Skip to content

Commit fb20587

Browse files
committed
support all pg types
[FLINK-38141][pipeline-connector/iceberg] Fix iceberg connector incorrect type mapping (#4070) --------- Co-authored-by: zhangchao.doovvv <zhangchao.doovvv@bytedance.com> [minor] Fix potential sql connection statement issue (#4069) [FLINK-38185][pipeline-connector][iceberg] Correctly handle the type conversion of TIMESTAMP_TITH_TIME_ZONE. (#4074) [tests][pipeline-connector/fluss] Add MySQL to Fluss E2e IT case (#4057) * [ci][fluss] Add MySQL to Fluss E2e IT case Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> * add: comments Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> --------- Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> [FLINK-38184] one time of GetCopyOfBuffer is enough When serializing split. (#4073) [FLINK-38164][pipeline-connector/mysql] support mysql long and long varchar type (#4076) --------- Co-authored-by: zhangchao.doovvv <zhangchao.doovvv@bytedance.com> [FLINK-37828] Enable scan.incremental.snapshot.unbounded-chunk-first by default for improved stability (#4082) [FLINK-38194][pipeline-connector/iceberg] Iceberg connector supports auto-creating namespace (#4080) --------- Co-authored-by: zhangchao.doovvv <zhangchao.doovvv@bytedance.com> [FLINK-37905][runtime] Fix transform failure with non-ascii string literals (#4038) * [FLINK-37905] Fix transform failure with non-ascii string literals * address comments Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> --------- Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> [FLINK-38142] Upgrading the Paimon version to 1.2.0 (#4066) [FLINK-37963] Fix potential NPE when triggering JobManager failover prematurely (#4044) fixed decimal add test fixed decimal mode test add all types [FLINK-38059][doc] Add fluss pipeline connector documentation (#4088) Co-authored-by: wangjunbo <wangjunbo@qiyi.com> [FLINK-37835] Fix NoPointException when start with latest-offset. (#4091) --------- Co-authored-by: wuzexian <shanqing.wzx@alibaba-inc.com> [FLINK-38188][pipeline-connector][postgres] Fix database name validation logic in PostgresDataSourceFactory (#4075) --------- Co-authored-by: lvyanquan <lvyanquan.lyq@alibaba-inc.com> [hotfix][doc] Update uses of Upload documentation in build_docs.yml. (#4093) [FLINK-38204][pipeline-connector][maxcompute] Use getLatestEvolvedSchema to get Schema in SessionManageOperator in case of using route. #4094 Co-authored-by: wuzexian <shanqing.wzx@alibaba-inc.com> back
1 parent fba682c commit fb20587

File tree

10 files changed

+921
-186
lines changed

10 files changed

+921
-186
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/pom.xml

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,19 +39,6 @@ limitations under the License.
3939
<version>${project.version}</version>
4040
</dependency>
4141

42-
<!-- geometry dependencies -->
43-
<dependency>
44-
<groupId>com.esri.geometry</groupId>
45-
<artifactId>esri-geometry-api</artifactId>
46-
<version>${geometry.version}</version>
47-
<exclusions>
48-
<exclusion>
49-
<groupId>com.fasterxml.jackson.core</groupId>
50-
<artifactId>jackson-core</artifactId>
51-
</exclusion>
52-
</exclusions>
53-
</dependency>
54-
5542
<dependency>
5643
<groupId>org.apache.flink</groupId>
5744
<artifactId>flink-connector-postgres-cdc</artifactId>

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@
2626
import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode;
2727
import org.apache.flink.table.data.TimestampData;
2828

29-
import com.esri.core.geometry.ogc.OGCGeometry;
30-
import com.fasterxml.jackson.databind.JsonNode;
3129
import com.fasterxml.jackson.databind.ObjectMapper;
3230
import io.debezium.data.Envelope;
3331
import io.debezium.data.geometry.Geography;
@@ -36,8 +34,8 @@
3634
import org.apache.kafka.connect.data.Schema;
3735
import org.apache.kafka.connect.data.Struct;
3836
import org.apache.kafka.connect.source.SourceRecord;
37+
import org.locationtech.jts.io.WKBReader;
3938

40-
import java.nio.ByteBuffer;
4139
import java.util.Collections;
4240
import java.util.HashMap;
4341
import java.util.List;
@@ -51,8 +49,6 @@ public class PostgresEventDeserializer extends DebeziumEventDeserializationSchem
5149
private static final long serialVersionUID = 1L;
5250
private List<PostgreSQLReadableMetadata> readableMetadataList;
5351

54-
public static final String SRID = "srid";
55-
public static final String HEXEWKB = "hexewkb";
5652
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
5753

5854
public PostgresEventDeserializer(DebeziumChangelogMode changelogMode) {
@@ -120,16 +116,19 @@ protected Object convertToString(Object dbzObj, Schema schema) {
120116
try {
121117
Struct geometryStruct = (Struct) dbzObj;
122118
byte[] wkb = geometryStruct.getBytes("wkb");
123-
String geoJson = OGCGeometry.fromBinary(ByteBuffer.wrap(wkb)).asGeoJson();
124-
JsonNode originGeoNode = OBJECT_MAPPER.readTree(geoJson);
119+
120+
WKBReader wkbReader = new WKBReader();
121+
org.locationtech.jts.geom.Geometry jtsGeom = wkbReader.read(wkb);
122+
125123
Optional<Integer> srid = Optional.ofNullable(geometryStruct.getInt32("srid"));
126124
Map<String, Object> geometryInfo = new HashMap<>();
127-
String geometryType = originGeoNode.get("type").asText();
125+
String geometryType = jtsGeom.getGeometryType();
128126
geometryInfo.put("type", geometryType);
127+
129128
if (geometryType.equals("GeometryCollection")) {
130-
geometryInfo.put("geometries", originGeoNode.get("geometries"));
129+
geometryInfo.put("geometries", jtsGeom.toText());
131130
} else {
132-
geometryInfo.put("coordinates", originGeoNode.get("coordinates"));
131+
geometryInfo.put("coordinates", jtsGeom.getCoordinates());
133132
}
134133
geometryInfo.put("srid", srid.orElse(0));
135134
return BinaryStringData.fromString(OBJECT_MAPPER.writeValueAsString(geometryInfo));

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSchemaDataTypeInference.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import io.debezium.data.geometry.Geography;
2626
import io.debezium.data.geometry.Geometry;
27+
import io.debezium.data.geometry.Point;
2728
import org.apache.kafka.connect.data.Schema;
2829

2930
/** {@link DataType} inference for PostgresSQL debezium {@link Schema}. */
@@ -35,7 +36,8 @@ public class PostgresSchemaDataTypeInference extends DebeziumSchemaDataTypeInfer
3536
protected DataType inferStruct(Object value, Schema schema) {
3637
// the Geometry datatype in PostgresSQL will be converted to
3738
// a String with Json format
38-
if (Geography.LOGICAL_NAME.equals(schema.name())
39+
if (Point.LOGICAL_NAME.equals(schema.name())
40+
|| Geography.LOGICAL_NAME.equals(schema.name())
3941
|| Geometry.LOGICAL_NAME.equals(schema.name())) {
4042
return DataTypes.STRING();
4143
} else {

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@
2323
import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect;
2424
import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
2525

26+
import io.debezium.connector.postgresql.PostgresConnectorConfig;
2627
import io.debezium.connector.postgresql.PostgresObjectUtils;
2728
import io.debezium.connector.postgresql.PostgresSchema;
2829
import io.debezium.connector.postgresql.PostgresTopicSelector;
30+
import io.debezium.connector.postgresql.TypeRegistry;
2931
import io.debezium.connector.postgresql.connection.PostgresConnection;
3032
import io.debezium.jdbc.JdbcConnection;
3133
import io.debezium.relational.Table;
@@ -165,16 +167,18 @@ public static Schema getTableSchema(
165167
topicSelector,
166168
valueConverterBuilder.build(jdbc.getTypeRegistry()));
167169
Table tableSchema = postgresSchema.tableFor(tableId);
168-
return toSchema(tableSchema);
170+
return toSchema(
171+
tableSchema, sourceConfig.getDbzConnectorConfig(), jdbc.getTypeRegistry());
169172
} catch (SQLException e) {
170173
throw new RuntimeException("Failed to initialize PostgresReplicationConnection", e);
171174
}
172175
}
173176

174-
public static Schema toSchema(Table table) {
177+
public static Schema toSchema(
178+
Table table, PostgresConnectorConfig dbzConfig, TypeRegistry typeRegistry) {
175179
List<Column> columns =
176180
table.columns().stream()
177-
.map(PostgresSchemaUtils::toColumn)
181+
.map(column -> toColumn(column, dbzConfig, typeRegistry))
178182
.collect(Collectors.toList());
179183

180184
return Schema.newBuilder()
@@ -184,16 +188,21 @@ public static Schema toSchema(Table table) {
184188
.build();
185189
}
186190

187-
public static Column toColumn(io.debezium.relational.Column column) {
191+
public static Column toColumn(
192+
io.debezium.relational.Column column,
193+
PostgresConnectorConfig dbzConfig,
194+
TypeRegistry typeRegistry) {
188195
if (column.defaultValueExpression().isPresent()) {
189196
return Column.physicalColumn(
190197
column.name(),
191-
PostgresTypeUtils.fromDbzColumn(column),
198+
PostgresTypeUtils.fromDbzColumn(column, dbzConfig, typeRegistry),
192199
column.comment(),
193200
column.defaultValueExpression().get());
194201
} else {
195202
return Column.physicalColumn(
196-
column.name(), PostgresTypeUtils.fromDbzColumn(column), column.comment());
203+
column.name(),
204+
PostgresTypeUtils.fromDbzColumn(column, dbzConfig, typeRegistry),
205+
column.comment());
197206
}
198207
}
199208

0 commit comments

Comments
 (0)