Skip to content

[FLINK-38249] Add support for skipping specific Debezium operations in MySQL CDC source. #4098

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -300,4 +300,9 @@ public MySqlSourceBuilder<T> assignUnboundedChunkFirst(boolean assignUnboundedCh
public MySqlSource<T> build() {
return new MySqlSource<>(configFactory, checkNotNull(deserializer));
}

public MySqlSourceBuilder<T> setDebeziumSkippedOperations(String skippedOperation) {
this.configFactory.skippedOperations(skippedOperation);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class MySqlSourceConfigFactory implements Serializable {
private List<String> tableList;
private String excludeTableList;
private String serverTimeZone = ZoneId.systemDefault().getId();
private String debeziumSkippedOperations;
private StartupOptions startupOptions = StartupOptions.initial();
private int splitSize = MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE.defaultValue();
private int splitMetaGroupSize = MySqlSourceOptions.CHUNK_META_GROUP_SIZE.defaultValue();
Expand Down Expand Up @@ -112,6 +113,11 @@ public MySqlSourceConfigFactory excludeTableList(String tableInclusions) {
return this;
}

public MySqlSourceConfigFactory skippedOperations(String debeziumSkippedOperations) {
this.debeziumSkippedOperations = debeziumSkippedOperations;
return this;
}

/** Name of the MySQL database to use when connecting to the MySQL database server. */
public MySqlSourceConfigFactory username(String username) {
this.username = username;
Expand Down Expand Up @@ -382,6 +388,9 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) {
if (serverTimeZone != null) {
props.setProperty("database.serverTimezone", serverTimeZone);
}
if (debeziumSkippedOperations != null) {
props.put("skipped.operations", debeziumSkippedOperations);
}

// override the user-defined debezium properties
if (dbzProperties != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,15 @@ public class MySqlSourceOptions {
.withDescription(
"The chunk size (number of rows) of table snapshot, captured tables are split into multiple chunks when read the snapshot of table.");

public static final ConfigOption<String> DEBEZIUM_SKIPPED_OPERATIONS =
ConfigOptions.key("debezium.skipped.operations")
.stringType()
.noDefaultValue()
.withDescription(
"The comma-separated list of operations to skip during streaming, "
+ "defined as: 'c' for inserts/create; 'u' for updates; 'd' for deletes, "
+ "'t' for truncates, By default, no operations will be skipped.");

public static final ConfigOption<Integer> SCAN_SNAPSHOT_FETCH_SIZE =
ConfigOptions.key("scan.snapshot.fetch.size")
.intType()
Expand Down