diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/DataField.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/DataField.java index f2488a27344..68e2d17eba7 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/DataField.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/DataField.java @@ -20,6 +20,7 @@ import org.apache.flink.cdc.common.annotation.PublicEvolving; import org.apache.flink.cdc.common.types.utils.DataTypeUtils; import org.apache.flink.cdc.common.utils.Preconditions; +import org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter; import javax.annotation.Nullable; @@ -125,4 +126,13 @@ public org.apache.flink.table.api.DataTypes.Field toFlinkDataTypeField() { : org.apache.flink.table.api.DataTypes.FIELD( name, DataTypeUtils.toFlinkDataType(type), description); } + + public static DataField fromFlinkDataTypeField( + org.apache.flink.table.types.logical.RowType.RowField rowField) { + return DataTypes.FIELD( + rowField.getName(), + DataTypeUtils.fromFlinkDataType( + LogicalTypeDataTypeConverter.toDataType(rowField.getType())), + rowField.getDescription().orElse(null)); + } } diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/utils/DataTypeUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/utils/DataTypeUtils.java index 4db049baf34..663a52a5369 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/utils/DataTypeUtils.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/utils/DataTypeUtils.java @@ -29,11 +29,16 @@ import org.apache.flink.cdc.common.types.DataTypes; import org.apache.flink.cdc.common.types.RowType; import org.apache.flink.cdc.common.utils.Preconditions; +import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.util.CollectionUtil; import java.util.List; import java.util.stream.Collectors; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getLength; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale; + /** Utilities for handling {@link DataType}s. */ public class DataTypeUtils { /** @@ -197,4 +202,101 @@ public static org.apache.flink.table.types.DataType toFlinkDataType(DataType typ throw new IllegalArgumentException("Illegal type: " + type); } } + + /** + * Convert Flink's internal {@link org.apache.flink.table.types.DataType} to CDC's {@link + * DataType}. + */ + public static DataType fromFlinkDataType(org.apache.flink.table.types.DataType flinkType) { + LogicalType logicalType = flinkType.getLogicalType(); + List children = flinkType.getChildren(); + DataType dataType; + switch (logicalType.getTypeRoot()) { + case CHAR: + dataType = DataTypes.CHAR(getLength(logicalType)); + break; + case VARCHAR: + dataType = DataTypes.VARCHAR(getLength(logicalType)); + break; + case BOOLEAN: + dataType = DataTypes.BOOLEAN(); + break; + case BINARY: + dataType = DataTypes.BINARY(getLength(logicalType)); + break; + case VARBINARY: + dataType = DataTypes.VARBINARY(getLength(logicalType)); + break; + case DECIMAL: + dataType = DataTypes.DECIMAL(getPrecision(logicalType), getScale(logicalType)); + break; + case TINYINT: + dataType = DataTypes.TINYINT(); + break; + case SMALLINT: + dataType = DataTypes.SMALLINT(); + break; + case INTEGER: + dataType = DataTypes.INT(); + break; + case BIGINT: + dataType = DataTypes.BIGINT(); + break; + case FLOAT: + dataType = DataTypes.FLOAT(); + break; + case DOUBLE: + dataType = DataTypes.DOUBLE(); + break; + case DATE: + dataType = DataTypes.DATE(); + break; + case TIME_WITHOUT_TIME_ZONE: + dataType = DataTypes.TIME(getPrecision(logicalType)); + break; + case TIMESTAMP_WITHOUT_TIME_ZONE: + dataType = DataTypes.TIMESTAMP(getPrecision(logicalType)); + break; + case TIMESTAMP_WITH_TIME_ZONE: + dataType = DataTypes.TIMESTAMP_TZ(getPrecision(logicalType)); + break; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + dataType = DataTypes.TIMESTAMP_LTZ(getPrecision(logicalType)); + break; + case ARRAY: + Preconditions.checkState(children != null && !children.isEmpty()); + dataType = DataTypes.ARRAY(fromFlinkDataType(children.get(0))); + break; + case MAP: + Preconditions.checkState(children != null && children.size() > 1); + dataType = + DataTypes.MAP( + fromFlinkDataType(children.get(0)), + fromFlinkDataType(children.get(1))); + break; + case ROW: + Preconditions.checkState(!CollectionUtil.isNullOrEmpty(children)); + org.apache.flink.table.types.logical.RowType rowType = + (org.apache.flink.table.types.logical.RowType) flinkType.getLogicalType(); + DataField[] fields = + rowType.getFields().stream() + .map(DataField::fromFlinkDataTypeField) + .toArray(DataField[]::new); + dataType = DataTypes.ROW(fields); + break; + case INTERVAL_YEAR_MONTH: + case INTERVAL_DAY_TIME: + case NULL: + case MULTISET: + case DISTINCT_TYPE: + case STRUCTURED_TYPE: + case RAW: + case SYMBOL: + case UNRESOLVED: + throw new IllegalArgumentException("Unsupported type: " + flinkType); + default: + throw new IllegalArgumentException("Illegal type: " + flinkType); + } + return logicalType.isNullable() ? dataType : dataType.notNull(); + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java index 06639ecaf7a..51e0fb0ed0e 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java @@ -294,9 +294,9 @@ private void applyDropColumn(DropColumnEvent event) throws SchemaEvolveException event.getDroppedColumnNames() .forEach((column) -> tableChangeList.addAll(SchemaChangeProvider.drop(column))); catalog.alterTable(tableIdToIdentifier(event), tableChangeList, true); - } catch (Catalog.TableNotExistException - | Catalog.ColumnAlreadyExistException - | Catalog.ColumnNotExistException e) { + } catch (Catalog.TableNotExistException | Catalog.ColumnNotExistException e) { + LOG.warn("Failed to apply DropColumnEvent, skip it.", e); + } catch (Catalog.ColumnAlreadyExistException e) { throw new SchemaEvolveException(event, e.getMessage(), e); } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java index 36bb9fbbc5b..9d51aeec422 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java @@ -25,30 +25,42 @@ import org.apache.flink.cdc.common.data.binary.BinaryMapData; import org.apache.flink.cdc.common.data.binary.BinaryRecordData; import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.common.types.DataTypeChecks; import org.apache.flink.cdc.common.types.DataTypeRoot; +import org.apache.flink.cdc.common.types.utils.DataTypeUtils; +import org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketAssignOperator; import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.table.types.utils.TypeConversions; +import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.Decimal; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; +import org.apache.paimon.flink.LogicalTypeConversion; import org.apache.paimon.memory.MemorySegmentUtils; +import org.apache.paimon.table.Table; import org.apache.paimon.types.RowKind; +import org.apache.paimon.types.RowType; import java.nio.ByteBuffer; import java.time.ZoneId; import java.util.ArrayList; import java.util.List; +import java.util.stream.Collectors; import static org.apache.flink.cdc.common.types.DataTypeChecks.getFieldCount; -/** A helper class for {@link PaimonWriter} to create FieldGetter and GenericRow. */ +/** + * A helper class to deduce Schema of paimon table for {@link BucketAssignOperator}, and create + * FieldGetter and GenericRow for {@link PaimonWriter}. + */ public class PaimonWriterHelper { /** create a list of {@link RecordData.FieldGetter} for {@link PaimonWriter}. */ @@ -61,6 +73,31 @@ public static List createFieldGetters(Schema schema, Zon return fieldGetters; } + /** + * Check if the columns of upstream schema is the same as the physical schema. + * + *

Note: Default value of column was ignored as it has no influence in {@link + * #createFieldGetter(DataType, int, ZoneId)}. + */ + public static Boolean sameColumnsIgnoreCommentAndDefaultValue( + Schema upstreamSchema, Schema physicalSchema) { + List upstreamColumns = upstreamSchema.getColumns(); + List physicalColumns = physicalSchema.getColumns(); + if (upstreamColumns.size() != physicalColumns.size()) { + return false; + } + for (int i = 0; i < physicalColumns.size(); i++) { + Column upstreamColumn = upstreamColumns.get(i); + Column physicalColumn = physicalColumns.get(i); + // Case sensitive. + if (!upstreamColumn.getName().equals(physicalColumn.getName()) + || !upstreamColumn.getType().equals(physicalColumn.getType())) { + return false; + } + } + return true; + } + private static RecordData.FieldGetter createFieldGetter( DataType fieldType, int fieldPos, ZoneId zoneId) { final RecordData.FieldGetter fieldGetter; @@ -215,6 +252,32 @@ public static List convertEventToFullGenericRows( return fullGenericRows; } + /** + * Deduce {@link Schema} for a {@link Table}. + * + *

Note: default value was not included in the result. + */ + public static Schema deduceSchemaForPaimonTable(Table table) { + RowType rowType = table.rowType(); + Schema.Builder builder = Schema.newBuilder(); + builder.setColumns( + rowType.getFields().stream() + .map( + column -> + Column.physicalColumn( + column.name(), + DataTypeUtils.fromFlinkDataType( + TypeConversions.fromLogicalToDataType( + LogicalTypeConversion.toLogicalType( + column.type()))), + column.description())) + .collect(Collectors.toList())); + builder.primaryKey(table.primaryKeys()); + table.comment().ifPresent(builder::comment); + builder.options(table.options()); + return builder.build(); + } + private static GenericRow convertRecordDataToGenericRow( RecordData recordData, List fieldGetters, RowKind rowKind) { GenericRow genericRow = new GenericRow(rowKind, recordData.getArity()); @@ -340,4 +403,8 @@ public InternalRow readRowData( return row; } } + + public static Identifier identifierFromTableId(TableId tableId) { + return Identifier.fromString(tableId.identifier()); + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java index 730e706e2bc..1e9faff3db0 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java @@ -17,19 +17,24 @@ package org.apache.flink.cdc.connectors.paimon.sink.v2.bucket; +import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.java.tuple.Tuple4; -import org.apache.flink.cdc.common.event.ChangeEvent; +import org.apache.flink.cdc.common.annotation.VisibleForTesting; +import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.DropTableEvent; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.FlushEvent; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.event.TruncateTableEvent; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.utils.Preconditions; import org.apache.flink.cdc.common.utils.SchemaUtils; import org.apache.flink.cdc.connectors.paimon.sink.v2.OperatorIDGenerator; import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonWriterHelper; import org.apache.flink.cdc.connectors.paimon.sink.v2.TableSchemaInfo; +import org.apache.flink.cdc.runtime.operators.schema.common.SchemaDerivator; import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient; import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway; import org.apache.flink.streaming.api.graph.StreamConfig; @@ -75,8 +80,7 @@ public class BucketAssignOperator extends AbstractStreamOperator Map> bucketAssignerMap; - // maintain the latest schema of tableId. - private Map schemaMaps; + private Map schemaMaps; private int totalTasksNumber; @@ -88,6 +92,8 @@ public class BucketAssignOperator extends AbstractStreamOperator private final ZoneId zoneId; + protected SchemaDerivator schemaDerivator; + public BucketAssignOperator( Options catalogOptions, String schemaOperatorUid, ZoneId zoneId, String commitUser) { this.catalogOptions = catalogOptions; @@ -100,11 +106,17 @@ public BucketAssignOperator( @Override public void open() throws Exception { super.open(); + open(getRuntimeContext().getTaskInfo()); + } + + @VisibleForTesting + public void open(TaskInfo taskInfo) { this.catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions); this.bucketAssignerMap = new HashMap<>(); - this.totalTasksNumber = getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(); - this.currentTaskNumber = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(); + this.totalTasksNumber = taskInfo.getNumberOfParallelSubtasks(); + this.currentTaskNumber = taskInfo.getIndexOfThisSubtask(); this.schemaMaps = new HashMap<>(); + this.schemaDerivator = new SchemaDerivator(); } @Override @@ -120,6 +132,11 @@ public void setup( toCoordinator, new OperatorIDGenerator(schemaOperatorUid).generate()); } + @VisibleForTesting + public void setSchemaEvolutionClient(SchemaEvolutionClient schemaEvolutionClient) { + this.schemaEvolutionClient = schemaEvolutionClient; + } + @Override public void processElement(StreamRecord streamRecord) throws Exception { Event event = streamRecord.getValue(); @@ -134,23 +151,8 @@ public void processElement(StreamRecord streamRecord) throws Exception { ((FlushEvent) event).getTableIds(), ((FlushEvent) event).getSchemaChangeEventType()))); } - return; - } - - if (event instanceof DataChangeEvent) { - DataChangeEvent dataChangeEvent = (DataChangeEvent) event; - if (!schemaMaps.containsKey(dataChangeEvent.tableId())) { - Optional schema = - schemaEvolutionClient.getLatestEvolvedSchema(dataChangeEvent.tableId()); - if (schema.isPresent()) { - schemaMaps.put( - dataChangeEvent.tableId(), new TableSchemaInfo(schema.get(), zoneId)); - } else { - throw new RuntimeException( - "Could not find schema message from SchemaRegistry for " - + dataChangeEvent.tableId()); - } - } + } else if (event instanceof DataChangeEvent) { + DataChangeEvent dataChangeEvent = convertDataChangeEvent((DataChangeEvent) event); Tuple4 tuple4 = bucketAssignerMap.computeIfAbsent( dataChangeEvent.tableId(), this::getTableInfo); @@ -158,7 +160,10 @@ public void processElement(StreamRecord streamRecord) throws Exception { GenericRow genericRow = PaimonWriterHelper.convertEventToGenericRow( dataChangeEvent, - schemaMaps.get(dataChangeEvent.tableId()).getFieldGetters()); + schemaMaps + .get(dataChangeEvent.tableId()) + .getPaimonSchemaInfo() + .getFieldGetters()); switch (tuple4.f0) { case HASH_DYNAMIC: { @@ -186,25 +191,132 @@ public void processElement(StreamRecord streamRecord) throws Exception { } } output.collect( - new StreamRecord<>(new BucketWrapperChangeEvent(bucket, (ChangeEvent) event))); - } else if (event instanceof SchemaChangeEvent) { - SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event; - Schema schema = - SchemaUtils.applySchemaChangeEvent( - Optional.ofNullable(schemaMaps.get(schemaChangeEvent.tableId())) - .map(TableSchemaInfo::getSchema) - .orElse(null), - schemaChangeEvent); - schemaMaps.put(schemaChangeEvent.tableId(), new TableSchemaInfo(schema, zoneId)); + new StreamRecord<>(new BucketWrapperChangeEvent(bucket, dataChangeEvent))); + } else { // Broadcast SchemachangeEvent. for (int index = 0; index < totalTasksNumber; index++) { output.collect( new StreamRecord<>( - new BucketWrapperChangeEvent(index, (ChangeEvent) event))); + new BucketWrapperChangeEvent( + index, + convertSchemaChangeEvent((SchemaChangeEvent) event)))); } } } + @VisibleForTesting + public SchemaChangeEvent convertSchemaChangeEvent(SchemaChangeEvent schemaChangeEvent) + throws Exception { + if (schemaChangeEvent instanceof DropTableEvent + || schemaChangeEvent instanceof TruncateTableEvent) { + return schemaChangeEvent; + } + TableId tableId = schemaChangeEvent.tableId(); + Schema upstreamSchema; + try { + upstreamSchema = + schemaMaps.containsKey(tableId) + ? schemaMaps.get(tableId).getUpstreamSchemaInfo().getSchema() + : schemaEvolutionClient.getLatestEvolvedSchema(tableId).orElse(null); + } catch (Exception e) { + // In batch mode, we can't get schema from registry. + upstreamSchema = null; + } + if (!SchemaUtils.isSchemaChangeEventRedundant(upstreamSchema, schemaChangeEvent)) { + upstreamSchema = SchemaUtils.applySchemaChangeEvent(upstreamSchema, schemaChangeEvent); + } + Schema physicalSchema = + PaimonWriterHelper.deduceSchemaForPaimonTable( + catalog.getTable(PaimonWriterHelper.identifierFromTableId(tableId))); + MixedSchemaInfo mixedSchemaInfo = + new MixedSchemaInfo( + new TableSchemaInfo(upstreamSchema, zoneId), + new TableSchemaInfo(physicalSchema, zoneId)); + if (!mixedSchemaInfo.isSameColumnsIgnoreCommentAndDefaultValue()) { + LOGGER.warn( + "Upstream schema of {} is {}, which is different with paimon physical table schema {}.", + tableId, + upstreamSchema, + physicalSchema); + } + schemaMaps.put(tableId, mixedSchemaInfo); + return new CreateTableEvent(tableId, physicalSchema); + } + + @VisibleForTesting + public DataChangeEvent convertDataChangeEvent(DataChangeEvent dataChangeEvent) + throws Exception { + TableId tableId = dataChangeEvent.tableId(); + if (!schemaMaps.containsKey(dataChangeEvent.tableId())) { + Optional schema; + try { + schema = schemaEvolutionClient.getLatestEvolvedSchema(dataChangeEvent.tableId()); + } catch (Exception e) { + // In batch mode, we can't get schema from registry. + schema = Optional.empty(); + } + if (schema.isPresent()) { + MixedSchemaInfo mixedSchemaInfo = + new MixedSchemaInfo( + new TableSchemaInfo(schema.get(), zoneId), + new TableSchemaInfo( + PaimonWriterHelper.deduceSchemaForPaimonTable( + catalog.getTable( + PaimonWriterHelper.identifierFromTableId( + tableId))), + zoneId)); + if (!mixedSchemaInfo.isSameColumnsIgnoreCommentAndDefaultValue()) { + LOGGER.warn( + "Upstream schema of {} is {}, which is different with paimon physical table schema {}.", + tableId, + mixedSchemaInfo.getUpstreamSchemaInfo().getSchema(), + mixedSchemaInfo.getPaimonSchemaInfo().getSchema()); + } + // Broadcast the CreateTableEvent with physical schema after job restarted. + // This is necessary because the DataSinkOperator would emit the upstream schema. + for (int index = 0; index < totalTasksNumber; index++) { + output.collect( + new StreamRecord<>( + new BucketWrapperChangeEvent( + index, + new CreateTableEvent( + tableId, + mixedSchemaInfo.paimonSchemaInfo + .getSchema())))); + } + schemaMaps.put(tableId, mixedSchemaInfo); + } else { + throw new RuntimeException( + "Could not find schema message from SchemaRegistry for " + + tableId + + ", this may because of this table was dropped."); + } + } + MixedSchemaInfo mixedSchemaInfo = schemaMaps.get(tableId); + if (!mixedSchemaInfo.isSameColumnsIgnoreCommentAndDefaultValue()) { + dataChangeEvent = + schemaDerivator + .coerceDataRecord( + zoneId.getId(), + dataChangeEvent, + mixedSchemaInfo.getUpstreamSchemaInfo().getSchema(), + mixedSchemaInfo.getPaimonSchemaInfo().getSchema()) + .orElseThrow( + () -> + new IllegalStateException( + String.format( + "Unable to coerce data record of %s from (schema: %s) to (schema: %s)", + tableId, + mixedSchemaInfo + .getUpstreamSchemaInfo() + .getSchema(), + mixedSchemaInfo + .getPaimonSchemaInfo() + .getSchema()))); + } + return dataChangeEvent; + } + private Tuple4 getTableInfo(TableId tableId) { Preconditions.checkNotNull(tableId, "Invalid tableId in given event."); @@ -217,7 +329,7 @@ public void processElement(StreamRecord streamRecord) throws Exception { long targetRowNum = table.coreOptions().dynamicBucketTargetRowNum(); Integer numAssigners = table.coreOptions().dynamicBucketInitialBuckets(); Integer maxBucketsNum = table.coreOptions().dynamicBucketMaxBuckets(); - + LOGGER.debug("Succeed to get table info " + table); return new Tuple4<>( table.bucketMode(), table.createRowKeyExtractor(), @@ -232,4 +344,34 @@ public void processElement(StreamRecord streamRecord) throws Exception { maxBucketsNum), new RowPartitionKeyExtractor(table.schema())); } + + /** MixedSchemaInfo is used to store the mixed schema info of upstream and paimon table. */ + private static class MixedSchemaInfo { + private final TableSchemaInfo upstreamSchemaInfo; + + private final TableSchemaInfo paimonSchemaInfo; + + private final boolean sameColumnsIgnoreCommentAndDefaultValue; + + public MixedSchemaInfo( + TableSchemaInfo upstreamSchemaInfo, TableSchemaInfo paimonSchemaInfo) { + this.upstreamSchemaInfo = upstreamSchemaInfo; + this.paimonSchemaInfo = paimonSchemaInfo; + this.sameColumnsIgnoreCommentAndDefaultValue = + PaimonWriterHelper.sameColumnsIgnoreCommentAndDefaultValue( + upstreamSchemaInfo.getSchema(), paimonSchemaInfo.getSchema()); + } + + public TableSchemaInfo getUpstreamSchemaInfo() { + return upstreamSchemaInfo; + } + + public TableSchemaInfo getPaimonSchemaInfo() { + return paimonSchemaInfo; + } + + public boolean isSameColumnsIgnoreCommentAndDefaultValue() { + return sameColumnsIgnoreCommentAndDefaultValue; + } + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java index f28fd97c2c2..c44ec3851c8 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.JobInfo; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.TaskInfo; +import org.apache.flink.api.common.TaskInfoImpl; import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.api.common.operators.ProcessingTimeService; import org.apache.flink.api.common.serialization.SerializationSchema; @@ -38,6 +39,7 @@ import org.apache.flink.cdc.common.event.DropColumnEvent; import org.apache.flink.cdc.common.event.DropTableEvent; import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily; import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.event.TruncateTableEvent; @@ -61,6 +63,8 @@ import org.apache.flink.cdc.connectors.paimon.sink.PaimonDataSinkFactory; import org.apache.flink.cdc.connectors.paimon.sink.PaimonDataSinkOptions; import org.apache.flink.cdc.connectors.paimon.sink.PaimonMetadataApplier; +import org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketAssignOperator; +import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient; import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.SinkWriterMetricGroup; @@ -86,6 +90,8 @@ import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.EnumSource; +import org.mockito.Mockito; import java.io.File; import java.io.IOException; @@ -97,13 +103,16 @@ import java.util.HashSet; import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.OptionalLong; import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; import static org.apache.flink.cdc.common.pipeline.PipelineOptions.DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT; +import static org.apache.flink.cdc.common.types.DataTypes.INT; import static org.apache.flink.cdc.common.types.DataTypes.STRING; +import static org.apache.flink.cdc.common.types.DataTypes.VARCHAR; import static org.apache.flink.configuration.ConfigConstants.DEFAULT_PARALLELISM; /** An ITCase for {@link PaimonWriter} and {@link PaimonCommitter}. */ @@ -182,6 +191,15 @@ private List createTestEvents(boolean enableDeleteVectors) throws SchemaE private List createTestEvents( boolean enableDeleteVectors, boolean appendOnly, boolean enabledBucketKey) throws SchemaEvolveException { + return createTestEvents(enableDeleteVectors, appendOnly, enabledBucketKey, null); + } + + private List createTestEvents( + boolean enableDeleteVectors, + boolean appendOnly, + boolean enabledBucketKey, + RandomSchemaCase randomSchemaCase) + throws SchemaEvolveException { List testEvents = new ArrayList<>(); Schema.Builder builder = Schema.newBuilder(); if (!appendOnly) { @@ -192,14 +210,19 @@ private List createTestEvents( } // create table Schema schema = - builder.physicalColumn("col1", STRING()) + builder.physicalColumn("col1", STRING().notNull()) .physicalColumn("col2", STRING()) .option("deletion-vectors.enabled", String.valueOf(enableDeleteVectors)) .build(); CreateTableEvent createTableEvent = new CreateTableEvent(table1, schema); testEvents.add(createTableEvent); PaimonMetadataApplier metadataApplier = new PaimonMetadataApplier(catalogOptions); - metadataApplier.applySchemaChange(createTableEvent); + if (randomSchemaCase != null) { + metadataApplier.applySchemaChange( + new CreateTableEvent(table1, generateRandomSchema(schema, randomSchemaCase))); + } else { + metadataApplier.applySchemaChange(createTableEvent); + } // insert testEvents.add( @@ -211,6 +234,37 @@ private List createTestEvents( return testEvents; } + private Schema generateRandomSchema(Schema schema, RandomSchemaCase randomSchemaCase) { + Schema.Builder builder = new Schema.Builder(); + switch (randomSchemaCase) { + case ADD_COLUMN: + { + builder.physicalColumn("col1", STRING().notNull()) + .physicalColumn("col2", STRING()) + .physicalColumn("op_ts", INT()); + break; + } + case REMOVE_COLUMN: + { + builder.physicalColumn("col1", STRING().notNull()); + break; + } + case REORDER_COLUMN: + { + builder.physicalColumn("col2", STRING()) + .physicalColumn("col1", STRING().notNull()); + break; + } + case MODIFY_COLUMN: + { + builder.physicalColumn("col1", STRING().notNull()) + .physicalColumn("col2", VARCHAR(10)); + break; + } + } + return schema.copy(builder.build().getColumns()); + } + @ParameterizedTest @CsvSource({"filesystem, true", "filesystem, false", "hive, true", "hive, false"}) public void testSinkWithDataChange(String metastore, boolean enableDeleteVector) @@ -413,6 +467,220 @@ public void testSinkWithSchemaChange(String metastore, boolean enableDeleteVecto .hasRootCauseMessage("Object 'table1' not found within 'paimon_catalog.test'"); } + enum RandomSchemaCase { + ADD_COLUMN, + REMOVE_COLUMN, + REORDER_COLUMN, + MODIFY_COLUMN; + } + + @ParameterizedTest + @EnumSource(RandomSchemaCase.class) + public void testSinkWithSchemaChangeForExistedTable(RandomSchemaCase randomSchemaCase) + throws Exception { + initialize("filesystem"); + PaimonSink paimonSink = + new PaimonSink<>( + catalogOptions, new PaimonRecordEventSerializer(ZoneId.systemDefault())); + PaimonWriter writer = paimonSink.createWriter(new MockInitContext()); + Committer committer = paimonSink.createCommitter(); + BucketAssignOperator bucketAssignOperator = + new BucketAssignOperator(catalogOptions, null, ZoneId.systemDefault(), null); + SchemaEvolutionClient schemaEvolutionClient = Mockito.mock(SchemaEvolutionClient.class); + Mockito.when(schemaEvolutionClient.getLatestEvolvedSchema(Mockito.any())) + .thenReturn(Optional.empty()); + bucketAssignOperator.setSchemaEvolutionClient(schemaEvolutionClient); + bucketAssignOperator.open(new TaskInfoImpl("test_TaskInfo", 1, 0, 1, 0)); + + // 1. receive only DataChangeEvents during one checkpoint + writeAndCommit( + bucketAssignOperator, + writer, + committer, + createTestEvents(false, false, true, randomSchemaCase).toArray(new Event[0])); + switch (randomSchemaCase) { + case ADD_COLUMN: + { + Assertions.assertThat(fetchResults(table1)) + .containsExactlyInAnyOrder( + Row.ofKind(RowKind.INSERT, "1", "1", null), + Row.ofKind(RowKind.INSERT, "2", "2", null)); + break; + } + case REMOVE_COLUMN: + { + Assertions.assertThat(fetchResults(table1)) + .containsExactlyInAnyOrder( + Row.ofKind(RowKind.INSERT, "1"), + Row.ofKind(RowKind.INSERT, "2")); + break; + } + case REORDER_COLUMN: + { + Assertions.assertThat(fetchResults(table1)) + .containsExactlyInAnyOrder( + Row.ofKind(RowKind.INSERT, "1", "1"), + Row.ofKind(RowKind.INSERT, "2", "2")); + break; + } + case MODIFY_COLUMN: + { + Assertions.assertThat(fetchResults(table1)) + .containsExactlyInAnyOrder( + Row.ofKind(RowKind.INSERT, "1", "1"), + Row.ofKind(RowKind.INSERT, "2", "2")); + } + } + + // 2. receive DataChangeEvents and SchemaChangeEvents during one checkpoint + writeAndCommit( + bucketAssignOperator, + writer, + committer, + generateInsert( + table1, Arrays.asList(Tuple2.of(STRING(), "3"), Tuple2.of(STRING(), "3")))); + + // add column + AddColumnEvent.ColumnWithPosition columnWithPosition = + new AddColumnEvent.ColumnWithPosition(Column.physicalColumn("col3", STRING())); + AddColumnEvent addColumnEvent = + new AddColumnEvent(table1, Collections.singletonList(columnWithPosition)); + PaimonMetadataApplier metadataApplier = new PaimonMetadataApplier(catalogOptions); + metadataApplier.applySchemaChange(addColumnEvent); + writer.write(bucketAssignOperator.convertSchemaChangeEvent(addColumnEvent), null); + + writeAndCommit( + bucketAssignOperator, + writer, + committer, + generateInsert( + table1, + Arrays.asList( + Tuple2.of(STRING(), "4"), + Tuple2.of(STRING(), "4"), + Tuple2.of(STRING(), "4")))); + switch (randomSchemaCase) { + case ADD_COLUMN: + { + Assertions.assertThat(fetchResults(table1)) + .containsExactlyInAnyOrder( + Row.ofKind(RowKind.INSERT, "1", "1", null, null), + Row.ofKind(RowKind.INSERT, "2", "2", null, null), + Row.ofKind(RowKind.INSERT, "3", "3", null, null), + Row.ofKind(RowKind.INSERT, "4", "4", null, "4")); + break; + } + case REMOVE_COLUMN: + { + Assertions.assertThat(fetchResults(table1)) + .containsExactlyInAnyOrder( + Row.ofKind(RowKind.INSERT, "1", null), + Row.ofKind(RowKind.INSERT, "2", null), + Row.ofKind(RowKind.INSERT, "3", null), + Row.ofKind(RowKind.INSERT, "4", "4")); + break; + } + case REORDER_COLUMN: + { + Assertions.assertThat(fetchResults(table1)) + .containsExactlyInAnyOrder( + Row.ofKind(RowKind.INSERT, "1", "1", null), + Row.ofKind(RowKind.INSERT, "2", "2", null), + Row.ofKind(RowKind.INSERT, "3", "3", null), + Row.ofKind(RowKind.INSERT, "4", "4", "4")); + break; + } + case MODIFY_COLUMN: + { + Assertions.assertThat(fetchResults(table1)) + .containsExactlyInAnyOrder( + Row.ofKind(RowKind.INSERT, "1", "1", null), + Row.ofKind(RowKind.INSERT, "2", "2", null), + Row.ofKind(RowKind.INSERT, "3", "3", null), + Row.ofKind(RowKind.INSERT, "4", "4", "4")); + } + } + + // 2. receive DataChangeEvents and SchemaChangeEvents during one checkpoint + writeAndCommit( + bucketAssignOperator, + writer, + committer, + generateInsert( + table1, + Arrays.asList( + Tuple2.of(STRING(), "5"), + Tuple2.of(STRING(), "5"), + Tuple2.of(STRING(), "5")))); + + // drop column + DropColumnEvent dropColumnEvent = + new DropColumnEvent(table1, Collections.singletonList("col2")); + metadataApplier.applySchemaChange(dropColumnEvent); + writer.write(bucketAssignOperator.convertSchemaChangeEvent(dropColumnEvent), null); + + writeAndCommit( + bucketAssignOperator, + writer, + committer, + generateInsert( + table1, Arrays.asList(Tuple2.of(STRING(), "6"), Tuple2.of(STRING(), "6")))); + + List result = fetchResults(TableId.tableId("test", "`table1$files`")); + Set deduplicated = new HashSet<>(result); + Assertions.assertThat(result).hasSameSizeAs(deduplicated); + + switch (randomSchemaCase) { + case ADD_COLUMN: + { + Assertions.assertThat(fetchResults(table1)) + .containsExactlyInAnyOrder( + Row.ofKind(RowKind.INSERT, "1", null, null), + Row.ofKind(RowKind.INSERT, "2", null, null), + Row.ofKind(RowKind.INSERT, "3", null, null), + Row.ofKind(RowKind.INSERT, "4", null, "4"), + Row.ofKind(RowKind.INSERT, "5", null, "5"), + Row.ofKind(RowKind.INSERT, "6", null, "6")); + break; + } + case REMOVE_COLUMN: + { + Assertions.assertThat(fetchResults(table1)) + .containsExactlyInAnyOrder( + Row.ofKind(RowKind.INSERT, "1", null), + Row.ofKind(RowKind.INSERT, "2", null), + Row.ofKind(RowKind.INSERT, "3", null), + Row.ofKind(RowKind.INSERT, "4", "4"), + Row.ofKind(RowKind.INSERT, "5", "5"), + Row.ofKind(RowKind.INSERT, "6", "6")); + break; + } + case REORDER_COLUMN: + { + Assertions.assertThat(fetchResults(table1)) + .containsExactlyInAnyOrder( + Row.ofKind(RowKind.INSERT, "1", null), + Row.ofKind(RowKind.INSERT, "2", null), + Row.ofKind(RowKind.INSERT, "3", null), + Row.ofKind(RowKind.INSERT, "4", "4"), + Row.ofKind(RowKind.INSERT, "5", "5"), + Row.ofKind(RowKind.INSERT, "6", "6")); + break; + } + case MODIFY_COLUMN: + { + Assertions.assertThat(fetchResults(table1)) + .containsExactlyInAnyOrder( + Row.ofKind(RowKind.INSERT, "1", null), + Row.ofKind(RowKind.INSERT, "2", null), + Row.ofKind(RowKind.INSERT, "3", null), + Row.ofKind(RowKind.INSERT, "4", "4"), + Row.ofKind(RowKind.INSERT, "5", "5"), + Row.ofKind(RowKind.INSERT, "6", "6")); + } + } + } + @ParameterizedTest @CsvSource({"filesystem, true", "filesystem, false", "hive, true", "hive, false"}) public void testSinkWithMultiTables(String metastore, boolean enableDeleteVector) @@ -473,6 +741,24 @@ private static void writeAndCommit( commit(writer, committer); } + private static void writeAndCommit( + BucketAssignOperator bucketAssignOperator, + PaimonWriter writer, + Committer committer, + Event... events) + throws Exception { + for (Event event : events) { + if (event instanceof DataChangeEvent) { + event = bucketAssignOperator.convertDataChangeEvent((DataChangeEvent) event); + } else { + event = bucketAssignOperator.convertSchemaChangeEvent((SchemaChangeEvent) event); + } + writer.write(event, null); + } + writer.flush(false); + commit(writer, committer); + } + private List fetchResults(TableId tableId) { List results = new ArrayList<>(); tEnv.sqlQuery("select * from paimon_catalog." + tableId.toString()) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelperTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelperTest.java index 3e3ebdb9d34..02efe00f855 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelperTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelperTest.java @@ -25,34 +25,46 @@ import org.apache.flink.cdc.common.data.binary.BinaryMapData; import org.apache.flink.cdc.common.data.binary.BinaryRecordData; import org.apache.flink.cdc.common.data.binary.BinaryStringData; +import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.DataChangeEvent; import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.types.DataTypes; import org.apache.flink.cdc.common.types.RowType; +import org.apache.flink.cdc.connectors.paimon.sink.PaimonMetadataApplier; import org.apache.flink.cdc.runtime.serializer.data.MapDataSerializer; import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.Decimal; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.NestedRow; import org.apache.paimon.data.Timestamp; +import org.apache.paimon.flink.FlinkCatalogFactory; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.Table; import org.apache.paimon.types.RowKind; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import java.io.File; import java.math.BigDecimal; import java.time.Instant; import java.time.ZoneId; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; /** Tests for {@link PaimonWriterHelper}. */ class PaimonWriterHelperTest { + @TempDir public static java.nio.file.Path temporaryFolder; + @Test void testConvertEventToGenericRowOfAllDataTypes() { RowType rowType = @@ -241,6 +253,133 @@ void testConvertEventToGenericRowWithNestedRow() { Assertions.assertThat(extractedMap).containsAllEntriesOf(expectedMap); } + @Test + void testSameColumnsWithOutCommentAndDefaultValue() { + Schema schema1 = + Schema.newBuilder() + .physicalColumn("col1", DataTypes.STRING(), "col1", "default") + .physicalColumn("col2", DataTypes.STRING(), "col2", "default") + .physicalColumn("col3", DataTypes.STRING(), "col3", "default") + .primaryKey("col1") + .partitionKey("col2") + .comment("schema1") + .option("key", "value") + .build(); + Schema schema2 = + Schema.newBuilder() + .physicalColumn("col1", DataTypes.STRING()) + .physicalColumn("col2", DataTypes.STRING()) + .physicalColumn("col3", DataTypes.STRING()) + .build(); + Assertions.assertThat( + PaimonWriterHelper.sameColumnsIgnoreCommentAndDefaultValue( + schema1, schema2)) + .isTrue(); + + // Not null. + schema1 = + Schema.newBuilder() + .physicalColumn("col1", DataTypes.STRING(), "col1", "default") + .physicalColumn("col2", DataTypes.STRING(), "col2", "default") + .physicalColumn("col3", DataTypes.STRING().notNull(), "col3", "default") + .primaryKey("col1") + .partitionKey("col2") + .comment("schema1") + .option("key", "value") + .build(); + schema2 = + Schema.newBuilder() + .physicalColumn("col1", DataTypes.STRING()) + .physicalColumn("col2", DataTypes.STRING()) + .physicalColumn("col3", DataTypes.STRING()) + .build(); + Assertions.assertThat( + PaimonWriterHelper.sameColumnsIgnoreCommentAndDefaultValue( + schema1, schema2)) + .isFalse(); + + schema1 = + Schema.newBuilder() + .physicalColumn("col1", DataTypes.STRING()) + .physicalColumn("col2", DataTypes.STRING()) + .physicalColumn("col3", DataTypes.STRING()) + .physicalColumn("col4", DataTypes.STRING()) + .build(); + schema2 = + Schema.newBuilder() + .physicalColumn("col1", DataTypes.STRING()) + .physicalColumn("col2", DataTypes.STRING()) + .physicalColumn("col3", DataTypes.STRING()) + .build(); + Assertions.assertThat( + PaimonWriterHelper.sameColumnsIgnoreCommentAndDefaultValue( + schema1, schema2)) + .isFalse(); + + // Different order. + schema1 = + Schema.newBuilder() + .physicalColumn("col1", DataTypes.STRING()) + .physicalColumn("col3", DataTypes.STRING()) + .physicalColumn("col2", DataTypes.STRING()) + .build(); + schema2 = + Schema.newBuilder() + .physicalColumn("col1", DataTypes.STRING()) + .physicalColumn("col2", DataTypes.STRING()) + .physicalColumn("col3", DataTypes.STRING()) + .build(); + Assertions.assertThat( + PaimonWriterHelper.sameColumnsIgnoreCommentAndDefaultValue( + schema1, schema2)) + .isFalse(); + } + + @Test + void testDeduceSchemaForPaimonTable() throws Catalog.TableNotExistException { + Options catalogOptions = new Options(); + String warehouse = + new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString(); + catalogOptions.setString("warehouse", warehouse); + catalogOptions.setString("cache-enabled", "false"); + PaimonMetadataApplier metadataApplier = new PaimonMetadataApplier(catalogOptions); + Schema allTypeSchema = + Schema.newBuilder() + .physicalColumn("col1", DataTypes.STRING().notNull()) + .physicalColumn("boolean", DataTypes.BOOLEAN()) + .physicalColumn("binary", DataTypes.BINARY(3)) + .physicalColumn("varbinary", DataTypes.VARBINARY(10)) + .physicalColumn("bytes", DataTypes.BYTES()) + .physicalColumn("tinyint", DataTypes.TINYINT()) + .physicalColumn("smallint", DataTypes.SMALLINT()) + .physicalColumn("int", DataTypes.INT()) + .physicalColumn("float", DataTypes.FLOAT()) + .physicalColumn("double", DataTypes.DOUBLE()) + .physicalColumn("decimal", DataTypes.DECIMAL(6, 3)) + .physicalColumn("char", DataTypes.CHAR(5)) + .physicalColumn("varchar", DataTypes.VARCHAR(10)) + .physicalColumn("string", DataTypes.STRING()) + .physicalColumn("date", DataTypes.DATE()) + .physicalColumn("time", DataTypes.TIME()) + .physicalColumn("time_with_precision", DataTypes.TIME(6)) + .physicalColumn("timestamp", DataTypes.TIMESTAMP()) + .physicalColumn("timestamp_with_precision", DataTypes.TIMESTAMP(3)) + .physicalColumn("timestamp_ltz", DataTypes.TIMESTAMP_LTZ()) + .physicalColumn("timestamp_ltz_with_precision", DataTypes.TIMESTAMP_LTZ(3)) + .primaryKey("col1") + .build(); + CreateTableEvent createTableEvent = + new CreateTableEvent(TableId.parse("test.table1"), allTypeSchema); + metadataApplier.applySchemaChange(createTableEvent); + Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions); + Table table = catalog.getTable(Identifier.fromString("test.table1")); + Schema deducedSchema = PaimonWriterHelper.deduceSchemaForPaimonTable(table); + Assertions.assertThat( + PaimonWriterHelper.sameColumnsIgnoreCommentAndDefaultValue( + deducedSchema, allTypeSchema)) + .isTrue(); + } + private static Map extractMap(InternalMap internalMap) { Map resultMap = new HashMap<>(); for (int i = 0; i < internalMap.size(); i++) {