Skip to content

[FLINK-34864][cdc-connector-mysql] Add the IgnoreNoPrimaryKeyTable parameter to skip tables without primary keys in multi-table synchronization #4027

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,13 @@ Flink SQL> SELECT * FROM orders;
如果设置 'use.legacy.json.format' = 'false', 这条数据会被转换为 {"key1": "value1", "key2": "value2"}, 也就是 key 和 value 前的空格都会被保留。
</td>
</tr>
<tr>
<td>ignore-no-primary-key-table</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>是否跳过没有主键的表。如果设置为true,连接器将跳过没有主键的表。</td>
</tr>
</tbody>
</table>
</div>
Expand Down Expand Up @@ -644,8 +651,8 @@ Flink 定期为 Source 执行 checkpoint,在故障转移的情况下,作业

在执行增量快照读取时,MySQL CDC source 需要一个用于分片的的算法。
MySQL CDC Source 使用主键列将表划分为多个分片(chunk)。 默认情况下,MySQL CDC source 会识别表的主键列,并使用主键中的第一列作为用作分片列。
如果表中没有主键,用户必须指定 `scan.incremental.snapshot.chunk.key-column`
否则增量快照读取将失败,你可以禁用 `scan.incremental.snapshot.enabled` 恢复到旧的快照读取机制。
如果表中没有主键,用户必须指定 `scan.incremental.snapshot.chunk.key-column`作为分块键,或者
设置 `ignore-no-primary-key-table` 参数为 true 以跳过没有主键的表。否则增量快照读取将失败,你可以禁用 `scan.incremental.snapshot.enabled` 恢复到旧的快照读取机制。
请注意,使用不在主键中的列作为分块键可能会降低表的查询性能。

对于数值和自动增量拆分列,MySQL CDC Source 按固定步长高效地拆分块。
Expand Down Expand Up @@ -868,6 +875,8 @@ $ ./bin/flink run \
由于**处理顺序**无法保证,最终 `id=0` 的 `pid` 可能为 `2` 或 `4`,从而导致数据不一致。


从 3.5.0 版本开始,MySQL 变更数据捕获(CDC)提供了一个忽略无主键表的选项。
当 “ignore-no-primary-key-table”(忽略无主键表)设置为 “true”(真)时,连接器将跳过没有主键的表。
### 可用的指标

指标系统能够帮助了解分片分发的进展, 下面列举出了支持的 Flink 指标 [Flink metrics](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/):
Expand Down
12 changes: 11 additions & 1 deletion docs/content/docs/connectors/flink-sources/mysql-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,13 @@ During a snapshot operation, the connector will query each included table to pro
For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially.
</td>
</tr>
<tr>
<td>ignore-no-primary-key-table</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to skip tables without primary keys. If set to true, the connector will skip tables that don't have a primary key.</td>
</tr>
</tbody>
</table>
</div>
Expand Down Expand Up @@ -666,7 +673,7 @@ Flink performs checkpoints for the source periodically, in case of failover, the

When performing incremental snapshot reading, MySQL CDC source need a criterion which used to split the table.
MySQL CDC Source use a splitting column to split the table to multiple splits (chunks). By default, MySQL CDC source will identify the primary key column of the table and use the first column in primary key as the splitting column.
If there is no primary key in the table, user must specify `scan.incremental.snapshot.chunk.key-column`,
If there is no primary key in the table, users must specify scan.incremental.snapshot.chunk.key-column as the chunk key, or set ignore-no-primary-key-table to true to skip tables without primary keys,
otherwise incremental snapshot reading will fail and you can disable `scan.incremental.snapshot.enabled` to fallback to old snapshot reading mechanism.
Please note that using a column not in primary key as a chunk key can result in slower table query performance.

Expand Down Expand Up @@ -892,6 +899,9 @@ Using a **non-primary key column** as the `scan.incremental.snapshot.chunk.key-c

Since the order of processing these records cannot be guaranteed, the final value of `pid` for `id=0` may end up being either `2` or `4`, leading to potential data inconsistencies.

Starting from version 3.5.0, MySQL CDC provides an option to ignore tables without primary keys.
When `ignore-no-primary-key-table` is set to `true`, the connector will skip tables that don't have a primary key.

### About converting binary type data to base64 encoded data

```sql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.CONNECT_TIMEOUT;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HEARTBEAT_INTERVAL;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.IGNORE_NO_PRIMARY_KEY_TABLE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.INCLUDE_COMMENTS_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.METADATA_LIST;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES;
Expand Down Expand Up @@ -168,6 +169,7 @@ public DataSource createDataSource(Context context) {
boolean useLegacyJsonFormat = config.get(USE_LEGACY_JSON_FORMAT);
boolean isAssignUnboundedChunkFirst =
config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
boolean ignoreNoPrimaryKeyTable = config.get(IGNORE_NO_PRIMARY_KEY_TABLE);

validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
Expand Down Expand Up @@ -221,6 +223,7 @@ public DataSource createDataSource(Context context) {
.treatTinyInt1AsBoolean(treatTinyInt1AsBoolean)
.useLegacyJsonFormat(useLegacyJsonFormat)
.assignUnboundedChunkFirst(isAssignUnboundedChunkFirst)
.ignoreNoPrimaryKeyTable(ignoreNoPrimaryKeyTable)
.skipSnapshotBackfill(skipSnapshotBackfill);

List<TableId> tableIds = MySqlSchemaUtils.listTables(configFactory.createConfig(0), null);
Expand Down Expand Up @@ -359,6 +362,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(PARSE_ONLINE_SCHEMA_CHANGES);
options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
options.add(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
options.add(IGNORE_NO_PRIMARY_KEY_TABLE);
return options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,4 +330,12 @@ public class MySqlDataSourceOptions {
.defaultValue(false)
.withDescription(
"Whether to skip backfill in snapshot reading phase. If backfill is skipped, changes on captured tables during snapshot phase will be consumed later in change log reading phase instead of being merged into the snapshot.WARNING: Skipping backfill might lead to data inconsistency because some change log events happened within the snapshot phase might be replayed (only at-least-once semantic is promised). For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially.");

@Experimental
public static final ConfigOption<Boolean> IGNORE_NO_PRIMARY_KEY_TABLE =
ConfigOptions.key("ignore-no-primary-key-table")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to ignore tables without primary key in MySQL. When enabled, the connector will skip tables.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,13 @@ protected void processElement(
private void sendCreateTableEvent(
JdbcConnection jdbc, TableId tableId, SourceOutput<Event> output) {
Schema schema = getSchema(jdbc, tableId);
// Check if table has primary key and ignore-no-primary-key-table is enabled
if (schema.primaryKeys().isEmpty() && sourceConfig.isIgnoreNoPrimaryKeyTable()) {
LOG.warn(
"Table {} has no primary key and ignore-no-primary-key-table is set to true, skipping table creation.",
tableId);
return;
}
output.collect(
new CreateTableEvent(
org.apache.flink.cdc.common.event.TableId.tableId(
Expand Down Expand Up @@ -263,6 +270,13 @@ private Map<TableId, CreateTableEvent> generateCreateTableEvent(
jdbc, sourceConfig.getDatabaseFilter(), sourceConfig.getTableFilter());
for (TableId tableId : capturedTableIds) {
Schema schema = getSchema(jdbc, tableId);
// Skip tables without primary keys if ignore-no-primary-key-table is enabled
if (schema.primaryKeys().isEmpty() && sourceConfig.isIgnoreNoPrimaryKeyTable()) {
LOG.warn(
"Table {} has no primary key and ignore-no-primary-key-table is set to true, skipping table creation.",
tableId);
continue;
}
createTableEventCache.put(
tableId,
new CreateTableEvent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,18 @@ private Optional<SourceRecord> parseOnLineSchemaChangeEvent(SourceRecord sourceR
private boolean shouldEmit(SourceRecord sourceRecord) {
if (RecordUtils.isDataChangeRecord(sourceRecord)) {
TableId tableId = RecordUtils.getTableId(sourceRecord);
// Skip events for tables without primary keys if ignore-no-primary-key-table is enabled
if (statefulTaskContext.getSourceConfig().isIgnoreNoPrimaryKeyTable()
&& statefulTaskContext
.getDatabaseSchema()
.tableFor(tableId)
.primaryKeyColumns()
.isEmpty()) {
LOG.warn(
"Table {} has no primary key and ignore-no-primary-key-table is set to true, skipping binlog event.",
tableId);
return false;
}
if (pureBinlogPhaseTables.contains(tableId)) {
return true;
}
Expand All @@ -267,7 +279,8 @@ private boolean shouldEmit(SourceRecord sourceRecord) {
ChunkUtils.getChunkKeyColumnType(
statefulTaskContext.getDatabaseSchema().tableFor(tableId),
statefulTaskContext.getSourceConfig().getChunkKeyColumns(),
statefulTaskContext.getSourceConfig().isTreatTinyInt1AsBoolean());
statefulTaskContext.getSourceConfig().isTreatTinyInt1AsBoolean(),
statefulTaskContext.getSourceConfig().isIgnoreNoPrimaryKeyTable());

Struct target = RecordUtils.getStructContainsChunkKey(sourceRecord);
Object[] chunkKey =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,13 @@ public List<MySqlSnapshotSplit> splitChunks(MySqlPartition partition, TableId ta
throws Exception {
if (!hasNextChunk()) {
analyzeTable(partition, tableId);
// Skip tables without primary key
if (splitColumn == null && sourceConfig.isIgnoreNoPrimaryKeyTable()) {
LOG.warn(
"Table {} doesn't have primary key and ignore-no-primary-key-table is set to true, skipping incremental snapshot.",
tableId);
return Collections.emptyList();
}
Optional<List<MySqlSnapshotSplit>> evenlySplitChunks =
trySplitAllEvenlySizedChunks(partition, tableId);
if (evenlySplitChunks.isPresent()) {
Expand All @@ -133,6 +140,13 @@ public List<MySqlSnapshotSplit> splitChunks(MySqlPartition partition, TableId ta
"Can not split a new table before the previous table splitting finish.");
if (currentSplittingTable == null) {
analyzeTable(partition, currentSplittingTableId);
// Skip tables without primary key
if (splitColumn == null && sourceConfig.isIgnoreNoPrimaryKeyTable()) {
LOG.warn(
"Table {} doesn't have primary key and ignore-no-primary-key-table is set to true, skipping incremental snapshot.",
currentSplittingTableId);
return Collections.emptyList();
}
}
synchronized (lock) {
return Collections.singletonList(splitOneUnevenlySizedChunk(partition, tableId));
Expand All @@ -145,12 +159,22 @@ private void analyzeTable(MySqlPartition partition, TableId tableId) {
try {
currentSplittingTable =
mySqlSchema.getTableSchema(partition, jdbcConnection, tableId).getTable();
splitColumn =
ChunkUtils.getChunkKeyColumn(
currentSplittingTable, sourceConfig.getChunkKeyColumns());
splitColumn = getChunkKeyColumn(currentSplittingTable);
if (splitColumn == null && sourceConfig.isIgnoreNoPrimaryKeyTable()) {
LOG.warn(
"Table {} doesn't have primary key and ignore-no-primary-key-table is set to true, skipping incremental snapshot.",
tableId);
currentSplittingTableId = null;
nextChunkStart = null;
nextChunkId = null;
return;
}
splitType =
ChunkUtils.getChunkKeyColumnType(
splitColumn, sourceConfig.isTreatTinyInt1AsBoolean());
currentSplittingTable,
sourceConfig.getChunkKeyColumns(),
sourceConfig.isTreatTinyInt1AsBoolean(),
sourceConfig.isIgnoreNoPrimaryKeyTable());
minMaxOfSplitColumn =
StatementUtils.queryMinMax(jdbcConnection, tableId, splitColumn.name());
approximateRowCnt = StatementUtils.queryApproximateRowCnt(jdbcConnection, tableId);
Expand Down Expand Up @@ -479,4 +503,9 @@ public void close() throws Exception {
}
mySqlSchema.close();
}

private Column getChunkKeyColumn(Table table) {
return ChunkUtils.getChunkKeyColumn(
table, sourceConfig.getChunkKeyColumns(), sourceConfig.isIgnoreNoPrimaryKeyTable());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class MySqlSourceConfig implements Serializable {
private final boolean parseOnLineSchemaChanges;
public static boolean useLegacyJsonFormat = true;
private final boolean assignUnboundedChunkFirst;
private final boolean ignoreNoPrimaryKeyTable;

// --------------------------------------------------------------------------------------------
// Debezium Configurations
Expand Down Expand Up @@ -108,7 +109,8 @@ public class MySqlSourceConfig implements Serializable {
boolean parseOnLineSchemaChanges,
boolean treatTinyInt1AsBoolean,
boolean useLegacyJsonFormat,
boolean assignUnboundedChunkFirst) {
boolean assignUnboundedChunkFirst,
boolean ignoreNoPrimaryKeyTable) {
this.hostname = checkNotNull(hostname);
this.port = port;
this.username = checkNotNull(username);
Expand Down Expand Up @@ -152,6 +154,7 @@ public class MySqlSourceConfig implements Serializable {
this.treatTinyInt1AsBoolean = treatTinyInt1AsBoolean;
this.useLegacyJsonFormat = useLegacyJsonFormat;
this.assignUnboundedChunkFirst = assignUnboundedChunkFirst;
this.ignoreNoPrimaryKeyTable = ignoreNoPrimaryKeyTable;
}

public String getHostname() {
Expand Down Expand Up @@ -285,4 +288,8 @@ public boolean isSkipSnapshotBackfill() {
public boolean isTreatTinyInt1AsBoolean() {
return treatTinyInt1AsBoolean;
}

public boolean isIgnoreNoPrimaryKeyTable() {
return ignoreNoPrimaryKeyTable;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class MySqlSourceConfigFactory implements Serializable {
private boolean treatTinyInt1AsBoolean = true;
private boolean useLegacyJsonFormat = true;
private boolean assignUnboundedChunkFirst = false;
private boolean ignoreNoPrimaryKeyTable = false;

public MySqlSourceConfigFactory hostname(String hostname) {
this.hostname = hostname;
Expand Down Expand Up @@ -280,6 +281,15 @@ public MySqlSourceConfigFactory skipSnapshotBackfill(boolean skipSnapshotBackfil
return this;
}

/**
* Whether to ignore tables without primary key. When enabled, the connector will skip tables
* that don't have a primary key.
*/
public MySqlSourceConfigFactory ignoreNoPrimaryKeyTable(boolean ignoreNoPrimaryKeyTable) {
this.ignoreNoPrimaryKeyTable = ignoreNoPrimaryKeyTable;
return this;
}

/**
* Whether to use legacy json format. The default value is true, which means there is no
* whitespace before value and after comma in json format.
Expand Down Expand Up @@ -421,6 +431,7 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) {
parseOnLineSchemaChanges,
treatTinyInt1AsBoolean,
useLegacyJsonFormat,
assignUnboundedChunkFirst);
assignUnboundedChunkFirst,
ignoreNoPrimaryKeyTable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -293,4 +293,12 @@ public class MySqlSourceOptions {
.defaultValue(false)
.withDescription(
"Whether to assign the unbounded chunks first during snapshot reading phase. This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk. Defaults to false.");

@Experimental
public static final ConfigOption<Boolean> IGNORE_NO_PRIMARY_KEY_TABLE =
ConfigOptions.key("ignore-no-primary-key-table")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to ignore tables without primary key in MySQL. When enabled, the connector will skip tables.");
}
Loading