Skip to content

Commit fe4e9e6

Browse files
committed
[FLINK-38278][postgres-pipeline] supported array type field.
1 parent 0842fee commit fe4e9e6

File tree

4 files changed

+145
-1
lines changed

4 files changed

+145
-1
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
2121
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
2222
import org.apache.flink.api.java.tuple.Tuple2;
23+
import org.apache.flink.cdc.common.data.ArrayData;
2324
import org.apache.flink.cdc.common.data.DecimalData;
2425
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
2526
import org.apache.flink.cdc.common.data.RecordData;
@@ -703,6 +704,59 @@ public void testJsonTypes() throws Exception {
703704
Assertions.assertThat(recordFields(snapshotRecord, JSON_TYPES)).isEqualTo(expectedSnapshot);
704705
}
705706

707+
@Test
708+
public void testArrayTypes() throws Exception {
709+
initializePostgresTable(POSTGIS_CONTAINER, "column_type_test");
710+
711+
PostgresSourceConfigFactory configFactory =
712+
(PostgresSourceConfigFactory)
713+
new PostgresSourceConfigFactory()
714+
.hostname(POSTGIS_CONTAINER.getHost())
715+
.port(POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT))
716+
.username(TEST_USER)
717+
.password(TEST_PASSWORD)
718+
.databaseList(POSTGRES_CONTAINER.getDatabaseName())
719+
.tableList("inventory.array_types")
720+
.startupOptions(StartupOptions.initial())
721+
.serverTimeZone("UTC");
722+
configFactory.database(POSTGRES_CONTAINER.getDatabaseName());
723+
configFactory.slotName(slotName);
724+
configFactory.decodingPluginName("pgoutput");
725+
726+
FlinkSourceProvider sourceProvider =
727+
(FlinkSourceProvider)
728+
new PostgresDataSource(configFactory).getEventSourceProvider();
729+
730+
CloseableIterator<Event> events =
731+
env.fromSource(
732+
sourceProvider.getSource(),
733+
WatermarkStrategy.noWatermarks(),
734+
PostgresDataSourceFactory.IDENTIFIER,
735+
new EventTypeInfo())
736+
.executeAndCollect();
737+
738+
List<Event> snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0;
739+
RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after();
740+
741+
Object[] actualSnapshotObjects = recordFields(snapshotRecord, ARRAY_TYPES);
742+
743+
Assertions.assertThat(actualSnapshotObjects[0]).isEqualTo(1); // id字段
744+
745+
ArrayData actualTagsArray = (ArrayData) actualSnapshotObjects[1];
746+
Assertions.assertThat(actualTagsArray.getString(0))
747+
.isEqualTo(BinaryStringData.fromString("electronics"));
748+
Assertions.assertThat(actualTagsArray.getString(1))
749+
.isEqualTo(BinaryStringData.fromString("gadget"));
750+
Assertions.assertThat(actualTagsArray.getString(2))
751+
.isEqualTo(BinaryStringData.fromString("sale"));
752+
753+
org.apache.flink.cdc.common.data.ArrayData actualScoresArray =
754+
(org.apache.flink.cdc.common.data.ArrayData) actualSnapshotObjects[2];
755+
Assertions.assertThat(actualScoresArray.getInt(0)).isEqualTo(85);
756+
Assertions.assertThat(actualScoresArray.getInt(1)).isEqualTo(90);
757+
Assertions.assertThat(actualScoresArray.getInt(2)).isEqualTo(78);
758+
}
759+
706760
private <T> Tuple2<List<T>, List<CreateTableEvent>> fetchResultsAndCreateTableEvent(
707761
Iterator<T> iter, int size) {
708762
List<T> result = new ArrayList<>(size);
@@ -838,4 +892,10 @@ private Instant toInstant(String ts) {
838892
DataTypes.STRING(),
839893
DataTypes.STRING(),
840894
DataTypes.BIGINT());
895+
private static final RowType ARRAY_TYPES =
896+
RowType.of(
897+
DataTypes.INT(),
898+
DataTypes.ARRAY(DataTypes.STRING()),
899+
DataTypes.ARRAY(DataTypes.INT()),
900+
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())));
841901
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/column_type_test.sql

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,4 +153,21 @@ VALUES
153153
'{"key1":"value1","key2":"value2"}'::jsonb,
154154
'[{"key1":"value1","key2":{"key2_1":"value2_1","key2_2":"value2_2"},"key3":["value3"],"key4":["value4_1","value4_2"]},{"key5":"value5"}]'::jsonb,
155155
1
156+
);
157+
158+
159+
CREATE TABLE array_types (
160+
id SERIAL PRIMARY KEY,
161+
text_a1 TEXT[],
162+
int_a1 INTEGER[]
163+
);
164+
165+
ALTER TABLE inventory.array_types
166+
REPLICA IDENTITY FULL;
167+
168+
INSERT INTO array_types (id,text_a1, int_a1, matrix)
169+
VALUES
170+
(1,
171+
ARRAY['electronics', 'gadget', 'sale'],
172+
'{85, 90, 78}'
156173
);

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.flink.api.common.typeinfo.TypeInformation;
2121
import org.apache.flink.cdc.common.annotation.Internal;
2222
import org.apache.flink.cdc.common.data.DecimalData;
23+
import org.apache.flink.cdc.common.data.GenericArrayData;
2324
import org.apache.flink.cdc.common.data.GenericMapData;
2425
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
2526
import org.apache.flink.cdc.common.data.RecordData;
@@ -235,6 +236,15 @@ public Object convert(Object dbzObj, Schema schema) throws Exception {
235236
}
236237
};
237238
case ARRAY:
239+
return new DeserializationRuntimeConverter() {
240+
241+
private static final long serialVersionUID = 1L;
242+
243+
@Override
244+
public Object convert(Object dbzObj, Schema schema) throws Exception {
245+
return convertToArray(dbzObj, schema);
246+
}
247+
};
238248
default:
239249
throw new UnsupportedOperationException("Unsupported type: " + type);
240250
}
@@ -472,6 +482,57 @@ protected Object convertToMap(Object dbzObj, Schema schema) throws Exception {
472482
return new GenericMapData(convertedMap);
473483
}
474484

485+
protected Object convertToArray(Object dbzObj, Schema schema) throws Exception {
486+
if (dbzObj == null) {
487+
return null;
488+
}
489+
490+
Schema elementSchema = schema.valueSchema();
491+
DataType elementType = schemaDataTypeInference.infer(null, elementSchema);
492+
DeserializationRuntimeConverter elementConverter = createConverter(elementType);
493+
494+
if (dbzObj instanceof java.util.List) {
495+
java.util.List<?> list = (java.util.List<?>) dbzObj;
496+
Object[] array = new Object[list.size()];
497+
498+
for (int i = 0; i < list.size(); i++) {
499+
Object element = list.get(i);
500+
if (element != null) {
501+
if (elementSchema.type() == Schema.Type.ARRAY) {
502+
array[i] = convertToArray(element, elementSchema);
503+
} else {
504+
array[i] = elementConverter.convert(element, elementSchema);
505+
}
506+
}
507+
}
508+
509+
return new GenericArrayData(array);
510+
} else if (dbzObj.getClass().isArray()) {
511+
Object[] inputArray = (Object[]) dbzObj;
512+
Object[] convertedArray = new Object[inputArray.length];
513+
514+
for (int i = 0; i < inputArray.length; i++) {
515+
if (inputArray[i] != null) {
516+
if (elementSchema.type() == Schema.Type.ARRAY) {
517+
convertedArray[i] = convertToArray(inputArray[i], elementSchema);
518+
} else {
519+
convertedArray[i] = elementConverter.convert(inputArray[i], elementSchema);
520+
}
521+
}
522+
}
523+
524+
return new GenericArrayData(convertedArray);
525+
} else {
526+
Object[] array = new Object[1];
527+
if (elementSchema.type() == Schema.Type.ARRAY) {
528+
array[0] = convertToArray(dbzObj, elementSchema);
529+
} else {
530+
array[0] = elementConverter.convert(dbzObj, elementSchema);
531+
}
532+
return new GenericArrayData(array);
533+
}
534+
}
535+
475536
private static DeserializationRuntimeConverter wrapIntoNullableConverter(
476537
DeserializationRuntimeConverter converter) {
477538
return new DeserializationRuntimeConverter() {

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumSchemaDataTypeInference.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,13 @@ protected DataType inferStruct(Object value, Schema schema) {
206206
}
207207

208208
protected DataType inferArray(Object value, Schema schema) {
209-
throw new UnsupportedOperationException("Unsupported type ARRAY");
209+
Schema elementSchema = schema.valueSchema();
210+
if (elementSchema != null) {
211+
DataType elementType = infer(null, elementSchema);
212+
return DataTypes.ARRAY(elementType);
213+
} else {
214+
return DataTypes.ARRAY(DataTypes.STRING());
215+
}
210216
}
211217

212218
protected DataType inferMap(Object value, Schema schema) {

0 commit comments

Comments
 (0)