From 37fcc78fafe475cb0e467c656146b979808b9321 Mon Sep 17 00:00:00 2001 From: liuzeshan Date: Sun, 3 Aug 2025 01:05:55 +0800 Subject: [PATCH 1/2] Fix the problem of data loss in the GTID disordered scenario --- .../debezium/connector/mysql/GtidUtils.java | 75 +++++++++++++++---- .../connector/mysql/GtidUtilsTest.java | 6 +- 2 files changed, 66 insertions(+), 15 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java index fe25208f668..3de9b63fff4 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java @@ -17,13 +17,20 @@ package io.debezium.connector.mysql; -import java.util.ArrayList; +import com.google.common.collect.Range; +import com.google.common.collect.RangeSet; +import com.google.common.collect.TreeRangeSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** Utils for handling GTIDs. */ public class GtidUtils { + private static final Logger LOG = LoggerFactory.getLogger(GtidUtils.class); /** * This method corrects the GTID set that has been restored from a state or checkpoint using the @@ -39,21 +46,61 @@ public static GtidSet fixRestoredGtidSet(GtidSet serverGtidSet, GtidSet restored for (GtidSet.UUIDSet uuidSet : restoredGtidSet.getUUIDSets()) { GtidSet.UUIDSet serverUuidSet = newSet.get(uuidSet.getUUID()); if (serverUuidSet != null) { - long restoredIntervalEnd = getIntervalEnd(uuidSet); - List newIntervals = - new ArrayList<>(); - for (GtidSet.Interval serverInterval : serverUuidSet.getIntervals()) { - if (serverInterval.getEnd() <= restoredIntervalEnd) { - newIntervals.add( - new com.github.shyiko.mysql.binlog.GtidSet.Interval( - serverInterval.getStart(), serverInterval.getEnd())); - } else if (serverInterval.getStart() <= restoredIntervalEnd - && serverInterval.getEnd() > restoredIntervalEnd) { - newIntervals.add( - new com.github.shyiko.mysql.binlog.GtidSet.Interval( - serverInterval.getStart(), restoredIntervalEnd)); + long serverIntervalEnd = getIntervalEnd(serverUuidSet); + + // The end of the restored interval cannot be greater than the end of the server + // span + RangeSet restoredRangeSet = TreeRangeSet.create(); + for (GtidSet.Interval restoredInterval : uuidSet.getIntervals()) { + if (restoredInterval.getStart() > serverIntervalEnd) { + LOG.warn( + "Discard restored interval {} because it is greater than the end of the server span {}", + restoredInterval, + serverIntervalEnd); + } else if (restoredInterval.getEnd() > serverIntervalEnd) { + Range adjustedInterval = + Range.closed(restoredInterval.getStart(), serverIntervalEnd); + restoredRangeSet.add(adjustedInterval); + LOG.warn( + "Adjust restored interval {} to {} because it is greater than the end of the server span {}", + restoredInterval, + adjustedInterval, + serverIntervalEnd); + } else { + restoredRangeSet.add( + Range.closed( + restoredInterval.getStart(), restoredInterval.getEnd())); } } + + // Add the server intervals that do not overlap with the middle gap of the restored + // intervals + final Range restoredSmallestRange = + restoredRangeSet.asRanges().iterator().next(); + serverUuidSet.getIntervals().stream() + .filter( + interval -> + interval.getStart() + <= restoredSmallestRange.upperEndpoint()) + .forEach( + interval -> + restoredRangeSet.add( + Range.closed( + interval.getStart(), + Math.min( + interval.getEnd(), + restoredSmallestRange + .upperEndpoint())))); + + List newIntervals = + restoredRangeSet.asRanges().stream() + .map( + range -> + new com.github.shyiko.mysql.binlog.GtidSet.Interval( + range.lowerEndpoint(), + range.upperEndpoint())) + .collect(Collectors.toList()); + newSet.put( uuidSet.getUUID(), new GtidSet.UUIDSet( diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/GtidUtilsTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/GtidUtilsTest.java index d0269df4c25..b6343cf6a41 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/GtidUtilsTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/GtidUtilsTest.java @@ -48,7 +48,11 @@ void testFixingRestoredGtidSet() { serverGtidSet = new GtidSet("A:1-100:102-200,B:20-200"); restoredGtidSet = new GtidSet("A:106-150:152-200,C:1-100"); assertThat(fixRestoredGtidSet(serverGtidSet, restoredGtidSet)) - .hasToString("A:1-100:102-200,B:20-200,C:1-100"); + .hasToString("A:1-100:102-150:152-200,B:20-200,C:1-100"); + + serverGtidSet = new GtidSet("A:1-100"); + restoredGtidSet = new GtidSet("A:100-150:152-200"); + assertThat(fixRestoredGtidSet(serverGtidSet, restoredGtidSet)).hasToString("A:1-100"); } @Test From 835a2fd8dcd99219755a1f9abce1d9b4121ffcce Mon Sep 17 00:00:00 2001 From: liuzeshan Date: Wed, 6 Aug 2025 15:21:13 +0800 Subject: [PATCH 2/2] use ParameterizedTest --- .../connector/mysql/GtidUtilsTest.java | 70 ++++++++++++------- 1 file changed, 44 insertions(+), 26 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/GtidUtilsTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/GtidUtilsTest.java index b6343cf6a41..06b68810801 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/GtidUtilsTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/GtidUtilsTest.java @@ -17,42 +17,60 @@ package io.debezium.connector.mysql; -import org.junit.jupiter.api.Test; - import static io.debezium.connector.mysql.GtidUtils.fixRestoredGtidSet; import static io.debezium.connector.mysql.GtidUtils.mergeGtidSetInto; + import static org.assertj.core.api.Assertions.assertThat; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.stream.Stream; + /** Unit test for {@link GtidUtils}. */ class GtidUtilsTest { - @Test - void testFixingRestoredGtidSet() { - GtidSet serverGtidSet = new GtidSet("A:1-100"); - GtidSet restoredGtidSet = new GtidSet("A:30-100"); - assertThat(fixRestoredGtidSet(serverGtidSet, restoredGtidSet)).hasToString("A:1-100"); - serverGtidSet = new GtidSet("A:1-100"); - restoredGtidSet = new GtidSet("A:30-50"); - assertThat(fixRestoredGtidSet(serverGtidSet, restoredGtidSet)).hasToString("A:1-50"); + @ParameterizedTest(name = "{0}") + @MethodSource("gtidSetsProvider") + void testFixingRestoredGtidSet( + String description, String serverStr, String restoredStr, String expectedStr) { + GtidSet serverGtidSet = new GtidSet(serverStr); + GtidSet restoredGtidSet = new GtidSet(restoredStr); - serverGtidSet = new GtidSet("A:1-100:102-200,B:20-200"); - restoredGtidSet = new GtidSet("A:106-150"); - assertThat(fixRestoredGtidSet(serverGtidSet, restoredGtidSet)) - .hasToString("A:1-100:102-150,B:20-200"); + GtidSet result = fixRestoredGtidSet(serverGtidSet, restoredGtidSet); - serverGtidSet = new GtidSet("A:1-100:102-200,B:20-200"); - restoredGtidSet = new GtidSet("A:106-150,C:1-100"); - assertThat(fixRestoredGtidSet(serverGtidSet, restoredGtidSet)) - .hasToString("A:1-100:102-150,B:20-200,C:1-100"); - - serverGtidSet = new GtidSet("A:1-100:102-200,B:20-200"); - restoredGtidSet = new GtidSet("A:106-150:152-200,C:1-100"); - assertThat(fixRestoredGtidSet(serverGtidSet, restoredGtidSet)) - .hasToString("A:1-100:102-150:152-200,B:20-200,C:1-100"); + assertThat(result).hasToString(expectedStr); + } - serverGtidSet = new GtidSet("A:1-100"); - restoredGtidSet = new GtidSet("A:100-150:152-200"); - assertThat(fixRestoredGtidSet(serverGtidSet, restoredGtidSet)).hasToString("A:1-100"); + private static Stream gtidSetsProvider() { + return Stream.of( + Arguments.of( + "Basic example with a straightforward subset", + "A:1-100", + "A:1-50:63-100", + "A:1-50:63-100"), + Arguments.of( + "Restored starts midrange, single gap", "A:1-100", "A:45-80", "A:1-80"), + Arguments.of( + "Multiple intervals with gaps in restored", + "A:1-100,B:1-100", + "A:45-80:83-90:92-98,C:1-20", + "A:1-80:83-90:92-98,B:1-100,C:1-20"), + Arguments.of( + "Server has disjoint intervals, restored partially overlaps", + "A:1-50:60-90:95-200", + "A:45-50:65-70:96-100", + "A:1-50:65-70:96-100"), + Arguments.of( + "Restored completely covers server range", "A:1-100", "A:1-100", "A:1-100"), + Arguments.of( + "Restored partially covers server range", + "A:1-100:102-200", + "A:106-150:152-200", + "A:1-100:102-150:152-200"), + Arguments.of("Restored end exceeds server range", "A:1-100", "A:1-110", "A:1-100")); } @Test