Skip to content

Commit f8873a5

Browse files
committed
[FLINK-38218] Enforce no duplicate split infos in MySqlBinlogSplit
1 parent 6206647 commit f8873a5

File tree

2 files changed

+43
-0
lines changed

2 files changed

+43
-0
lines changed

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import javax.annotation.Nullable;
2929

3030
import java.util.ArrayList;
31+
import java.util.HashSet;
3132
import java.util.List;
3233
import java.util.Map;
3334
import java.util.Objects;
@@ -42,7 +43,10 @@ public class MySqlBinlogSplit extends MySqlSplit {
4243

4344
private final BinlogOffset startingOffset;
4445
private final BinlogOffset endingOffset;
46+
47+
/** Split IDs of all elements must be unique. */
4548
private final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos;
49+
4650
private final Map<TableId, TableChange> tableSchemas;
4751
private final int totalFinishedSplitSize;
4852
private final boolean isSuspended;
@@ -58,6 +62,19 @@ public MySqlBinlogSplit(
5862
int totalFinishedSplitSize,
5963
boolean isSuspended) {
6064
super(splitId);
65+
66+
Set<String> seenSplitIds = new HashSet<>();
67+
for (FinishedSnapshotSplitInfo splitInfo : finishedSnapshotSplitInfos) {
68+
if (seenSplitIds.contains(splitInfo.getSplitId())) {
69+
throw new IllegalArgumentException(
70+
String.format(
71+
"Found duplicate split ID %s in finished snapshot split infos",
72+
splitInfo.getSplitId()));
73+
}
74+
75+
seenSplitIds.add(splitInfo.getSplitId());
76+
}
77+
6178
this.startingOffset = startingOffset;
6279
this.endingOffset = endingOffset;
6380
this.finishedSnapshotSplitInfos = finishedSnapshotSplitInfos;

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplitTest.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,32 @@ void testTruncatedTablesForLog() {
148148
Assertions.assertThat(binlogSplit.getTables()).isEqualTo(expectedTables);
149149
}
150150

151+
@Test
152+
public void duplicateSplitInfo() {
153+
FinishedSnapshotSplitInfo info =
154+
new FinishedSnapshotSplitInfo(
155+
new TableId("catalog", "schema", "table"),
156+
"split",
157+
null,
158+
null,
159+
BinlogOffset.ofLatest());
160+
List<FinishedSnapshotSplitInfo> infos = new ArrayList<>();
161+
infos.add(info);
162+
infos.add(info);
163+
164+
Assertions.assertThatThrownBy(
165+
() ->
166+
new MySqlBinlogSplit(
167+
"binlog-split",
168+
BinlogOffset.ofLatest(),
169+
null,
170+
infos,
171+
Collections.emptyMap(),
172+
0,
173+
false))
174+
.isExactlyInstanceOf(IllegalArgumentException.class);
175+
}
176+
151177
/** A mock implementation for {@link Table} which is used for unit tests. */
152178
private static class MockTable implements Table {
153179
private final TableId tableId;

0 commit comments

Comments
 (0)