-
Notifications
You must be signed in to change notification settings - Fork 2.1k
[FLINK-37677][cdc-common][cdc-runtime] Handle Exclusion of create.table
Events in Flink CDC Schema Evolution
#4015
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
@yuxiqian cc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for @beryllw's contribution, change looks reasonable.
One minor concern is if we allow skipping CreateTableEvents, we can't ensure the schemas of external tables are consistent with SchemaRegistry. Jobs may fail or write columns in wrong order.
We may need to handle this later, and let users to ensure schema consistency for now.
RegularEventOperatorTestHarness.withDurationAndExcludeCreateTableBehavior( | ||
schemaOperator, 5, Duration.ofSeconds(3), behavior); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we reuse RegularEventOperatorTestHarness#withDurationAndFineGrainedBehaviorWithError
?
// filter create.table schema change event | ||
if (metadataApplier.acceptsSchemaEvolutionType(schemaChangeEvent.getType())) { | ||
metadataApplier.applySchemaChange(schemaChangeEvent); | ||
} else { | ||
LOG.info("Skip apply schema change {}.", schemaChangeEvent); | ||
} | ||
schemaManager.applyEvolvedSchemaChange(schemaChangeEvent); | ||
LOG.info( | ||
"Successfully applied schema change event {} to external system.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this log in the true
branch since no changes will be applied if skipped
Thanks for pointing out this concern, @yuxiqian . Regarding ensuring the schemas of external tables stay consistent with SchemaRegistry, I think we could extend the schema compatibility mechanism (similar to what was done in PR #4081) in future. Perhaps we can hold off on finalizing this PR until the schema compatibility handling is further improved. |
Description
This PR addresses the issue identified in FLINK-37677, where setting
exclude.schema.changes: [create.table]
in the YAML configuration causes Flink CDC to throw anIllegalStateException
.Motivation
Solution
Testing
Added unit tests.