Skip to content

Commit 1646715

Browse files
[FLINK-37484][cdc-connector] unevenly chunk split also enables unbounded-chunk-first.enabled. (#3954)
1 parent 69ec337 commit 1646715

File tree

9 files changed

+132
-98
lines changed

9 files changed

+132
-98
lines changed

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -366,16 +366,20 @@ private void splitTable(TableId nextTable) {
366366
hasRecordSchema = true;
367367
tableSchemas.putAll(splits.iterator().next().getTableSchemas());
368368
}
369-
final List<SchemalessSnapshotSplit> schemalessSnapshotSplits =
370-
splits.stream()
371-
.map(SnapshotSplit::toSchemalessSnapshotSplit)
372-
.collect(Collectors.toList());
369+
List<String> splitIds = new ArrayList<>();
370+
for (SnapshotSplit split : splits) {
371+
SchemalessSnapshotSplit schemalessSnapshotSplit =
372+
split.toSchemalessSnapshotSplit();
373+
splitIds.add(schemalessSnapshotSplit.splitId());
374+
if (sourceConfig.isAssignUnboundedChunkFirst() && split.getSplitEnd() == null) {
375+
// assign unbounded split first
376+
remainingSplits.add(0, schemalessSnapshotSplit);
377+
} else {
378+
remainingSplits.add(schemalessSnapshotSplit);
379+
}
380+
}
381+
373382
chunkNum += splits.size();
374-
remainingSplits.addAll(schemalessSnapshotSplits);
375-
List<String> splitIds =
376-
schemalessSnapshotSplits.stream()
377-
.map(SchemalessSnapshotSplit::splitId)
378-
.collect(Collectors.toList());
379383
enumeratorMetrics.getTableMetrics(nextTable).addNewSplits(splitIds);
380384

381385
if (!chunkSplitter.hasNextChunk()) {

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/ChunkRange.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,4 +83,9 @@ public boolean equals(Object o) {
8383
public int hashCode() {
8484
return Objects.hash(chunkStart, chunkEnd);
8585
}
86+
87+
@Override
88+
public String toString() {
89+
return "ChunkRange{chunkStart=" + chunkStart + ", chunkEnd=" + chunkEnd + '}';
90+
}
8691
}

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -469,15 +469,8 @@ private List<ChunkRange> splitEvenlySizedChunks(
469469
break;
470470
}
471471
}
472-
// add the unbounded split
473-
// assign unbounded split first, both the largest and smallest unbounded chunks are
474-
// completed
475-
// in the first two splits
476-
if (sourceConfig.isAssignUnboundedChunkFirst()) {
477-
splits.add(0, ChunkRange.of(chunkStart, null));
478-
} else {
479-
splits.add(ChunkRange.of(chunkStart, null));
480-
}
472+
// add the ending split
473+
splits.add(ChunkRange.of(chunkStart, null));
481474
return splits;
482475
}
483476

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/ChunkRange.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,4 +83,9 @@ public boolean equals(Object o) {
8383
public int hashCode() {
8484
return Objects.hash(chunkStart, chunkEnd);
8585
}
86+
87+
@Override
88+
public String toString() {
89+
return "ChunkRange{chunkStart=" + chunkStart + ", chunkEnd=" + chunkEnd + '}';
90+
}
8691
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -316,15 +316,9 @@ public List<ChunkRange> splitEvenlySizedChunks(
316316
break;
317317
}
318318
}
319-
// add the unbounded split
320-
// assign unbounded split first, both the largest and smallest unbounded chunks are
321-
// completed
322-
// in the first two splits
323-
if (sourceConfig.isAssignUnboundedChunkFirst()) {
324-
splits.add(0, ChunkRange.of(chunkStart, null));
325-
} else {
326-
splits.add(ChunkRange.of(chunkStart, null));
327-
}
319+
320+
// add the ending split
321+
splits.add(ChunkRange.of(chunkStart, null));
328322
return splits;
329323
}
330324

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -324,12 +324,19 @@ private void splitTable(TableId nextTable) {
324324
tableSchema.putAll(splits.iterator().next().getTableSchemas());
325325
tableSchemas.putAll(tableSchema);
326326
}
327-
final List<MySqlSchemalessSnapshotSplit> schemaLessSnapshotSplits =
328-
splits.stream()
329-
.map(MySqlSnapshotSplit::toSchemalessSnapshotSplit)
330-
.collect(Collectors.toList());
327+
328+
for (MySqlSnapshotSplit split : splits) {
329+
MySqlSchemalessSnapshotSplit schemalessSnapshotSplit =
330+
split.toSchemalessSnapshotSplit();
331+
if (sourceConfig.isAssignUnboundedChunkFirst() && split.getSplitEnd() == null) {
332+
// assign unbounded split first
333+
remainingSplits.add(0, schemalessSnapshotSplit);
334+
} else {
335+
remainingSplits.add(schemalessSnapshotSplit);
336+
}
337+
}
338+
331339
chunkNum += splits.size();
332-
remainingSplits.addAll(schemaLessSnapshotSplits);
333340
if (!chunkSplitter.hasNextChunk()) {
334341
remainingTables.remove(nextTable);
335342
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -85,34 +85,4 @@ void testSplitEvenlySizedChunksNormal() {
8585
ChunkRange.of(2147483637, 2147483647),
8686
ChunkRange.of(2147483647, null));
8787
}
88-
89-
@Test
90-
public void testSplitEvenlySizedChunksEndingFirst() {
91-
MySqlSourceConfig sourceConfig =
92-
new MySqlSourceConfigFactory()
93-
.startupOptions(StartupOptions.initial())
94-
.databaseList("")
95-
.tableList("")
96-
.hostname("")
97-
.username("")
98-
.password("")
99-
.serverTimeZone(ZoneId.of("UTC").toString())
100-
.assignUnboundedChunkFirst(true)
101-
.createConfig(0);
102-
MySqlChunkSplitter splitter = new MySqlChunkSplitter(null, sourceConfig);
103-
104-
List<ChunkRange> res =
105-
splitter.splitEvenlySizedChunks(
106-
new TableId("catalog", "db", "tab"),
107-
Integer.MAX_VALUE - 20,
108-
Integer.MAX_VALUE,
109-
20,
110-
10,
111-
10);
112-
Assertions.assertThat(res)
113-
.containsExactly(
114-
ChunkRange.of(2147483647, null),
115-
ChunkRange.of(null, 2147483637),
116-
ChunkRange.of(2147483637, 2147483647));
117-
}
11888
}

0 commit comments

Comments
 (0)