Skip to content

[FLINK-38278][postgres-pipeline] supported array type field. #4108

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.data.ArrayData;
import org.apache.flink.cdc.common.data.DecimalData;
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
import org.apache.flink.cdc.common.data.RecordData;
Expand Down Expand Up @@ -263,7 +264,7 @@ public void testTimeTypesWithTemporalModeAdaptive() throws Exception {
.tableList("inventory.time_types")
.startupOptions(StartupOptions.initial())
.debeziumProperties(debeziumProps)
.serverTimeZone("UTC");
.serverTimeZone("UTC+8");
configFactory.database(POSTGRES_CONTAINER.getDatabaseName());
configFactory.slotName(slotName);
configFactory.decodingPluginName("pgoutput");
Expand Down Expand Up @@ -297,6 +298,9 @@ public void testTimeTypesWithTemporalModeAdaptive() throws Exception {

List<Event> snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0;
RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after();
;

Object[] xx = recordFields(snapshotRecord, COMMON_TYPES);
Assertions.assertThat(recordFields(snapshotRecord, TIME_TYPES_WITH_ADAPTIVE))
.isEqualTo(expectedSnapshot);
}
Expand Down Expand Up @@ -703,6 +707,59 @@ public void testJsonTypes() throws Exception {
Assertions.assertThat(recordFields(snapshotRecord, JSON_TYPES)).isEqualTo(expectedSnapshot);
}

@Test
public void testArrayTypes() throws Exception {
initializePostgresTable(POSTGIS_CONTAINER, "column_type_test");

PostgresSourceConfigFactory configFactory =
(PostgresSourceConfigFactory)
new PostgresSourceConfigFactory()
.hostname(POSTGIS_CONTAINER.getHost())
.port(POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT))
.username(TEST_USER)
.password(TEST_PASSWORD)
.databaseList(POSTGRES_CONTAINER.getDatabaseName())
.tableList("inventory.array_types")
.startupOptions(StartupOptions.initial())
.serverTimeZone("UTC");
configFactory.database(POSTGRES_CONTAINER.getDatabaseName());
configFactory.slotName(slotName);
configFactory.decodingPluginName("pgoutput");

FlinkSourceProvider sourceProvider =
(FlinkSourceProvider)
new PostgresDataSource(configFactory).getEventSourceProvider();

CloseableIterator<Event> events =
env.fromSource(
sourceProvider.getSource(),
WatermarkStrategy.noWatermarks(),
PostgresDataSourceFactory.IDENTIFIER,
new EventTypeInfo())
.executeAndCollect();

List<Event> snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0;
RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after();

Object[] actualSnapshotObjects = recordFields(snapshotRecord, ARRAY_TYPES);

Assertions.assertThat(actualSnapshotObjects[0]).isEqualTo(1); // id字段

ArrayData actualTagsArray = (ArrayData) actualSnapshotObjects[1];
Assertions.assertThat(actualTagsArray.getString(0))
.isEqualTo(BinaryStringData.fromString("electronics"));
Assertions.assertThat(actualTagsArray.getString(1))
.isEqualTo(BinaryStringData.fromString("gadget"));
Assertions.assertThat(actualTagsArray.getString(2))
.isEqualTo(BinaryStringData.fromString("sale"));

org.apache.flink.cdc.common.data.ArrayData actualScoresArray =
(org.apache.flink.cdc.common.data.ArrayData) actualSnapshotObjects[2];
Assertions.assertThat(actualScoresArray.getInt(0)).isEqualTo(85);
Assertions.assertThat(actualScoresArray.getInt(1)).isEqualTo(90);
Assertions.assertThat(actualScoresArray.getInt(2)).isEqualTo(78);
}

private <T> Tuple2<List<T>, List<CreateTableEvent>> fetchResultsAndCreateTableEvent(
Iterator<T> iter, int size) {
List<T> result = new ArrayList<>(size);
Expand Down Expand Up @@ -838,4 +895,10 @@ private Instant toInstant(String ts) {
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.BIGINT());
private static final RowType ARRAY_TYPES =
RowType.of(
DataTypes.INT(),
DataTypes.ARRAY(DataTypes.STRING()),
DataTypes.ARRAY(DataTypes.INT()),
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())));
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,21 @@ VALUES
'{"key1":"value1","key2":"value2"}'::jsonb,
'[{"key1":"value1","key2":{"key2_1":"value2_1","key2_2":"value2_2"},"key3":["value3"],"key4":["value4_1","value4_2"]},{"key5":"value5"}]'::jsonb,
1
);


CREATE TABLE array_types (
id SERIAL PRIMARY KEY,
text_a1 TEXT[],
int_a1 INTEGER[]
);

ALTER TABLE inventory.array_types
REPLICA IDENTITY FULL;

INSERT INTO array_types (id,text_a1, int_a1)
VALUES
(1,
ARRAY['electronics', 'gadget', 'sale'],
'{85, 90, 78}'
);
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.data.DecimalData;
import org.apache.flink.cdc.common.data.GenericArrayData;
import org.apache.flink.cdc.common.data.GenericMapData;
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
import org.apache.flink.cdc.common.data.RecordData;
Expand Down Expand Up @@ -235,6 +236,15 @@ public Object convert(Object dbzObj, Schema schema) throws Exception {
}
};
case ARRAY:
return new DeserializationRuntimeConverter() {

private static final long serialVersionUID = 1L;

@Override
public Object convert(Object dbzObj, Schema schema) throws Exception {
return convertToArray(dbzObj, schema);
}
};
default:
throw new UnsupportedOperationException("Unsupported type: " + type);
}
Expand Down Expand Up @@ -472,6 +482,57 @@ protected Object convertToMap(Object dbzObj, Schema schema) throws Exception {
return new GenericMapData(convertedMap);
}

protected Object convertToArray(Object dbzObj, Schema schema) throws Exception {
if (dbzObj == null) {
return null;
}

Schema elementSchema = schema.valueSchema();
DataType elementType = schemaDataTypeInference.infer(null, elementSchema);
DeserializationRuntimeConverter elementConverter = createConverter(elementType);

if (dbzObj instanceof java.util.List) {
java.util.List<?> list = (java.util.List<?>) dbzObj;
Object[] array = new Object[list.size()];

for (int i = 0; i < list.size(); i++) {
Object element = list.get(i);
if (element != null) {
if (elementSchema.type() == Schema.Type.ARRAY) {
array[i] = convertToArray(element, elementSchema);
} else {
array[i] = elementConverter.convert(element, elementSchema);
}
}
}

return new GenericArrayData(array);
} else if (dbzObj.getClass().isArray()) {
Object[] inputArray = (Object[]) dbzObj;
Object[] convertedArray = new Object[inputArray.length];

for (int i = 0; i < inputArray.length; i++) {
if (inputArray[i] != null) {
if (elementSchema.type() == Schema.Type.ARRAY) {
convertedArray[i] = convertToArray(inputArray[i], elementSchema);
} else {
convertedArray[i] = elementConverter.convert(inputArray[i], elementSchema);
}
}
}

return new GenericArrayData(convertedArray);
} else {
Object[] array = new Object[1];
if (elementSchema.type() == Schema.Type.ARRAY) {
array[0] = convertToArray(dbzObj, elementSchema);
} else {
array[0] = elementConverter.convert(dbzObj, elementSchema);
}
return new GenericArrayData(array);
}
}

private static DeserializationRuntimeConverter wrapIntoNullableConverter(
DeserializationRuntimeConverter converter) {
return new DeserializationRuntimeConverter() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,13 @@ protected DataType inferStruct(Object value, Schema schema) {
}

protected DataType inferArray(Object value, Schema schema) {
throw new UnsupportedOperationException("Unsupported type ARRAY");
Schema elementSchema = schema.valueSchema();
if (elementSchema != null) {
DataType elementType = infer(null, elementSchema);
return DataTypes.ARRAY(elementType);
} else {
return DataTypes.ARRAY(DataTypes.STRING());
}
}

protected DataType inferMap(Object value, Schema schema) {
Expand Down
Loading