diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLDeserializationConverterFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLDeserializationConverterFactory.java index 74d1839977d..7b38d5b2ced 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLDeserializationConverterFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLDeserializationConverterFactory.java @@ -17,8 +17,11 @@ package org.apache.flink.cdc.connectors.postgres.table; +import java.util.List; +import java.util.Objects; import org.apache.flink.cdc.debezium.table.DeserializationRuntimeConverter; import org.apache.flink.cdc.debezium.table.DeserializationRuntimeConverterFactory; +import org.apache.flink.table.data.GenericArrayData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.types.logical.LogicalType; @@ -52,6 +55,8 @@ public Optional createUserDefinedConverter( switch (logicalType.getTypeRoot()) { case VARCHAR: return createStringConverter(); + case ARRAY: + return createArrayConverter(); default: // fallback to default converter return Optional.empty(); @@ -96,4 +101,36 @@ public Object convert(Object dbzObj, Schema schema) throws Exception { } }); } + + private static Optional createArrayConverter() { + return Optional.of( + new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) throws Exception { + if (dbzObj == null) { + return null; + } + if (Schema.Type.ARRAY.equals(schema.type()) && dbzObj instanceof List && schema.valueSchema() != null) { + switch (schema.valueSchema().type()) { + case STRING: return new GenericArrayData( + ((List) dbzObj).stream().map(i -> StringData.fromString(i.toString())).toArray()); + case INT8: + case INT16: + case INT32: return new GenericArrayData( + ((List) dbzObj).stream().mapToInt(i -> ((Integer) i)).toArray()); + case INT64: return new GenericArrayData( + ((List) dbzObj).stream().mapToLong(i -> ((Long) i)).toArray()); + } + } else if (dbzObj instanceof List) { + return new GenericArrayData(((List) dbzObj).stream().filter(Objects::nonNull) + .map(i -> StringData.fromString(i.toString())) + .toArray()); + } + return new GenericArrayData(new StringData[]{StringData.fromString(dbzObj.toString())}); + } + }); + } }