Skip to content

Commit 821e327

Browse files
committed
[FLINK-38218] Rely on stable order of assigned splits
1 parent b72a9be commit 821e327

File tree

3 files changed

+19
-48
lines changed

3 files changed

+19
-48
lines changed

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,10 @@
3333

3434
import java.util.ArrayList;
3535
import java.util.Collection;
36-
import java.util.Comparator;
3736
import java.util.HashMap;
3837
import java.util.List;
3938
import java.util.Map;
4039
import java.util.Optional;
41-
import java.util.stream.Collectors;
4240

4341
/**
4442
* A {@link MySqlSplitAssigner} that splits tables into small chunk splits based on primary key
@@ -208,9 +206,7 @@ public void close() {
208206

209207
private MySqlBinlogSplit createBinlogSplit() {
210208
final List<MySqlSchemalessSnapshotSplit> assignedSnapshotSplit =
211-
snapshotSplitAssigner.getAssignedSplits().values().stream()
212-
.sorted(Comparator.comparing(MySqlSplit::splitId))
213-
.collect(Collectors.toList());
209+
new ArrayList<>(snapshotSplitAssigner.getAssignedSplits().values());
214210

215211
Map<String, BinlogOffset> splitFinishedOffsets =
216212
snapshotSplitAssigner.getSplitFinishedOffsets();

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -169,17 +169,7 @@ private MySqlSnapshotSplitAssigner(
169169
this.currentParallelism = currentParallelism;
170170
this.alreadyProcessedTables = alreadyProcessedTables;
171171
this.remainingSplits = new CopyOnWriteArrayList<>(remainingSplits);
172-
// When job restore from savepoint, sort the existing tables and newly added tables
173-
// to let enumerator only send newly added tables' BinlogSplitMetaEvent
174-
this.assignedSplits =
175-
assignedSplits.entrySet().stream()
176-
.sorted(Entry.comparingByKey())
177-
.collect(
178-
Collectors.toMap(
179-
Entry::getKey,
180-
Entry::getValue,
181-
(o, o2) -> o,
182-
LinkedHashMap::new));
172+
this.assignedSplits = assignedSplits;
183173
this.tableSchemas = tableSchemas;
184174
this.splitFinishedOffsets = splitFinishedOffsets;
185175
this.assignerStatus = assignerStatus;

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java

Lines changed: 17 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,8 @@
6262
import java.util.ArrayList;
6363
import java.util.Collections;
6464
import java.util.HashMap;
65-
import java.util.HashSet;
6665
import java.util.List;
6766
import java.util.Map;
68-
import java.util.Set;
6967
import java.util.function.Supplier;
7068
import java.util.stream.Collectors;
7169

@@ -421,15 +419,24 @@ private void fillMetadataForBinlogSplit(BinlogSplitMetaEvent metadataEvent) {
421419
binlogSplit, receivedTotalFinishedSplitSize);
422420
uncompletedBinlogSplits.put(binlogSplit.splitId(), binlogSplit);
423421
} else if (receivedMetaGroupId == expectedMetaGroupId) {
424-
List<FinishedSnapshotSplitInfo> newAddedMetadataGroup;
425-
Set<String> existedSplitsOfLastGroup =
426-
getExistedSplitsOfLastGroup(
427-
binlogSplit.getFinishedSnapshotSplitInfos(),
428-
sourceConfig.getSplitMetaGroupSize());
429-
newAddedMetadataGroup =
430-
metadataEvent.getMetaGroup().stream()
422+
int expectedNumberOfAlreadyRetrievedElements =
423+
binlogSplit.getFinishedSnapshotSplitInfos().size()
424+
% sourceConfig.getSplitMetaGroupSize();
425+
List<byte[]> metaGroup = metadataEvent.getMetaGroup();
426+
if (expectedNumberOfAlreadyRetrievedElements > 0) {
427+
LOG.info(
428+
"Source reader {} is discarding the first {} out of {} elements of meta group {}.",
429+
subtaskId,
430+
expectedNumberOfAlreadyRetrievedElements,
431+
metaGroup.size(),
432+
receivedMetaGroupId);
433+
metaGroup =
434+
metaGroup.subList(
435+
expectedNumberOfAlreadyRetrievedElements, metaGroup.size());
436+
}
437+
List<FinishedSnapshotSplitInfo> newAddedMetadataGroup =
438+
metaGroup.stream()
431439
.map(FinishedSnapshotSplitInfo::deserialize)
432-
.filter(r -> !existedSplitsOfLastGroup.contains(r.getSplitId()))
433440
.collect(Collectors.toList());
434441

435442
uncompletedBinlogSplits.put(
@@ -499,28 +506,6 @@ private MySqlBinlogSplit discoverTableSchemasForBinlogSplit(
499506
}
500507
}
501508

502-
private Set<String> getExistedSplitsOfLastGroup(
503-
List<FinishedSnapshotSplitInfo> finishedSnapshotSplits, int metaGroupSize) {
504-
int splitsNumOfLastGroup =
505-
finishedSnapshotSplits.size() % sourceConfig.getSplitMetaGroupSize();
506-
if (splitsNumOfLastGroup != 0) {
507-
int lastGroupStart =
508-
((int) (finishedSnapshotSplits.size() / sourceConfig.getSplitMetaGroupSize()))
509-
* metaGroupSize;
510-
// Keep same order with MySqlHybridSplitAssigner.createBinlogSplit() to avoid
511-
// 'invalid request meta group id' error
512-
List<String> sortedFinishedSnapshotSplits =
513-
finishedSnapshotSplits.stream()
514-
.map(FinishedSnapshotSplitInfo::getSplitId)
515-
.sorted()
516-
.collect(Collectors.toList());
517-
return new HashSet<>(
518-
sortedFinishedSnapshotSplits.subList(
519-
lastGroupStart, lastGroupStart + splitsNumOfLastGroup));
520-
}
521-
return new HashSet<>();
522-
}
523-
524509
private void logCurrentBinlogOffsets(List<MySqlSplit> splits, long checkpointId) {
525510
if (!LOG.isInfoEnabled()) {
526511
return;

0 commit comments

Comments
 (0)