From 76a5a259184f830d211f52dd332cd205fcf670ce Mon Sep 17 00:00:00 2001 From: Junbo wang Date: Tue, 13 May 2025 15:54:53 +0800 Subject: [PATCH 1/2] [FLINK-37677][cdc-common][cdc-runtime] Sink supports skipping create table event --- .../cdc/common/sink/MetadataApplier.java | 2 +- .../schema/common/SchemaDerivator.java | 5 +- .../schema/distributed/SchemaCoordinator.java | 15 +++-- .../schema/regular/SchemaCoordinator.java | 7 ++- .../schema/regular/SchemaEvolveTest.java | 60 +++++++++++++++++++ .../RegularEventOperatorTestHarness.java | 16 +++++ 6 files changed, 98 insertions(+), 7 deletions(-) diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/MetadataApplier.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/MetadataApplier.java index 22424db997e..4d946f8a923 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/MetadataApplier.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/MetadataApplier.java @@ -41,7 +41,7 @@ default MetadataApplier setAcceptedSchemaEvolutionTypes( return this; } - /** Checks if this metadata applier should this event type. */ + /** Checks if this metadata applier should accept this event type. */ default boolean acceptsSchemaEvolutionType(SchemaChangeEventType schemaChangeEventType) { return true; } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivator.java index a2659561e58..b42f7f03f7f 100755 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivator.java @@ -25,6 +25,7 @@ import org.apache.flink.cdc.common.event.DropColumnEvent; import org.apache.flink.cdc.common.event.RenameColumnEvent; import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.SchemaChangeEventType; import org.apache.flink.cdc.common.event.SchemaChangeEventWithPreSchema; import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; @@ -171,7 +172,9 @@ public static List normalizeSchemaChangeEvents( List finalSchemaChangeEvents = new ArrayList<>(); for (SchemaChangeEvent schemaChangeEvent : rewrittenSchemaChangeEvents) { - if (metadataApplier.acceptsSchemaEvolutionType(schemaChangeEvent.getType())) { + // always accept create.table event + if (metadataApplier.acceptsSchemaEvolutionType(schemaChangeEvent.getType()) + || schemaChangeEvent.getType().equals(SchemaChangeEventType.CREATE_TABLE)) { finalSchemaChangeEvents.add(schemaChangeEvent); } else { LOG.info("Ignored schema change {}.", schemaChangeEvent); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinator.java index 8626eafbcbf..4670fa1ab29 100755 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinator.java @@ -462,11 +462,18 @@ private Tuple2, List> deduceEvolvedSchemaChanges private boolean applyAndUpdateEvolvedSchemaChange(SchemaChangeEvent schemaChangeEvent) { try { - metadataApplier.applySchemaChange(schemaChangeEvent); + // filter create.table schema change event + if (metadataApplier.acceptsSchemaEvolutionType(schemaChangeEvent.getType())) { + metadataApplier.applySchemaChange(schemaChangeEvent); + LOG.info( + "Successfully applied schema change event {} to external system.", + schemaChangeEvent); + } else { + LOG.info( + "Skip apply schema change event {} to external system.", + schemaChangeEvent); + } schemaManager.applyEvolvedSchemaChange(schemaChangeEvent); - LOG.info( - "Successfully applied schema change event {} to external system.", - schemaChangeEvent); return true; } catch (Throwable t) { handleUnrecoverableError( diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java index 53344dc87ff..36bf1a82b45 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java @@ -434,7 +434,12 @@ private void applySchemaChange(int sourceSubTaskId) { private boolean applyAndUpdateEvolvedSchemaChange(SchemaChangeEvent schemaChangeEvent) { try { - metadataApplier.applySchemaChange(schemaChangeEvent); + // filter create.table schema change event + if (metadataApplier.acceptsSchemaEvolutionType(schemaChangeEvent.getType())) { + metadataApplier.applySchemaChange(schemaChangeEvent); + } else { + LOG.info("Skip apply schema change {}.", schemaChangeEvent); + } schemaManager.applyEvolvedSchemaChange(schemaChangeEvent); LOG.info( "Successfully applied schema change event {} to external system.", diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaEvolveTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaEvolveTest.java index 740c77bd49d..503097da1d5 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaEvolveTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaEvolveTest.java @@ -2561,6 +2561,66 @@ tableId, buildRecord(INT, 12, INT, 0, SMALLINT, (short) 11)), } } + /** Tests lenient schema change behavior exclude create.table event. */ + @Test + void testLenientSchemaEvolvesExcludeCreate() throws Exception { + TableId tableId = CUSTOMERS_TABLE_ID; + Schema schemaV1 = + Schema.newBuilder() + .physicalColumn("id", INT) + .physicalColumn("name", STRING.notNull()) + .physicalColumn("age", SMALLINT) + .primaryKey("id") + .build(); + + SchemaChangeBehavior behavior = SchemaChangeBehavior.LENIENT; + + SchemaOperator schemaOperator = + new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); + RegularEventOperatorTestHarness harness = + RegularEventOperatorTestHarness.withDurationAndExcludeCreateTableBehavior( + schemaOperator, 5, Duration.ofSeconds(3), behavior); + harness.open(); + + // Test CreateTableEvent + { + List createAndInsertDataEvents = + Arrays.asList( + new CreateTableEvent(tableId, schemaV1), + DataChangeEvent.insertEvent( + tableId, + buildRecord(INT, 1, STRING, "Alice", SMALLINT, (short) 17)), + DataChangeEvent.insertEvent( + tableId, + buildRecord(INT, 2, STRING, "Bob", SMALLINT, (short) 18)), + DataChangeEvent.insertEvent( + tableId, + buildRecord(INT, 3, STRING, "Carol", SMALLINT, (short) 19))); + + processEvent(schemaOperator, createAndInsertDataEvents); + + FlushEvent result; + result = + new FlushEvent( + 0, + Collections.singletonList(tableId), + SchemaChangeEventType.CREATE_TABLE); + + Assertions.assertThat( + harness.getOutputRecords().stream() + .map(StreamRecord::getValue) + .collect(Collectors.toList())) + .isEqualTo( + ListUtils.union( + Collections.singletonList(result), createAndInsertDataEvents)); + + Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1); + Assertions.assertThat(harness.getLatestEvolvedSchema(tableId)).isEqualTo(schemaV1); + + harness.clearOutputRecords(); + } + } + private RecordData buildRecord(final Object... args) { List dataTypes = new ArrayList<>(); List objects = new ArrayList<>(); diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/RegularEventOperatorTestHarness.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/RegularEventOperatorTestHarness.java index ea4de7a14e7..86eea4bdb7f 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/RegularEventOperatorTestHarness.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/RegularEventOperatorTestHarness.java @@ -167,6 +167,22 @@ RegularEventOperatorTestHarness withDurationAndBehavior( Collections.emptySet()); } + public static , E extends Event> + RegularEventOperatorTestHarness withDurationAndExcludeCreateTableBehavior( + OP operator, + int numOutputs, + Duration evolveDuration, + SchemaChangeBehavior behavior) { + return new RegularEventOperatorTestHarness<>( + operator, + numOutputs, + evolveDuration, + DEFAULT_RPC_TIMEOUT, + behavior, + Arrays.stream(SchemaChangeEventTypeFamily.COLUMN).collect(Collectors.toSet()), + Arrays.stream(SchemaChangeEventTypeFamily.TABLE).collect(Collectors.toSet())); + } + public static , E extends Event> RegularEventOperatorTestHarness withDurationAndFineGrainedBehavior( OP operator, From 26e672e423582846657d21efd0c42194f47cd897 Mon Sep 17 00:00:00 2001 From: Junbo wang Date: Tue, 13 May 2025 17:08:13 +0800 Subject: [PATCH 2/2] fix spotless-check --- .../operators/schema/distributed/SchemaCoordinator.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinator.java index 4670fa1ab29..52251a680bd 100755 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinator.java @@ -470,8 +470,7 @@ private boolean applyAndUpdateEvolvedSchemaChange(SchemaChangeEvent schemaChange schemaChangeEvent); } else { LOG.info( - "Skip apply schema change event {} to external system.", - schemaChangeEvent); + "Skip apply schema change event {} to external system.", schemaChangeEvent); } schemaManager.applyEvolvedSchemaChange(schemaChangeEvent); return true;