From f024bd2ac50a75f05de76840f0460670bd9d6457 Mon Sep 17 00:00:00 2001 From: Sergei Morozov Date: Mon, 21 Apr 2025 18:54:32 -0700 Subject: [PATCH] [FLINK-37710] Use schemas from the state in SchemaOperator --- .../mysql/source/MySqlDataSource.java | 8 +- .../reader/MySqlPipelineRecordEmitter.java | 265 ------------------ .../schema/common/SchemaRegistry.java | 5 +- .../schema/regular/SchemaCoordinator.java | 24 +- .../schema/regular/SchemaOperator.java | 68 +++++ .../regular/event/EvolvedSchemaRequest.java | 66 +++++ .../regular/event/EvolvedSchemaResponse.java | 69 +++++ .../regular/event/OriginalSchemaRequest.java | 66 +++++ .../regular/event/OriginalSchemaResponse.java | 69 +++++ 9 files changed, 369 insertions(+), 271 deletions(-) delete mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java create mode 100644 flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/event/EvolvedSchemaRequest.java create mode 100644 flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/event/EvolvedSchemaResponse.java create mode 100644 flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/event/OriginalSchemaRequest.java create mode 100644 flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/event/OriginalSchemaResponse.java diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java index 81912b4a847..64651ea8135 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java @@ -27,7 +27,7 @@ import org.apache.flink.cdc.common.source.SupportedMetadataColumn; import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig; import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; -import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlPipelineRecordEmitter; +import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter; import org.apache.flink.cdc.connectors.mysql.table.MySqlReadableMetadata; import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode; @@ -79,8 +79,10 @@ public EventSourceProvider getEventSourceProvider() { configFactory, deserializer, (sourceReaderMetrics, sourceConfig) -> - new MySqlPipelineRecordEmitter( - deserializer, sourceReaderMetrics, sourceConfig)); + new MySqlRecordEmitter<>( + deserializer, + sourceReaderMetrics, + sourceConfig.isIncludeSchemaChanges())); return FlinkSourceProvider.of(source); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java deleted file mode 100644 index de058df1444..00000000000 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java +++ /dev/null @@ -1,265 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.cdc.connectors.mysql.source.reader; - -import org.apache.flink.api.connector.source.SourceOutput; -import org.apache.flink.cdc.common.event.CreateTableEvent; -import org.apache.flink.cdc.common.event.Event; -import org.apache.flink.cdc.common.schema.Schema; -import org.apache.flink.cdc.common.types.DataType; -import org.apache.flink.cdc.connectors.mysql.schema.MySqlFieldDefinition; -import org.apache.flink.cdc.connectors.mysql.schema.MySqlTableDefinition; -import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig; -import org.apache.flink.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics; -import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitState; -import org.apache.flink.cdc.connectors.mysql.utils.MySqlTypeUtils; -import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema; -import org.apache.flink.connector.base.source.reader.RecordEmitter; - -import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser; -import io.debezium.jdbc.JdbcConnection; -import io.debezium.relational.Column; -import io.debezium.relational.RelationalDatabaseConnectorConfig; -import io.debezium.relational.Table; -import io.debezium.relational.TableId; -import io.debezium.relational.Tables; -import io.debezium.text.ParsingException; -import org.apache.commons.lang3.StringUtils; -import org.apache.kafka.connect.source.SourceRecord; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; - -import static org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.openJdbcConnection; -import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.getTableId; -import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.isDataChangeRecord; -import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.isLowWatermarkEvent; -import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.isSchemaChangeEvent; -import static org.apache.flink.cdc.connectors.mysql.source.utils.TableDiscoveryUtils.listTables; - -/** The {@link RecordEmitter} implementation for pipeline mysql connector. */ -public class MySqlPipelineRecordEmitter extends MySqlRecordEmitter { - - private static final Logger LOG = LoggerFactory.getLogger(MySqlPipelineRecordEmitter.class); - - private final MySqlSourceConfig sourceConfig; - private MySqlAntlrDdlParser mySqlAntlrDdlParser; - - // Used when startup mode is initial - private Set alreadySendCreateTableTables; - - private Map createTableEventCache; - - public MySqlPipelineRecordEmitter( - DebeziumDeserializationSchema debeziumDeserializationSchema, - MySqlSourceReaderMetrics sourceReaderMetrics, - MySqlSourceConfig sourceConfig) { - super( - debeziumDeserializationSchema, - sourceReaderMetrics, - sourceConfig.isIncludeSchemaChanges()); - this.sourceConfig = sourceConfig; - this.alreadySendCreateTableTables = new HashSet<>(); - this.createTableEventCache = generateCreateTableEvent(sourceConfig); - } - - @Override - protected void processElement( - SourceRecord element, SourceOutput output, MySqlSplitState splitState) - throws Exception { - if (isLowWatermarkEvent(element) && splitState.isSnapshotSplitState()) { - // In Snapshot phase of INITIAL startup mode, we lazily send CreateTableEvent to - // downstream to avoid checkpoint timeout. - TableId tableId = splitState.asSnapshotSplitState().toMySqlSplit().getTableId(); - if (!alreadySendCreateTableTables.contains(tableId)) { - try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) { - sendCreateTableEvent(jdbc, tableId, output); - alreadySendCreateTableTables.add(tableId); - } - } - } else { - if (isDataChangeRecord(element) || isSchemaChangeEvent(element)) { - TableId tableId = getTableId(element); - if (!alreadySendCreateTableTables.contains(tableId)) { - CreateTableEvent createTableEvent = createTableEventCache.get(tableId); - // New created table in binlog reading phase. - if (createTableEvent != null) { - output.collect(createTableEvent); - } - alreadySendCreateTableTables.add(tableId); - } - } - } - super.processElement(element, output, splitState); - } - - private void sendCreateTableEvent( - JdbcConnection jdbc, TableId tableId, SourceOutput output) { - Schema schema = getSchema(jdbc, tableId); - output.collect( - new CreateTableEvent( - org.apache.flink.cdc.common.event.TableId.tableId( - tableId.catalog(), tableId.table()), - schema)); - } - - private Schema getSchema(JdbcConnection jdbc, TableId tableId) { - String ddlStatement = showCreateTable(jdbc, tableId); - try { - return parseDDL(ddlStatement, tableId); - } catch (ParsingException pe) { - LOG.warn( - "Failed to parse DDL: \n{}\nWill try parsing by describing table.", - ddlStatement, - pe); - } - ddlStatement = describeTable(jdbc, tableId); - return parseDDL(ddlStatement, tableId); - } - - private String showCreateTable(JdbcConnection jdbc, TableId tableId) { - final String showCreateTableQuery = - String.format("SHOW CREATE TABLE `%s`.`%s`", tableId.catalog(), tableId.table()); - try { - return jdbc.queryAndMap( - showCreateTableQuery, - rs -> { - String ddlStatement = null; - while (rs.next()) { - ddlStatement = rs.getString(2); - } - return ddlStatement; - }); - } catch (SQLException e) { - throw new RuntimeException( - String.format("Failed to show create table for %s", tableId), e); - } - } - - private String describeTable(JdbcConnection jdbc, TableId tableId) { - List fieldMetas = new ArrayList<>(); - List primaryKeys = new ArrayList<>(); - try { - return jdbc.queryAndMap( - String.format("DESC `%s`.`%s`", tableId.catalog(), tableId.table()), - rs -> { - while (rs.next()) { - MySqlFieldDefinition meta = new MySqlFieldDefinition(); - meta.setColumnName(rs.getString("Field")); - meta.setColumnType(rs.getString("Type")); - meta.setNullable( - StringUtils.equalsIgnoreCase(rs.getString("Null"), "YES")); - meta.setKey("PRI".equalsIgnoreCase(rs.getString("Key"))); - meta.setUnique("UNI".equalsIgnoreCase(rs.getString("Key"))); - meta.setDefaultValue(rs.getString("Default")); - meta.setExtra(rs.getString("Extra")); - if (meta.isKey()) { - primaryKeys.add(meta.getColumnName()); - } - fieldMetas.add(meta); - } - return new MySqlTableDefinition(tableId, fieldMetas, primaryKeys).toDdl(); - }); - } catch (SQLException e) { - throw new RuntimeException(String.format("Failed to describe table %s", tableId), e); - } - } - - private Schema parseDDL(String ddlStatement, TableId tableId) { - Table table = parseDdl(ddlStatement, tableId); - - List columns = table.columns(); - Schema.Builder tableBuilder = Schema.newBuilder(); - for (int i = 0; i < columns.size(); i++) { - Column column = columns.get(i); - - String colName = column.name(); - DataType dataType = - MySqlTypeUtils.fromDbzColumn(column, sourceConfig.isTreatTinyInt1AsBoolean()); - if (!column.isOptional()) { - dataType = dataType.notNull(); - } - tableBuilder.physicalColumn( - colName, - dataType, - column.comment(), - column.defaultValueExpression().orElse(null)); - } - tableBuilder.comment(table.comment()); - - List primaryKey = table.primaryKeyColumnNames(); - if (Objects.nonNull(primaryKey) && !primaryKey.isEmpty()) { - tableBuilder.primaryKey(primaryKey); - } - return tableBuilder.build(); - } - - private synchronized Table parseDdl(String ddlStatement, TableId tableId) { - MySqlAntlrDdlParser mySqlAntlrDdlParser = getParser(); - mySqlAntlrDdlParser.setCurrentDatabase(tableId.catalog()); - Tables tables = new Tables(); - mySqlAntlrDdlParser.parse(ddlStatement, tables); - return tables.forTable(tableId); - } - - private synchronized MySqlAntlrDdlParser getParser() { - if (mySqlAntlrDdlParser == null) { - boolean includeComments = - sourceConfig - .getDbzConfiguration() - .getBoolean( - RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_COMMENTS - .name(), - false); - mySqlAntlrDdlParser = - new MySqlAntlrDdlParser( - true, false, includeComments, null, Tables.TableFilter.includeAll()); - } - return mySqlAntlrDdlParser; - } - - private Map generateCreateTableEvent( - MySqlSourceConfig sourceConfig) { - try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) { - Map createTableEventCache = new HashMap<>(); - List capturedTableIds = - listTables( - jdbc, sourceConfig.getDatabaseFilter(), sourceConfig.getTableFilter()); - for (TableId tableId : capturedTableIds) { - Schema schema = getSchema(jdbc, tableId); - createTableEventCache.put( - tableId, - new CreateTableEvent( - org.apache.flink.cdc.common.event.TableId.tableId( - tableId.catalog(), tableId.table()), - schema)); - } - return createTableEventCache; - } catch (SQLException e) { - throw new RuntimeException("Cannot start emitter to fetch table schema.", e); - } - } -} diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java index 1e681a79fb4..912c3dd9ae5 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java @@ -89,7 +89,8 @@ public abstract class SchemaRegistry implements OperatorCoordinator, Coordinatio protected final SchemaChangeBehavior behavior; // ------------------------- - // Dynamically initialized transient fields (after coordinator starts) + // Dynamically initialized transient fields (restored from a checkpoint or initialized during + // coordinator startup) // ------------------------- protected transient int currentParallelism; protected transient Set activeSinkWriters; @@ -112,6 +113,7 @@ protected SchemaRegistry( this.routingRules = routingRules; this.rpcTimeout = rpcTimeout; this.behavior = schemaChangeBehavior; + this.schemaManager = new SchemaManager(); } // --------------- @@ -123,7 +125,6 @@ public void start() throws Exception { this.currentParallelism = context.currentParallelism(); this.activeSinkWriters = ConcurrentHashMap.newKeySet(); this.failedReasons = new ConcurrentHashMap<>(); - this.schemaManager = new SchemaManager(); this.router = new TableIdRouter(routingRules); } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java index 92ae16b8cdd..39bcb5ad6b9 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java @@ -34,6 +34,10 @@ import org.apache.flink.cdc.runtime.operators.schema.common.SchemaManager; import org.apache.flink.cdc.runtime.operators.schema.common.SchemaRegistry; import org.apache.flink.cdc.runtime.operators.schema.common.event.FlushSuccessEvent; +import org.apache.flink.cdc.runtime.operators.schema.regular.event.EvolvedSchemaRequest; +import org.apache.flink.cdc.runtime.operators.schema.regular.event.EvolvedSchemaResponse; +import org.apache.flink.cdc.runtime.operators.schema.regular.event.OriginalSchemaRequest; +import org.apache.flink.cdc.runtime.operators.schema.regular.event.OriginalSchemaResponse; import org.apache.flink.cdc.runtime.operators.schema.regular.event.SchemaChangeRequest; import org.apache.flink.cdc.runtime.operators.schema.regular.event.SchemaChangeResponse; import org.apache.flink.cdc.runtime.serializer.TableIdSerializer; @@ -185,7 +189,11 @@ protected void restore(byte[] checkpointData) throws Exception { @Override protected void handleCustomCoordinationRequest( CoordinationRequest request, CompletableFuture responseFuture) { - if (request instanceof SchemaChangeRequest) { + if (request instanceof OriginalSchemaRequest) { + handleOriginalSchemasRequest((OriginalSchemaRequest) request, responseFuture); + } else if (request instanceof EvolvedSchemaRequest) { + handleEvolvedSchemasRequest((EvolvedSchemaRequest) request, responseFuture); + } else if (request instanceof SchemaChangeRequest) { handleSchemaChangeRequest((SchemaChangeRequest) request, responseFuture); } else { throw new UnsupportedOperationException( @@ -230,6 +238,20 @@ protected void handleUnrecoverableError(String taskDescription, Throwable t) { }); } + /** Handle an {@link OriginalSchemaRequest}. */ + public void handleOriginalSchemasRequest( + OriginalSchemaRequest request, CompletableFuture responseFuture) { + Schema schema = schemaManager.getLatestOriginalSchema(request.getTableId()).orElse(null); + responseFuture.complete(wrap(OriginalSchemaResponse.success(schema))); + } + + /** Handle an {@link EvolvedSchemaRequest}. */ + public void handleEvolvedSchemasRequest( + EvolvedSchemaRequest request, CompletableFuture responseFuture) { + Schema schema = schemaManager.getLatestEvolvedSchema(request.getTableId()).orElse(null); + responseFuture.complete(wrap(EvolvedSchemaResponse.success(schema))); + } + /** * Handle the {@link SchemaChangeRequest} and wait for all sink subtasks flushing. * diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java index 28195f2386d..11004e9e866 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java @@ -19,6 +19,7 @@ import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.common.annotation.VisibleForTesting; +import org.apache.flink.cdc.common.event.ChangeEvent; import org.apache.flink.cdc.common.event.DataChangeEvent; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.FlushEvent; @@ -32,6 +33,10 @@ import org.apache.flink.cdc.runtime.operators.schema.common.SchemaDerivator; import org.apache.flink.cdc.runtime.operators.schema.common.TableIdRouter; import org.apache.flink.cdc.runtime.operators.schema.common.metrics.SchemaOperatorMetrics; +import org.apache.flink.cdc.runtime.operators.schema.regular.event.EvolvedSchemaRequest; +import org.apache.flink.cdc.runtime.operators.schema.regular.event.EvolvedSchemaResponse; +import org.apache.flink.cdc.runtime.operators.schema.regular.event.OriginalSchemaRequest; +import org.apache.flink.cdc.runtime.operators.schema.regular.event.OriginalSchemaResponse; import org.apache.flink.cdc.runtime.operators.schema.regular.event.SchemaChangeRequest; import org.apache.flink.cdc.runtime.operators.schema.regular.event.SchemaChangeResponse; import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway; @@ -52,8 +57,10 @@ import java.io.Serializable; import java.time.Duration; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -81,6 +88,8 @@ public class SchemaOperator extends AbstractStreamOperator private transient int subTaskId; private transient TaskOperatorEventGateway toCoordinator; private transient SchemaOperatorMetrics schemaOperatorMetrics; + private transient volatile Set initializedOriginalSchemas; + private transient volatile Set initializedEvolvedSchemas; private transient volatile Map originalSchemaMap; private transient volatile Map evolvedSchemaMap; private transient TableIdRouter router; @@ -132,6 +141,8 @@ public void open() throws Exception { new SchemaOperatorMetrics( getRuntimeContext().getMetricGroup(), schemaChangeBehavior); this.subTaskId = getRuntimeContext().getIndexOfThisSubtask(); + this.initializedOriginalSchemas = new HashSet<>(); + this.initializedEvolvedSchemas = new HashSet<>(); this.originalSchemaMap = new HashMap<>(); this.evolvedSchemaMap = new HashMap<>(); this.router = new TableIdRouter(routingRules); @@ -144,6 +155,10 @@ public void open() throws Exception { @Override public void processElement(StreamRecord streamRecord) throws Exception { Event event = streamRecord.getValue(); + if (event instanceof ChangeEvent) { + ensureSchemasAreInitialized((ChangeEvent) event); + } + if (event instanceof SchemaChangeEvent) { handleSchemaChangeEvent((SchemaChangeEvent) event); } else if (event instanceof DataChangeEvent) { @@ -153,6 +168,51 @@ public void processElement(StreamRecord streamRecord) throws Exception { } } + private void ensureSchemasAreInitialized(ChangeEvent event) { + TableId originalTableId = event.tableId(); + if (!initializedOriginalSchemas.contains(originalTableId)) { + initializeOriginalSchema(originalTableId); + initializedOriginalSchemas.add(originalTableId); + } + + for (TableId evolvedTableId : router.route(originalTableId)) { + if (!initializedEvolvedSchemas.contains(evolvedTableId)) { + initializeEvolvedSchema(evolvedTableId); + initializedEvolvedSchemas.add(evolvedTableId); + } + } + } + + private void initializeOriginalSchema(TableId tableId) { + LOG.info("{}> Requesting original schema for table {}", subTaskId, tableId); + + OriginalSchemaResponse response = requestOriginalSchema(tableId); + Schema schema = response.getSchema(); + + LOG.info( + "{}> Successfully received original schema for table {}: {}", + subTaskId, + tableId, + schema); + + originalSchemaMap.put(tableId, schema); + } + + private void initializeEvolvedSchema(TableId tableId) { + LOG.info("{}> Requesting evolved schema for table {}", subTaskId, tableId); + + EvolvedSchemaResponse response = requestEvolvedSchema(tableId); + Schema schema = response.getSchema(); + + LOG.info( + "{}> Successfully received evolved schema for table {}: {}", + subTaskId, + tableId, + schema); + + evolvedSchemaMap.put(tableId, schema); + } + private void handleSchemaChangeEvent(SchemaChangeEvent originalEvent) throws Exception { // First, update original schema map unconditionally and it will never fail TableId tableId = originalEvent.tableId(); @@ -221,6 +281,14 @@ private void handleDataChangeEvent(DataChangeEvent dataChangeEvent) { } } + private OriginalSchemaResponse requestOriginalSchema(TableId tableId) { + return sendRequestToCoordinator(new OriginalSchemaRequest(tableId)); + } + + private EvolvedSchemaResponse requestEvolvedSchema(TableId tableId) { + return sendRequestToCoordinator(new EvolvedSchemaRequest(tableId)); + } + private SchemaChangeResponse requestSchemaChange( TableId tableId, SchemaChangeEvent schemaChangeEvent) { return sendRequestToCoordinator( diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/event/EvolvedSchemaRequest.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/event/EvolvedSchemaRequest.java new file mode 100644 index 00000000000..7c0e3f9697f --- /dev/null +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/event/EvolvedSchemaRequest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.operators.schema.regular.event; + +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator; +import org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; + +import java.util.Objects; + +/** + * The request from {@link SchemaOperator} to {@link SchemaCoordinator} to get the evolved schema of + * a destination table. + */ +public class EvolvedSchemaRequest implements CoordinationRequest { + + private static final long serialVersionUID = 1L; + + private final TableId tableId; + + public EvolvedSchemaRequest(TableId tableId) { + this.tableId = tableId; + } + + public TableId getTableId() { + return tableId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof EvolvedSchemaRequest)) { + return false; + } + EvolvedSchemaRequest that = (EvolvedSchemaRequest) o; + return Objects.equals(tableId, that.tableId); + } + + @Override + public int hashCode() { + return tableId.hashCode(); + } + + @Override + public String toString() { + return "EvolvedSchemaRequest{tableId=" + tableId + '}'; + } +} diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/event/EvolvedSchemaResponse.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/event/EvolvedSchemaResponse.java new file mode 100644 index 00000000000..16335f1e09b --- /dev/null +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/event/EvolvedSchemaResponse.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.operators.schema.regular.event; + +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator; +import org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; + +import java.util.Objects; + +/** + * The response for {@link EvolvedSchemaRequest} from {@link SchemaCoordinator} to {@link + * SchemaOperator}. + */ +public class EvolvedSchemaResponse implements CoordinationResponse { + private static final long serialVersionUID = 1L; + + private final Schema schema; + + public static EvolvedSchemaResponse success(Schema schema) { + return new EvolvedSchemaResponse(schema); + } + + private EvolvedSchemaResponse(Schema schema) { + this.schema = schema; + } + + public Schema getSchema() { + return schema; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof EvolvedSchemaResponse)) { + return false; + } + EvolvedSchemaResponse response = (EvolvedSchemaResponse) o; + return Objects.equals(schema, response.schema); + } + + @Override + public int hashCode() { + return Objects.hash(schema); + } + + @Override + public String toString() { + return "EvolvedSchemaResponse{schema=" + schema + '}'; + } +} diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/event/OriginalSchemaRequest.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/event/OriginalSchemaRequest.java new file mode 100644 index 00000000000..8d143144d52 --- /dev/null +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/event/OriginalSchemaRequest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.operators.schema.regular.event; + +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator; +import org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; + +import java.util.Objects; + +/** + * The request from {@link SchemaOperator} to {@link SchemaCoordinator} to get the original schema + * of a source table. + */ +public class OriginalSchemaRequest implements CoordinationRequest { + + private static final long serialVersionUID = 1L; + + private final TableId tableId; + + public OriginalSchemaRequest(TableId tableId) { + this.tableId = tableId; + } + + public TableId getTableId() { + return tableId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof OriginalSchemaRequest)) { + return false; + } + OriginalSchemaRequest that = (OriginalSchemaRequest) o; + return Objects.equals(tableId, that.tableId); + } + + @Override + public int hashCode() { + return tableId.hashCode(); + } + + @Override + public String toString() { + return "OriginalSchemaRequest{tableId=" + tableId + '}'; + } +} diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/event/OriginalSchemaResponse.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/event/OriginalSchemaResponse.java new file mode 100644 index 00000000000..e55bc00bb02 --- /dev/null +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/event/OriginalSchemaResponse.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.operators.schema.regular.event; + +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator; +import org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; + +import java.util.Objects; + +/** + * The response for {@link OriginalSchemaRequest} from {@link SchemaCoordinator} to {@link + * SchemaOperator}. + */ +public class OriginalSchemaResponse implements CoordinationResponse { + private static final long serialVersionUID = 1L; + + private final Schema schema; + + public static OriginalSchemaResponse success(Schema schema) { + return new OriginalSchemaResponse(schema); + } + + private OriginalSchemaResponse(Schema schema) { + this.schema = schema; + } + + public Schema getSchema() { + return schema; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof OriginalSchemaResponse)) { + return false; + } + OriginalSchemaResponse response = (OriginalSchemaResponse) o; + return Objects.equals(schema, response.schema); + } + + @Override + public int hashCode() { + return Objects.hash(schema); + } + + @Override + public String toString() { + return "OriginalSchemaResponse{schema=" + schema + '}'; + } +}