Skip to content

Commit b72a9be

Browse files
committed
[FLINK-38218] Require assigned snapshot splits to be ordered
1 parent 224a075 commit b72a9be

File tree

6 files changed

+36
-15
lines changed

6 files changed

+36
-15
lines changed

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions;
2727
import org.apache.flink.cdc.connectors.mysql.source.connection.JdbcConnectionPools;
2828
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
29+
import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSourceReader;
2930
import org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
3031
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSchemalessSnapshotSplit;
3132
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
@@ -73,7 +74,21 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
7374

7475
private final List<TableId> alreadyProcessedTables;
7576
private final List<MySqlSchemalessSnapshotSplit> remainingSplits;
76-
private final Map<String, MySqlSchemalessSnapshotSplit> assignedSplits;
77+
78+
/**
79+
* The splits that have been assigned to a reader. Once a split is finished, it remains in this
80+
* map. An entry added to {@link #splitFinishedOffsets} indicates that the split has been
81+
* finished. If reading the split fails, it is removed from this map.
82+
*
83+
* <p>{@link MySqlSourceReader} relies on the order of elements within the map:
84+
*
85+
* <ol>
86+
* <li>It must correspond to the order of assignment of the splits to readers.
87+
* <li>The order must be retained across job restarts.
88+
* </ol>
89+
*/
90+
private final LinkedHashMap<String, MySqlSchemalessSnapshotSplit> assignedSplits;
91+
7792
private final Map<TableId, TableChanges.TableChange> tableSchemas;
7893
private final Map<String, BinlogOffset> splitFinishedOffsets;
7994
private final MySqlSourceConfig sourceConfig;
@@ -141,7 +156,7 @@ private MySqlSnapshotSplitAssigner(
141156
int currentParallelism,
142157
List<TableId> alreadyProcessedTables,
143158
List<MySqlSchemalessSnapshotSplit> remainingSplits,
144-
Map<String, MySqlSchemalessSnapshotSplit> assignedSplits,
159+
LinkedHashMap<String, MySqlSchemalessSnapshotSplit> assignedSplits,
145160
Map<TableId, TableChanges.TableChange> tableSchemas,
146161
Map<String, BinlogOffset> splitFinishedOffsets,
147162
AssignerStatus assignerStatus,

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializer.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.ArrayList;
3636
import java.util.Collection;
3737
import java.util.HashMap;
38+
import java.util.LinkedHashMap;
3839
import java.util.List;
3940
import java.util.Map;
4041

@@ -196,12 +197,12 @@ private SnapshotPendingSplitsState deserializeLegacySnapshotPendingSplitsState(
196197
int splitVersion, DataInputDeserializer in) throws IOException {
197198
List<TableId> alreadyProcessedTables = readTableIds(in);
198199
List<MySqlSnapshotSplit> remainingSplits = readMySqlSnapshotSplits(splitVersion, in);
199-
Map<String, MySqlSnapshotSplit> assignedSnapshotSplits =
200+
LinkedHashMap<String, MySqlSnapshotSplit> assignedSnapshotSplits =
200201
readAssignedSnapshotSplits(splitVersion, in);
201202

202203
final List<MySqlSchemalessSnapshotSplit> remainingSchemalessSplits = new ArrayList<>();
203-
final Map<String, MySqlSchemalessSnapshotSplit> assignedSchemalessSnapshotSplits =
204-
new HashMap<>();
204+
final LinkedHashMap<String, MySqlSchemalessSnapshotSplit> assignedSchemalessSnapshotSplits =
205+
new LinkedHashMap<>();
205206
final Map<TableId, TableChanges.TableChange> tableSchemas = new HashMap<>();
206207
remainingSplits.forEach(
207208
split -> {
@@ -267,8 +268,8 @@ private SnapshotPendingSplitsState deserializeSnapshotPendingSplitsState(
267268
List<TableId> remainingTableIds = readTableIds(in);
268269
boolean isTableIdCaseSensitive = in.readBoolean();
269270
final List<MySqlSchemalessSnapshotSplit> remainingSchemalessSplits = new ArrayList<>();
270-
final Map<String, MySqlSchemalessSnapshotSplit> assignedSchemalessSnapshotSplits =
271-
new HashMap<>();
271+
final LinkedHashMap<String, MySqlSchemalessSnapshotSplit> assignedSchemalessSnapshotSplits =
272+
new LinkedHashMap<>();
272273
final Map<TableId, TableChanges.TableChange> tableSchemas = new HashMap<>();
273274
remainingSplits.forEach(
274275
split -> {
@@ -368,9 +369,9 @@ private void writeAssignedSnapshotSplits(
368369
}
369370
}
370371

371-
private Map<String, MySqlSnapshotSplit> readAssignedSnapshotSplits(
372+
private LinkedHashMap<String, MySqlSnapshotSplit> readAssignedSnapshotSplits(
372373
int splitVersion, DataInputDeserializer in) throws IOException {
373-
Map<String, MySqlSnapshotSplit> assignedSplits = new HashMap<>();
374+
LinkedHashMap<String, MySqlSnapshotSplit> assignedSplits = new LinkedHashMap<>();
374375
final int size = in.readInt();
375376
for (int i = 0; i < size; i++) {
376377
String splitId = in.readUTF();

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/SnapshotPendingSplitsState.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.debezium.relational.TableId;
2828
import io.debezium.relational.history.TableChanges.TableChange;
2929

30+
import java.util.LinkedHashMap;
3031
import java.util.List;
3132
import java.util.Map;
3233
import java.util.Objects;
@@ -50,7 +51,7 @@ public class SnapshotPendingSplitsState extends PendingSplitsState {
5051
* The snapshot splits that the {@link MySqlSourceEnumerator} has assigned to {@link
5152
* MySqlSplitReader}s.
5253
*/
53-
private final Map<String, MySqlSchemalessSnapshotSplit> assignedSplits;
54+
private final LinkedHashMap<String, MySqlSchemalessSnapshotSplit> assignedSplits;
5455

5556
/**
5657
* The offsets of finished (snapshot) splits that the {@link MySqlSourceEnumerator} has received
@@ -75,7 +76,7 @@ public class SnapshotPendingSplitsState extends PendingSplitsState {
7576
public SnapshotPendingSplitsState(
7677
List<TableId> alreadyProcessedTables,
7778
List<MySqlSchemalessSnapshotSplit> remainingSplits,
78-
Map<String, MySqlSchemalessSnapshotSplit> assignedSplits,
79+
LinkedHashMap<String, MySqlSchemalessSnapshotSplit> assignedSplits,
7980
Map<TableId, TableChange> tableSchemas,
8081
Map<String, BinlogOffset> splitFinishedOffsets,
8182
AssignerStatus assignerStatus,
@@ -103,7 +104,7 @@ public List<MySqlSchemalessSnapshotSplit> getRemainingSplits() {
103104
return remainingSplits;
104105
}
105106

106-
public Map<String, MySqlSchemalessSnapshotSplit> getAssignedSplits() {
107+
public LinkedHashMap<String, MySqlSchemalessSnapshotSplit> getAssignedSplits() {
107108
return assignedSplits;
108109
}
109110

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssignerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import java.util.Arrays;
4848
import java.util.Comparator;
4949
import java.util.HashMap;
50+
import java.util.LinkedHashMap;
5051
import java.util.List;
5152
import java.util.Map;
5253
import java.util.Optional;
@@ -81,7 +82,7 @@ void testAssignMySqlBinlogSplitAfterAllSnapshotSplitsFinished() {
8182
List<TableId> alreadyProcessedTables = Lists.newArrayList(tableId);
8283
List<MySqlSchemalessSnapshotSplit> remainingSplits = new ArrayList<>();
8384

84-
Map<String, MySqlSchemalessSnapshotSplit> assignedSplits = new HashMap<>();
85+
LinkedHashMap<String, MySqlSchemalessSnapshotSplit> assignedSplits = new LinkedHashMap<>();
8586
Map<String, BinlogOffset> splitFinishedOffsets = new HashMap<>();
8687

8788
for (int i = 0; i < 5; i++) {

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import java.util.Arrays;
4343
import java.util.Collections;
4444
import java.util.HashMap;
45+
import java.util.LinkedHashMap;
4546
import java.util.List;
4647
import java.util.Map;
4748
import java.util.Optional;
@@ -647,7 +648,7 @@ private List<String> getTestAssignSnapshotSplitsFromCheckpoint(AssignerStatus as
647648
null,
648649
null));
649650

650-
Map<String, MySqlSchemalessSnapshotSplit> assignedSplits = new HashMap<>();
651+
LinkedHashMap<String, MySqlSchemalessSnapshotSplit> assignedSplits = new LinkedHashMap<>();
651652
assignedSplits.put(
652653
processedTable + ":0",
653654
new MySqlSchemalessSnapshotSplit(

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializerTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.Arrays;
4040
import java.util.Collections;
4141
import java.util.HashMap;
42+
import java.util.LinkedHashMap;
4243
import java.util.List;
4344
import java.util.Map;
4445
import java.util.stream.Stream;
@@ -147,7 +148,8 @@ private static SnapshotPendingSplitsState getTestSnapshotPendingSplitsState(
147148
remainingSplits.add(getTestSchemalessSnapshotSplit(tableId1, 2));
148149
remainingSplits.add(getTestSchemalessSnapshotSplit(tableId1, 3));
149150

150-
final Map<String, MySqlSchemalessSnapshotSplit> assignedSnapshotSplits = new HashMap<>();
151+
final LinkedHashMap<String, MySqlSchemalessSnapshotSplit> assignedSnapshotSplits =
152+
new LinkedHashMap<>();
151153
Arrays.asList(
152154
getTestSchemalessSnapshotSplit(tableId0, 0),
153155
getTestSchemalessSnapshotSplit(tableId0, 1),

0 commit comments

Comments
 (0)