Skip to content

[FLINK-38218] Fix MySQL CDC binlog split metadata split transmission #4087

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,10 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

/**
* A {@link MySqlSplitAssigner} that splits tables into small chunk splits based on primary key
Expand Down Expand Up @@ -208,9 +206,7 @@ public void close() {

private MySqlBinlogSplit createBinlogSplit() {
final List<MySqlSchemalessSnapshotSplit> assignedSnapshotSplit =
snapshotSplitAssigner.getAssignedSplits().values().stream()
.sorted(Comparator.comparing(MySqlSplit::splitId))
.collect(Collectors.toList());
new ArrayList<>(snapshotSplitAssigner.getAssignedSplits().values());

Map<String, BinlogOffset> splitFinishedOffsets =
snapshotSplitAssigner.getSplitFinishedOffsets();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import org.apache.flink.cdc.connectors.mysql.source.connection.JdbcConnectionPools;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSourceReader;
import org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSchemalessSnapshotSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
Expand Down Expand Up @@ -73,7 +74,21 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {

private final List<TableId> alreadyProcessedTables;
private final List<MySqlSchemalessSnapshotSplit> remainingSplits;
private final Map<String, MySqlSchemalessSnapshotSplit> assignedSplits;

/**
* The splits that have been assigned to a reader. Once a split is finished, it remains in this
* map. An entry added to {@link #splitFinishedOffsets} indicates that the split has been
* finished. If reading the split fails, it is removed from this map.
*
* <p>{@link MySqlSourceReader} relies on the order of elements within the map:
*
* <ol>
* <li>It must correspond to the order of assignment of the splits to readers.
* <li>The order must be retained across job restarts.
* </ol>
*/
private final LinkedHashMap<String, MySqlSchemalessSnapshotSplit> assignedSplits;

private final Map<TableId, TableChanges.TableChange> tableSchemas;
private final Map<String, BinlogOffset> splitFinishedOffsets;
private final MySqlSourceConfig sourceConfig;
Expand Down Expand Up @@ -141,7 +156,7 @@ private MySqlSnapshotSplitAssigner(
int currentParallelism,
List<TableId> alreadyProcessedTables,
List<MySqlSchemalessSnapshotSplit> remainingSplits,
Map<String, MySqlSchemalessSnapshotSplit> assignedSplits,
LinkedHashMap<String, MySqlSchemalessSnapshotSplit> assignedSplits,
Map<TableId, TableChanges.TableChange> tableSchemas,
Map<String, BinlogOffset> splitFinishedOffsets,
AssignerStatus assignerStatus,
Expand All @@ -154,17 +169,7 @@ private MySqlSnapshotSplitAssigner(
this.currentParallelism = currentParallelism;
this.alreadyProcessedTables = alreadyProcessedTables;
this.remainingSplits = new CopyOnWriteArrayList<>(remainingSplits);
// When job restore from savepoint, sort the existing tables and newly added tables
// to let enumerator only send newly added tables' BinlogSplitMetaEvent
this.assignedSplits =
assignedSplits.entrySet().stream()
.sorted(Entry.comparingByKey())
.collect(
Collectors.toMap(
Entry::getKey,
Entry::getValue,
(o, o2) -> o,
LinkedHashMap::new));
this.assignedSplits = assignedSplits;
this.tableSchemas = tableSchemas;
this.splitFinishedOffsets = splitFinishedOffsets;
this.assignerStatus = assignerStatus;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -196,12 +197,12 @@ private SnapshotPendingSplitsState deserializeLegacySnapshotPendingSplitsState(
int splitVersion, DataInputDeserializer in) throws IOException {
List<TableId> alreadyProcessedTables = readTableIds(in);
List<MySqlSnapshotSplit> remainingSplits = readMySqlSnapshotSplits(splitVersion, in);
Map<String, MySqlSnapshotSplit> assignedSnapshotSplits =
LinkedHashMap<String, MySqlSnapshotSplit> assignedSnapshotSplits =
readAssignedSnapshotSplits(splitVersion, in);

final List<MySqlSchemalessSnapshotSplit> remainingSchemalessSplits = new ArrayList<>();
final Map<String, MySqlSchemalessSnapshotSplit> assignedSchemalessSnapshotSplits =
new HashMap<>();
final LinkedHashMap<String, MySqlSchemalessSnapshotSplit> assignedSchemalessSnapshotSplits =
new LinkedHashMap<>();
final Map<TableId, TableChanges.TableChange> tableSchemas = new HashMap<>();
remainingSplits.forEach(
split -> {
Expand Down Expand Up @@ -267,8 +268,8 @@ private SnapshotPendingSplitsState deserializeSnapshotPendingSplitsState(
List<TableId> remainingTableIds = readTableIds(in);
boolean isTableIdCaseSensitive = in.readBoolean();
final List<MySqlSchemalessSnapshotSplit> remainingSchemalessSplits = new ArrayList<>();
final Map<String, MySqlSchemalessSnapshotSplit> assignedSchemalessSnapshotSplits =
new HashMap<>();
final LinkedHashMap<String, MySqlSchemalessSnapshotSplit> assignedSchemalessSnapshotSplits =
new LinkedHashMap<>();
final Map<TableId, TableChanges.TableChange> tableSchemas = new HashMap<>();
remainingSplits.forEach(
split -> {
Expand Down Expand Up @@ -368,9 +369,9 @@ private void writeAssignedSnapshotSplits(
}
}

private Map<String, MySqlSnapshotSplit> readAssignedSnapshotSplits(
private LinkedHashMap<String, MySqlSnapshotSplit> readAssignedSnapshotSplits(
int splitVersion, DataInputDeserializer in) throws IOException {
Map<String, MySqlSnapshotSplit> assignedSplits = new HashMap<>();
LinkedHashMap<String, MySqlSnapshotSplit> assignedSplits = new LinkedHashMap<>();
final int size = in.readInt();
for (int i = 0; i < size; i++) {
String splitId = in.readUTF();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges.TableChange;

import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -50,7 +51,7 @@ public class SnapshotPendingSplitsState extends PendingSplitsState {
* The snapshot splits that the {@link MySqlSourceEnumerator} has assigned to {@link
* MySqlSplitReader}s.
*/
private final Map<String, MySqlSchemalessSnapshotSplit> assignedSplits;
private final LinkedHashMap<String, MySqlSchemalessSnapshotSplit> assignedSplits;

/**
* The offsets of finished (snapshot) splits that the {@link MySqlSourceEnumerator} has received
Expand All @@ -75,7 +76,7 @@ public class SnapshotPendingSplitsState extends PendingSplitsState {
public SnapshotPendingSplitsState(
List<TableId> alreadyProcessedTables,
List<MySqlSchemalessSnapshotSplit> remainingSplits,
Map<String, MySqlSchemalessSnapshotSplit> assignedSplits,
LinkedHashMap<String, MySqlSchemalessSnapshotSplit> assignedSplits,
Map<TableId, TableChange> tableSchemas,
Map<String, BinlogOffset> splitFinishedOffsets,
AssignerStatus assignerStatus,
Expand Down Expand Up @@ -103,7 +104,7 @@ public List<MySqlSchemalessSnapshotSplit> getRemainingSplits() {
return remainingSplits;
}

public Map<String, MySqlSchemalessSnapshotSplit> getAssignedSplits() {
public LinkedHashMap<String, MySqlSchemalessSnapshotSplit> getAssignedSplits() {
return assignedSplits;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,8 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -421,15 +419,24 @@ private void fillMetadataForBinlogSplit(BinlogSplitMetaEvent metadataEvent) {
binlogSplit, receivedTotalFinishedSplitSize);
uncompletedBinlogSplits.put(binlogSplit.splitId(), binlogSplit);
} else if (receivedMetaGroupId == expectedMetaGroupId) {
List<FinishedSnapshotSplitInfo> newAddedMetadataGroup;
Set<String> existedSplitsOfLastGroup =
getExistedSplitsOfLastGroup(
binlogSplit.getFinishedSnapshotSplitInfos(),
sourceConfig.getSplitMetaGroupSize());
newAddedMetadataGroup =
metadataEvent.getMetaGroup().stream()
int expectedNumberOfAlreadyRetrievedElements =
binlogSplit.getFinishedSnapshotSplitInfos().size()
% sourceConfig.getSplitMetaGroupSize();
List<byte[]> metaGroup = metadataEvent.getMetaGroup();
if (expectedNumberOfAlreadyRetrievedElements > 0) {
LOG.info(
"Source reader {} is discarding the first {} out of {} elements of meta group {}.",
subtaskId,
expectedNumberOfAlreadyRetrievedElements,
metaGroup.size(),
receivedMetaGroupId);
metaGroup =
metaGroup.subList(
expectedNumberOfAlreadyRetrievedElements, metaGroup.size());
}
List<FinishedSnapshotSplitInfo> newAddedMetadataGroup =
metaGroup.stream()
.map(FinishedSnapshotSplitInfo::deserialize)
.filter(r -> !existedSplitsOfLastGroup.contains(r.getSplitId()))
.collect(Collectors.toList());

uncompletedBinlogSplits.put(
Expand Down Expand Up @@ -499,28 +506,6 @@ private MySqlBinlogSplit discoverTableSchemasForBinlogSplit(
}
}

private Set<String> getExistedSplitsOfLastGroup(
List<FinishedSnapshotSplitInfo> finishedSnapshotSplits, int metaGroupSize) {
int splitsNumOfLastGroup =
finishedSnapshotSplits.size() % sourceConfig.getSplitMetaGroupSize();
if (splitsNumOfLastGroup != 0) {
int lastGroupStart =
((int) (finishedSnapshotSplits.size() / sourceConfig.getSplitMetaGroupSize()))
* metaGroupSize;
// Keep same order with MySqlHybridSplitAssigner.createBinlogSplit() to avoid
// 'invalid request meta group id' error
List<String> sortedFinishedSnapshotSplits =
finishedSnapshotSplits.stream()
.map(FinishedSnapshotSplitInfo::getSplitId)
.sorted()
.collect(Collectors.toList());
return new HashSet<>(
sortedFinishedSnapshotSplits.subList(
lastGroupStart, lastGroupStart + splitsNumOfLastGroup));
}
return new HashSet<>();
}

private void logCurrentBinlogOffsets(List<MySqlSplit> splits, long checkpointId) {
if (!LOG.isInfoEnabled()) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -42,7 +43,10 @@ public class MySqlBinlogSplit extends MySqlSplit {

private final BinlogOffset startingOffset;
private final BinlogOffset endingOffset;

/** Split IDs of all elements must be unique. */
private final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos;

private final Map<TableId, TableChange> tableSchemas;
private final int totalFinishedSplitSize;
private final boolean isSuspended;
Expand All @@ -58,6 +62,19 @@ public MySqlBinlogSplit(
int totalFinishedSplitSize,
boolean isSuspended) {
super(splitId);

Set<String> seenSplitIds = new HashSet<>();
for (FinishedSnapshotSplitInfo splitInfo : finishedSnapshotSplitInfos) {
if (seenSplitIds.contains(splitInfo.getSplitId())) {
throw new IllegalArgumentException(
String.format(
"Found duplicate split ID %s in finished snapshot split infos",
splitInfo.getSplitId()));
}

seenSplitIds.add(splitInfo.getSplitId());
}

this.startingOffset = startingOffset;
this.endingOffset = endingOffset;
this.finishedSnapshotSplitInfos = finishedSnapshotSplitInfos;
Expand All @@ -74,14 +91,14 @@ public MySqlBinlogSplit(
List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos,
Map<TableId, TableChange> tableSchemas,
int totalFinishedSplitSize) {
super(splitId);
this.startingOffset = startingOffset;
this.endingOffset = endingOffset;
this.finishedSnapshotSplitInfos = finishedSnapshotSplitInfos;
this.tableSchemas = tableSchemas;
this.totalFinishedSplitSize = totalFinishedSplitSize;
this.isSuspended = false;
this.tablesForLog = getTablesForLog();
this(
splitId,
startingOffset,
endingOffset,
finishedSnapshotSplitInfos,
tableSchemas,
totalFinishedSplitSize,
false);
}

public BinlogOffset getStartingOffset() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -81,7 +82,7 @@ void testAssignMySqlBinlogSplitAfterAllSnapshotSplitsFinished() {
List<TableId> alreadyProcessedTables = Lists.newArrayList(tableId);
List<MySqlSchemalessSnapshotSplit> remainingSplits = new ArrayList<>();

Map<String, MySqlSchemalessSnapshotSplit> assignedSplits = new HashMap<>();
LinkedHashMap<String, MySqlSchemalessSnapshotSplit> assignedSplits = new LinkedHashMap<>();
Map<String, BinlogOffset> splitFinishedOffsets = new HashMap<>();

for (int i = 0; i < 5; i++) {
Expand Down
Loading
Loading