Skip to content

Commit b89d84f

Browse files
authored
[FLINK-37566][source-connector/mysql] Avoid processing binlog event of excluded tables
This closes #3976.
1 parent e05f797 commit b89d84f

File tree

4 files changed

+310
-7
lines changed

4 files changed

+310
-7
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,149 @@ void testInitialStartupMode() throws Exception {
255255
.isEqualTo(expectedBinlog);
256256
}
257257

258+
@Test
259+
void testExcludeTables() throws Exception {
260+
inventoryDatabase.createAndInitialize();
261+
String databaseName = inventoryDatabase.getDatabaseName();
262+
MySqlSourceConfigFactory configFactory =
263+
new MySqlSourceConfigFactory()
264+
.hostname(MYSQL8_CONTAINER.getHost())
265+
.port(MYSQL8_CONTAINER.getDatabasePort())
266+
.username(TEST_USER)
267+
.password(TEST_PASSWORD)
268+
.databaseList(databaseName)
269+
.tableList(databaseName + ".*")
270+
.excludeTableList(
271+
String.format(
272+
"%s.customers, %s.orders, %s.multi_max_table",
273+
databaseName, databaseName, databaseName))
274+
.startupOptions(StartupOptions.initial())
275+
.serverId(getServerId(env.getParallelism()))
276+
.serverTimeZone("UTC")
277+
.includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue());
278+
279+
FlinkSourceProvider sourceProvider =
280+
(FlinkSourceProvider) new MySqlDataSource(configFactory).getEventSourceProvider();
281+
CloseableIterator<Event> events =
282+
env.fromSource(
283+
sourceProvider.getSource(),
284+
WatermarkStrategy.noWatermarks(),
285+
MySqlDataSourceFactory.IDENTIFIER,
286+
new EventTypeInfo())
287+
.executeAndCollect();
288+
Thread.sleep(10_000);
289+
290+
TableId tableId = TableId.tableId(databaseName, "products");
291+
CreateTableEvent createTableEvent = getProductsCreateTableEvent(tableId);
292+
293+
// generate snapshot data
294+
List<Event> expectedSnapshot = getSnapshotExpected(tableId);
295+
296+
List<Event> expectedBinlog = new ArrayList<>();
297+
try (Connection connection = inventoryDatabase.getJdbcConnection();
298+
Statement statement = connection.createStatement()) {
299+
expectedBinlog.addAll(executeAlterAndProvideExpected(tableId, statement));
300+
301+
RowType rowType =
302+
RowType.of(
303+
new DataType[] {
304+
DataTypes.INT().notNull(),
305+
DataTypes.VARCHAR(255).notNull(),
306+
DataTypes.FLOAT(),
307+
DataTypes.VARCHAR(45),
308+
DataTypes.VARCHAR(55)
309+
},
310+
new String[] {"id", "name", "weight", "col1", "col2"});
311+
BinaryRecordDataGenerator generator = new BinaryRecordDataGenerator(rowType);
312+
// insert more data
313+
statement.execute(
314+
String.format(
315+
"INSERT INTO `%s`.`products` VALUES (default,'scooter',5.5,'c-10','c-20');",
316+
databaseName)); // 110
317+
expectedBinlog.add(
318+
DataChangeEvent.insertEvent(
319+
tableId,
320+
generator.generate(
321+
new Object[] {
322+
110,
323+
BinaryStringData.fromString("scooter"),
324+
5.5f,
325+
BinaryStringData.fromString("c-10"),
326+
BinaryStringData.fromString("c-20")
327+
})));
328+
statement.execute(
329+
String.format(
330+
"INSERT INTO `%s`.`products` VALUES (default,'football',6.6,'c-11','c-21');",
331+
databaseName)); // 111
332+
expectedBinlog.add(
333+
DataChangeEvent.insertEvent(
334+
tableId,
335+
generator.generate(
336+
new Object[] {
337+
111,
338+
BinaryStringData.fromString("football"),
339+
6.6f,
340+
BinaryStringData.fromString("c-11"),
341+
BinaryStringData.fromString("c-21")
342+
})));
343+
statement.execute(
344+
String.format(
345+
"UPDATE `%s`.`products` SET `col1`='c-12', `col2`='c-22' WHERE id=110;",
346+
databaseName));
347+
expectedBinlog.add(
348+
DataChangeEvent.updateEvent(
349+
tableId,
350+
generator.generate(
351+
new Object[] {
352+
110,
353+
BinaryStringData.fromString("scooter"),
354+
5.5f,
355+
BinaryStringData.fromString("c-10"),
356+
BinaryStringData.fromString("c-20")
357+
}),
358+
generator.generate(
359+
new Object[] {
360+
110,
361+
BinaryStringData.fromString("scooter"),
362+
5.5f,
363+
BinaryStringData.fromString("c-12"),
364+
BinaryStringData.fromString("c-22")
365+
})));
366+
statement.execute(
367+
String.format("DELETE FROM `%s`.`products` WHERE `id` = 111;", databaseName));
368+
expectedBinlog.add(
369+
DataChangeEvent.deleteEvent(
370+
tableId,
371+
generator.generate(
372+
new Object[] {
373+
111,
374+
BinaryStringData.fromString("football"),
375+
6.6f,
376+
BinaryStringData.fromString("c-11"),
377+
BinaryStringData.fromString("c-21")
378+
})));
379+
// Make some change of excluded tables.
380+
statement.execute(
381+
String.format(
382+
"INSERT INTO `%s`.`customers` VALUES(1,\"Anne\",\"Kretchmar\",\"mark@noanswer.org\");",
383+
databaseName));
384+
statement.execute(
385+
String.format(
386+
"INSERT INTO `%s`.`customers` VALUES(2,\"Anne\",\"Kretchmar\",\"mark2@noanswer.org\");",
387+
databaseName));
388+
}
389+
// In this configuration, several subtasks might emit their corresponding CreateTableEvent
390+
// to downstream. Since it is not possible to predict how many CreateTableEvents should we
391+
// expect, we simply filter them out from expected sets, and assert there's at least one.
392+
List<Event> actual =
393+
fetchResultsExcept(
394+
events, expectedSnapshot.size() + expectedBinlog.size(), createTableEvent);
395+
assertThat(actual.subList(0, expectedSnapshot.size()))
396+
.containsExactlyInAnyOrder(expectedSnapshot.toArray(new Event[0]));
397+
assertThat(actual.subList(expectedSnapshot.size(), actual.size()))
398+
.isEqualTo(expectedBinlog);
399+
}
400+
258401
private static <T> List<T> fetchResultsExcept(Iterator<T> iter, int size, T sideEvent) {
259402
List<T> result = new ArrayList<>(size);
260403
List<T> sideResults = new ArrayList<>();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Copyright Debezium Authors.
3+
*
4+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
*/
6+
package io.debezium.relational;
7+
8+
import io.debezium.config.Configuration;
9+
import io.debezium.relational.Selectors.TableIdToStringMapper;
10+
import io.debezium.relational.Selectors.TableSelectionPredicateBuilder;
11+
import io.debezium.relational.Tables.TableFilter;
12+
import io.debezium.relational.history.DatabaseHistory;
13+
import io.debezium.schema.DataCollectionFilters;
14+
15+
import java.util.function.Predicate;
16+
17+
import static io.debezium.relational.RelationalDatabaseConnectorConfig.COLUMN_BLACKLIST;
18+
import static io.debezium.relational.RelationalDatabaseConnectorConfig.COLUMN_EXCLUDE_LIST;
19+
20+
/**
21+
* Copied from Debezium 1.9.8.Final.
22+
*
23+
* <p>Line 146: add a method to update the tableFilter variable.
24+
*/
25+
public class RelationalTableFilters implements DataCollectionFilters {
26+
27+
// Filter that filters tables based only on datbase/schema/system table filters but not table
28+
// filters
29+
// Represents the list of tables whose schema needs to be captured
30+
private final TableFilter eligibleTableFilter;
31+
// Filter that filters tables based on table filters
32+
private TableFilter tableFilter;
33+
private final Predicate<String> databaseFilter;
34+
private final String excludeColumns;
35+
36+
/**
37+
* Evaluate whether the table is eligible for schema snapshotting or not. This closely relates
38+
* to fact whether only captured tables schema should be stored in database history or all
39+
* tables schema.
40+
*/
41+
private final TableFilter schemaSnapshotFilter;
42+
43+
public RelationalTableFilters(
44+
Configuration config,
45+
TableFilter systemTablesFilter,
46+
TableIdToStringMapper tableIdMapper) {
47+
// Define the filter that provides the list of tables that could be captured if configured
48+
final TableSelectionPredicateBuilder eligibleTables =
49+
Selectors.tableSelector()
50+
.includeDatabases(
51+
config.getFallbackStringProperty(
52+
RelationalDatabaseConnectorConfig.DATABASE_INCLUDE_LIST,
53+
RelationalDatabaseConnectorConfig.DATABASE_WHITELIST))
54+
.excludeDatabases(
55+
config.getFallbackStringProperty(
56+
RelationalDatabaseConnectorConfig.DATABASE_EXCLUDE_LIST,
57+
RelationalDatabaseConnectorConfig.DATABASE_BLACKLIST))
58+
.includeSchemas(
59+
config.getFallbackStringProperty(
60+
RelationalDatabaseConnectorConfig.SCHEMA_INCLUDE_LIST,
61+
RelationalDatabaseConnectorConfig.SCHEMA_WHITELIST))
62+
.excludeSchemas(
63+
config.getFallbackStringProperty(
64+
RelationalDatabaseConnectorConfig.SCHEMA_EXCLUDE_LIST,
65+
RelationalDatabaseConnectorConfig.SCHEMA_BLACKLIST));
66+
final Predicate<TableId> eligibleTablePredicate = eligibleTables.build();
67+
68+
Predicate<TableId> finalEligibleTablePredicate =
69+
config.getBoolean(RelationalDatabaseConnectorConfig.TABLE_IGNORE_BUILTIN)
70+
? eligibleTablePredicate.and(systemTablesFilter::isIncluded)
71+
: eligibleTablePredicate;
72+
73+
this.eligibleTableFilter = finalEligibleTablePredicate::test;
74+
75+
// Define the filter using the include and exclude lists for tables and database names ...
76+
Predicate<TableId> tablePredicate =
77+
eligibleTables
78+
.includeTables(
79+
config.getFallbackStringProperty(
80+
RelationalDatabaseConnectorConfig.TABLE_INCLUDE_LIST,
81+
RelationalDatabaseConnectorConfig.TABLE_WHITELIST),
82+
tableIdMapper)
83+
.excludeTables(
84+
config.getFallbackStringProperty(
85+
RelationalDatabaseConnectorConfig.TABLE_EXCLUDE_LIST,
86+
RelationalDatabaseConnectorConfig.TABLE_BLACKLIST),
87+
tableIdMapper)
88+
.build();
89+
90+
Predicate<TableId> finalTablePredicate =
91+
config.getBoolean(RelationalDatabaseConnectorConfig.TABLE_IGNORE_BUILTIN)
92+
? tablePredicate.and(systemTablesFilter::isIncluded)
93+
: tablePredicate;
94+
95+
this.tableFilter = finalTablePredicate::test;
96+
97+
// Define the database filter using the include and exclude lists for database names ...
98+
this.databaseFilter =
99+
Selectors.databaseSelector()
100+
.includeDatabases(
101+
config.getFallbackStringProperty(
102+
RelationalDatabaseConnectorConfig.DATABASE_INCLUDE_LIST,
103+
RelationalDatabaseConnectorConfig.DATABASE_WHITELIST))
104+
.excludeDatabases(
105+
config.getFallbackStringProperty(
106+
RelationalDatabaseConnectorConfig.DATABASE_EXCLUDE_LIST,
107+
RelationalDatabaseConnectorConfig.DATABASE_BLACKLIST))
108+
.build();
109+
110+
Predicate<TableId> eligibleSchemaPredicate =
111+
config.getBoolean(RelationalDatabaseConnectorConfig.TABLE_IGNORE_BUILTIN)
112+
? systemTablesFilter::isIncluded
113+
: x -> true;
114+
115+
this.schemaSnapshotFilter =
116+
config.getBoolean(DatabaseHistory.STORE_ONLY_CAPTURED_TABLES_DDL)
117+
? eligibleSchemaPredicate.and(tableFilter::isIncluded)::test
118+
: eligibleSchemaPredicate::test;
119+
120+
this.excludeColumns =
121+
config.getFallbackStringProperty(COLUMN_EXCLUDE_LIST, COLUMN_BLACKLIST);
122+
}
123+
124+
@Override
125+
public TableFilter dataCollectionFilter() {
126+
return tableFilter;
127+
}
128+
129+
public TableFilter eligibleDataCollectionFilter() {
130+
return eligibleTableFilter;
131+
}
132+
133+
public TableFilter eligibleForSchemaDataCollectionFilter() {
134+
return schemaSnapshotFilter;
135+
}
136+
137+
public Predicate<String> databaseFilter() {
138+
return databaseFilter;
139+
}
140+
141+
public String getExcludeColumns() {
142+
return excludeColumns;
143+
}
144+
145+
public void setDataCollectionFilters(TableFilter tableFilter) {
146+
this.tableFilter = tableFilter;
147+
}
148+
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/table/DebeziumOptions.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ public static Properties getDebeziumProperties(Map<String, String> properties) {
3838
debeziumProperties.put(subKey, value);
3939
});
4040
}
41+
if (properties.containsKey("table.include.list")
42+
|| properties.containsKey("table.exclude.list")) {
43+
throw new IllegalArgumentException(
44+
"table.include.list and table.exclude.list are not supported to set manually, please remove these options.");
45+
}
4146
return debeziumProperties;
4247
}
4348

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import io.debezium.connector.mysql.MySqlConnectorConfig;
2727
import io.debezium.relational.RelationalTableFilters;
2828
import io.debezium.relational.TableId;
29+
import io.debezium.relational.Tables;
2930

3031
import javax.annotation.Nullable;
3132

@@ -132,6 +133,18 @@ public class MySqlSourceConfig implements Serializable {
132133
this.dbzProperties = checkNotNull(dbzProperties);
133134
this.dbzConfiguration = Configuration.from(dbzProperties);
134135
this.dbzMySqlConfig = new MySqlConnectorConfig(dbzConfiguration);
136+
Selectors excludeTableFilter =
137+
(excludeTableList == null
138+
? null
139+
: new Selectors.SelectorsBuilder().includeTables(excludeTableList).build());
140+
Tables.TableFilter tableFilter = dbzMySqlConfig.getTableFilters().dataCollectionFilter();
141+
dbzMySqlConfig
142+
.getTableFilters()
143+
.setDataCollectionFilters(
144+
(TableId tableId) ->
145+
tableFilter.isIncluded(tableId)
146+
&& (excludeTableFilter == null
147+
|| !excludeTableFilter.isMatch(tableId)));
135148
this.jdbcProperties = jdbcProperties;
136149
this.chunkKeyColumns = chunkKeyColumns;
137150
this.skipSnapshotBackfill = skipSnapshotBackfill;
@@ -254,13 +267,7 @@ public Predicate<String> getDatabaseFilter() {
254267

255268
public Predicate<TableId> getTableFilter() {
256269
RelationalTableFilters tableFilters = dbzMySqlConfig.getTableFilters();
257-
Selectors excludeTableFilter =
258-
(excludeTableList == null
259-
? null
260-
: new Selectors.SelectorsBuilder().includeTables(excludeTableList).build());
261-
return (TableId tableId) ->
262-
tableFilters.dataCollectionFilter().isIncluded(tableId)
263-
&& (excludeTableFilter == null || !excludeTableFilter.isMatch(tableId));
270+
return tableId -> tableFilters.dataCollectionFilter().isIncluded(tableId);
264271
}
265272

266273
public Properties getJdbcProperties() {

0 commit comments

Comments
 (0)