From c76813d81b9886a36736f7a6b8427ab047799317 Mon Sep 17 00:00:00 2001 From: ouyangwulin Date: Thu, 7 Aug 2025 14:10:22 +0800 Subject: [PATCH 1/6] [FLINK-37959]supported postgres sql 14 all field types. --- .../pom.xml | 13 +++++ .../source/PostgresEventDeserializer.java | 24 ++++++--- .../postgres/utils/PostgresTypeUtils.java | 53 ++++++++++++++++++- .../source/PostgresFullTypesITCase.java | 31 ++++++++++- .../test/resources/ddl/column_type_test.sql | 19 ++++++- 5 files changed, 130 insertions(+), 10 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/pom.xml index 528d8586084..1c19d07886e 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/pom.xml +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/pom.xml @@ -39,6 +39,19 @@ limitations under the License. ${project.version} + + + com.esri.geometry + esri-geometry-api + ${geometry.version} + + + com.fasterxml.jackson.core + jackson-core + + + + org.apache.flink flink-connector-postgres-cdc diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java index 560ef72d26d..d6863e421e5 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java @@ -26,15 +26,18 @@ import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode; import org.apache.flink.table.data.TimestampData; +import com.esri.core.geometry.ogc.OGCGeometry; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import io.debezium.data.Envelope; import io.debezium.data.geometry.Geography; import io.debezium.data.geometry.Geometry; -import io.debezium.util.HexConverter; +import io.debezium.data.geometry.Point; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; +import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -111,15 +114,24 @@ protected Map getMetadata(SourceRecord record) { protected Object convertToString(Object dbzObj, Schema schema) { // the Geometry datatype in PostgreSQL will be converted to // a String with Json format - if (Geometry.LOGICAL_NAME.equals(schema.name()) + if (Point.LOGICAL_NAME.equals(schema.name()) + || Geometry.LOGICAL_NAME.equals(schema.name()) || Geography.LOGICAL_NAME.equals(schema.name())) { try { Struct geometryStruct = (Struct) dbzObj; byte[] wkb = geometryStruct.getBytes("wkb"); - Optional srid = Optional.ofNullable(geometryStruct.getInt32(SRID)); - Map geometryInfo = new HashMap<>(2); - geometryInfo.put(HEXEWKB, HexConverter.convertToHexString(wkb)); - geometryInfo.put(SRID, srid.orElse(0)); + String geoJson = OGCGeometry.fromBinary(ByteBuffer.wrap(wkb)).asGeoJson(); + JsonNode originGeoNode = OBJECT_MAPPER.readTree(geoJson); + Optional srid = Optional.ofNullable(geometryStruct.getInt32("srid")); + Map geometryInfo = new HashMap<>(); + String geometryType = originGeoNode.get("type").asText(); + geometryInfo.put("type", geometryType); + if (geometryType.equals("GeometryCollection")) { + geometryInfo.put("geometries", originGeoNode.get("geometries")); + } else { + geometryInfo.put("coordinates", originGeoNode.get("coordinates")); + } + geometryInfo.put("srid", srid.orElse(0)); return BinaryStringData.fromString(OBJECT_MAPPER.writeValueAsString(geometryInfo)); } catch (Exception e) { throw new IllegalArgumentException( diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java index 8af4057f9f0..86ec56febc2 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java @@ -26,6 +26,37 @@ /** A utility class for converting Postgres types to Flink types. */ public class PostgresTypeUtils { + private static final String PG_BIT = "bit"; + private static final String PG_BIT_ARRAY = "_bit"; + + private static final String PG_VARBIT = "varbit"; + private static final String PG_VARBIT_ARRAY = "_varbit"; + + private static final String PG_OID = "OID"; + + private static final String PG_CHAR = "char"; + private static final String PG_CHAR_ARRAY = "_char"; + + private static final String PG_TIMETZ = "timetz"; + private static final String PG_TIMETZ_ARRAY = "_timetz"; + + private static final String PG_INTERVAL = "interval"; + private static final String PG_INTERVAL_ARRAY = "_interval"; + + private static final String PG_JSON = "json"; + private static final String PG_JSONB = "jsonb"; + private static final String PG_XML = "xml"; + private static final String PG_POINT = "point"; + private static final String PG_LTREE = "ltree"; + private static final String PG_CITEXT = "citext"; + private static final String PG_INET = "inet"; + private static final String PG_INT4RANGE = "int4range"; + private static final String PG_INT8RANGE = "int8range"; + private static final String PG_NUMRANGE = "numrange"; + private static final String PG_TSTZRANGE = "tstzrange"; + private static final String PG_DATERANGE = "daterange"; + private static final String PG_ENUM = "enum"; + private static final String PG_SMALLSERIAL = "smallserial"; private static final String PG_SERIAL = "serial"; private static final String PG_BIGSERIAL = "bigserial"; @@ -55,8 +86,8 @@ public class PostgresTypeUtils { private static final String PG_TIME_ARRAY = "_time"; private static final String PG_TEXT = "text"; private static final String PG_TEXT_ARRAY = "_text"; - private static final String PG_CHAR = "bpchar"; - private static final String PG_CHAR_ARRAY = "_bpchar"; + private static final String PG_BPCHAR = "bpchar"; + private static final String PG_BPCHAR_ARRAY = "_bpchar"; private static final String PG_CHARACTER = "character"; private static final String PG_CHARACTER_ARRAY = "_character"; private static final String PG_CHARACTER_VARYING = "varchar"; @@ -88,6 +119,13 @@ private static DataType convertFromColumn(Column column) { switch (typeName) { case PG_BOOLEAN: return DataTypes.BOOLEAN(); + case PG_BIT: + case PG_VARBIT: + if (precision == 1) { + return DataTypes.BOOLEAN(); + } else { + return DataTypes.BINARY(precision); + } case PG_BOOLEAN_ARRAY: return DataTypes.ARRAY(DataTypes.BOOLEAN()); case PG_BYTEA: @@ -106,8 +144,11 @@ private static DataType convertFromColumn(Column column) { return DataTypes.ARRAY(DataTypes.INT()); case PG_BIGINT: case PG_BIGSERIAL: + case PG_OID: + case PG_INTERVAL: return DataTypes.BIGINT(); case PG_BIGINT_ARRAY: + case PG_INTERVAL_ARRAY: return DataTypes.ARRAY(DataTypes.BIGINT()); case PG_REAL: return DataTypes.FLOAT(); @@ -130,9 +171,11 @@ private static DataType convertFromColumn(Column column) { } return DataTypes.ARRAY(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 0)); case PG_CHAR: + case PG_BPCHAR: case PG_CHARACTER: return DataTypes.CHAR(precision); case PG_CHAR_ARRAY: + case PG_BPCHAR_ARRAY: case PG_CHARACTER_ARRAY: return DataTypes.ARRAY(DataTypes.CHAR(precision)); case PG_CHARACTER_VARYING: @@ -143,6 +186,10 @@ private static DataType convertFromColumn(Column column) { case PG_GEOMETRY: case PG_GEOGRAPHY: case PG_UUID: + case PG_JSON: + case PG_JSONB: + case PG_XML: + case PG_POINT: return DataTypes.STRING(); case PG_TEXT_ARRAY: return DataTypes.ARRAY(DataTypes.STRING()); @@ -155,8 +202,10 @@ private static DataType convertFromColumn(Column column) { case PG_TIMESTAMPTZ_ARRAY: return DataTypes.ARRAY(new ZonedTimestampType(scale)); case PG_TIME: + case PG_TIMETZ: return DataTypes.TIME(scale); case PG_TIME_ARRAY: + case PG_TIMETZ_ARRAY: return DataTypes.ARRAY(DataTypes.TIME(scale)); case PG_DATE: return DataTypes.DATE(); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java index 3dc04f59dc5..0785f71c1e1 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java @@ -195,7 +195,27 @@ public void testFullTypes() throws Exception { BinaryStringData.fromString( "{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187}"), BinaryStringData.fromString( - "{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326}") + "{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326}"), + true, + new byte[] {10}, + new byte[] {42}, + BinaryStringData.fromString("abc"), + 1209600000000L, + BinaryStringData.fromString( + "{\"order_id\": 10248, \"product\": \"Notebook\", \"quantity\": 5}"), + BinaryStringData.fromString( + "{\"product\": \"Pen\", \"order_id\": 10249, \"quantity\": 10}"), + BinaryStringData.fromString( + "\n" + + " 123\n" + + " Alice\n" + + " alice@example.com\n" + + " \n" + + " dark\n" + + " true\n" + + " \n" + + " "), + BinaryStringData.fromString("(3.456,7.890)") }; List snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0; @@ -259,5 +279,14 @@ private Object[] recordFields(RecordData record, RowType rowType) { DataTypes.TIME(0), DataTypes.DECIMAL(DecimalType.DEFAULT_PRECISION, DecimalType.DEFAULT_SCALE), DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.BOOLEAN(), + DataTypes.BINARY(8), + DataTypes.BINARY(20), + DataTypes.CHAR(3), + DataTypes.BIGINT(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), DataTypes.STRING()); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/column_type_test.sql b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/column_type_test.sql index 4790b0f4918..4967e191196 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/column_type_test.sql +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/column_type_test.sql @@ -46,6 +46,15 @@ CREATE TABLE full_types default_numeric_c NUMERIC, geometry_c GEOMETRY(POINT, 3187), geography_c GEOGRAPHY(MULTILINESTRING), + bit_c BIT(1), + bit_fixed_c BIT(8), + bit_varying_c BIT VARYING(20), + bpchar_c BPCHAR(3), + duration_c INTERVAL, + json_c JSON, + jsonb_c JSONB, + xml_C XML, + location POINT, PRIMARY KEY (id) ); @@ -56,4 +65,12 @@ INSERT INTO inventory.full_types VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true, 'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456', '2020-07-17', '18:00:22', 500,'SRID=3187;POINT(174.9479 -36.7208)'::geometry, - 'MULTILINESTRING((169.1321 -44.7032, 167.8974 -44.6414))'::geography); \ No newline at end of file + 'MULTILINESTRING((169.1321 -44.7032, 167.8974 -44.6414))'::geography,B'1',B'00001010',B'00101010','abc','2 weeks','{"order_id": 10248, "product": "Notebook", "quantity": 5}','{"order_id": 10249, "product": "Pen", "quantity": 10}'::jsonb,' + 123 + Alice + alice@example.com + + dark + true + + ','(3.456,7.890)'::point); \ No newline at end of file From be9540d4c25b51f305110cc9fb821f2815db7277 Mon Sep 17 00:00:00 2001 From: ouyangwulin Date: Fri, 8 Aug 2025 11:32:28 +0800 Subject: [PATCH 2/6] support all pg types [FLINK-38141][pipeline-connector/iceberg] Fix iceberg connector incorrect type mapping (#4070) --------- Co-authored-by: zhangchao.doovvv [minor] Fix potential sql connection statement issue (#4069) [FLINK-38185][pipeline-connector][iceberg] Correctly handle the type conversion of TIMESTAMP_TITH_TIME_ZONE. (#4074) [tests][pipeline-connector/fluss] Add MySQL to Fluss E2e IT case (#4057) * [ci][fluss] Add MySQL to Fluss E2e IT case Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> * add: comments Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> --------- Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> [FLINK-38184] one time of GetCopyOfBuffer is enough When serializing split. (#4073) [FLINK-38164][pipeline-connector/mysql] support mysql long and long varchar type (#4076) --------- Co-authored-by: zhangchao.doovvv [FLINK-37828] Enable scan.incremental.snapshot.unbounded-chunk-first by default for improved stability (#4082) [FLINK-38194][pipeline-connector/iceberg] Iceberg connector supports auto-creating namespace (#4080) --------- Co-authored-by: zhangchao.doovvv [FLINK-37905][runtime] Fix transform failure with non-ascii string literals (#4038) * [FLINK-37905] Fix transform failure with non-ascii string literals * address comments Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> --------- Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> [FLINK-38142] Upgrading the Paimon version to 1.2.0 (#4066) [FLINK-37963] Fix potential NPE when triggering JobManager failover prematurely (#4044) fixed decimal add test fixed decimal mode test add all types [FLINK-38059][doc] Add fluss pipeline connector documentation (#4088) Co-authored-by: wangjunbo [FLINK-37835] Fix NoPointException when start with latest-offset. (#4091) --------- Co-authored-by: wuzexian [FLINK-38188][pipeline-connector][postgres] Fix database name validation logic in PostgresDataSourceFactory (#4075) --------- Co-authored-by: lvyanquan [hotfix][doc] Update uses of Upload documentation in build_docs.yml. (#4093) [FLINK-38204][pipeline-connector][maxcompute] Use getLatestEvolvedSchema to get Schema in SessionManageOperator in case of using route. #4094 Co-authored-by: wuzexian back --- .../pom.xml | 13 - .../source/PostgresEventDeserializer.java | 19 +- .../PostgresSchemaDataTypeInference.java | 4 +- .../postgres/utils/PostgresSchemaUtils.java | 21 +- .../postgres/utils/PostgresTypeUtils.java | 352 +++++++----- .../source/PostgresFullTypesITCase.java | 513 +++++++++++++++++- .../test/resources/ddl/column_type_test.sql | 59 +- .../test/resources/ddl/decimal_mode_test.sql | 70 +++ .../DebeziumEventDeserializationSchema.java | 48 +- .../DebeziumSchemaDataTypeInference.java | 8 +- 10 files changed, 921 insertions(+), 186 deletions(-) create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/decimal_mode_test.sql diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/pom.xml index 1c19d07886e..528d8586084 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/pom.xml +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/pom.xml @@ -39,19 +39,6 @@ limitations under the License. ${project.version} - - - com.esri.geometry - esri-geometry-api - ${geometry.version} - - - com.fasterxml.jackson.core - jackson-core - - - - org.apache.flink flink-connector-postgres-cdc diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java index d6863e421e5..dd4f3b4bbe5 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java @@ -26,8 +26,6 @@ import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode; import org.apache.flink.table.data.TimestampData; -import com.esri.core.geometry.ogc.OGCGeometry; -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import io.debezium.data.Envelope; import io.debezium.data.geometry.Geography; @@ -36,8 +34,8 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; +import org.locationtech.jts.io.WKBReader; -import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -51,8 +49,6 @@ public class PostgresEventDeserializer extends DebeziumEventDeserializationSchem private static final long serialVersionUID = 1L; private List readableMetadataList; - public static final String SRID = "srid"; - public static final String HEXEWKB = "hexewkb"; private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public PostgresEventDeserializer(DebeziumChangelogMode changelogMode) { @@ -120,16 +116,19 @@ protected Object convertToString(Object dbzObj, Schema schema) { try { Struct geometryStruct = (Struct) dbzObj; byte[] wkb = geometryStruct.getBytes("wkb"); - String geoJson = OGCGeometry.fromBinary(ByteBuffer.wrap(wkb)).asGeoJson(); - JsonNode originGeoNode = OBJECT_MAPPER.readTree(geoJson); + + WKBReader wkbReader = new WKBReader(); + org.locationtech.jts.geom.Geometry jtsGeom = wkbReader.read(wkb); + Optional srid = Optional.ofNullable(geometryStruct.getInt32("srid")); Map geometryInfo = new HashMap<>(); - String geometryType = originGeoNode.get("type").asText(); + String geometryType = jtsGeom.getGeometryType(); geometryInfo.put("type", geometryType); + if (geometryType.equals("GeometryCollection")) { - geometryInfo.put("geometries", originGeoNode.get("geometries")); + geometryInfo.put("geometries", jtsGeom.toText()); } else { - geometryInfo.put("coordinates", originGeoNode.get("coordinates")); + geometryInfo.put("coordinates", jtsGeom.getCoordinates()); } geometryInfo.put("srid", srid.orElse(0)); return BinaryStringData.fromString(OBJECT_MAPPER.writeValueAsString(geometryInfo)); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSchemaDataTypeInference.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSchemaDataTypeInference.java index e82e5e8efa7..90a66978029 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSchemaDataTypeInference.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSchemaDataTypeInference.java @@ -24,6 +24,7 @@ import io.debezium.data.geometry.Geography; import io.debezium.data.geometry.Geometry; +import io.debezium.data.geometry.Point; import org.apache.kafka.connect.data.Schema; /** {@link DataType} inference for PostgresSQL debezium {@link Schema}. */ @@ -35,7 +36,8 @@ public class PostgresSchemaDataTypeInference extends DebeziumSchemaDataTypeInfer protected DataType inferStruct(Object value, Schema schema) { // the Geometry datatype in PostgresSQL will be converted to // a String with Json format - if (Geography.LOGICAL_NAME.equals(schema.name()) + if (Point.LOGICAL_NAME.equals(schema.name()) + || Geography.LOGICAL_NAME.equals(schema.name()) || Geometry.LOGICAL_NAME.equals(schema.name())) { return DataTypes.STRING(); } else { diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java index d2200531c49..0f3b0580644 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java @@ -23,9 +23,11 @@ import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect; import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig; +import io.debezium.connector.postgresql.PostgresConnectorConfig; import io.debezium.connector.postgresql.PostgresObjectUtils; import io.debezium.connector.postgresql.PostgresSchema; import io.debezium.connector.postgresql.PostgresTopicSelector; +import io.debezium.connector.postgresql.TypeRegistry; import io.debezium.connector.postgresql.connection.PostgresConnection; import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.Table; @@ -165,16 +167,18 @@ public static Schema getTableSchema( topicSelector, valueConverterBuilder.build(jdbc.getTypeRegistry())); Table tableSchema = postgresSchema.tableFor(tableId); - return toSchema(tableSchema); + return toSchema( + tableSchema, sourceConfig.getDbzConnectorConfig(), jdbc.getTypeRegistry()); } catch (SQLException e) { throw new RuntimeException("Failed to initialize PostgresReplicationConnection", e); } } - public static Schema toSchema(Table table) { + public static Schema toSchema( + Table table, PostgresConnectorConfig dbzConfig, TypeRegistry typeRegistry) { List columns = table.columns().stream() - .map(PostgresSchemaUtils::toColumn) + .map(column -> toColumn(column, dbzConfig, typeRegistry)) .collect(Collectors.toList()); return Schema.newBuilder() @@ -184,16 +188,21 @@ public static Schema toSchema(Table table) { .build(); } - public static Column toColumn(io.debezium.relational.Column column) { + public static Column toColumn( + io.debezium.relational.Column column, + PostgresConnectorConfig dbzConfig, + TypeRegistry typeRegistry) { if (column.defaultValueExpression().isPresent()) { return Column.physicalColumn( column.name(), - PostgresTypeUtils.fromDbzColumn(column), + PostgresTypeUtils.fromDbzColumn(column, dbzConfig, typeRegistry), column.comment(), column.defaultValueExpression().get()); } else { return Column.physicalColumn( - column.name(), PostgresTypeUtils.fromDbzColumn(column), column.comment()); + column.name(), + PostgresTypeUtils.fromDbzColumn(column, dbzConfig, typeRegistry), + column.comment()); } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java index 86ec56febc2..404a7b4d10f 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java @@ -22,83 +22,23 @@ import org.apache.flink.cdc.common.types.ZonedTimestampType; import org.apache.flink.table.types.logical.DecimalType; +import io.debezium.connector.postgresql.PgOid; +import io.debezium.connector.postgresql.PostgresConnectorConfig; +import io.debezium.connector.postgresql.PostgresType; +import io.debezium.connector.postgresql.TypeRegistry; +import io.debezium.jdbc.JdbcValueConverters; +import io.debezium.jdbc.TemporalPrecisionMode; import io.debezium.relational.Column; +import static io.debezium.connector.postgresql.PostgresConnectorConfig.MONEY_FRACTION_DIGITS; + /** A utility class for converting Postgres types to Flink types. */ public class PostgresTypeUtils { - private static final String PG_BIT = "bit"; - private static final String PG_BIT_ARRAY = "_bit"; - - private static final String PG_VARBIT = "varbit"; - private static final String PG_VARBIT_ARRAY = "_varbit"; - - private static final String PG_OID = "OID"; - - private static final String PG_CHAR = "char"; - private static final String PG_CHAR_ARRAY = "_char"; - - private static final String PG_TIMETZ = "timetz"; - private static final String PG_TIMETZ_ARRAY = "_timetz"; - - private static final String PG_INTERVAL = "interval"; - private static final String PG_INTERVAL_ARRAY = "_interval"; - - private static final String PG_JSON = "json"; - private static final String PG_JSONB = "jsonb"; - private static final String PG_XML = "xml"; - private static final String PG_POINT = "point"; - private static final String PG_LTREE = "ltree"; - private static final String PG_CITEXT = "citext"; - private static final String PG_INET = "inet"; - private static final String PG_INT4RANGE = "int4range"; - private static final String PG_INT8RANGE = "int8range"; - private static final String PG_NUMRANGE = "numrange"; - private static final String PG_TSTZRANGE = "tstzrange"; - private static final String PG_DATERANGE = "daterange"; - private static final String PG_ENUM = "enum"; - - private static final String PG_SMALLSERIAL = "smallserial"; - private static final String PG_SERIAL = "serial"; - private static final String PG_BIGSERIAL = "bigserial"; - private static final String PG_BYTEA = "bytea"; - private static final String PG_BYTEA_ARRAY = "_bytea"; - private static final String PG_SMALLINT = "int2"; - private static final String PG_SMALLINT_ARRAY = "_int2"; - private static final String PG_INTEGER = "int4"; - private static final String PG_INTEGER_ARRAY = "_int4"; - private static final String PG_BIGINT = "int8"; - private static final String PG_BIGINT_ARRAY = "_int8"; - private static final String PG_REAL = "float4"; - private static final String PG_REAL_ARRAY = "_float4"; - private static final String PG_DOUBLE_PRECISION = "float8"; - private static final String PG_DOUBLE_PRECISION_ARRAY = "_float8"; - private static final String PG_NUMERIC = "numeric"; - private static final String PG_NUMERIC_ARRAY = "_numeric"; - private static final String PG_BOOLEAN = "bool"; - private static final String PG_BOOLEAN_ARRAY = "_bool"; - private static final String PG_TIMESTAMP = "timestamp"; - private static final String PG_TIMESTAMP_ARRAY = "_timestamp"; - private static final String PG_TIMESTAMPTZ = "timestamptz"; - private static final String PG_TIMESTAMPTZ_ARRAY = "_timestamptz"; - private static final String PG_DATE = "date"; - private static final String PG_DATE_ARRAY = "_date"; - private static final String PG_TIME = "time"; - private static final String PG_TIME_ARRAY = "_time"; - private static final String PG_TEXT = "text"; - private static final String PG_TEXT_ARRAY = "_text"; - private static final String PG_BPCHAR = "bpchar"; - private static final String PG_BPCHAR_ARRAY = "_bpchar"; - private static final String PG_CHARACTER = "character"; - private static final String PG_CHARACTER_ARRAY = "_character"; - private static final String PG_CHARACTER_VARYING = "varchar"; - private static final String PG_CHARACTER_VARYING_ARRAY = "_varchar"; - private static final String PG_UUID = "uuid"; - private static final String PG_GEOMETRY = "geometry"; - private static final String PG_GEOGRAPHY = "geography"; /** Returns a corresponding Flink data type from a debezium {@link Column}. */ - public static DataType fromDbzColumn(Column column) { - DataType dataType = convertFromColumn(column); + public static DataType fromDbzColumn( + Column column, PostgresConnectorConfig dbzConfig, TypeRegistry typeRegistry) { + DataType dataType = convertFromColumn(column, dbzConfig, typeRegistry); if (column.isOptional()) { return dataType; } else { @@ -110,110 +50,242 @@ public static DataType fromDbzColumn(Column column) { * Returns a corresponding Flink data type from a debezium {@link Column} with nullable always * be true. */ - private static DataType convertFromColumn(Column column) { - String typeName = column.typeName(); + private static DataType convertFromColumn( + Column column, PostgresConnectorConfig dbzConfig, TypeRegistry typeRegistry) { + int nativeType = column.nativeType(); int precision = column.length(); int scale = column.scale().orElse(0); - switch (typeName) { - case PG_BOOLEAN: + PostgresConnectorConfig.IntervalHandlingMode intervalHandlingMode = + PostgresConnectorConfig.IntervalHandlingMode.parse( + dbzConfig + .getConfig() + .getString(PostgresConnectorConfig.INTERVAL_HANDLING_MODE)); + + TemporalPrecisionMode temporalPrecisionMode = dbzConfig.getTemporalPrecisionMode(); + + JdbcValueConverters.DecimalMode decimalMode = + dbzConfig.getDecimalMode() != null + ? dbzConfig.getDecimalMode() + : JdbcValueConverters.DecimalMode.PRECISE; + + PostgresConnectorConfig.HStoreHandlingMode hStoreHandlingMode = + PostgresConnectorConfig.HStoreHandlingMode.parse( + dbzConfig + .getConfig() + .getString(PostgresConnectorConfig.HSTORE_HANDLING_MODE)); + + switch (nativeType) { + case PgOid.BOOL: return DataTypes.BOOLEAN(); - case PG_BIT: - case PG_VARBIT: + case PgOid.BIT: + case PgOid.VARBIT: if (precision == 1) { return DataTypes.BOOLEAN(); } else { return DataTypes.BINARY(precision); } - case PG_BOOLEAN_ARRAY: + case PgOid.BOOL_ARRAY: return DataTypes.ARRAY(DataTypes.BOOLEAN()); - case PG_BYTEA: + case PgOid.BYTEA: return DataTypes.BYTES(); - case PG_BYTEA_ARRAY: + case PgOid.BYTEA_ARRAY: return DataTypes.ARRAY(DataTypes.BYTES()); - case PG_SMALLINT: - case PG_SMALLSERIAL: + case PgOid.INT2: return DataTypes.SMALLINT(); - case PG_SMALLINT_ARRAY: + case PgOid.INT2_ARRAY: return DataTypes.ARRAY(DataTypes.SMALLINT()); - case PG_INTEGER: - case PG_SERIAL: + case PgOid.INT4: return DataTypes.INT(); - case PG_INTEGER_ARRAY: + case PgOid.INT4_ARRAY: return DataTypes.ARRAY(DataTypes.INT()); - case PG_BIGINT: - case PG_BIGSERIAL: - case PG_OID: - case PG_INTERVAL: + case PgOid.INT8: + case PgOid.OID: return DataTypes.BIGINT(); - case PG_BIGINT_ARRAY: - case PG_INTERVAL_ARRAY: + case PgOid.INTERVAL: + return handleIntervalWithIntervalHandlingMode(intervalHandlingMode); + case PgOid.INTERVAL_ARRAY: + return DataTypes.ARRAY( + handleIntervalWithIntervalHandlingMode(intervalHandlingMode)); + case PgOid.INT8_ARRAY: return DataTypes.ARRAY(DataTypes.BIGINT()); - case PG_REAL: + case PgOid.FLOAT4: return DataTypes.FLOAT(); - case PG_REAL_ARRAY: + case PgOid.FLOAT4_ARRAY: return DataTypes.ARRAY(DataTypes.FLOAT()); - case PG_DOUBLE_PRECISION: + case PgOid.FLOAT8: return DataTypes.DOUBLE(); - case PG_DOUBLE_PRECISION_ARRAY: + case PgOid.FLOAT8_ARRAY: return DataTypes.ARRAY(DataTypes.DOUBLE()); - case PG_NUMERIC: + case PgOid.NUMERIC: // see SPARK-26538: handle numeric without explicit precision and scale. - if (precision > 0) { - return DataTypes.DECIMAL(precision, scale); - } - return DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 0); - case PG_NUMERIC_ARRAY: + return handleNumericWithDecimalMode(precision, scale, decimalMode); + case PgOid.NUMERIC_ARRAY: // see SPARK-26538: handle numeric without explicit precision and scale. - if (precision > 0) { - return DataTypes.ARRAY(DataTypes.DECIMAL(precision, scale)); - } - return DataTypes.ARRAY(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 0)); - case PG_CHAR: - case PG_BPCHAR: - case PG_CHARACTER: + return DataTypes.ARRAY(handleNumericWithDecimalMode(precision, scale, decimalMode)); + case PgOid.MONEY: + return handleMoneyWithDecimalMode( + dbzConfig.getConfig().getInteger(MONEY_FRACTION_DIGITS), decimalMode); + case PgOid.CHAR: + case PgOid.BPCHAR: return DataTypes.CHAR(precision); - case PG_CHAR_ARRAY: - case PG_BPCHAR_ARRAY: - case PG_CHARACTER_ARRAY: + case PgOid.CHAR_ARRAY: + case PgOid.BPCHAR_ARRAY: return DataTypes.ARRAY(DataTypes.CHAR(precision)); - case PG_CHARACTER_VARYING: + case PgOid.VARCHAR: return DataTypes.VARCHAR(precision); - case PG_CHARACTER_VARYING_ARRAY: + case PgOid.VARCHAR_ARRAY: return DataTypes.ARRAY(DataTypes.VARCHAR(precision)); - case PG_TEXT: - case PG_GEOMETRY: - case PG_GEOGRAPHY: - case PG_UUID: - case PG_JSON: - case PG_JSONB: - case PG_XML: - case PG_POINT: + case PgOid.TEXT: + case PgOid.POINT: + case PgOid.UUID: + case PgOid.JSON: + case PgOid.JSONB: + case PgOid.XML: + case PgOid.INET_OID: + case PgOid.CIDR_OID: + case PgOid.MACADDR_OID: + case PgOid.MACADDR8_OID: + case PgOid.INT4RANGE_OID: + case PgOid.NUM_RANGE_OID: + case PgOid.INT8RANGE_OID: + case PgOid.TSRANGE_OID: + case PgOid.TSTZRANGE_OID: + case PgOid.DATERANGE_OID: + case PgOid.TIMETZ: return DataTypes.STRING(); - case PG_TEXT_ARRAY: + case PgOid.TEXT_ARRAY: + case PgOid.TIMETZ_ARRAY: return DataTypes.ARRAY(DataTypes.STRING()); - case PG_TIMESTAMP: - return DataTypes.TIMESTAMP(scale); - case PG_TIMESTAMP_ARRAY: - return DataTypes.ARRAY(DataTypes.TIMESTAMP(scale)); - case PG_TIMESTAMPTZ: + case PgOid.TIMESTAMP: + return handleTimestampWithTemporalMode(temporalPrecisionMode); + case PgOid.TIMESTAMP_ARRAY: + return DataTypes.ARRAY(handleTimestampWithTemporalMode(temporalPrecisionMode)); + case PgOid.TIMESTAMPTZ: return new ZonedTimestampType(scale); - case PG_TIMESTAMPTZ_ARRAY: + case PgOid.TIMESTAMPTZ_ARRAY: return DataTypes.ARRAY(new ZonedTimestampType(scale)); - case PG_TIME: - case PG_TIMETZ: - return DataTypes.TIME(scale); - case PG_TIME_ARRAY: - case PG_TIMETZ_ARRAY: - return DataTypes.ARRAY(DataTypes.TIME(scale)); - case PG_DATE: - return DataTypes.DATE(); - case PG_DATE_ARRAY: - return DataTypes.ARRAY(DataTypes.DATE()); + case PgOid.TIME: + return handleTimeWithTemporalMode(temporalPrecisionMode, scale); + case PgOid.TIME_ARRAY: + return DataTypes.ARRAY(handleTimeWithTemporalMode(temporalPrecisionMode, scale)); + case PgOid.DATE: + return handleDateWithTemporalMode(temporalPrecisionMode); + case PgOid.DATE_ARRAY: + return DataTypes.ARRAY(handleDateWithTemporalMode(temporalPrecisionMode)); default: + if (nativeType == typeRegistry.ltreeOid()) { + return DataTypes.STRING(); + } else if (nativeType == typeRegistry.geometryOid()) { + return DataTypes.STRING(); + } else if (nativeType == typeRegistry.geographyOid()) { + return DataTypes.STRING(); + } else if (nativeType == typeRegistry.citextOid()) { + return DataTypes.STRING(); + } else if (nativeType == typeRegistry.hstoreOid()) { + return handleHstoreWithHstoreMode(hStoreHandlingMode); + } else if (nativeType == typeRegistry.ltreeArrayOid()) { + return DataTypes.ARRAY(DataTypes.STRING()); + } else if (nativeType == typeRegistry.geometryArrayOid()) { + return DataTypes.ARRAY(DataTypes.STRING()); + } + final PostgresType resolvedType = typeRegistry.get(nativeType); + if (resolvedType.isEnumType()) { + return DataTypes.STRING(); + } throw new UnsupportedOperationException( - String.format("Doesn't support Postgres type '%s' yet", typeName)); + String.format( + "Doesn't support Postgres type '%s', Postgres oid '%d' yet", + column.typeName(), column.nativeType())); + } + } + + public static DataType handleNumericWithDecimalMode( + int precision, int scale, JdbcValueConverters.DecimalMode mode) { + switch (mode) { + case PRECISE: + if (precision > 0 && precision <= 38) { + return DataTypes.DECIMAL(precision, scale); + } + return DataTypes.DECIMAL(DecimalType.MAX_PRECISION, DecimalType.DEFAULT_SCALE); + case DOUBLE: + return DataTypes.DOUBLE(); + case STRING: + return DataTypes.STRING(); + default: + throw new IllegalArgumentException("Unknown decimal mode: " + mode); + } + } + + public static DataType handleMoneyWithDecimalMode( + int moneyFractionDigits, JdbcValueConverters.DecimalMode mode) { + switch (mode) { + case PRECISE: + return DataTypes.DECIMAL(DecimalType.MAX_PRECISION, moneyFractionDigits); + case DOUBLE: + return DataTypes.DOUBLE(); + case STRING: + return DataTypes.STRING(); + default: + throw new IllegalArgumentException("Unknown decimal mode: " + mode); + } + } + + public static DataType handleIntervalWithIntervalHandlingMode( + PostgresConnectorConfig.IntervalHandlingMode mode) { + switch (mode) { + case NUMERIC: + return DataTypes.BIGINT(); + case STRING: + return DataTypes.STRING(); + default: + throw new IllegalArgumentException("Unknown interval mode: " + mode); + } + } + + public static DataType handleDateWithTemporalMode(TemporalPrecisionMode mode) { + switch (mode) { + case ADAPTIVE: + case ADAPTIVE_TIME_MICROSECONDS: + case CONNECT: + return DataTypes.DATE(); + default: + throw new IllegalArgumentException("Unknown temporal precision mode: " + mode); + } + } + + public static DataType handleTimeWithTemporalMode(TemporalPrecisionMode mode, int scale) { + switch (mode) { + case ADAPTIVE: + case ADAPTIVE_TIME_MICROSECONDS: + case CONNECT: + return DataTypes.TIME(scale); + default: + throw new IllegalArgumentException("Unknown temporal precision mode: " + mode); + } + } + + public static DataType handleTimestampWithTemporalMode(TemporalPrecisionMode mode) { + switch (mode) { + case ADAPTIVE: + case ADAPTIVE_TIME_MICROSECONDS: + case CONNECT: + return DataTypes.BIGINT(); + default: + throw new IllegalArgumentException("Unknown temporal precision mode: " + mode); + } + } + + public static DataType handleHstoreWithHstoreMode( + PostgresConnectorConfig.HStoreHandlingMode mode) { + switch (mode) { + case JSON: + return DataTypes.STRING(); + case MAP: + return DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()); + default: + throw new IllegalArgumentException("Unknown hstore mode: " + mode); } } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java index 0785f71c1e1..9942a213b92 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java @@ -21,8 +21,10 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cdc.common.data.DecimalData; +import org.apache.flink.cdc.common.data.LocalZonedTimestampData; import org.apache.flink.cdc.common.data.RecordData; import org.apache.flink.cdc.common.data.TimestampData; +import org.apache.flink.cdc.common.data.binary.BinaryMapData; import org.apache.flink.cdc.common.data.binary.BinaryStringData; import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.DataChangeEvent; @@ -59,10 +61,16 @@ import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; +import java.sql.Timestamp; +import java.time.Instant; import java.time.LocalDateTime; +import java.time.ZoneId; import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.Properties; import java.util.stream.Stream; import static org.testcontainers.containers.PostgreSQLContainer.POSTGRESQL_PORT; @@ -193,9 +201,9 @@ public void testFullTypes() throws Exception { 64822000, DecimalData.fromBigDecimal(new BigDecimal("500"), 10, 0), BinaryStringData.fromString( - "{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187}"), + "{\"coordinates\":[{\"x\":174.9479,\"y\":-36.7208,\"z\":\"NaN\",\"m\":\"NaN\",\"valid\":true}],\"type\":\"Point\",\"srid\":3187}"), BinaryStringData.fromString( - "{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326}"), + "{\"coordinates\":[{\"x\":169.1321,\"y\":-44.7032,\"z\":\"NaN\",\"m\":\"NaN\",\"valid\":true},{\"x\":167.8974,\"y\":-44.6414,\"z\":\"NaN\",\"m\":\"NaN\",\"valid\":true}],\"type\":\"MultiLineString\",\"srid\":4326}"), true, new byte[] {10}, new byte[] {42}, @@ -207,21 +215,440 @@ public void testFullTypes() throws Exception { "{\"product\": \"Pen\", \"order_id\": 10249, \"quantity\": 10}"), BinaryStringData.fromString( "\n" - + " 123\n" - + " Alice\n" - + " alice@example.com\n" - + " \n" - + " dark\n" - + " true\n" - + " \n" - + " "), - BinaryStringData.fromString("(3.456,7.890)") + + "123\n" + + "Alice\n" + + "alice@example.com\n" + + "\n" + + "dark\n" + + "true\n" + + "\n" + + ""), + BinaryStringData.fromString( + "{\"coordinates\":[{\"x\":3.456,\"y\":7.89,\"z\":\"NaN\",\"m\":\"NaN\",\"valid\":true}],\"type\":\"Point\",\"srid\":0}"), + BinaryStringData.fromString("foo.bar.baz"), + BinaryStringData.fromString("JohnDoe"), + BinaryStringData.fromString("{\"size\":\"L\",\"color\":\"blue\"}"), + BinaryStringData.fromString("192.168.1.1"), + BinaryStringData.fromString("[1,10)"), + BinaryStringData.fromString("[1000000000,5000000000)"), + BinaryStringData.fromString("[5.5,20.75)"), + BinaryStringData.fromString( + "[\"2023-08-01 08:00:00\",\"2023-08-01 12:00:00\")"), + BinaryStringData.fromString( + "[\"2023-08-01 16:00:00+08\",\"2023-08-01 20:00:00+08\")"), + BinaryStringData.fromString("[2023-08-01,2023-08-15)"), + BinaryStringData.fromString("pending"), + }; + + List snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0; + RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after(); + + Assertions.assertThat(recordFields(snapshotRecord, COMMON_TYPES)) + .isEqualTo(expectedSnapshot); + } + + @Test + public void testTimeTypesWithTemporalModeAdaptive() throws Exception { + initializePostgresTable(POSTGIS_CONTAINER, "column_type_test"); + + Properties debeziumProps = new Properties(); + debeziumProps.setProperty("time.precision.mode", "adaptive"); + + PostgresSourceConfigFactory configFactory = + (PostgresSourceConfigFactory) + new PostgresSourceConfigFactory() + .hostname(POSTGIS_CONTAINER.getHost()) + .port(POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT)) + .username(TEST_USER) + .password(TEST_PASSWORD) + .databaseList(POSTGRES_CONTAINER.getDatabaseName()) + .tableList("inventory.time_types") + .startupOptions(StartupOptions.initial()) + .debeziumProperties(debeziumProps) + .serverTimeZone("UTC"); + configFactory.database(POSTGRES_CONTAINER.getDatabaseName()); + configFactory.slotName(slotName); + configFactory.decodingPluginName("pgoutput"); + + FlinkSourceProvider sourceProvider = + (FlinkSourceProvider) + new PostgresDataSource(configFactory).getEventSourceProvider(); + + CloseableIterator events = + env.fromSource( + sourceProvider.getSource(), + WatermarkStrategy.noWatermarks(), + PostgresDataSourceFactory.IDENTIFIER, + new EventTypeInfo()) + .executeAndCollect(); + + Object[] expectedSnapshot = + new Object[] { + 2, + 18460, + 64822000, + 64822123, + 64822123, + BinaryStringData.fromString("18:00:22Z"), + 481036337152L, + 515396075520L, + 549756269888L, + 584115552256L, + LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")), + }; + + List snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0; + RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after(); + Object[] ob = recordFields(snapshotRecord, TIME_TYPES_WITH_ADAPTIVE); + Assertions.assertThat(recordFields(snapshotRecord, TIME_TYPES_WITH_ADAPTIVE)) + .isEqualTo(expectedSnapshot); + } + + @Test + public void testHandlingDecimalModePrecise() throws Exception { + initializePostgresTable(POSTGIS_CONTAINER, "decimal_mode_test"); + + Properties debeziumProps = new Properties(); + debeziumProps.setProperty("decimal.handling.mode", "precise"); + + PostgresSourceConfigFactory configFactory = + (PostgresSourceConfigFactory) + new PostgresSourceConfigFactory() + .hostname(POSTGIS_CONTAINER.getHost()) + .port(POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT)) + .username(TEST_USER) + .password(TEST_PASSWORD) + .databaseList(POSTGRES_CONTAINER.getDatabaseName()) + .tableList("test_decimal.decimal_test_table") + .startupOptions(StartupOptions.initial()) + .debeziumProperties(debeziumProps) + .serverTimeZone("UTC"); + configFactory.database(POSTGRES_CONTAINER.getDatabaseName()); + configFactory.slotName(slotName); + configFactory.decodingPluginName("pgoutput"); + + FlinkSourceProvider sourceProvider = + (FlinkSourceProvider) + new PostgresDataSource(configFactory).getEventSourceProvider(); + + CloseableIterator events = + env.fromSource( + sourceProvider.getSource(), + WatermarkStrategy.noWatermarks(), + PostgresDataSourceFactory.IDENTIFIER, + new EventTypeInfo()) + .executeAndCollect(); + + Object[] expectedSnapshot = + new Object[] { + 1, + DecimalData.fromBigDecimal(new BigDecimal("123.45"), 10, 2), + DecimalData.fromBigDecimal(new BigDecimal("67.8912"), 8, 4), + DecimalData.fromBigDecimal(new BigDecimal("987.65"), 5, 2), + DecimalData.fromBigDecimal(new BigDecimal("12.3"), 3, 1), + DecimalData.fromBigDecimal(new BigDecimal("100.50"), 38, 2), + }; + + List snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0; + RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after(); + + Assertions.assertThat(recordFields(snapshotRecord, TYPES_WITH_PRECISE)) + .isEqualTo(expectedSnapshot); + } + + @Test + public void testHandlingDecimalModeDouble() throws Exception { + initializePostgresTable(POSTGIS_CONTAINER, "decimal_mode_test"); + + Properties debeziumProps = new Properties(); + debeziumProps.setProperty("decimal.handling.mode", "double"); + + PostgresSourceConfigFactory configFactory = + (PostgresSourceConfigFactory) + new PostgresSourceConfigFactory() + .hostname(POSTGIS_CONTAINER.getHost()) + .port(POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT)) + .username(TEST_USER) + .password(TEST_PASSWORD) + .databaseList(POSTGRES_CONTAINER.getDatabaseName()) + .tableList("test_decimal.decimal_test_table") + .startupOptions(StartupOptions.initial()) + .debeziumProperties(debeziumProps) + .serverTimeZone("UTC"); + configFactory.database(POSTGRES_CONTAINER.getDatabaseName()); + configFactory.slotName(slotName); + configFactory.decodingPluginName("pgoutput"); + + FlinkSourceProvider sourceProvider = + (FlinkSourceProvider) + new PostgresDataSource(configFactory).getEventSourceProvider(); + + CloseableIterator events = + env.fromSource( + sourceProvider.getSource(), + WatermarkStrategy.noWatermarks(), + PostgresDataSourceFactory.IDENTIFIER, + new EventTypeInfo()) + .executeAndCollect(); + + Object[] expectedSnapshot = + new Object[] { + 1, 123.45, 67.8912, 987.65, 12.3, 100.50, }; List snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0; RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after(); - Assertions.assertThat(recordFields(snapshotRecord, PG_TYPES)).isEqualTo(expectedSnapshot); + Assertions.assertThat(recordFields(snapshotRecord, TYPES_WITH_DOUBLE)) + .isEqualTo(expectedSnapshot); + } + + @Test + public void testHandlingDecimalModeString() throws Exception { + initializePostgresTable(POSTGIS_CONTAINER, "decimal_mode_test"); + + Properties debeziumProps = new Properties(); + debeziumProps.setProperty("decimal.handling.mode", "string"); + + PostgresSourceConfigFactory configFactory = + (PostgresSourceConfigFactory) + new PostgresSourceConfigFactory() + .hostname(POSTGIS_CONTAINER.getHost()) + .port(POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT)) + .username(TEST_USER) + .password(TEST_PASSWORD) + .databaseList(POSTGRES_CONTAINER.getDatabaseName()) + .tableList("test_decimal.decimal_test_table") + .startupOptions(StartupOptions.initial()) + .debeziumProperties(debeziumProps) + .serverTimeZone("UTC"); + configFactory.database(POSTGRES_CONTAINER.getDatabaseName()); + configFactory.slotName(slotName); + configFactory.decodingPluginName("pgoutput"); + + FlinkSourceProvider sourceProvider = + (FlinkSourceProvider) + new PostgresDataSource(configFactory).getEventSourceProvider(); + + CloseableIterator events = + env.fromSource( + sourceProvider.getSource(), + WatermarkStrategy.noWatermarks(), + PostgresDataSourceFactory.IDENTIFIER, + new EventTypeInfo()) + .executeAndCollect(); + + Object[] expectedSnapshot = + new Object[] { + 1, + BinaryStringData.fromString("123.45"), + BinaryStringData.fromString("67.8912"), + BinaryStringData.fromString("987.65"), + BinaryStringData.fromString("12.3"), + BinaryStringData.fromString("100.5"), + }; + + List snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0; + RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after(); + Assertions.assertThat(recordFields(snapshotRecord, TYPES_WITH_STRING)) + .isEqualTo(expectedSnapshot); + } + + @Test + public void testZeroHandlingDecimalModePrecise() throws Exception { + initializePostgresTable(POSTGIS_CONTAINER, "decimal_mode_test"); + + Properties debeziumProps = new Properties(); + debeziumProps.setProperty("decimal.handling.mode", "precise"); + + PostgresSourceConfigFactory configFactory = + (PostgresSourceConfigFactory) + new PostgresSourceConfigFactory() + .hostname(POSTGIS_CONTAINER.getHost()) + .port(POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT)) + .username(TEST_USER) + .password(TEST_PASSWORD) + .databaseList(POSTGRES_CONTAINER.getDatabaseName()) + .tableList("test_decimal.decimal_test_zero") + .startupOptions(StartupOptions.initial()) + .debeziumProperties(debeziumProps) + .serverTimeZone("UTC"); + configFactory.database(POSTGRES_CONTAINER.getDatabaseName()); + configFactory.slotName(slotName); + configFactory.decodingPluginName("pgoutput"); + + FlinkSourceProvider sourceProvider = + (FlinkSourceProvider) + new PostgresDataSource(configFactory).getEventSourceProvider(); + + CloseableIterator events = + env.fromSource( + sourceProvider.getSource(), + WatermarkStrategy.noWatermarks(), + PostgresDataSourceFactory.IDENTIFIER, + new EventTypeInfo()) + .executeAndCollect(); + + Object[] expectedSnapshot = + new Object[] { + 2, + DecimalData.fromBigDecimal(new BigDecimal("99999999.99"), 10, 2), + DecimalData.fromBigDecimal(new BigDecimal("9999.9999"), 8, 4), + null, + null, + null, + }; + + List snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0; + RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after(); + Assertions.assertThat(recordFields(snapshotRecord, TYPES_WITH_PRECISE)) + .isEqualTo(expectedSnapshot); + } + + @Test + public void testZeroHandlingDecimalModeDouble() throws Exception { + initializePostgresTable(POSTGIS_CONTAINER, "decimal_mode_test"); + + Properties debeziumProps = new Properties(); + debeziumProps.setProperty("decimal.handling.mode", "double"); + + PostgresSourceConfigFactory configFactory = + (PostgresSourceConfigFactory) + new PostgresSourceConfigFactory() + .hostname(POSTGIS_CONTAINER.getHost()) + .port(POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT)) + .username(TEST_USER) + .password(TEST_PASSWORD) + .databaseList(POSTGRES_CONTAINER.getDatabaseName()) + .tableList("test_decimal.decimal_test_zero") + .startupOptions(StartupOptions.initial()) + .debeziumProperties(debeziumProps) + .serverTimeZone("UTC"); + configFactory.database(POSTGRES_CONTAINER.getDatabaseName()); + configFactory.slotName(slotName); + configFactory.decodingPluginName("pgoutput"); + + FlinkSourceProvider sourceProvider = + (FlinkSourceProvider) + new PostgresDataSource(configFactory).getEventSourceProvider(); + + CloseableIterator events = + env.fromSource( + sourceProvider.getSource(), + WatermarkStrategy.noWatermarks(), + PostgresDataSourceFactory.IDENTIFIER, + new EventTypeInfo()) + .executeAndCollect(); + + Object[] expectedSnapshot = + new Object[] { + 2, 99999999.99, 9999.9999, null, null, null, + }; + + List snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0; + RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after(); + Assertions.assertThat(recordFields(snapshotRecord, TYPES_WITH_DOUBLE)) + .isEqualTo(expectedSnapshot); + } + + @Test + public void testZeroHandlingDecimalModeString() throws Exception { + initializePostgresTable(POSTGIS_CONTAINER, "decimal_mode_test"); + + Properties debeziumProps = new Properties(); + debeziumProps.setProperty("decimal.handling.mode", "string"); + + PostgresSourceConfigFactory configFactory = + (PostgresSourceConfigFactory) + new PostgresSourceConfigFactory() + .hostname(POSTGIS_CONTAINER.getHost()) + .port(POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT)) + .username(TEST_USER) + .password(TEST_PASSWORD) + .databaseList(POSTGRES_CONTAINER.getDatabaseName()) + .tableList("test_decimal.decimal_test_zero") + .startupOptions(StartupOptions.initial()) + .debeziumProperties(debeziumProps) + .serverTimeZone("UTC"); + configFactory.database(POSTGRES_CONTAINER.getDatabaseName()); + configFactory.slotName(slotName); + configFactory.decodingPluginName("pgoutput"); + + FlinkSourceProvider sourceProvider = + (FlinkSourceProvider) + new PostgresDataSource(configFactory).getEventSourceProvider(); + + CloseableIterator events = + env.fromSource( + sourceProvider.getSource(), + WatermarkStrategy.noWatermarks(), + PostgresDataSourceFactory.IDENTIFIER, + new EventTypeInfo()) + .executeAndCollect(); + + Object[] expectedSnapshot = + new Object[] { + 2, + BinaryStringData.fromString("99999999.99"), + BinaryStringData.fromString("9999.9999"), + null, + null, + null, + }; + + List snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0; + RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after(); + Assertions.assertThat(recordFields(snapshotRecord, TYPES_WITH_STRING)) + .isEqualTo(expectedSnapshot); + } + + @Test + public void testHstoreHandlingModeMap() throws Exception { + initializePostgresTable(POSTGIS_CONTAINER, "column_type_test"); + + Properties debeziumProps = new Properties(); + debeziumProps.setProperty("hstore.handling.mode", "map"); + + PostgresSourceConfigFactory configFactory = + (PostgresSourceConfigFactory) + new PostgresSourceConfigFactory() + .hostname(POSTGIS_CONTAINER.getHost()) + .port(POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT)) + .username(TEST_USER) + .password(TEST_PASSWORD) + .databaseList(POSTGRES_CONTAINER.getDatabaseName()) + .tableList("inventory.hstore_types") + .startupOptions(StartupOptions.initial()) + .debeziumProperties(debeziumProps) + .serverTimeZone("UTC"); + configFactory.database(POSTGRES_CONTAINER.getDatabaseName()); + configFactory.slotName(slotName); + configFactory.decodingPluginName("pgoutput"); + + FlinkSourceProvider sourceProvider = + (FlinkSourceProvider) + new PostgresDataSource(configFactory).getEventSourceProvider(); + + CloseableIterator events = + env.fromSource( + sourceProvider.getSource(), + WatermarkStrategy.noWatermarks(), + PostgresDataSourceFactory.IDENTIFIER, + new EventTypeInfo()) + .executeAndCollect(); + + Map expectedMap = new HashMap<>(); + expectedMap.put("a", "1"); + expectedMap.put("b", "2"); + + List snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0; + RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after(); + Object[] snapshotObjects = recordFields(snapshotRecord, HSTORE_TYPES_WITH_ADAPTIVE); + Map snapshotMap = + (Map) + ((BinaryMapData) snapshotObjects[1]) + .toJavaMap(DataTypes.STRING(), DataTypes.STRING()); + Assertions.assertThat(expectedMap).isEqualTo(snapshotMap); } private Tuple2, List> fetchResultsAndCreateTableEvent( @@ -257,7 +684,11 @@ private Object[] recordFields(RecordData record, RowType rowType) { return fields; } - private static final RowType PG_TYPES = + private Instant toInstant(String ts) { + return Timestamp.valueOf(ts).toLocalDateTime().atZone(ZoneId.of("UTC+8")).toInstant(); + } + + private static final RowType COMMON_TYPES = RowType.of( DataTypes.INT(), DataTypes.BYTES(), @@ -288,5 +719,61 @@ private Object[] recordFields(RecordData record, RowType rowType) { DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), DataTypes.STRING()); + + private static final RowType TYPES_WITH_PRECISE = + RowType.of( + DataTypes.INT(), + DataTypes.DECIMAL(10, 2), + DataTypes.DECIMAL(8, 4), + DataTypes.DECIMAL(5, 2), + DataTypes.DECIMAL(3, 1), + DataTypes.DECIMAL(38, 2)); + + private static final RowType TYPES_WITH_DOUBLE = + RowType.of( + DataTypes.INT(), + DataTypes.DOUBLE(), + DataTypes.DOUBLE(), + DataTypes.DOUBLE(), + DataTypes.DOUBLE(), + DataTypes.DOUBLE()); + + private static final RowType TYPES_WITH_STRING = + RowType.of( + DataTypes.INT(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING()); + + private static final RowType TIME_TYPES_WITH_ADAPTIVE = + RowType.of( + DataTypes.INT(), + DataTypes.DATE(), + DataTypes.TIME(0), + DataTypes.TIME(3), + DataTypes.TIME(6), + DataTypes.STRING(), + DataTypes.BIGINT(), + DataTypes.BIGINT(), + DataTypes.BIGINT(), + DataTypes.BIGINT(), + DataTypes.TIMESTAMP_LTZ(0)); + + private static final RowType HSTORE_TYPES_WITH_ADAPTIVE = + RowType.of(DataTypes.INT(), DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/column_type_test.sql b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/column_type_test.sql index 4967e191196..1e9979d19a5 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/column_type_test.sql +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/column_type_test.sql @@ -21,7 +21,11 @@ DROP SCHEMA IF EXISTS inventory CASCADE; CREATE SCHEMA inventory; -- postgis is installed into public schema SET search_path TO inventory, public; +CREATE EXTENSION IF NOT EXISTS ltree; +CREATE EXTENSION IF NOT EXISTS citext; +CREATE EXTENSION IF NOT EXISTS hstore; +CREATE TYPE status AS ENUM ('pending', 'approved', 'rejected'); CREATE TABLE full_types ( @@ -55,6 +59,17 @@ CREATE TABLE full_types jsonb_c JSONB, xml_C XML, location POINT, + ltree_c LTREE, + username CITEXT NOT NULL, + attributes HSTORE, + inet_c INET, + int4range_c INT4RANGE, + int8range_c INT8RANGE, + numrange_c NUMRANGE, + tsrange_c TSRANGE, + tsTZrange_c TSTZRANGE, + daterange_c DATERANGE, + status status NOT NULL, PRIMARY KEY (id) ); @@ -73,4 +88,46 @@ VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true, dark true - ','(3.456,7.890)'::point); \ No newline at end of file + ','(3.456,7.890)'::point,'foo.bar.baz','JohnDoe','color => "blue", size => "L"','192.168.1.1'::inet,'[1, 10)'::int4range,'[1000000000, 5000000000)'::int8range,'[5.5, 20.75)'::numrange, + '["2023-08-01 08:00:00", "2023-08-01 12:00:00")','["2023-08-01 08:00:00+00", "2023-08-01 12:00:00+00")','["2023-08-01", "2023-08-15")','pending'); + + +CREATE TABLE time_types ( + id SERIAL PRIMARY KEY, + date_c DATE, + time_c TIME(0) WITHOUT TIME ZONE, + time_3_c TIME(3) WITHOUT TIME ZONE, + time_6_c TIME(6) WITHOUT TIME ZONE, + time_tz_c TIME(6) WITH TIME ZONE, + datetime_c TIMESTAMP(0) WITHOUT TIME ZONE, + datetime3_c TIMESTAMP(3) WITHOUT TIME ZONE, + datetime6_c TIMESTAMP(6) WITHOUT TIME ZONE, + timestamp_c TIMESTAMP WITHOUT TIME ZONE, + timestamp_tz_c TIMESTAMP WITH TIME ZONE +); +ALTER TABLE inventory.time_types + REPLICA IDENTITY FULL; + +INSERT INTO time_types +VALUES (2, + '2020-07-17', + '18:00:22', + '18:00:22.123', + '18:00:22.123456', + '18:00:22+08:00', + '2020-07-17 18:00:22', + '2020-07-17 18:00:22.123', + '2020-07-17 18:00:22.123456', + '2020-07-17 18:00:22', + '2020-07-17 18:00:22+08:00'); + +CREATE TABLE hstore_types ( + id SERIAL PRIMARY KEY, + hstore_c HSTORE +); + +ALTER TABLE inventory.hstore_types + REPLICA IDENTITY FULL; + +INSERT INTO hstore_types +VALUES (1, 'a => 1, b => 2'); \ No newline at end of file diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/decimal_mode_test.sql b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/decimal_mode_test.sql new file mode 100644 index 00000000000..b2f18d5899f --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/decimal_mode_test.sql @@ -0,0 +1,70 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: decimal_mode_test +-- ---------------------------------------------------------------------------------------------------------------- +-- Generate a number of tables to cover as many of the PG types as possible + + +DROP SCHEMA IF EXISTS test_decimal CASCADE; +CREATE SCHEMA IF NOT EXISTS test_decimal; + +SET search_path TO test_decimal; + +DROP TABLE IF EXISTS decimal_test_table; +CREATE TABLE decimal_test_table ( + id SERIAL PRIMARY KEY, + fixed_numeric NUMERIC(10,2), + fixed_decimal DECIMAL(8,4), + variable_numeric NUMERIC, + variable_decimal DECIMAL, + amount_money MONEY +); + +ALTER TABLE decimal_test_table REPLICA IDENTITY FULL; + +INSERT INTO decimal_test_table ( + id, + fixed_numeric, + fixed_decimal, + variable_numeric, + variable_decimal, + amount_money +) VALUES +(1, 123.45, 67.8912, 987.65, 12.3, '100.50'::money); + + +DROP TABLE IF EXISTS decimal_test_zero; +CREATE TABLE decimal_test_zero ( + id SERIAL PRIMARY KEY, + fixed_numeric NUMERIC(10,2), + fixed_decimal DECIMAL(8,4), + variable_numeric NUMERIC, + variable_decimal DECIMAL, + amount_money MONEY +); + +ALTER TABLE decimal_test_zero REPLICA IDENTITY FULL; + +INSERT INTO decimal_test_zero ( + id, + fixed_numeric, + fixed_decimal, + variable_numeric, + variable_decimal, + amount_money +) VALUES + (2, 99999999.99, 9999.9999, null, null, null); \ No newline at end of file diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java index 338f03d2356..538e8ceef24 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.common.data.DecimalData; +import org.apache.flink.cdc.common.data.GenericMapData; import org.apache.flink.cdc.common.data.LocalZonedTimestampData; import org.apache.flink.cdc.common.data.RecordData; import org.apache.flink.cdc.common.data.TimestampData; @@ -31,6 +32,7 @@ import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.types.DataField; import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; import org.apache.flink.cdc.common.types.DecimalType; import org.apache.flink.cdc.common.types.RowType; import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema; @@ -222,8 +224,17 @@ public Object convert(Object dbzObj, Schema schema) throws Exception { return convertToRecord((RowType) type, dbzObj, schema); } }; - case ARRAY: case MAP: + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) throws Exception { + return convertToMap(dbzObj, schema); + } + }; + case ARRAY: default: throw new UnsupportedOperationException("Unsupported type: " + type); } @@ -426,6 +437,41 @@ private static Object convertField( } } + protected Object convertToMap(Object dbzObj, Schema schema) throws Exception { + if (dbzObj == null) { + return null; + } + + // Obtain the schema for the keys and values of a Map" + Schema keySchema = schema.keySchema(); + Schema valueSchema = schema.valueSchema(); + + // Infer the data types of keys and values + DataType keyType = + keySchema != null + ? schemaDataTypeInference.infer(null, keySchema) + : DataTypes.STRING(); + + DataType valueType = + valueSchema != null + ? schemaDataTypeInference.infer(null, valueSchema) + : DataTypes.STRING(); + + DeserializationRuntimeConverter keyConverter = createConverter(keyType); + DeserializationRuntimeConverter valueConverter = createConverter(valueType); + + Map map = (Map) dbzObj; + Map convertedMap = new java.util.HashMap<>(map.size()); + + for (Map.Entry entry : map.entrySet()) { + Object convertedKey = convertField(keyConverter, entry.getKey(), keySchema); + Object convertedValue = convertField(valueConverter, entry.getValue(), valueSchema); + convertedMap.put(convertedKey, convertedValue); + } + + return new GenericMapData(convertedMap); + } + private static DeserializationRuntimeConverter wrapIntoNullableConverter( DeserializationRuntimeConverter converter) { return new DeserializationRuntimeConverter() { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumSchemaDataTypeInference.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumSchemaDataTypeInference.java index 3afa7128d13..06f4c9d5500 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumSchemaDataTypeInference.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumSchemaDataTypeInference.java @@ -210,6 +210,12 @@ protected DataType inferArray(Object value, Schema schema) { } protected DataType inferMap(Object value, Schema schema) { - throw new UnsupportedOperationException("Unsupported type MAP"); + Schema keySchema = schema.keySchema(); + Schema valueSchema = schema.valueSchema(); + + DataType keyType = keySchema != null ? infer(null, keySchema) : DataTypes.STRING(); + DataType valueType = valueSchema != null ? infer(null, valueSchema) : DataTypes.STRING(); + + return DataTypes.MAP(keyType, valueType); } } From 1ed10c3ca910dd59158d448644c05293588d02fa Mon Sep 17 00:00:00 2001 From: ouyangwulin Date: Thu, 14 Aug 2025 21:41:33 +0800 Subject: [PATCH 3/6] add docs --- .../pipeline-connectors/postgres.md | 316 +++++++++++++++--- .../pipeline-connectors/postgres.md | 304 ++++++++++++++--- .../postgres/utils/PostgresTypeUtils.java | 35 +- .../source/PostgresFullTypesITCase.java | 96 +++++- .../test/resources/ddl/column_type_test.sql | 33 +- 5 files changed, 665 insertions(+), 119 deletions(-) diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md b/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md index 2ed47cfea92..fcd80ea6c07 100644 --- a/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md +++ b/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md @@ -290,7 +290,6 @@ pipeline: ## 数据类型映射 -
@@ -300,6 +299,17 @@ pipeline: + + + + + + - - - - - - - - + BIGSERIAL
+ OID
+ + FLOAT4 + - - - - - - - + + + - - + + + + + + - - + + - - + + - - + + - - + + + JSON
+ JSONB
+ XML
+ UUID
+ POINT
+ LTREE
+ CITEXT
+ INET
+ INT4RANGE
+ INT8RANGE
+ NUMRANGE
+ TSRANGE
+ DATERANGE
+ ENUM + - - - -
+ BOOLEAN
+ BIT(1)
+
BOOLEAN
+ BIT( > 1) + BYTES
SMALLINT
@@ -317,79 +327,289 @@ pipeline:
BIGINT
- BIGSERIAL
BIGINT
NUMERICDECIMAL(20, 0)
BIGINT BIGINT
REAL
- FLOAT4
FLOAT
- FLOAT8
- DOUBLE PRECISION
DOUBLE
- NUMERIC(p, s)
- DECIMAL(p, s)
DECIMAL(p, s)
NUMERICDECIMAL(38, 0)
BOOLEANBOOLEANDOUBLE PRECISION
+ FLOAT8 +
DOUBLE
CHAR[(M)]
+ VARCHAR[(M)]
+ CHARACTER[(M)]
+ BPCHAR[(M)]
+ CHARACTER VARYING[(M)] +
STRING
DATEDATETIMESTAMPTZ
+ TIMESTAMP WITH TIME ZONE
ZonedTimestampType
TIME [(p)] [WITHOUT TIMEZONE]TIME [(p)] [WITHOUT TIMEZONE]INTERVAL [P]BIGINT
TIMESTAMP [(p)] [WITHOUT TIMEZONE]TIMESTAMP [(p)] [WITHOUT TIMEZONE]INTERVAL [P]STRING(when interval.handling.mode is set to string)
- CHAR(n)
- CHARACTER(n)
- VARCHAR(n)
- CHARACTER VARYING(n)
CHAR(n)BYTEABYTES or STRING (when binary.handling.mode is set to base64 or base64-url-safe or hex)
- TEXT STRING
BYTEABYTES
-### 空间数据类型映射 -PostgreSQL通过PostGIS扩展支持空间数据类型: +### Temporal types Mapping +除了包含时区信息的 PostgreSQL 的 TIMESTAMPTZ 数据类型之外,其他时间类型如何映射取决于连接器配置属性 time.precision.mode 的值。以下各节将描述这些映射关系: +time.precision.mode=adaptive + +time.precision.mode=adaptive_time_microseconds + +time.precision.mode=connect + + +当 time.precision.mode 属性设置为默认的 adaptive(自适应)时,连接器会根据列的数据类型定义来确定字面类型和语义类型。这可以确保事件能够精确地表示数据库中的值。 +
+ + + + + + + + + + + + + + + + + + + + +
PostgreSQL typeCDC type
+ DATE + DATE
+ TIME([P]) + TIME([P])
+ TIMESTAMP([P]) + TIMESTAMP([P])
+
+ +### Decimal types Mapping +PostgreSQL 连接器配置属性 decimal.handling.mode 的设置决定了连接器如何映射十进制类型。 + +当 decimal.handling.mode 属性设置为 precise(精确)时,连接器会对所有 DECIMAL、NUMERIC 和 MONEY 列使用 Kafka Connect 的 org.apache.kafka.connect.data.Decimal 逻辑类型。这是默认模式。 +
+ + + + + + + + + + + + + + + + + + + + + + + + +
PostgreSQL typeCDC type
+ NUMERIC[(M[,D])] + DECIMAL[(M[,D])]
+ NUMERIC + DECIMAL(38,0)
+ DECIMAL[(M[,D])] + DECIMAL[(M[,D])]
+ DECIMAL + DECIMAL(38,0)
+ MONEY[(M[,D])] + DECIMAL(38,digits)(schema 参数 scale 包含一个整数,表示小数点移动了多少位。scale schema 参数由 money.fraction.digits 连接器配置属性决定。)
+
+ +当 decimal.handling.mode 属性设置为 double 时,连接器将所有 DECIMAL、NUMERIC 和 MONEY 值表示为 Java 的 double 值,并按照下表所示进行编码。 + +
+ + + + + + + + + + + + + + + + + + +
PostgreSQL typeCDC type
+ NUMERIC[(M[,D])] + DOUBLE
+ DECIMAL[(M[,D])] + DOUBLE
+ MONEY[(M[,D])] + DOUBLE
+
- GEOMETRY(POINT, xx):表示使用笛卡尔坐标系的点,EPSG:xxx定义其坐标系统,适用于局部平面计算。 - GEOGRAPHY(MULTILINESTRING):以经纬度存储多条线串,基于球面模型,适合全球范围的空间分析。 +decimal.handling.mode 配置属性的最后一个可选设置是 string(字符串)。在这种情况下,连接器将 DECIMAL、NUMERIC 和 MONEY 值表示为其格式化的字符串形式,并按照下表所示进行编码。 +
+ + + + + + + + + + + + + + + + + + +
PostgreSQL typeCDC type
+ NUMERIC[(M[,D])] + STRING
+ DECIMAL[(M[,D])] + STRING
+ MONEY[(M[,D])] + STRING
+
-前者用于小范围平面数据,后者用于大范围、需考虑地球曲率的地理数据。 +当 decimal.handling.mode 的设置为 string 或 double 时,PostgreSQL 支持将 NaN(非数字)作为一个特殊值存储在 DECIMAL/NUMERIC 值中。在这种情况下,连接器会将 NaN 编码为 Double.NaN 或字符串常量 NAN。 + +### HSTORE type Mapping +PostgreSQL 连接器配置属性 hstore.handling.mode 的设置决定了连接器如何映射 HSTORE 值。 + +当 hstore.handling.mode 属性设置为 json(默认值)时,连接器将 HSTORE 值表示为 JSON 值的字符串形式,并按照下表所示进行编码。当 hstore.handling.mode 属性设置为 map 时,连接器对 HSTORE 值使用 MAP 模式类型。 +
+ + + + + + + + + + + + + + + +
PostgreSQL typeCDC type
+ HSTORE + STRING(hstore.handling.mode=string)
+ HSTORE + MAP(hstore.handling.mode=map)
+
+ +### Network address types Mapping +PostgreSQL 拥有可以存储 IPv4、IPv6 和 MAC 地址的数据类型。使用这些类型来存储网络地址比使用纯文本类型更为合适。网络地址类型提供了输入错误检查以及专用的操作符和函数。 +
+ + + + + + + + + + + + + + + + + + + + + +
PostgreSQL typeCDC type
+ INET + STRING
+ CIDR + STRING
+ MACADDR + STRING
+ MACADDR8 + STRING
+
+ +### PostGIS Types Mapping +PostgreSQL 通过 PostGIS 扩展支持空间数据类型: +``` + GEOMETRY(POINT, xx): 在笛卡尔坐标系中表示一个点,其中 EPSG:xx 定义了坐标系。它适用于局部平面计算。 + GEOGRAPHY(MULTILINESTRING): 在基于球面模型的纬度和经度上存储多条线串。它适用于全球范围的空间分析。 +``` +前者适用于小范围的平面数据,而后者适用于需要考虑地球曲率的大范围数据。
@@ -401,11 +621,11 @@ PostgreSQL通过PostGIS扩展支持空间数据类型: - + - +
GEOMETRY(POINT, xx){"hexewkb":"0101000020730c00001c7c613255de6540787aa52c435c42c0","srid":3187}{"coordinates":[{"x":174.9479,"y":-36.7208,"z":"NaN","m":"NaN","valid":true}],"type":"Point","srid":3187}"
GEOGRAPHY(MULTILINESTRING){"hexewkb":"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0","srid":4326}{"coordinates":[{"x":169.1321,"y":-44.7032,"z":"NaN","m":"NaN","valid":true},{"x":167.8974,"y":-44.6414,"z":"NaN","m":"NaN","valid":true}],"type":"MultiLineString","srid":4326}
diff --git a/docs/content/docs/connectors/pipeline-connectors/postgres.md b/docs/content/docs/connectors/pipeline-connectors/postgres.md index 7580028787c..ed83d0654d3 100644 --- a/docs/content/docs/connectors/pipeline-connectors/postgres.md +++ b/docs/content/docs/connectors/pipeline-connectors/postgres.md @@ -285,7 +285,6 @@ Notice: ## Data Type Mapping -
@@ -295,6 +294,17 @@ Notice: + + + + + + + BIGSERIAL
+ OID
+ - - - - + FLOAT4 + - - - - - - - + + + - - + + + + + + - - + + - - + + - - + + - - + + + JSON
+ JSONB
+ XML
+ UUID
+ POINT
+ LTREE
+ CITEXT
+ INET
+ INT4RANGE
+ INT8RANGE
+ NUMRANGE
+ TSRANGE
+ DATERANGE
+ ENUM + - - - -
+ BOOLEAN
+ BIT(1)
+
BOOLEAN
+ BIT( > 1) + BYTES
SMALLINT
@@ -312,69 +322,283 @@ Notice:
BIGINT
- BIGSERIAL
BIGINT
NUMERICDECIMAL(20, 0)
REAL
- FLOAT4
FLOAT
- FLOAT8
- DOUBLE PRECISION
DOUBLE
- NUMERIC(p, s)
- DECIMAL(p, s)
DECIMAL(p, s)
NUMERICDECIMAL(38, 0)
BOOLEANBOOLEANDOUBLE PRECISION
+ FLOAT8 +
DOUBLE
CHAR[(M)]
+ VARCHAR[(M)]
+ CHARACTER[(M)]
+ BPCHAR[(M)]
+ CHARACTER VARYING[(M)] +
STRING
DATEDATETIMESTAMPTZ
+ TIMESTAMP WITH TIME ZONE
ZonedTimestampType
TIME [(p)] [WITHOUT TIMEZONE]TIME [(p)] [WITHOUT TIMEZONE]INTERVAL [P]BIGINT
TIMESTAMP [(p)] [WITHOUT TIMEZONE]TIMESTAMP [(p)] [WITHOUT TIMEZONE]INTERVAL [P]STRING(when interval.handling.mode is set to string)
- CHAR(n)
- CHARACTER(n)
- VARCHAR(n)
- CHARACTER VARYING(n)
CHAR(n)BYTEABYTES or STRING (when binary.handling.mode is set to base64 or base64-url-safe or hex)
- TEXT STRING
BYTEABYTES
-### Postgres Spatial Data Types Mapping +### Temporal types Mapping +Other than PostgreSQL’s TIMESTAMPTZ data types, which contain time zone information, how temporal types are mapped depends on the value of the time.precision.mode connector configuration property. The following sections describe these mappings: +time.precision.mode=adaptive + +time.precision.mode=adaptive_time_microseconds + +time.precision.mode=connect +time.precision.mode=adaptive + +When the time.precision.mode property is set to adaptive, the default, the connector determines the literal type and semantic type based on the column’s data type definition. This ensures that events exactly represent the values in the database. +
+ + + + + + + + + + + + + + + + + + + + +
PostgreSQL typeCDC type
+ DATE + DATE
+ TIME([P]) + TIME([P])
+ TIMESTAMP([P]) + TIMESTAMP([P])
+
+ +### Decimal types Mapping +The setting of the PostgreSQL connector configuration property decimal.handling.mode determines how the connector maps decimal types. + +When the decimal.handling.mode property is set to precise, the connector uses the Kafka Connect org.apache.kafka.connect.data.Decimal logical type for all DECIMAL, NUMERIC and MONEY columns. This is the default mode. +
+ + + + + + + + + + + + + + + + + + + + + + + + +
PostgreSQL typeCDC type
+ NUMERIC[(M[,D])] + DECIMAL[(M[,D])]
+ NUMERIC + DECIMAL(38,0)
+ DECIMAL[(M[,D])] + DECIMAL[(M[,D])]
+ DECIMAL + DECIMAL(38,0)
+ MONEY[(M[,D])] + DECIMAL(38,digits)(The scale schema parameter contains an integer representing how many digits the decimal point was shifted. The scale schema parameter is determined by the money.fraction.digits connector configuration property.)
+
+ +When the decimal.handling.mode property is set to double, the connector represents all DECIMAL, NUMERIC and MONEY values as Java double values and encodes them as shown in the following table. + +
+ + + + + + + + + + + + + + + + + + +
PostgreSQL typeCDC type
+ NUMERIC[(M[,D])] + DOUBLE
+ DECIMAL[(M[,D])] + DOUBLE
+ MONEY[(M[,D])] + DOUBLE
+
+ +The last possible setting for the decimal.handling.mode configuration property is string. In this case, the connector represents DECIMAL, NUMERIC and MONEY values as their formatted string representation, and encodes them as shown in the following table. +
+ + + + + + + + + + + + + + + + + + +
PostgreSQL typeCDC type
+ NUMERIC[(M[,D])] + STRING
+ DECIMAL[(M[,D])] + STRING
+ MONEY[(M[,D])] + STRING
+
+ +PostgreSQL supports NaN (not a number) as a special value to be stored in DECIMAL/NUMERIC values when the setting of decimal.handling.mode is string or double. In this case, the connector encodes NaN as either Double.NaN or the string constant NAN. + +### HSTORE type Mapping +The setting of the PostgreSQL connector configuration property hstore.handling.mode determines how the connector maps HSTORE values. + +When the hstore.handling.mode property is set to json (the default), the connector represents HSTORE values as string representations of JSON values and encodes them as shown in the following table. When the hstore.handling.mode property is set to map, the connector uses the MAP schema type for HSTORE values. +
+ + + + + + + + + + + + + + + +
PostgreSQL typeCDC type
+ HSTORE + STRING(hstore.handling.mode=string)
+ HSTORE + MAP(hstore.handling.mode=map)
+
+ +### Network address types Mapping +PostgreSQL has data types that can store IPv4, IPv6, and MAC addresses. It is better to use these types instead of plain text types to store network addresses. Network address types offer input error checking and specialized operators and functions. +
+ + + + + + + + + + + + + + + + + + + + + +
PostgreSQL typeCDC type
+ INET + STRING
+ CIDR + STRING
+ MACADDR + STRING
+ MACADDR8 + STRING
+
+ +### PostGIS Types Mapping PostgreSQL supports spatial data types through the PostGIS extension: ``` GEOMETRY(POINT, xx): Represents a point in a Cartesian coordinate system, with EPSG:xx defining the coordinate system. It is suitable for local planar calculations. @@ -392,11 +616,11 @@ The former is used for small-area planar data, while the latter is used for larg GEOMETRY(POINT, xx) - {"hexewkb":"0101000020730c00001c7c613255de6540787aa52c435c42c0","srid":3187} + {"coordinates":[{"x":174.9479,"y":-36.7208,"z":"NaN","m":"NaN","valid":true}],"type":"Point","srid":3187}" GEOGRAPHY(MULTILINESTRING) - {"hexewkb":"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0","srid":4326} + {"coordinates":[{"x":169.1321,"y":-44.7032,"z":"NaN","m":"NaN","valid":true},{"x":167.8974,"y":-44.6414,"z":"NaN","m":"NaN","valid":true}],"type":"MultiLineString","srid":4326} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java index 404a7b4d10f..3a7a19be668 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java @@ -22,6 +22,7 @@ import org.apache.flink.cdc.common.types.ZonedTimestampType; import org.apache.flink.table.types.logical.DecimalType; +import io.debezium.config.CommonConnectorConfig; import io.debezium.connector.postgresql.PgOid; import io.debezium.connector.postgresql.PostgresConnectorConfig; import io.debezium.connector.postgresql.PostgresType; @@ -63,6 +64,9 @@ private static DataType convertFromColumn( .getConfig() .getString(PostgresConnectorConfig.INTERVAL_HANDLING_MODE)); + PostgresConnectorConfig.BinaryHandlingMode binaryHandlingMode = + dbzConfig.binaryHandlingMode(); + TemporalPrecisionMode temporalPrecisionMode = dbzConfig.getTemporalPrecisionMode(); JdbcValueConverters.DecimalMode decimalMode = @@ -89,9 +93,9 @@ private static DataType convertFromColumn( case PgOid.BOOL_ARRAY: return DataTypes.ARRAY(DataTypes.BOOLEAN()); case PgOid.BYTEA: - return DataTypes.BYTES(); + return handleBinaryWithBinaryMode(binaryHandlingMode); case PgOid.BYTEA_ARRAY: - return DataTypes.ARRAY(DataTypes.BYTES()); + return DataTypes.ARRAY(handleBinaryWithBinaryMode(binaryHandlingMode)); case PgOid.INT2: return DataTypes.SMALLINT(); case PgOid.INT2_ARRAY: @@ -153,15 +157,14 @@ private static DataType convertFromColumn( case PgOid.TSRANGE_OID: case PgOid.TSTZRANGE_OID: case PgOid.DATERANGE_OID: - case PgOid.TIMETZ: return DataTypes.STRING(); case PgOid.TEXT_ARRAY: - case PgOid.TIMETZ_ARRAY: return DataTypes.ARRAY(DataTypes.STRING()); case PgOid.TIMESTAMP: - return handleTimestampWithTemporalMode(temporalPrecisionMode); + return handleTimestampWithTemporalMode(temporalPrecisionMode, scale); case PgOid.TIMESTAMP_ARRAY: - return DataTypes.ARRAY(handleTimestampWithTemporalMode(temporalPrecisionMode)); + return DataTypes.ARRAY( + handleTimestampWithTemporalMode(temporalPrecisionMode, scale)); case PgOid.TIMESTAMPTZ: return new ZonedTimestampType(scale); case PgOid.TIMESTAMPTZ_ARRAY: @@ -205,7 +208,8 @@ public static DataType handleNumericWithDecimalMode( int precision, int scale, JdbcValueConverters.DecimalMode mode) { switch (mode) { case PRECISE: - if (precision > 0 && precision <= 38) { + if (precision > DecimalType.DEFAULT_SCALE + && precision <= DecimalType.MAX_PRECISION) { return DataTypes.DECIMAL(precision, scale); } return DataTypes.DECIMAL(DecimalType.MAX_PRECISION, DecimalType.DEFAULT_SCALE); @@ -218,6 +222,19 @@ public static DataType handleNumericWithDecimalMode( } } + public static DataType handleBinaryWithBinaryMode( + CommonConnectorConfig.BinaryHandlingMode mode) { + switch (mode) { + case BYTES: + return DataTypes.BYTES(); + case BASE64: + case HEX: + return DataTypes.STRING(); + default: + throw new IllegalArgumentException("Unknown binary mode: " + mode); + } + } + public static DataType handleMoneyWithDecimalMode( int moneyFractionDigits, JdbcValueConverters.DecimalMode mode) { switch (mode) { @@ -266,12 +283,12 @@ public static DataType handleTimeWithTemporalMode(TemporalPrecisionMode mode, in } } - public static DataType handleTimestampWithTemporalMode(TemporalPrecisionMode mode) { + public static DataType handleTimestampWithTemporalMode(TemporalPrecisionMode mode, int scale) { switch (mode) { case ADAPTIVE: case ADAPTIVE_TIME_MICROSECONDS: case CONNECT: - return DataTypes.BIGINT(); + return DataTypes.TIMESTAMP(scale); default: throw new IllegalArgumentException("Unknown temporal precision mode: " + mode); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java index 9942a213b92..faa8deffc90 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java @@ -234,8 +234,6 @@ public void testFullTypes() throws Exception { BinaryStringData.fromString("[5.5,20.75)"), BinaryStringData.fromString( "[\"2023-08-01 08:00:00\",\"2023-08-01 12:00:00\")"), - BinaryStringData.fromString( - "[\"2023-08-01 16:00:00+08\",\"2023-08-01 20:00:00+08\")"), BinaryStringData.fromString("[2023-08-01,2023-08-15)"), BinaryStringData.fromString("pending"), }; @@ -289,17 +287,16 @@ public void testTimeTypesWithTemporalModeAdaptive() throws Exception { 64822000, 64822123, 64822123, - BinaryStringData.fromString("18:00:22Z"), - 481036337152L, - 515396075520L, - 549756269888L, - 584115552256L, + TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22")), + TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22.123")), + TimestampData.fromLocalDateTime( + LocalDateTime.parse("2020-07-17T18:00:22.123456")), + TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22")), LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")), }; List snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0; RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after(); - Object[] ob = recordFields(snapshotRecord, TIME_TYPES_WITH_ADAPTIVE); Assertions.assertThat(recordFields(snapshotRecord, TIME_TYPES_WITH_ADAPTIVE)) .isEqualTo(expectedSnapshot); } @@ -636,10 +633,9 @@ public void testHstoreHandlingModeMap() throws Exception { PostgresDataSourceFactory.IDENTIFIER, new EventTypeInfo()) .executeAndCollect(); - - Map expectedMap = new HashMap<>(); - expectedMap.put("a", "1"); - expectedMap.put("b", "2"); + Map expectedMap = new HashMap<>(); + expectedMap.put(BinaryStringData.fromString("a"), BinaryStringData.fromString("1")); + expectedMap.put(BinaryStringData.fromString("b"), BinaryStringData.fromString("2")); List snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0; RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after(); @@ -651,6 +647,62 @@ public void testHstoreHandlingModeMap() throws Exception { Assertions.assertThat(expectedMap).isEqualTo(snapshotMap); } + @Test + public void testJsonTypes() throws Exception { + initializePostgresTable(POSTGIS_CONTAINER, "column_type_test"); + + Properties debeziumProps = new Properties(); + debeziumProps.setProperty("hstore.handling.mode", "map"); + + PostgresSourceConfigFactory configFactory = + (PostgresSourceConfigFactory) + new PostgresSourceConfigFactory() + .hostname(POSTGIS_CONTAINER.getHost()) + .port(POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT)) + .username(TEST_USER) + .password(TEST_PASSWORD) + .databaseList(POSTGRES_CONTAINER.getDatabaseName()) + .tableList("inventory.json_types") + .startupOptions(StartupOptions.initial()) + .debeziumProperties(debeziumProps) + .serverTimeZone("UTC"); + configFactory.database(POSTGRES_CONTAINER.getDatabaseName()); + configFactory.slotName(slotName); + configFactory.decodingPluginName("pgoutput"); + + FlinkSourceProvider sourceProvider = + (FlinkSourceProvider) + new PostgresDataSource(configFactory).getEventSourceProvider(); + + CloseableIterator events = + env.fromSource( + sourceProvider.getSource(), + WatermarkStrategy.noWatermarks(), + PostgresDataSourceFactory.IDENTIFIER, + new EventTypeInfo()) + .executeAndCollect(); + + Object[] expectedSnapshot = + new Object[] { + 1, + BinaryStringData.fromString("{\"key1\":\"value1\"}"), + BinaryStringData.fromString("{\"key1\":\"value1\",\"key2\":\"value2\"}"), + BinaryStringData.fromString( + "[{\"key1\":\"value1\",\"key2\":{\"key2_1\":\"value2_1\",\"key2_2\":\"value2_2\"},\"key3\":[\"value3\"],\"key4\":[\"value4_1\",\"value4_2\"]},{\"key5\":\"value5\"}]"), + BinaryStringData.fromBytes("{\"key1\": \"value1\"}".getBytes()), + BinaryStringData.fromBytes( + "{\"key1\": \"value1\", \"key2\": \"value2\"}".getBytes()), + BinaryStringData.fromBytes( + "[{\"key1\": \"value1\", \"key2\": {\"key2_1\": \"value2_1\", \"key2_2\": \"value2_2\"}, \"key3\": [\"value3\"], \"key4\": [\"value4_1\", \"value4_2\"]}, {\"key5\": \"value5\"}]" + .getBytes()), + 1L + }; + + List snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0; + RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after(); + Assertions.assertThat(recordFields(snapshotRecord, JSON_TYPES)).isEqualTo(expectedSnapshot); + } + private Tuple2, List> fetchResultsAndCreateTableEvent( Iterator iter, int size) { List result = new ArrayList<>(size); @@ -767,13 +819,23 @@ private Instant toInstant(String ts) { DataTypes.TIME(0), DataTypes.TIME(3), DataTypes.TIME(6), - DataTypes.STRING(), - DataTypes.BIGINT(), - DataTypes.BIGINT(), - DataTypes.BIGINT(), - DataTypes.BIGINT(), + DataTypes.TIMESTAMP(0), + DataTypes.TIMESTAMP(3), + DataTypes.TIMESTAMP(6), + DataTypes.TIMESTAMP(), DataTypes.TIMESTAMP_LTZ(0)); private static final RowType HSTORE_TYPES_WITH_ADAPTIVE = RowType.of(DataTypes.INT(), DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())); + + private static final RowType JSON_TYPES = + RowType.of( + DataTypes.INT(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.BIGINT()); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/column_type_test.sql b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/column_type_test.sql index 1e9979d19a5..9ac9d95bbb2 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/column_type_test.sql +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/column_type_test.sql @@ -67,7 +67,6 @@ CREATE TABLE full_types int8range_c INT8RANGE, numrange_c NUMRANGE, tsrange_c TSRANGE, - tsTZrange_c TSTZRANGE, daterange_c DATERANGE, status status NOT NULL, PRIMARY KEY (id) @@ -89,7 +88,7 @@ VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true, true ','(3.456,7.890)'::point,'foo.bar.baz','JohnDoe','color => "blue", size => "L"','192.168.1.1'::inet,'[1, 10)'::int4range,'[1000000000, 5000000000)'::int8range,'[5.5, 20.75)'::numrange, - '["2023-08-01 08:00:00", "2023-08-01 12:00:00")','["2023-08-01 08:00:00+00", "2023-08-01 12:00:00+00")','["2023-08-01", "2023-08-15")','pending'); + '["2023-08-01 08:00:00", "2023-08-01 12:00:00")','["2023-08-01", "2023-08-15")','pending'); CREATE TABLE time_types ( @@ -98,7 +97,6 @@ CREATE TABLE time_types ( time_c TIME(0) WITHOUT TIME ZONE, time_3_c TIME(3) WITHOUT TIME ZONE, time_6_c TIME(6) WITHOUT TIME ZONE, - time_tz_c TIME(6) WITH TIME ZONE, datetime_c TIMESTAMP(0) WITHOUT TIME ZONE, datetime3_c TIMESTAMP(3) WITHOUT TIME ZONE, datetime6_c TIMESTAMP(6) WITHOUT TIME ZONE, @@ -114,7 +112,6 @@ VALUES (2, '18:00:22', '18:00:22.123', '18:00:22.123456', - '18:00:22+08:00', '2020-07-17 18:00:22', '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456', @@ -130,4 +127,30 @@ ALTER TABLE inventory.hstore_types REPLICA IDENTITY FULL; INSERT INTO hstore_types -VALUES (1, 'a => 1, b => 2'); \ No newline at end of file +VALUES (1, 'a => 1, b => 2'); + +CREATE TABLE json_types ( + id SERIAL PRIMARY KEY, + json_c0 JSON, + json_c1 JSON, + json_c2 JSON, + jsonb_c0 JSONB, + jsonb_c1 JSONB, + jsonb_c2 JSONB, + int_c INTEGER +); + +ALTER TABLE inventory.json_types + REPLICA IDENTITY FULL; + +INSERT INTO json_types (id,json_c0, json_c1, json_c2, jsonb_c0, jsonb_c1, jsonb_c2, int_c) +VALUES + (1, + '{"key1":"value1"}', + '{"key1":"value1","key2":"value2"}', + '[{"key1":"value1","key2":{"key2_1":"value2_1","key2_2":"value2_2"},"key3":["value3"],"key4":["value4_1","value4_2"]},{"key5":"value5"}]', + '{"key1":"value1"}'::jsonb, + '{"key1":"value1","key2":"value2"}'::jsonb, + '[{"key1":"value1","key2":{"key2_1":"value2_1","key2_2":"value2_2"},"key3":["value3"],"key4":["value4_1","value4_2"]},{"key5":"value5"}]'::jsonb, + 1 + ); \ No newline at end of file From a54acb9c16f1b27115f0285625c413036da7861b Mon Sep 17 00:00:00 2001 From: Hongshun Wang <125648852+loserwang1024@users.noreply.github.com> Date: Tue, 19 Aug 2025 09:19:28 +0800 Subject: [PATCH 4/6] [FLINK-38244][hotfix] Fix case-insensitive error when adding column After an existed column. (#4100) fixed docs --- .../pipeline-connectors/postgres.md | 22 +++++++++---------- .../pipeline-connectors/postgres.md | 22 +++++++++---------- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md b/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md index fcd80ea6c07..f0213fabc3f 100644 --- a/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md +++ b/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md @@ -369,11 +369,11 @@ pipeline: INTERVAL [P] - STRING(when interval.handling.mode is set to string) + STRING(when debezium.interval.handling.mode is set to string) BYTEA - BYTES or STRING (when binary.handling.mode is set to base64 or base64-url-safe or hex) + BYTES or STRING (when debezium.binary.handling.mode is set to base64 or base64-url-safe or hex) @@ -439,9 +439,9 @@ time.precision.mode=connect
### Decimal types Mapping -PostgreSQL 连接器配置属性 decimal.handling.mode 的设置决定了连接器如何映射十进制类型。 +PostgreSQL 连接器配置属性 debezium.decimal.handling.mode 的设置决定了连接器如何映射十进制类型。 -当 decimal.handling.mode 属性设置为 precise(精确)时,连接器会对所有 DECIMAL、NUMERIC 和 MONEY 列使用 Kafka Connect 的 org.apache.kafka.connect.data.Decimal 逻辑类型。这是默认模式。 +当 debezium.decimal.handling.mode 属性设置为 precise(精确)时,连接器会对所有 DECIMAL、NUMERIC 和 MONEY 列使用 Kafka Connect 的 org.apache.kafka.connect.data.Decimal 逻辑类型。这是默认模式。
@@ -480,7 +480,7 @@ PostgreSQL 连接器配置属性 decimal.handling.mode 的设置决定了连接
-当 decimal.handling.mode 属性设置为 double 时,连接器将所有 DECIMAL、NUMERIC 和 MONEY 值表示为 Java 的 double 值,并按照下表所示进行编码。 +当 debezium.decimal.handling.mode 属性设置为 double 时,连接器将所有 DECIMAL、NUMERIC 和 MONEY 值表示为 Java 的 double 值,并按照下表所示进行编码。
@@ -510,7 +510,7 @@ PostgreSQL 连接器配置属性 decimal.handling.mode 的设置决定了连接
-decimal.handling.mode 配置属性的最后一个可选设置是 string(字符串)。在这种情况下,连接器将 DECIMAL、NUMERIC 和 MONEY 值表示为其格式化的字符串形式,并按照下表所示进行编码。 +debezium.decimal.handling.mode 配置属性的最后一个可选设置是 string(字符串)。在这种情况下,连接器将 DECIMAL、NUMERIC 和 MONEY 值表示为其格式化的字符串形式,并按照下表所示进行编码。
@@ -539,12 +539,12 @@ decimal.handling.mode 配置属性的最后一个可选设置是 string(字符
-当 decimal.handling.mode 的设置为 string 或 double 时,PostgreSQL 支持将 NaN(非数字)作为一个特殊值存储在 DECIMAL/NUMERIC 值中。在这种情况下,连接器会将 NaN 编码为 Double.NaN 或字符串常量 NAN。 +当 debezium.decimal.handling.mode 的设置为 string 或 double 时,PostgreSQL 支持将 NaN(非数字)作为一个特殊值存储在 DECIMAL/NUMERIC 值中。在这种情况下,连接器会将 NaN 编码为 Double.NaN 或字符串常量 NAN。 ### HSTORE type Mapping -PostgreSQL 连接器配置属性 hstore.handling.mode 的设置决定了连接器如何映射 HSTORE 值。 +PostgreSQL 连接器配置属性 debezium.hstore.handling.mode 的设置决定了连接器如何映射 HSTORE 值。 -当 hstore.handling.mode 属性设置为 json(默认值)时,连接器将 HSTORE 值表示为 JSON 值的字符串形式,并按照下表所示进行编码。当 hstore.handling.mode 属性设置为 map 时,连接器对 HSTORE 值使用 MAP 模式类型。 +当 debezium.hstore.handling.mode 属性设置为 json(默认值)时,连接器将 HSTORE 值表示为 JSON 值的字符串形式,并按照下表所示进行编码。当 debezium.hstore.handling.mode 属性设置为 map 时,连接器对 HSTORE 值使用 MAP 模式类型。
@@ -557,12 +557,12 @@ PostgreSQL 连接器配置属性 hstore.handling.mode 的设置决定了连接 + +
HSTORE - STRING(hstore.handling.mode=string)STRING(`debezium.hstore.handling.mode`=`string`)
HSTORE - MAP(hstore.handling.mode=map)MAP(`debezium.hstore.handling.mode`=`map`)
diff --git a/docs/content/docs/connectors/pipeline-connectors/postgres.md b/docs/content/docs/connectors/pipeline-connectors/postgres.md index ed83d0654d3..eff95b73928 100644 --- a/docs/content/docs/connectors/pipeline-connectors/postgres.md +++ b/docs/content/docs/connectors/pipeline-connectors/postgres.md @@ -364,11 +364,11 @@ Notice: INTERVAL [P] - STRING(when interval.handling.mode is set to string) + STRING(when debezium.interval.handling.mode is set to string) BYTEA - BYTES or STRING (when binary.handling.mode is set to base64 or base64-url-safe or hex) + BYTES or STRING (when debezium.binary.handling.mode is set to base64 or base64-url-safe or hex) @@ -434,9 +434,9 @@ When the time.precision.mode property is set to adaptive, the default, the conne
### Decimal types Mapping -The setting of the PostgreSQL connector configuration property decimal.handling.mode determines how the connector maps decimal types. +The setting of the PostgreSQL connector configuration property debezium.decimal.handling.mode determines how the connector maps decimal types. -When the decimal.handling.mode property is set to precise, the connector uses the Kafka Connect org.apache.kafka.connect.data.Decimal logical type for all DECIMAL, NUMERIC and MONEY columns. This is the default mode. +When the debezium.decimal.handling.mode property is set to precise, the connector uses the Kafka Connect org.apache.kafka.connect.data.Decimal logical type for all DECIMAL, NUMERIC and MONEY columns. This is the default mode.
@@ -475,7 +475,7 @@ When the decimal.handling.mode property is set to precise, the connector uses th
-When the decimal.handling.mode property is set to double, the connector represents all DECIMAL, NUMERIC and MONEY values as Java double values and encodes them as shown in the following table. +When the debezium.decimal.handling.mode property is set to double, the connector represents all DECIMAL, NUMERIC and MONEY values as Java double values and encodes them as shown in the following table.
@@ -505,7 +505,7 @@ When the decimal.handling.mode property is set to double, the connector represen
-The last possible setting for the decimal.handling.mode configuration property is string. In this case, the connector represents DECIMAL, NUMERIC and MONEY values as their formatted string representation, and encodes them as shown in the following table. +The last possible setting for the debezium.decimal.handling.mode configuration property is string. In this case, the connector represents DECIMAL, NUMERIC and MONEY values as their formatted string representation, and encodes them as shown in the following table.
@@ -534,12 +534,12 @@ The last possible setting for the decimal.handling.mode configuration property i
-PostgreSQL supports NaN (not a number) as a special value to be stored in DECIMAL/NUMERIC values when the setting of decimal.handling.mode is string or double. In this case, the connector encodes NaN as either Double.NaN or the string constant NAN. +PostgreSQL supports NaN (not a number) as a special value to be stored in DECIMAL/NUMERIC values when the setting of debezium.decimal.handling.mode is string or double. In this case, the connector encodes NaN as either Double.NaN or the string constant NAN. ### HSTORE type Mapping -The setting of the PostgreSQL connector configuration property hstore.handling.mode determines how the connector maps HSTORE values. +The setting of the PostgreSQL connector configuration property debezium.hstore.handling.mode determines how the connector maps HSTORE values. -When the hstore.handling.mode property is set to json (the default), the connector represents HSTORE values as string representations of JSON values and encodes them as shown in the following table. When the hstore.handling.mode property is set to map, the connector uses the MAP schema type for HSTORE values. +When the debezium.hstore.handling.mode property is set to json (the default), the connector represents HSTORE values as string representations of JSON values and encodes them as shown in the following table. When the debezium.hstore.handling.mode property is set to map, the connector uses the MAP schema type for HSTORE values.
@@ -552,12 +552,12 @@ When the hstore.handling.mode property is set to json (the default), the connect + +
HSTORE - STRING(hstore.handling.mode=string)STRING(`debezium.hstore.handling.mode`=`string`)
HSTORE - MAP(hstore.handling.mode=map)MAP(`debezium.hstore.handling.mode`=`map`)
From 3c31c5b9819b340922c40e24dbfaffc34017a633 Mon Sep 17 00:00:00 2001 From: ouyangwulin Date: Tue, 19 Aug 2025 16:36:29 +0800 Subject: [PATCH 5/6] fixed test order --- .../cdc/connectors/postgres/source/PostgresFullTypesITCase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java index faa8deffc90..838442b3d1f 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java @@ -201,7 +201,7 @@ public void testFullTypes() throws Exception { 64822000, DecimalData.fromBigDecimal(new BigDecimal("500"), 10, 0), BinaryStringData.fromString( - "{\"coordinates\":[{\"x\":174.9479,\"y\":-36.7208,\"z\":\"NaN\",\"m\":\"NaN\",\"valid\":true}],\"type\":\"Point\",\"srid\":3187}"), + "{\"coordinates\":[{\"x\":174.9479,\"y\":-36.7208,\"z\":\"NaN\",\"valid\":true,\"m\":\"NaN\"}],\"type\":\"Point\",\"srid\":3187}"), BinaryStringData.fromString( "{\"coordinates\":[{\"x\":169.1321,\"y\":-44.7032,\"z\":\"NaN\",\"m\":\"NaN\",\"valid\":true},{\"x\":167.8974,\"y\":-44.6414,\"z\":\"NaN\",\"m\":\"NaN\",\"valid\":true}],\"type\":\"MultiLineString\",\"srid\":4326}"), true, From 596a78b74dead7687cf743ee44cafdb840fc76c5 Mon Sep 17 00:00:00 2001 From: ouyangwulin Date: Tue, 19 Aug 2025 17:39:49 +0800 Subject: [PATCH 6/6] fixed it gis deserializer --- .../connectors/pipeline-connectors/postgres.md | 4 ++-- .../connectors/pipeline-connectors/postgres.md | 4 ++-- .../postgres/source/PostgresEventDeserializer.java | 14 +++++++++++++- .../postgres/source/PostgresFullTypesITCase.java | 6 +++--- 4 files changed, 20 insertions(+), 8 deletions(-) diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md b/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md index f0213fabc3f..cb56ee42467 100644 --- a/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md +++ b/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md @@ -621,11 +621,11 @@ PostgreSQL 通过 PostGIS 扩展支持空间数据类型: GEOMETRY(POINT, xx) - {"coordinates":[{"x":174.9479,"y":-36.7208,"z":"NaN","m":"NaN","valid":true}],"type":"Point","srid":3187}" + {"coordinates":"[[174.9479, -36.7208]]","type":"Point","srid":3187}" GEOGRAPHY(MULTILINESTRING) - {"coordinates":[{"x":169.1321,"y":-44.7032,"z":"NaN","m":"NaN","valid":true},{"x":167.8974,"y":-44.6414,"z":"NaN","m":"NaN","valid":true}],"type":"MultiLineString","srid":4326} + {"coordinates":"[[169.1321, -44.7032],[167.8974, -44.6414]]","type":"MultiLineString","srid":4326} diff --git a/docs/content/docs/connectors/pipeline-connectors/postgres.md b/docs/content/docs/connectors/pipeline-connectors/postgres.md index eff95b73928..d6d077ff25d 100644 --- a/docs/content/docs/connectors/pipeline-connectors/postgres.md +++ b/docs/content/docs/connectors/pipeline-connectors/postgres.md @@ -616,11 +616,11 @@ The former is used for small-area planar data, while the latter is used for larg GEOMETRY(POINT, xx) - {"coordinates":[{"x":174.9479,"y":-36.7208,"z":"NaN","m":"NaN","valid":true}],"type":"Point","srid":3187}" + {"coordinates":"[[174.9479, -36.7208]]","type":"Point","srid":3187}" GEOGRAPHY(MULTILINESTRING) - {"coordinates":[{"x":169.1321,"y":-44.7032,"z":"NaN","m":"NaN","valid":true},{"x":167.8974,"y":-44.6414,"z":"NaN","m":"NaN","valid":true}],"type":"MultiLineString","srid":4326} + {"coordinates":"[[169.1321, -44.7032],[167.8974, -44.6414]]","type":"MultiLineString","srid":4326} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java index dd4f3b4bbe5..a37c35c5216 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java @@ -34,8 +34,10 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; +import org.locationtech.jts.geom.Coordinate; import org.locationtech.jts.io.WKBReader; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -128,7 +130,17 @@ protected Object convertToString(Object dbzObj, Schema schema) { if (geometryType.equals("GeometryCollection")) { geometryInfo.put("geometries", jtsGeom.toText()); } else { - geometryInfo.put("coordinates", jtsGeom.getCoordinates()); + Coordinate[] coordinates = jtsGeom.getCoordinates(); + List coordinateList = new ArrayList<>(); + if (coordinates != null) { + for (Coordinate coordinate : coordinates) { + coordinateList.add(new double[] {coordinate.x, coordinate.y}); + geometryInfo.put( + "coordinates", new double[] {coordinate.x, coordinate.y}); + } + } + geometryInfo.put( + "coordinates", OBJECT_MAPPER.writeValueAsString(coordinateList)); } geometryInfo.put("srid", srid.orElse(0)); return BinaryStringData.fromString(OBJECT_MAPPER.writeValueAsString(geometryInfo)); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java index 838442b3d1f..677e071616e 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java @@ -201,9 +201,9 @@ public void testFullTypes() throws Exception { 64822000, DecimalData.fromBigDecimal(new BigDecimal("500"), 10, 0), BinaryStringData.fromString( - "{\"coordinates\":[{\"x\":174.9479,\"y\":-36.7208,\"z\":\"NaN\",\"valid\":true,\"m\":\"NaN\"}],\"type\":\"Point\",\"srid\":3187}"), + "{\"coordinates\":\"[[174.9479,-36.7208]]\",\"type\":\"Point\",\"srid\":3187}"), BinaryStringData.fromString( - "{\"coordinates\":[{\"x\":169.1321,\"y\":-44.7032,\"z\":\"NaN\",\"m\":\"NaN\",\"valid\":true},{\"x\":167.8974,\"y\":-44.6414,\"z\":\"NaN\",\"m\":\"NaN\",\"valid\":true}],\"type\":\"MultiLineString\",\"srid\":4326}"), + "{\"coordinates\":\"[[169.1321,-44.7032],[167.8974,-44.6414]]\",\"type\":\"MultiLineString\",\"srid\":4326}"), true, new byte[] {10}, new byte[] {42}, @@ -224,7 +224,7 @@ public void testFullTypes() throws Exception { + "\n" + ""), BinaryStringData.fromString( - "{\"coordinates\":[{\"x\":3.456,\"y\":7.89,\"z\":\"NaN\",\"m\":\"NaN\",\"valid\":true}],\"type\":\"Point\",\"srid\":0}"), + "{\"coordinates\":\"[[3.456,7.89]]\",\"type\":\"Point\",\"srid\":0}"), BinaryStringData.fromString("foo.bar.baz"), BinaryStringData.fromString("JohnDoe"), BinaryStringData.fromString("{\"size\":\"L\",\"color\":\"blue\"}"),