Skip to content

Commit ce6960b

Browse files
committed
fixed checkstyle
1 parent 5299758 commit ce6960b

File tree

4 files changed

+42
-17
lines changed

4 files changed

+42
-17
lines changed

docs/content/docs/connectors/flink-sources/mysql-cdc.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -874,6 +874,30 @@ There are two places that need to be taken care of.
874874
* If no update operation is performed on the specified column, the exactly-once semantics is ensured.
875875
* If the update operation is performed on the specified column, only the at-least-once semantics is ensured. However, you can specify primary keys at downstream and perform the idempotence operation to ensure data correctness.
876876

877+
#### Warning
878+
879+
Using a **non-primary key column** as the `scan.incremental.snapshot.chunk.key-column` for a MySQL table with primary keys may lead to data inconsistencies. Below is a scenario illustrating this issue and recommendations to mitigate potential problems.
880+
881+
#### Problem Scenario
882+
883+
- **Table Structure:**
884+
- **Primary Key:** `id`
885+
- **Chunk Key Column:** `pid` (Not a primary key)
886+
887+
- **Snapshot Splits:**
888+
- **Split 0:** `1 < pid <= 3`
889+
- **Split 1:** `3 < pid <= 5`
890+
891+
- **Operation:**
892+
- Two different subtasks are reading Split 0 and Split 1 concurrently.
893+
- An update operation changes `pid` from `2` to `4` for `id=0` while both splits are being read. This update occurs between the low and high watermark of both splits.
894+
895+
- **Result:**
896+
- **Split 0:** Contains the record `[id=0, pid=2]`
897+
- **Split 1:** Contains the record `[id=0, pid=4]`
898+
899+
Since the order of processing these records cannot be guaranteed, the final value of `pid` for `id=0` may end up being either `2` or `4`, leading to potential data inconsistencies.
900+
877901
### About converting binary type data to base64 encoded data
878902

879903
```sql

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,6 @@
106106
import static org.apache.flink.api.common.JobStatus.RUNNING;
107107
import static org.apache.flink.util.Preconditions.checkState;
108108
import static org.assertj.core.api.Assertions.assertThat;
109-
import static org.junit.jupiter.api.Assertions.assertTrue;
110109

111110
/** IT tests for {@link MySqlSource}. */
112111
@Timeout(value = 300, unit = TimeUnit.SECONDS)
@@ -777,7 +776,8 @@ public void testSnapshotFilters(String tableName, String chunkColumnName) throws
777776
assertEqualsInAnyOrder(
778777
Arrays.asList(expectedSnapshotData),
779778
fetchRowData(iterator, expectedSnapshotData.length));
780-
assertTrue(!hasNextData(iterator));
779+
assertThat(hasNextData(iterator)).isFalse();
780+
781781
jobClient.cancel().get();
782782
}
783783

@@ -855,7 +855,7 @@ public void testSnapshotFiltersEscape(String tableName, String chunkColumnName)
855855
assertEqualsInAnyOrder(
856856
Arrays.asList(expectedSnapshotData),
857857
fetchRowData(iterator, expectedSnapshotData.length));
858-
assertTrue(!hasNextData(iterator));
858+
assertThat(hasNextData(iterator)).isFalse();
859859
jobClient.cancel().get();
860860
}
861861

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/SnapshotFilterUtilsTest.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,12 @@
1818
package org.apache.flink.cdc.connectors.mysql.source.utils;
1919

2020
import io.debezium.relational.TableId;
21-
import org.junit.Test;
21+
import org.junit.jupiter.api.Test;
2222

2323
import java.util.HashMap;
2424
import java.util.Map;
2525

26-
import static org.junit.Assert.assertEquals;
27-
import static org.junit.Assert.assertNull;
26+
import static org.assertj.core.api.Assertions.assertThat;
2827

2928
/** Unit test for {@link org.apache.flink.cdc.connectors.mysql.source.utils.SnapshotFilterUtils}. */
3029
public class SnapshotFilterUtilsTest {
@@ -34,14 +33,16 @@ public void test() {
3433
Map<String, String> map = new HashMap<>();
3534
map.put("db.user", "id > 100");
3635
map.put("db.order_[0-9]+", "id > 200");
37-
assertEquals(
38-
"id > 100", SnapshotFilterUtils.getSnapshotFilter(map, TableId.parse("db.user")));
39-
assertEquals(
40-
"id > 200",
41-
SnapshotFilterUtils.getSnapshotFilter(map, TableId.parse("db.order_1")));
42-
assertEquals(
43-
"id > 200",
44-
SnapshotFilterUtils.getSnapshotFilter(map, TableId.parse("db.order_2")));
45-
assertNull(SnapshotFilterUtils.getSnapshotFilter(map, TableId.parse("db.shop")));
36+
37+
assertThat(SnapshotFilterUtils.getSnapshotFilter(map, TableId.parse("db.user")))
38+
.isEqualTo("id > 100");
39+
40+
assertThat(SnapshotFilterUtils.getSnapshotFilter(map, TableId.parse("db.order_1")))
41+
.isEqualTo("id > 200");
42+
43+
assertThat(SnapshotFilterUtils.getSnapshotFilter(map, TableId.parse("db.order_2")))
44+
.isEqualTo("id > 200");
45+
46+
assertThat(SnapshotFilterUtils.getSnapshotFilter(map, TableId.parse("db.shop"))).isNull();
4647
}
4748
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@
5858
import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST;
5959
import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
6060
import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.USE_LEGACY_JSON_FORMAT;
61-
import static org.junit.jupiter.api.Assertions.assertEquals;
61+
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
6262

6363
/** Test for {@link MySqlTableSource} created by {@link MySqlTableSourceFactory}. */
6464
class MySqlTableSourceFactoryTest {
@@ -485,7 +485,7 @@ void testStartupFromEarliestOffset() {
485485
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
486486
false,
487487
null);
488-
assertEquals(expectedSource, actualSource);
488+
assertThat(actualSource).isEqualTo(expectedSource);
489489
}
490490

491491
@Test

0 commit comments

Comments
 (0)