Skip to content

[FLINK-37710] Use schemas from the state in SchemaOperator #3999

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlPipelineRecordEmitter;
import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter;
import org.apache.flink.cdc.connectors.mysql.table.MySqlReadableMetadata;
import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode;

Expand Down Expand Up @@ -79,8 +79,10 @@ public EventSourceProvider getEventSourceProvider() {
configFactory,
deserializer,
(sourceReaderMetrics, sourceConfig) ->
new MySqlPipelineRecordEmitter(
deserializer, sourceReaderMetrics, sourceConfig));
new MySqlRecordEmitter<>(
deserializer,
sourceReaderMetrics,
sourceConfig.isIncludeSchemaChanges()));

return FlinkSourceProvider.of(source);
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ public abstract class SchemaRegistry implements OperatorCoordinator, Coordinatio
protected final SchemaChangeBehavior behavior;

// -------------------------
// Dynamically initialized transient fields (after coordinator starts)
// Dynamically initialized transient fields (restored from a checkpoint or initialized during
// coordinator startup)
// -------------------------
protected transient int currentParallelism;
protected transient Set<Integer> activeSinkWriters;
Expand All @@ -112,6 +113,7 @@ protected SchemaRegistry(
this.routingRules = routingRules;
this.rpcTimeout = rpcTimeout;
this.behavior = schemaChangeBehavior;
this.schemaManager = new SchemaManager();
}

// ---------------
Expand All @@ -123,7 +125,6 @@ public void start() throws Exception {
this.currentParallelism = context.currentParallelism();
this.activeSinkWriters = ConcurrentHashMap.newKeySet();
this.failedReasons = new ConcurrentHashMap<>();
this.schemaManager = new SchemaManager();
this.router = new TableIdRouter(routingRules);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
import org.apache.flink.cdc.runtime.operators.schema.common.SchemaManager;
import org.apache.flink.cdc.runtime.operators.schema.common.SchemaRegistry;
import org.apache.flink.cdc.runtime.operators.schema.common.event.FlushSuccessEvent;
import org.apache.flink.cdc.runtime.operators.schema.regular.event.EvolvedSchemaRequest;
import org.apache.flink.cdc.runtime.operators.schema.regular.event.EvolvedSchemaResponse;
import org.apache.flink.cdc.runtime.operators.schema.regular.event.OriginalSchemaRequest;
import org.apache.flink.cdc.runtime.operators.schema.regular.event.OriginalSchemaResponse;
import org.apache.flink.cdc.runtime.operators.schema.regular.event.SchemaChangeRequest;
import org.apache.flink.cdc.runtime.operators.schema.regular.event.SchemaChangeResponse;
import org.apache.flink.cdc.runtime.serializer.TableIdSerializer;
Expand Down Expand Up @@ -185,7 +189,11 @@ protected void restore(byte[] checkpointData) throws Exception {
@Override
protected void handleCustomCoordinationRequest(
CoordinationRequest request, CompletableFuture<CoordinationResponse> responseFuture) {
if (request instanceof SchemaChangeRequest) {
if (request instanceof OriginalSchemaRequest) {
handleOriginalSchemasRequest((OriginalSchemaRequest) request, responseFuture);
} else if (request instanceof EvolvedSchemaRequest) {
handleEvolvedSchemasRequest((EvolvedSchemaRequest) request, responseFuture);
} else if (request instanceof SchemaChangeRequest) {
handleSchemaChangeRequest((SchemaChangeRequest) request, responseFuture);
} else {
throw new UnsupportedOperationException(
Expand Down Expand Up @@ -230,6 +238,20 @@ protected void handleUnrecoverableError(String taskDescription, Throwable t) {
});
}

/** Handle an {@link OriginalSchemaRequest}. */
public void handleOriginalSchemasRequest(
OriginalSchemaRequest request, CompletableFuture<CoordinationResponse> responseFuture) {
Schema schema = schemaManager.getLatestOriginalSchema(request.getTableId()).orElse(null);
responseFuture.complete(wrap(OriginalSchemaResponse.success(schema)));
}

/** Handle an {@link EvolvedSchemaRequest}. */
public void handleEvolvedSchemasRequest(
EvolvedSchemaRequest request, CompletableFuture<CoordinationResponse> responseFuture) {
Schema schema = schemaManager.getLatestEvolvedSchema(request.getTableId()).orElse(null);
responseFuture.complete(wrap(EvolvedSchemaResponse.success(schema)));
}

/**
* Handle the {@link SchemaChangeRequest} and wait for all sink subtasks flushing.
*
Expand Down
Loading
Loading