From b72a9be431d3a80719a92b754dc1a175a9a42466 Mon Sep 17 00:00:00 2001 From: Sergei Morozov Date: Thu, 7 Aug 2025 15:07:00 -0700 Subject: [PATCH 1/5] [FLINK-38218] Require assigned snapshot splits to be ordered --- .../assigners/MySqlSnapshotSplitAssigner.java | 19 +++++++++++++++++-- .../state/PendingSplitsStateSerializer.java | 15 ++++++++------- .../state/SnapshotPendingSplitsState.java | 7 ++++--- .../MySqlHybridSplitAssignerTest.java | 3 ++- .../MySqlSnapshotSplitAssignerTest.java | 3 ++- .../PendingSplitsStateSerializerTest.java | 4 +++- 6 files changed, 36 insertions(+), 15 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java index e295fb51fdb..5f9d0b42886 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java @@ -26,6 +26,7 @@ import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions; import org.apache.flink.cdc.connectors.mysql.source.connection.JdbcConnectionPools; import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset; +import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSourceReader; import org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo; import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSchemalessSnapshotSplit; import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit; @@ -73,7 +74,21 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner { private final List alreadyProcessedTables; private final List remainingSplits; - private final Map assignedSplits; + + /** + * The splits that have been assigned to a reader. Once a split is finished, it remains in this + * map. An entry added to {@link #splitFinishedOffsets} indicates that the split has been + * finished. If reading the split fails, it is removed from this map. + * + *

{@link MySqlSourceReader} relies on the order of elements within the map: + * + *

    + *
  1. It must correspond to the order of assignment of the splits to readers. + *
  2. The order must be retained across job restarts. + *
+ */ + private final LinkedHashMap assignedSplits; + private final Map tableSchemas; private final Map splitFinishedOffsets; private final MySqlSourceConfig sourceConfig; @@ -141,7 +156,7 @@ private MySqlSnapshotSplitAssigner( int currentParallelism, List alreadyProcessedTables, List remainingSplits, - Map assignedSplits, + LinkedHashMap assignedSplits, Map tableSchemas, Map splitFinishedOffsets, AssignerStatus assignerStatus, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializer.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializer.java index bb3e2d2eda4..0df1ee97992 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializer.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializer.java @@ -35,6 +35,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -196,12 +197,12 @@ private SnapshotPendingSplitsState deserializeLegacySnapshotPendingSplitsState( int splitVersion, DataInputDeserializer in) throws IOException { List alreadyProcessedTables = readTableIds(in); List remainingSplits = readMySqlSnapshotSplits(splitVersion, in); - Map assignedSnapshotSplits = + LinkedHashMap assignedSnapshotSplits = readAssignedSnapshotSplits(splitVersion, in); final List remainingSchemalessSplits = new ArrayList<>(); - final Map assignedSchemalessSnapshotSplits = - new HashMap<>(); + final LinkedHashMap assignedSchemalessSnapshotSplits = + new LinkedHashMap<>(); final Map tableSchemas = new HashMap<>(); remainingSplits.forEach( split -> { @@ -267,8 +268,8 @@ private SnapshotPendingSplitsState deserializeSnapshotPendingSplitsState( List remainingTableIds = readTableIds(in); boolean isTableIdCaseSensitive = in.readBoolean(); final List remainingSchemalessSplits = new ArrayList<>(); - final Map assignedSchemalessSnapshotSplits = - new HashMap<>(); + final LinkedHashMap assignedSchemalessSnapshotSplits = + new LinkedHashMap<>(); final Map tableSchemas = new HashMap<>(); remainingSplits.forEach( split -> { @@ -368,9 +369,9 @@ private void writeAssignedSnapshotSplits( } } - private Map readAssignedSnapshotSplits( + private LinkedHashMap readAssignedSnapshotSplits( int splitVersion, DataInputDeserializer in) throws IOException { - Map assignedSplits = new HashMap<>(); + LinkedHashMap assignedSplits = new LinkedHashMap<>(); final int size = in.readInt(); for (int i = 0; i < size; i++) { String splitId = in.readUTF(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/SnapshotPendingSplitsState.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/SnapshotPendingSplitsState.java index 39aa71ad079..c49069a60b9 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/SnapshotPendingSplitsState.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/SnapshotPendingSplitsState.java @@ -27,6 +27,7 @@ import io.debezium.relational.TableId; import io.debezium.relational.history.TableChanges.TableChange; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -50,7 +51,7 @@ public class SnapshotPendingSplitsState extends PendingSplitsState { * The snapshot splits that the {@link MySqlSourceEnumerator} has assigned to {@link * MySqlSplitReader}s. */ - private final Map assignedSplits; + private final LinkedHashMap assignedSplits; /** * The offsets of finished (snapshot) splits that the {@link MySqlSourceEnumerator} has received @@ -75,7 +76,7 @@ public class SnapshotPendingSplitsState extends PendingSplitsState { public SnapshotPendingSplitsState( List alreadyProcessedTables, List remainingSplits, - Map assignedSplits, + LinkedHashMap assignedSplits, Map tableSchemas, Map splitFinishedOffsets, AssignerStatus assignerStatus, @@ -103,7 +104,7 @@ public List getRemainingSplits() { return remainingSplits; } - public Map getAssignedSplits() { + public LinkedHashMap getAssignedSplits() { return assignedSplits; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssignerTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssignerTest.java index 0b02fd5906d..605b848c3d1 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssignerTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssignerTest.java @@ -47,6 +47,7 @@ import java.util.Arrays; import java.util.Comparator; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -81,7 +82,7 @@ void testAssignMySqlBinlogSplitAfterAllSnapshotSplitsFinished() { List alreadyProcessedTables = Lists.newArrayList(tableId); List remainingSplits = new ArrayList<>(); - Map assignedSplits = new HashMap<>(); + LinkedHashMap assignedSplits = new LinkedHashMap<>(); Map splitFinishedOffsets = new HashMap<>(); for (int i = 0; i < 5; i++) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java index d9755559357..6347b2d078e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java @@ -42,6 +42,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -647,7 +648,7 @@ private List getTestAssignSnapshotSplitsFromCheckpoint(AssignerStatus as null, null)); - Map assignedSplits = new HashMap<>(); + LinkedHashMap assignedSplits = new LinkedHashMap<>(); assignedSplits.put( processedTable + ":0", new MySqlSchemalessSnapshotSplit( diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializerTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializerTest.java index 3d813b8a81d..60abb56ff16 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializerTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializerTest.java @@ -39,6 +39,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.stream.Stream; @@ -147,7 +148,8 @@ private static SnapshotPendingSplitsState getTestSnapshotPendingSplitsState( remainingSplits.add(getTestSchemalessSnapshotSplit(tableId1, 2)); remainingSplits.add(getTestSchemalessSnapshotSplit(tableId1, 3)); - final Map assignedSnapshotSplits = new HashMap<>(); + final LinkedHashMap assignedSnapshotSplits = + new LinkedHashMap<>(); Arrays.asList( getTestSchemalessSnapshotSplit(tableId0, 0), getTestSchemalessSnapshotSplit(tableId0, 1), From 821e327a91c178316bb623560f9c89793308b813 Mon Sep 17 00:00:00 2001 From: Sergei Morozov Date: Thu, 7 Aug 2025 15:07:24 -0700 Subject: [PATCH 2/5] [FLINK-38218] Rely on stable order of assigned splits --- .../assigners/MySqlHybridSplitAssigner.java | 6 +-- .../assigners/MySqlSnapshotSplitAssigner.java | 12 +---- .../source/reader/MySqlSourceReader.java | 49 +++++++------------ 3 files changed, 19 insertions(+), 48 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java index d66e6f3da1f..4a25491b23f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java @@ -33,12 +33,10 @@ import java.util.ArrayList; import java.util.Collection; -import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.stream.Collectors; /** * A {@link MySqlSplitAssigner} that splits tables into small chunk splits based on primary key @@ -208,9 +206,7 @@ public void close() { private MySqlBinlogSplit createBinlogSplit() { final List assignedSnapshotSplit = - snapshotSplitAssigner.getAssignedSplits().values().stream() - .sorted(Comparator.comparing(MySqlSplit::splitId)) - .collect(Collectors.toList()); + new ArrayList<>(snapshotSplitAssigner.getAssignedSplits().values()); Map splitFinishedOffsets = snapshotSplitAssigner.getSplitFinishedOffsets(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java index 5f9d0b42886..51a2897ee98 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java @@ -169,17 +169,7 @@ private MySqlSnapshotSplitAssigner( this.currentParallelism = currentParallelism; this.alreadyProcessedTables = alreadyProcessedTables; this.remainingSplits = new CopyOnWriteArrayList<>(remainingSplits); - // When job restore from savepoint, sort the existing tables and newly added tables - // to let enumerator only send newly added tables' BinlogSplitMetaEvent - this.assignedSplits = - assignedSplits.entrySet().stream() - .sorted(Entry.comparingByKey()) - .collect( - Collectors.toMap( - Entry::getKey, - Entry::getValue, - (o, o2) -> o, - LinkedHashMap::new)); + this.assignedSplits = assignedSplits; this.tableSchemas = tableSchemas; this.splitFinishedOffsets = splitFinishedOffsets; this.assignerStatus = assignerStatus; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java index d9bfa18e7cf..050ee14577e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java @@ -62,10 +62,8 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -421,15 +419,24 @@ private void fillMetadataForBinlogSplit(BinlogSplitMetaEvent metadataEvent) { binlogSplit, receivedTotalFinishedSplitSize); uncompletedBinlogSplits.put(binlogSplit.splitId(), binlogSplit); } else if (receivedMetaGroupId == expectedMetaGroupId) { - List newAddedMetadataGroup; - Set existedSplitsOfLastGroup = - getExistedSplitsOfLastGroup( - binlogSplit.getFinishedSnapshotSplitInfos(), - sourceConfig.getSplitMetaGroupSize()); - newAddedMetadataGroup = - metadataEvent.getMetaGroup().stream() + int expectedNumberOfAlreadyRetrievedElements = + binlogSplit.getFinishedSnapshotSplitInfos().size() + % sourceConfig.getSplitMetaGroupSize(); + List metaGroup = metadataEvent.getMetaGroup(); + if (expectedNumberOfAlreadyRetrievedElements > 0) { + LOG.info( + "Source reader {} is discarding the first {} out of {} elements of meta group {}.", + subtaskId, + expectedNumberOfAlreadyRetrievedElements, + metaGroup.size(), + receivedMetaGroupId); + metaGroup = + metaGroup.subList( + expectedNumberOfAlreadyRetrievedElements, metaGroup.size()); + } + List newAddedMetadataGroup = + metaGroup.stream() .map(FinishedSnapshotSplitInfo::deserialize) - .filter(r -> !existedSplitsOfLastGroup.contains(r.getSplitId())) .collect(Collectors.toList()); uncompletedBinlogSplits.put( @@ -499,28 +506,6 @@ private MySqlBinlogSplit discoverTableSchemasForBinlogSplit( } } - private Set getExistedSplitsOfLastGroup( - List finishedSnapshotSplits, int metaGroupSize) { - int splitsNumOfLastGroup = - finishedSnapshotSplits.size() % sourceConfig.getSplitMetaGroupSize(); - if (splitsNumOfLastGroup != 0) { - int lastGroupStart = - ((int) (finishedSnapshotSplits.size() / sourceConfig.getSplitMetaGroupSize())) - * metaGroupSize; - // Keep same order with MySqlHybridSplitAssigner.createBinlogSplit() to avoid - // 'invalid request meta group id' error - List sortedFinishedSnapshotSplits = - finishedSnapshotSplits.stream() - .map(FinishedSnapshotSplitInfo::getSplitId) - .sorted() - .collect(Collectors.toList()); - return new HashSet<>( - sortedFinishedSnapshotSplits.subList( - lastGroupStart, lastGroupStart + splitsNumOfLastGroup)); - } - return new HashSet<>(); - } - private void logCurrentBinlogOffsets(List splits, long checkpointId) { if (!LOG.isInfoEnabled()) { return; From 620664760f95ced52c0bbf28bbd51ba613fb1d7c Mon Sep 17 00:00:00 2001 From: Sergei Morozov Date: Fri, 8 Aug 2025 14:29:34 -0700 Subject: [PATCH 3/5] [FLINK-38218] Eliminate code duplication in MySqlBinlogSplit constructors --- .../mysql/source/split/MySqlBinlogSplit.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java index fba3abccead..aba94c128b4 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java @@ -74,14 +74,14 @@ public MySqlBinlogSplit( List finishedSnapshotSplitInfos, Map tableSchemas, int totalFinishedSplitSize) { - super(splitId); - this.startingOffset = startingOffset; - this.endingOffset = endingOffset; - this.finishedSnapshotSplitInfos = finishedSnapshotSplitInfos; - this.tableSchemas = tableSchemas; - this.totalFinishedSplitSize = totalFinishedSplitSize; - this.isSuspended = false; - this.tablesForLog = getTablesForLog(); + this( + splitId, + startingOffset, + endingOffset, + finishedSnapshotSplitInfos, + tableSchemas, + totalFinishedSplitSize, + false); } public BinlogOffset getStartingOffset() { From f8873a524be5c1d6db41ff80d48a88e1dad97b5e Mon Sep 17 00:00:00 2001 From: Sergei Morozov Date: Thu, 7 Aug 2025 15:52:21 -0700 Subject: [PATCH 4/5] [FLINK-38218] Enforce no duplicate split infos in MySqlBinlogSplit --- .../mysql/source/split/MySqlBinlogSplit.java | 17 ++++++++++++ .../source/split/MySqlBinlogSplitTest.java | 26 +++++++++++++++++++ 2 files changed, 43 insertions(+) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java index aba94c128b4..081c2a7443a 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java @@ -28,6 +28,7 @@ import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -42,7 +43,10 @@ public class MySqlBinlogSplit extends MySqlSplit { private final BinlogOffset startingOffset; private final BinlogOffset endingOffset; + + /** Split IDs of all elements must be unique. */ private final List finishedSnapshotSplitInfos; + private final Map tableSchemas; private final int totalFinishedSplitSize; private final boolean isSuspended; @@ -58,6 +62,19 @@ public MySqlBinlogSplit( int totalFinishedSplitSize, boolean isSuspended) { super(splitId); + + Set seenSplitIds = new HashSet<>(); + for (FinishedSnapshotSplitInfo splitInfo : finishedSnapshotSplitInfos) { + if (seenSplitIds.contains(splitInfo.getSplitId())) { + throw new IllegalArgumentException( + String.format( + "Found duplicate split ID %s in finished snapshot split infos", + splitInfo.getSplitId())); + } + + seenSplitIds.add(splitInfo.getSplitId()); + } + this.startingOffset = startingOffset; this.endingOffset = endingOffset; this.finishedSnapshotSplitInfos = finishedSnapshotSplitInfos; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplitTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplitTest.java index f26c3a0b858..ff310d367e8 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplitTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplitTest.java @@ -148,6 +148,32 @@ void testTruncatedTablesForLog() { Assertions.assertThat(binlogSplit.getTables()).isEqualTo(expectedTables); } + @Test + public void duplicateSplitInfo() { + FinishedSnapshotSplitInfo info = + new FinishedSnapshotSplitInfo( + new TableId("catalog", "schema", "table"), + "split", + null, + null, + BinlogOffset.ofLatest()); + List infos = new ArrayList<>(); + infos.add(info); + infos.add(info); + + Assertions.assertThatThrownBy( + () -> + new MySqlBinlogSplit( + "binlog-split", + BinlogOffset.ofLatest(), + null, + infos, + Collections.emptyMap(), + 0, + false)) + .isExactlyInstanceOf(IllegalArgumentException.class); + } + /** A mock implementation for {@link Table} which is used for unit tests. */ private static class MockTable implements Table { private final TableId tableId; From 9f1635624943350237d13ad89bcd1652dafa43de Mon Sep 17 00:00:00 2001 From: Sergei Morozov Date: Mon, 11 Aug 2025 12:33:24 -0700 Subject: [PATCH 5/5] [FLINK-38218]: Add a MySqlSnapshotSplitAssigner test --- .../MySqlSnapshotSplitAssignerTest.java | 92 +++++++++++++++++++ 1 file changed, 92 insertions(+) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java index 6347b2d078e..ca5dff5bdff 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java @@ -36,6 +36,11 @@ import org.assertj.core.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import javax.annotation.Nullable; import java.time.ZoneId; import java.util.ArrayList; @@ -47,6 +52,7 @@ import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND; @@ -528,6 +534,92 @@ void testSplitEvenlySizedChunksEndingFirst() { assertThat(splits).isEqualTo(expected); } + @ParameterizedTest + @MethodSource + void testFinishedSnapshotSplitInfosAreInOrderOfAssignment( + String table1Name, String table2Name) { + List tableNames = new ArrayList<>(); + tableNames.add(table1Name); + + SnapshotPendingSplitsState state; + + try (MySqlSnapshotSplitAssigner assigner = createAssigner(tableNames, null)) { + state = processAllSplitsAndSnapshotState(assigner, 1L); + } + + tableNames.add(table2Name); + + try (MySqlSnapshotSplitAssigner assigner = createAssigner(tableNames, state)) { + state = processAllSplitsAndSnapshotState(assigner, 2L); + } + + try (MySqlSnapshotSplitAssigner assigner = createAssigner(tableNames, state)) { + List finishedSnapshotSplitTableNames = + assigner.getFinishedSplitInfos().stream() + .map(i -> i.getTableId().table()) + .collect(Collectors.toList()); + + assertThat(finishedSnapshotSplitTableNames).isEqualTo(tableNames); + } + } + + /** + * Use various combinations of table names to ensure that the finished snapshot split infos are + * in the order of assignment, not the order of table names. + */ + public static Stream testFinishedSnapshotSplitInfosAreInOrderOfAssignment() { + String table1Name = "customers"; + String table2Name = "customers_1"; + + return Stream.of( + Arguments.of(table1Name, table2Name), Arguments.of(table2Name, table1Name)); + } + + private MySqlSnapshotSplitAssigner createAssigner( + List tableNames, @Nullable SnapshotPendingSplitsState state) { + int currentParallelism = 1; + + if (state == null) { + return new MySqlSnapshotSplitAssigner( + createConfiguration(tableNames), + currentParallelism, + new ArrayList<>(), + true, + getMySqlSplitEnumeratorContext()); + } + + return new MySqlSnapshotSplitAssigner( + createConfiguration(tableNames), + currentParallelism, + state, + getMySqlSplitEnumeratorContext()); + } + + private MySqlSourceConfig createConfiguration(List tableNames) { + return getConfig( + customerDatabase, + Integer.MAX_VALUE, + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + tableNames.toArray(new String[0]), + "id", + true, + false); + } + + private SnapshotPendingSplitsState processAllSplitsAndSnapshotState( + MySqlSnapshotSplitAssigner assigner, long checkpointId) { + assigner.open(); + + Optional optional; + while ((optional = assigner.getNext()).isPresent()) { + assigner.onFinishedSplits( + Collections.singletonMap(optional.get().splitId(), BinlogOffset.ofLatest())); + } + + return assigner.snapshotState(checkpointId); + } + private List getTestAssignSnapshotSplits( int splitSize, double distributionFactorUpper,