Skip to content

[FLINK-37677][cdc-common][cdc-runtime] Handle Exclusion of create.table Events in Flink CDC Schema Evolution #4015

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

beryllw
Copy link
Contributor

@beryllw beryllw commented May 13, 2025

Description

This PR addresses the issue identified in FLINK-37677, where setting exclude.schema.changes: [create.table] in the YAML configuration causes Flink CDC to throw an IllegalStateException.

java.lang.IllegalStateException: Unable to coerce data record from my_company.my_branch.customers (schema: columns={`id` INT,`name` STRING NOT NULL,`age` SMALLINT}, primaryKeys=id, options=()) to my_company.my_branch.customers (schema: null)

	at org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.lambda$handleDataChangeEvent$1(SchemaOperator.java:215)
	at java.util.Optional.orElseThrow(Optional.java:290)
	at org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.handleDataChangeEvent(SchemaOperator.java:212)
	at org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.processElement(SchemaOperator.java:150)
	at org.apache.flink.cdc.runtime.operators.schema.regular.SchemaEvolveTest.processEvent(SchemaEvolveTest.java:2643)
	at org.apache.flink.cdc.runtime.operators.schema.regular.SchemaEvolveTest.testLenientSchemaEvolvesExcludeCreate(SchemaEvolveTest.java:2600)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.util.ArrayList.forEach(ArrayList.java:1259)
	at java.util.ArrayList.forEach(ArrayList.java:1259)

Motivation

  1. Platform-managed data lake/Starrocks users often lack permission to create tables directly.(Casual table creation leads to complex follow-up handling. The platform will control table creation permissions but will allow schema changes.)
  2. Users may need to customize table properties.
  3. If a created table doesn't meet requirements, users must delete it and repeatedly restart the task.

Solution

  • Always accept create.table event
  • Skip apply create.table schema change event to external system

Testing

Added unit tests.

@beryllw
Copy link
Contributor Author

beryllw commented May 13, 2025

@yuxiqian cc

@beryllw
Copy link
Contributor Author

beryllw commented May 13, 2025

@lvyanquan lvyanquan self-assigned this May 26, 2025
@lvyanquan lvyanquan added the 3.5 label May 27, 2025
Copy link
Member

@yuxiqian yuxiqian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for @beryllw's contribution, change looks reasonable.

One minor concern is if we allow skipping CreateTableEvents, we can't ensure the schemas of external tables are consistent with SchemaRegistry. Jobs may fail or write columns in wrong order.

We may need to handle this later, and let users to ensure schema consistency for now.

Comment on lines +2581 to +2582
RegularEventOperatorTestHarness.withDurationAndExcludeCreateTableBehavior(
schemaOperator, 5, Duration.ofSeconds(3), behavior);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we reuse RegularEventOperatorTestHarness#withDurationAndFineGrainedBehaviorWithError?

Comment on lines +437 to 445
// filter create.table schema change event
if (metadataApplier.acceptsSchemaEvolutionType(schemaChangeEvent.getType())) {
metadataApplier.applySchemaChange(schemaChangeEvent);
} else {
LOG.info("Skip apply schema change {}.", schemaChangeEvent);
}
schemaManager.applyEvolvedSchemaChange(schemaChangeEvent);
LOG.info(
"Successfully applied schema change event {} to external system.",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this log in the true branch since no changes will be applied if skipped

@beryllw
Copy link
Contributor Author

beryllw commented Aug 15, 2025

One minor concern is if we allow skipping CreateTableEvents, we can't ensure the schemas of external tables are consistent with SchemaRegistry. Jobs may fail or write columns in wrong order.

Thanks for pointing out this concern, @yuxiqian .

Regarding ensuring the schemas of external tables stay consistent with SchemaRegistry, I think we could extend the schema compatibility mechanism (similar to what was done in PR #4081) in future.

Perhaps we can hold off on finalizing this PR until the schema compatibility handling is further improved.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants