Skip to content

Commit 5eaf9eb

Browse files
authored
[FLINK-38134][cdc-common] Cannot recognize DropColumnEvent in method SchemaMergingUtils.getSchemaDifference(#4065)
1 parent a26f41d commit 5eaf9eb

File tree

2 files changed

+29
-1
lines changed

2 files changed

+29
-1
lines changed

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.flink.cdc.common.event.AddColumnEvent;
3030
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
3131
import org.apache.flink.cdc.common.event.CreateTableEvent;
32+
import org.apache.flink.cdc.common.event.DropColumnEvent;
3233
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
3334
import org.apache.flink.cdc.common.event.TableId;
3435
import org.apache.flink.cdc.common.schema.Column;
@@ -212,6 +213,7 @@ public static List<SchemaChangeEvent> getSchemaDifference(
212213
oldTypeMapping.put(columnName, beforeType);
213214
newTypeMapping.put(columnName, afterType);
214215
}
216+
beforeColumns.remove(columnName);
215217
} else {
216218
if (afterWhichColumnPosition == null) {
217219
appendedColumns.add(
@@ -238,6 +240,10 @@ public static List<SchemaChangeEvent> getSchemaDifference(
238240
new AlterColumnTypeEvent(tableId, newTypeMapping, oldTypeMapping));
239241
}
240242

243+
if (!beforeColumns.isEmpty()) {
244+
schemaChangeEvents.add(
245+
new DropColumnEvent(tableId, new ArrayList<>(beforeColumns.keySet())));
246+
}
241247
return schemaChangeEvents;
242248
}
243249

flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaMergingUtilsTest.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.flink.cdc.common.event.AddColumnEvent;
2828
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
2929
import org.apache.flink.cdc.common.event.CreateTableEvent;
30+
import org.apache.flink.cdc.common.event.DropColumnEvent;
3031
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
3132
import org.apache.flink.cdc.common.event.TableId;
3233
import org.apache.flink.cdc.common.schema.Column;
@@ -417,7 +418,28 @@ TABLE_ID, of("id", BIGINT), of("name", VARCHAR(17), "id", BIGINT)))
417418
TABLE_ID,
418419
Collections.singletonMap("name", STRING),
419420
Collections.singletonMap("name", VARCHAR(17))));
420-
421+
Assertions.assertThat(
422+
getSchemaDifference(
423+
TABLE_ID,
424+
of("id", BIGINT, "name", STRING, "number", BIGINT),
425+
of("id", BIGINT)))
426+
.as("test remove id while add gentle")
427+
.containsExactly(new DropColumnEvent(TABLE_ID, Arrays.asList("number", "name")));
428+
Assertions.assertThat(
429+
getSchemaDifference(
430+
TABLE_ID,
431+
of("id", BIGINT, "name", STRING, "number", BIGINT),
432+
of("id", BIGINT, "name", STRING, "gentle", STRING)))
433+
.as("test remove id while add gentle")
434+
.containsExactly(
435+
new AddColumnEvent(
436+
TABLE_ID,
437+
Collections.singletonList(
438+
new AddColumnEvent.ColumnWithPosition(
439+
Column.physicalColumn("gentle", STRING),
440+
AddColumnEvent.ColumnPosition.AFTER,
441+
"name"))),
442+
new DropColumnEvent(TABLE_ID, Collections.singletonList("number")));
421443
Stream.of(TINYINT, SMALLINT, INT)
422444
.forEach(
423445
type ->

0 commit comments

Comments
 (0)