From 49a0a097cffe048635558eed30c204a419e8ea19 Mon Sep 17 00:00:00 2001 From: ConradJam Date: Thu, 7 Aug 2025 20:38:52 +0800 Subject: [PATCH] MySqlDatabaseSchema include gh-ost ddl in HistorizedRelationalDatabaseSchema --- .../debezium/relational/ddl/DdlChanges.java | 339 ++++++++++ .../connector/mysql/MySqlDatabaseSchema.java | 579 ++++++++++++++++++ .../mysql/debezium/DebeziumUtils.java | 7 +- .../task/context/StatefulTaskContext.java | 5 +- .../connectors/mysql/schema/MySqlSchema.java | 5 +- 5 files changed, 931 insertions(+), 4 deletions(-) create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/io/debezium/relational/ddl/DdlChanges.java create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlDatabaseSchema.java diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/io/debezium/relational/ddl/DdlChanges.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/io/debezium/relational/ddl/DdlChanges.java new file mode 100644 index 00000000000..3b8df041897 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/io/debezium/relational/ddl/DdlChanges.java @@ -0,0 +1,339 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.relational.ddl; + +import io.debezium.annotation.NotThreadSafe; +import io.debezium.relational.RelationalTableFilters; +import io.debezium.relational.TableId; +import io.debezium.relational.Tables.TableFilter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.function.Predicate; +import java.util.regex.Pattern; + +/** + * Copied from Debezium project. A {@link DdlParserListener} that accumulates changes, allowing them + * to be consumed in the same order by database. + * + * @author Randall Hauch + */ +@NotThreadSafe +public class DdlChanges implements DdlParserListener { + + private static final Logger LOGGER = LoggerFactory.getLogger(DdlChanges.class); + private final String terminator; + private final List events = new ArrayList<>(); + private final Set databaseNames = new HashSet<>(); + private static final Pattern GHOST_TABLE_PATTERN = Pattern.compile("^_(.*)_(del|gho|ghc)$"); + + /** Create a new changes object with ';' as the terminator token. */ + public DdlChanges() { + this(null); + } + + /** + * Create a new changes object with the designated terminator token. + * + * @param terminator the token used to terminate each statement; may be null + */ + public DdlChanges(String terminator) { + this.terminator = terminator != null ? terminator : ";"; + } + + /** + * Clear all accumulated changes. + * + * @return this object for method chaining; never null + */ + public DdlChanges reset() { + events.clear(); + databaseNames.clear(); + return this; + } + + @Override + public void handle(Event event) { + events.add(event); + databaseNames.add(getDatabase(event)); + } + + /** + * Consume the events in the same order they were {@link + * #handle(io.debezium.relational.ddl.DdlParserListener.Event) recorded}, but grouped by + * database name. Multiple sequential statements that were applied to the same database are + * grouped together. + * + * @param consumer the consumer + */ + public void groupStatementStringsByDatabase(DatabaseStatementStringConsumer consumer) { + groupEventsByDatabase( + (DatabaseEventConsumer) + (dbName, eventList) -> { + final StringBuilder statements = new StringBuilder(); + final Set tables = new HashSet<>(); + eventList.forEach( + event -> { + statements.append(event.statement()); + statements.append(terminator); + addTable(tables, event); + }); + consumer.consume(dbName, tables, statements.toString()); + }); + } + + private void addTable(final Set tables, Event event) { + if (event instanceof TableEvent) { + tables.add(((TableEvent) event).tableId()); + } + } + + /** + * Consume the events in the same order they were {@link + * #handle(io.debezium.relational.ddl.DdlParserListener.Event) recorded}, but grouped by + * database name. Multiple sequential statements that were applied to the same database are + * grouped together. + * + * @param consumer the consumer + */ + public void groupStatementsByDatabase(DatabaseStatementConsumer consumer) { + groupEventsByDatabase( + (DatabaseEventConsumer) + (dbName, eventList) -> { + List statements = new ArrayList<>(); + final Set tables = new HashSet<>(); + eventList.forEach( + event -> { + statements.add(event.statement()); + addTable(tables, event); + }); + consumer.consume(dbName, tables, statements); + }); + } + + /** + * Consume the events in the same order they were {@link + * #handle(io.debezium.relational.ddl.DdlParserListener.Event) recorded}, but grouped by + * database name. Multiple sequential statements that were applied to the same database are + * grouped together. + * + * @param consumer the consumer + */ + public void groupEventsByDatabase(DatabaseEventConsumer consumer) { + if (isEmpty()) { + return; + } + if (databaseNames.size() <= 1) { + consumer.consume(databaseNames.iterator().next(), events); + return; + } + List dbEvents = new ArrayList<>(); + String currentDatabase = null; + for (Event event : events) { + String dbName = getDatabase(event); + if (currentDatabase == null || dbName.equals(currentDatabase)) { + currentDatabase = dbName; + // Accumulate the statement ... + dbEvents.add(event); + } else { + // Submit the statements ... + consumer.consume(currentDatabase, dbEvents); + } + } + } + + /** + * Consume the events in the same order they were {@link + * #handle(io.debezium.relational.ddl.DdlParserListener.Event) recorded}, but grouped by + * database name. Multiple sequential statements that were applied to the same database are + * grouped together. + * + * @param consumer the consumer + */ + public void getEventsByDatabase(DatabaseEventConsumer consumer) { + if (isEmpty()) { + return; + } + if (databaseNames.size() <= 1) { + consumer.consume(databaseNames.iterator().next(), events); + return; + } + List dbEvents = new ArrayList<>(); + String currentDatabase = null; + for (Event event : events) { + String dbName = getDatabase(event); + if (currentDatabase == null || dbName.equals(currentDatabase)) { + currentDatabase = dbName; + // Accumulate the statement ... + dbEvents.add(event); + } else { + // Submit the statements ... + consumer.consume(currentDatabase, dbEvents); + dbEvents = new ArrayList<>(); + currentDatabase = dbName; + // Accumulate the statement ... + dbEvents.add(event); + } + } + if (!dbEvents.isEmpty()) { + consumer.consume(currentDatabase, dbEvents); + } + } + + protected String getDatabase(Event event) { + switch (event.type()) { + case CREATE_TABLE: + case ALTER_TABLE: + case DROP_TABLE: + case TRUNCATE_TABLE: + TableEvent tableEvent = (TableEvent) event; + return tableEvent.tableId().catalog(); + case CREATE_INDEX: + case DROP_INDEX: + TableIndexEvent tableIndexEvent = (TableIndexEvent) event; + return tableIndexEvent.tableId().catalog(); + case CREATE_DATABASE: + case ALTER_DATABASE: + case DROP_DATABASE: + case USE_DATABASE: + DatabaseEvent dbEvent = (DatabaseEvent) event; + return dbEvent.databaseName(); + case SET_VARIABLE: + SetVariableEvent varEvent = (SetVariableEvent) event; + return varEvent.databaseName().orElse(""); + } + assert false : "Should never happen"; + return null; + } + + public boolean isEmpty() { + return events.isEmpty(); + } + + public boolean applyToMoreDatabasesThan(String name) { + return databaseNames.contains(name) ? databaseNames.size() > 1 : databaseNames.size() > 0; + } + + @Override + public String toString() { + return events.toString(); + } + + public static interface DatabaseEventConsumer { + void consume(String databaseName, List events); + } + + public static interface DatabaseStatementConsumer { + void consume(String databaseName, Set tableList, List ddlStatements); + } + + public static interface DatabaseStatementStringConsumer { + void consume(String databaseName, Set tableList, String ddlStatements); + } + + /** + * @return true if any event stored is one of + *
    + *
  • database-wide events and affects included/excluded database + *
  • table related events and the table is included + *
  • events that set a variable and either affects included database or is a system-wide + * variable + *
      + */ + @Deprecated + public boolean anyMatch(Predicate databaseFilter, Predicate tableFilter) { + return events.stream() + .anyMatch( + event -> + (event instanceof DatabaseEvent) + && databaseFilter.test( + ((DatabaseEvent) event).databaseName()) + || (event instanceof TableEvent) + && tableFilter.test(((TableEvent) event).tableId()) + || (event instanceof SetVariableEvent) + && (!((SetVariableEvent) event) + .databaseName() + .isPresent() + || databaseFilter.test( + ((SetVariableEvent) event) + .databaseName() + .get()))); + } + + /** + * @return true if any event stored is one of + *
        + *
      • database-wide events and affects included/excluded database + *
      • table related events and the table is included + *
      • events that set a variable and either affects included database or is a system-wide + * variable + *
          + */ + // TODO javadoc + public boolean anyMatch(RelationalTableFilters filters) { + Predicate databaseFilter = filters.databaseFilter(); + TableFilter tableFilter = filters.dataCollectionFilter(); + return events.stream() + .anyMatch( + event -> + (event instanceof DatabaseEvent) + && databaseFilter.test( + ((DatabaseEvent) event).databaseName()) + || (event instanceof TableEvent) + && tableFilter.isIncluded( + ((TableEvent) event).tableId()) + || (event instanceof SetVariableEvent) + && (!((SetVariableEvent) event) + .databaseName() + .isPresent() + || databaseFilter.test( + ((SetVariableEvent) event) + .databaseName() + .get()))); + } + + public boolean anyMatch(RelationalTableFilters filters, boolean parserOnlineDDL) { + Predicate databaseFilter = filters.databaseFilter(); + TableFilter tableFilter = filters.dataCollectionFilter(); + return events.stream() + .anyMatch( + event -> { + if (event instanceof DatabaseEvent) { + DatabaseEvent dbEvent = (DatabaseEvent) event; + return databaseFilter.test(dbEvent.databaseName()); + } + if (event instanceof TableEvent) { + TableEvent tableEvent = (TableEvent) event; + TableId tableId = tableEvent.tableId(); + boolean isIncludedByFilter = tableFilter.isIncluded(tableId); + boolean isGhostTable = + parserOnlineDDL + && tableId != null + && GHOST_TABLE_PATTERN + .matcher(tableId.table()) + .matches(); + LOGGER.info( + "isIncludedByFilter:{},isGhostTable:{},result:{}", + isIncludedByFilter, + isGhostTable, + isIncludedByFilter || isGhostTable); + return isIncludedByFilter || isGhostTable; + } + if (event instanceof SetVariableEvent) { + SetVariableEvent varEvent = (SetVariableEvent) event; + return !varEvent.databaseName().isPresent() + || (varEvent.databaseName().isPresent() + && databaseFilter.test( + varEvent.databaseName().get())); + } + return false; + }); + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlDatabaseSchema.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlDatabaseSchema.java new file mode 100644 index 00000000000..be58acb1dfa --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlDatabaseSchema.java @@ -0,0 +1,579 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.mysql; + +import io.debezium.annotation.NotThreadSafe; +import io.debezium.connector.mysql.MySqlSystemVariables.MySqlScope; +import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser; +import io.debezium.relational.HistorizedRelationalDatabaseSchema; +import io.debezium.relational.RelationalTableFilters; +import io.debezium.relational.SystemVariables; +import io.debezium.relational.Table; +import io.debezium.relational.TableId; +import io.debezium.relational.TableSchema; +import io.debezium.relational.TableSchemaBuilder; +import io.debezium.relational.Tables; +import io.debezium.relational.ddl.DdlChanges; +import io.debezium.relational.ddl.DdlChanges.DatabaseStatementStringConsumer; +import io.debezium.relational.ddl.DdlParser; +import io.debezium.relational.ddl.DdlParserListener.Event; +import io.debezium.relational.ddl.DdlParserListener.SetVariableEvent; +import io.debezium.relational.ddl.DdlParserListener.TableAlteredEvent; +import io.debezium.relational.ddl.DdlParserListener.TableCreatedEvent; +import io.debezium.relational.ddl.DdlParserListener.TableDroppedEvent; +import io.debezium.relational.ddl.DdlParserListener.TableEvent; +import io.debezium.relational.ddl.DdlParserListener.TableIndexCreatedEvent; +import io.debezium.relational.ddl.DdlParserListener.TableIndexDroppedEvent; +import io.debezium.relational.ddl.DdlParserListener.TableIndexEvent; +import io.debezium.relational.history.DatabaseHistory; +import io.debezium.schema.SchemaChangeEvent; +import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType; +import io.debezium.schema.TopicSelector; +import io.debezium.text.MultipleParsingExceptions; +import io.debezium.text.ParsingException; +import io.debezium.util.Collect; +import io.debezium.util.SchemaNameAdjuster; +import io.debezium.util.Strings; +import org.apache.kafka.connect.data.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.regex.Pattern; + +/** + * Copied from Debezium project. Component that records the schema history for databases hosted by a + * MySQL database server. The schema information includes the {@link Tables table definitions} and + * the Kafka Connect {@link #schemaFor(TableId) Schema}s for each table, where the {@link Schema} + * excludes any columns that have been {@link MySqlConnectorConfig#COLUMN_EXCLUDE_LIST specified} in + * the configuration. + * + *

          The history is changed by {@link #applyDdl(SourceInfo, String, String, + * DatabaseStatementStringConsumer) applying DDL statements}, and every change is {@link + * DatabaseHistory persisted} as defined in the supplied {@link MySqlConnectorConfig MySQL connector + * configuration}. This component can be reconstructed (e.g., on connector restart) and the history + * {@link #loadHistory(SourceInfo) loaded} from persisted storage. + * + *

          Note that when {@link #applyDdl(SourceInfo, String, String, DatabaseStatementStringConsumer) + * applying DDL statements}, the caller is able to supply a {@link DatabaseStatementStringConsumer + * consumer function} that will be called with the DDL statements and the database to which they + * apply, grouped by database names. However, these will only be called based when the databases are + * included by the database filters defined in the {@link MySqlConnectorConfig MySQL connector + * configuration}. + * + * @author Randall Hauch + */ +@NotThreadSafe +public class MySqlDatabaseSchema extends HistorizedRelationalDatabaseSchema { + + private static final Logger LOGGER = LoggerFactory.getLogger(MySqlDatabaseSchema.class); + + private static final Pattern GHOST_TABLE_PATTERN = Pattern.compile("^_(.*)_(del|gho|ghc)$"); + private final Set ignoredQueryStatements = + Collect.unmodifiableSet("BEGIN", "END", "FLUSH PRIVILEGES"); + private final DdlParser ddlParser; + private final RelationalTableFilters filters; + private final DdlChanges ddlChanges; + private final Map tableIdsByTableNumber = new ConcurrentHashMap<>(); + private final Map excludeTableIdsByTableNumber = new ConcurrentHashMap<>(); + private final boolean parseOnLineSchemaChanges; + private boolean storageInitialiationExecuted = false; + private final MySqlConnectorConfig connectorConfig; + + /** + * Create a schema component given the supplied {@link MySqlConnectorConfig MySQL connector + * configuration}. The DDL statements passed to the schema are parsed and a logical model of the + * database schema is created. + */ + public MySqlDatabaseSchema( + MySqlConnectorConfig connectorConfig, + MySqlValueConverters valueConverter, + TopicSelector topicSelector, + SchemaNameAdjuster schemaNameAdjuster, + boolean tableIdCaseInsensitive, + boolean parseOnLineSchemaChanges) { + super( + connectorConfig, + topicSelector, + connectorConfig.getTableFilters().dataCollectionFilter(), + connectorConfig.getColumnFilter(), + new TableSchemaBuilder( + valueConverter, + new MySqlDefaultValueConverter(valueConverter), + schemaNameAdjuster, + connectorConfig.customConverterRegistry(), + connectorConfig.getSourceInfoStructMaker().schema(), + connectorConfig.getSanitizeFieldNames(), + false), + tableIdCaseInsensitive, + connectorConfig.getKeyMapper()); + + this.ddlParser = + new MySqlAntlrDdlParser( + true, + false, + connectorConfig.isSchemaCommentsHistoryEnabled(), + valueConverter, + getTableFilter()); + this.ddlChanges = this.ddlParser.getDdlChanges(); + this.parseOnLineSchemaChanges = parseOnLineSchemaChanges; + this.connectorConfig = connectorConfig; + filters = connectorConfig.getTableFilters(); + } + + /** + * Get all table names for all databases that are monitored whose events are captured by + * Debezium + * + * @return the array with the table names + */ + public String[] capturedTablesAsStringArray() { + final Collection tables = tableIds(); + String[] ret = new String[tables.size()]; + int i = 0; + for (TableId table : tables) { + ret[i++] = table.toString(); + } + return ret; + } + + /** + * Set the system variables on the DDL parser. + * + * @param variables the system variables; may not be null but may be empty + */ + public void setSystemVariables(Map variables) { + variables.forEach( + (varName, value) -> { + ddlParser.systemVariables().setVariable(MySqlScope.SESSION, varName, value); + }); + } + + /** Table is match that gh-ost table,such as _xxxx_gho,_xxxx_ghc,_xxxx_del. */ + private boolean isGhostTable(TableId tableId) { + if (tableId == null || tableId.table() == null) { + return false; + } + return GHOST_TABLE_PATTERN.matcher(tableId.table()).matches(); + } + + /** + * filter tableId,must filter by default.if parseOnLineSchemaChanges is true,will be include + * gh-ost table to schema change history. + */ + private boolean isTableIncluded(String MethodName, TableId tableId) { + boolean isIncludedByOriginalFilter = filters.dataCollectionFilter().isIncluded(tableId); + LOGGER.info( + "MethodName: {},Including isIncludedByOriginalFilter table that would otherwise be filtered: {}," + + "isIncludedByOriginalFilter:{}", + MethodName, + tableId, + isIncludedByOriginalFilter); + if (parseOnLineSchemaChanges) { + if (!isIncludedByOriginalFilter && isGhostTable(tableId)) { + LOGGER.info( + "MethodName: {},Including gh-ost table that would otherwise be filtered: {}", + MethodName, + tableId); + return true; + } + } + return isIncludedByOriginalFilter; + } + + /** + * Get the system variables as known by the DDL parser. + * + * @return the system variables; never null + */ + public SystemVariables systemVariables() { + return ddlParser.systemVariables(); + } + + protected void appendDropTableStatement(StringBuilder sb, TableId tableId) { + sb.append("DROP TABLE ") + .append(tableId) + .append(" IF EXISTS;") + .append(System.lineSeparator()); + } + + protected void appendCreateTableStatement(StringBuilder sb, Table table) { + sb.append("CREATE TABLE ").append(table.id()).append(';').append(System.lineSeparator()); + } + + /** Discard any currently-cached schemas and rebuild them using the filters. */ + protected void refreshSchemas() { + clearSchemas(); + // Create TableSchema instances for any existing table ... + this.tableIds() + .forEach( + id -> { + if (isTableIncluded("refreshSchemas", id)) { + Table table = this.tableFor(id); + buildAndRegisterSchema(table); + } + }); + } + + public boolean isGlobalSetVariableStatement(String ddl, String databaseName) { + return (databaseName == null || databaseName.isEmpty()) + && ddl != null + && ddl.toUpperCase().startsWith("SET "); + } + + @Override + public void applySchemaChange(SchemaChangeEvent schemaChange) { + switch (schemaChange.getType()) { + case CREATE: + case ALTER: + schemaChange.getTableChanges().forEach(x -> buildAndRegisterSchema(x.getTable())); + break; + case DROP: + schemaChange.getTableChanges().forEach(x -> removeSchema(x.getId())); + break; + default: + } + + // Record the DDL statement so that we can later recover them if needed. We do this _after_ + // writing the + // schema change records so that failure recovery (which is based on of the history) won't + // lose + // schema change records. + // We are storing either + // - all DDLs if configured + // - or global SET variables + // - or DDLs for monitored objects + if (!databaseHistory.storeOnlyCapturedTables() + || isGlobalSetVariableStatement(schemaChange.getDdl(), schemaChange.getDatabase()) + || schemaChange.getTables().stream() + .map(Table::id) + .anyMatch(x -> isTableIncluded("applySchemaChange", x))) { + boolean applySchemaChange = + schemaChange.getTables().stream() + .map(Table::id) + .anyMatch(x -> isTableIncluded("applySchemaChange", x)); + LOGGER.info( + "Recorded DDL statements for database '{}': {},{},{},{}", + schemaChange.getDatabase(), + schemaChange.getDdl(), + applySchemaChange, + !databaseHistory.storeOnlyCapturedTables(), + isGlobalSetVariableStatement( + schemaChange.getDdl(), schemaChange.getDatabase())); + record(schemaChange, schemaChange.getTableChanges()); + } + } + + public List parseSnapshotDdl( + MySqlPartition partition, + String ddlStatements, + String databaseName, + MySqlOffsetContext offset, + Instant sourceTime) { + LOGGER.debug("Processing snapshot DDL '{}' for database '{}'", ddlStatements, databaseName); + return parseDdl(partition, ddlStatements, databaseName, offset, sourceTime, true); + } + + public List parseStreamingDdl( + MySqlPartition partition, + String ddlStatements, + String databaseName, + MySqlOffsetContext offset, + Instant sourceTime) { + LOGGER.info("Processing streaming DDL '{}' for database '{}'", ddlStatements, databaseName); + return parseDdl(partition, ddlStatements, databaseName, offset, sourceTime, false); + } + + private List parseDdl( + MySqlPartition partition, + String ddlStatements, + String databaseName, + MySqlOffsetContext offset, + Instant sourceTime, + boolean snapshot) { + final List schemaChangeEvents = new ArrayList<>(3); + + if (ignoredQueryStatements.contains(ddlStatements)) { + return schemaChangeEvents; + } + + try { + this.ddlChanges.reset(); + this.ddlParser.setCurrentSchema(databaseName); + this.ddlParser.parse(ddlStatements, tables()); + } catch (ParsingException | MultipleParsingExceptions e) { + if (databaseHistory.skipUnparseableDdlStatements()) { + LOGGER.warn("Ignoring unparseable DDL statement '{}': {}", ddlStatements, e); + } else { + throw e; + } + } + // No need to send schema events or store DDL if no table has changed + if (!databaseHistory.storeOnlyCapturedTables() + || isGlobalSetVariableStatement(ddlStatements, databaseName) + || ddlChanges.anyMatch(filters, parseOnLineSchemaChanges)) { + + // We are supposed to _also_ record the schema changes as SourceRecords, but these need + // to be filtered + // by database. Unfortunately, the databaseName on the event might not be the same + // database as that + // being modified by the DDL statements (since the DDL statements can have + // fully-qualified names). + // Therefore, we have to look at each statement to figure out which database it applies + // and then + // record the DDL statements (still in the same order) to those databases. + if (!ddlChanges.isEmpty()) { + // We understood at least some of the DDL statements and can figure out to which + // database they apply. + // They also apply to more databases than 'databaseName', so we need to apply the + // DDL statements in + // the same order they were read for each _affected_ database, grouped together if + // multiple apply + // to the same _affected_ database... + ddlChanges.getEventsByDatabase( + (String dbName, List events) -> { + final String sanitizedDbName = (dbName == null) ? "" : dbName; + if (acceptableDatabase(dbName)) { + final Set tableIds = new HashSet<>(); + events.forEach( + event -> { + final TableId tableId = getTableId(event); + if (tableId != null) { + tableIds.add(tableId); + } + }); + events.forEach( + event -> { + final TableId tableId = getTableId(event); + offset.tableEvent(dbName, tableIds, sourceTime); + // For SET with multiple parameters + if (event instanceof TableCreatedEvent) { + emitChangeEvent( + partition, + offset, + schemaChangeEvents, + sanitizedDbName, + event, + tableId, + SchemaChangeEventType.CREATE, + snapshot); + } else if (event instanceof TableAlteredEvent + || event instanceof TableIndexCreatedEvent + || event instanceof TableIndexDroppedEvent) { + emitChangeEvent( + partition, + offset, + schemaChangeEvents, + sanitizedDbName, + event, + tableId, + SchemaChangeEventType.ALTER, + snapshot); + } else if (event instanceof TableDroppedEvent) { + emitChangeEvent( + partition, + offset, + schemaChangeEvents, + sanitizedDbName, + event, + tableId, + SchemaChangeEventType.DROP, + snapshot); + } else if (event instanceof SetVariableEvent) { + // SET statement with multiple variable emits event + // for each variable. We want to emit only + // one change event + final SetVariableEvent varEvent = + (SetVariableEvent) event; + if (varEvent.order() == 0) { + emitChangeEvent( + partition, + offset, + schemaChangeEvents, + sanitizedDbName, + event, + tableId, + SchemaChangeEventType.DATABASE, + snapshot); + } + } else { + emitChangeEvent( + partition, + offset, + schemaChangeEvents, + sanitizedDbName, + event, + tableId, + SchemaChangeEventType.DATABASE, + snapshot); + } + }); + } + }); + } else { + offset.databaseEvent(databaseName, sourceTime); + schemaChangeEvents.add( + SchemaChangeEvent.ofDatabase( + partition, offset, databaseName, ddlStatements, snapshot)); + } + } else { + LOGGER.debug( + "Changes for DDL '{}' were filtered and not recorded in database history", + ddlStatements); + } + return schemaChangeEvents; + } + + private void emitChangeEvent( + MySqlPartition partition, + MySqlOffsetContext offset, + List schemaChangeEvents, + final String sanitizedDbName, + Event event, + TableId tableId, + SchemaChangeEventType type, + boolean snapshot) { + SchemaChangeEvent schemaChangeEvent; + if (type.equals(SchemaChangeEventType.ALTER) + && event instanceof TableAlteredEvent + && ((TableAlteredEvent) event).previousTableId() != null) { + schemaChangeEvent = + SchemaChangeEvent.ofRename( + partition, + offset, + sanitizedDbName, + null, + event.statement(), + tableId != null ? tableFor(tableId) : null, + ((TableAlteredEvent) event).previousTableId()); + } else { + schemaChangeEvent = + SchemaChangeEvent.of( + type, + partition, + offset, + sanitizedDbName, + null, + event.statement(), + tableId != null ? tableFor(tableId) : null, + snapshot); + } + schemaChangeEvents.add(schemaChangeEvent); + } + + private boolean acceptableDatabase(final String databaseName) { + return !storeOnlyCapturedTables() + || filters.databaseFilter().test(databaseName) + || databaseName == null + || databaseName.isEmpty(); + } + + private TableId getTableId(Event event) { + if (event instanceof TableEvent) { + return ((TableEvent) event).tableId(); + } else if (event instanceof TableIndexEvent) { + return ((TableIndexEvent) event).tableId(); + } + return null; + } + + @Override + protected DdlParser getDdlParser() { + return ddlParser; + } + + /** Return true if the database history entity exists */ + public boolean historyExists() { + return databaseHistory.exists(); + } + + @Override + public boolean storeOnlyCapturedTables() { + return databaseHistory.storeOnlyCapturedTables(); + } + + /** + * Assign the given table number to the table with the specified {@link TableId table ID}. + * + * @param tableNumber the table number found in binlog events + * @param id the identifier for the corresponding table + * @return {@code true} if the assignment was successful, or {@code false} if the table is + * currently excluded in the connector's configuration + */ + public boolean assignTableNumber(long tableNumber, TableId id) { + final TableSchema tableSchema = schemaFor(id); + if (tableSchema == null && !isTableIncluded("assignTableNumber", id)) { + excludeTableIdsByTableNumber.put(tableNumber, id); + return false; + } + tableIdsByTableNumber.put(tableNumber, id); + return true; + } + + /** + * Return the table id associated with MySQL-specific table number. + * + * @param tableNumber + * @return the table id or null if not known + */ + public TableId getTableId(long tableNumber) { + return tableIdsByTableNumber.get(tableNumber); + } + + /** + * Return the excluded table id associated with MySQL-specific table number. + * + * @param tableNumber + * @return the table id or null if not known + */ + public TableId getExcludeTableId(long tableNumber) { + return excludeTableIdsByTableNumber.get(tableNumber); + } + + /** + * Clear all of the table mappings. This should be done when the logs are rotated, since in that + * a different table numbering scheme will be used by all subsequent TABLE_MAP binlog events. + */ + public void clearTableMappings() { + LOGGER.debug("Clearing table number mappings"); + tableIdsByTableNumber.clear(); + excludeTableIdsByTableNumber.clear(); + } + + @Override + public void initializeStorage() { + super.initializeStorage(); + storageInitialiationExecuted = true; + } + + public boolean isStorageInitializationExecuted() { + return storageInitialiationExecuted; + } + + public boolean skipSchemaChangeEvent(SchemaChangeEvent event) { + if (!Strings.isNullOrEmpty(event.getDatabase()) + && !connectorConfig.getTableFilters().databaseFilter().test(event.getDatabase())) { + LOGGER.debug( + "Skipping schema event as it belongs to a non-captured database: '{}'", event); + return true; + } + + if (event.getTables() != null && !event.getTables().isEmpty()) { + return event.getTables().stream() + .map(Table::id) + .noneMatch(x -> isTableIncluded("skipSchemaChangeEvent", x)); + } + + return false; + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java index 7ca60be5bf2..8f144aba893 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java @@ -104,7 +104,9 @@ public static BinaryLogClient createBinaryClient(Configuration dbzConfiguration) /** Creates a new {@link MySqlDatabaseSchema} to monitor the latest MySql database schemas. */ public static MySqlDatabaseSchema createMySqlDatabaseSchema( - MySqlConnectorConfig dbzMySqlConfig, boolean isTableIdCaseSensitive) { + MySqlConnectorConfig dbzMySqlConfig, + boolean isTableIdCaseSensitive, + boolean parseOnLineSchemaChanges) { TopicSelector topicSelector = MySqlTopicSelector.defaultSelector(dbzMySqlConfig); SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(); MySqlValueConverters valueConverters = getValueConverters(dbzMySqlConfig); @@ -113,7 +115,8 @@ public static MySqlDatabaseSchema createMySqlDatabaseSchema( valueConverters, topicSelector, schemaNameAdjuster, - isTableIdCaseSensitive); + isTableIdCaseSensitive, + parseOnLineSchemaChanges); } /** Fetch current binlog offsets in MySql Server. */ diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java index 09a5fe8e85c..25fd516ab13 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java @@ -121,7 +121,10 @@ public void configure(MySqlSplit mySqlSplit) { Optional.ofNullable(databaseSchema).ifPresent(MySqlDatabaseSchema::close); this.databaseSchema = - DebeziumUtils.createMySqlDatabaseSchema(connectorConfig, tableIdCaseInsensitive); + DebeziumUtils.createMySqlDatabaseSchema( + connectorConfig, + tableIdCaseInsensitive, + sourceConfig.isParseOnLineSchemaChanges()); this.mySqlPartition = new MySqlPartition(connectorConfig.getLogicalName()); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/schema/MySqlSchema.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/schema/MySqlSchema.java index 5cc06625a08..28dcb003e21 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/schema/MySqlSchema.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/schema/MySqlSchema.java @@ -51,7 +51,10 @@ public class MySqlSchema implements AutoCloseable { public MySqlSchema(MySqlSourceConfig sourceConfig, boolean isTableIdCaseSensitive) { this.connectorConfig = sourceConfig.getMySqlConnectorConfig(); this.databaseSchema = - DebeziumUtils.createMySqlDatabaseSchema(connectorConfig, isTableIdCaseSensitive); + DebeziumUtils.createMySqlDatabaseSchema( + connectorConfig, + isTableIdCaseSensitive, + sourceConfig.isParseOnLineSchemaChanges()); this.schemasByTableId = new HashMap<>(); }