Skip to content

Commit c56aa50

Browse files
suhwan-cheonMrart
authored andcommitted
[FLINK-38248][pipeline-connector][paimon][starrocks][doris] The default value of '0000-00-00 00:00:00' for MySQL TIMESTAMP fields is not supported in downstream systems. (#4096)
1 parent 579cf4d commit c56aa50

File tree

8 files changed

+334
-6
lines changed

8 files changed

+334
-6
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,8 @@ private Map<String, FieldSchema> buildFields(Schema schema) {
200200
new FieldSchema(
201201
column.getName(),
202202
typeString,
203-
column.getDefaultValueExpression(),
203+
convertInvalidTimestampDefaultValue(
204+
column.getDefaultValueExpression(), column.getType()),
204205
column.getComment()));
205206
}
206207
return fieldSchemaMap;
@@ -237,7 +238,8 @@ private void applyAddColumnEvent(AddColumnEvent event) throws SchemaEvolveExcept
237238
new FieldSchema(
238239
column.getName(),
239240
buildTypeString(column.getType()),
240-
column.getDefaultValueExpression(),
241+
convertInvalidTimestampDefaultValue(
242+
column.getDefaultValueExpression(), column.getType()),
241243
column.getComment());
242244
schemaChangeManager.addColumn(
243245
tableId.getSchemaName(), tableId.getTableName(), addFieldSchema);
@@ -316,4 +318,21 @@ private void applyDropTableEvent(DropTableEvent dropTableEvent) throws SchemaEvo
316318
throw new SchemaEvolveException(dropTableEvent, "fail to drop table", e);
317319
}
318320
}
321+
322+
private String convertInvalidTimestampDefaultValue(String defaultValue, DataType dataType) {
323+
if (defaultValue == null) {
324+
return null;
325+
}
326+
327+
if (dataType instanceof LocalZonedTimestampType
328+
|| dataType instanceof TimestampType
329+
|| dataType instanceof ZonedTimestampType) {
330+
331+
if (DorisSchemaUtils.INVALID_OR_MISSING_DATATIME.equals(defaultValue)) {
332+
return DorisSchemaUtils.DEFAULT_DATETIME;
333+
}
334+
}
335+
336+
return defaultValue;
337+
}
319338
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/utils/DorisSchemaUtils.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ public class DorisSchemaUtils {
4646
public static final String DEFAULT_DATE = "1970-01-01";
4747
public static final String DEFAULT_DATETIME = "1970-01-01 00:00:00";
4848

49+
public static final String INVALID_OR_MISSING_DATATIME = "0000-00-00 00:00:00";
50+
4951
/**
5052
* Get partition info by config. Currently only supports DATE/TIMESTAMP AUTO RANGE PARTITION and
5153
* doris version should greater than 2.1.6

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.flink.cdc.composer.flink.translator.SchemaOperatorTranslator;
4747
import org.apache.flink.cdc.connectors.doris.sink.utils.DorisContainer;
4848
import org.apache.flink.cdc.connectors.doris.sink.utils.DorisSinkTestBase;
49+
import org.apache.flink.cdc.connectors.doris.utils.DorisSchemaUtils;
4950
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
5051
import org.apache.flink.runtime.client.JobExecutionException;
5152
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -527,6 +528,108 @@ void testDorisDropTable(boolean batchMode) throws Exception {
527528
tableId.getTableName()));
528529
}
529530

531+
@ParameterizedTest(name = "batchMode: {0}")
532+
@ValueSource(booleans = {true, false})
533+
void testMysqlDefaultTimestampValueConversionInCreateTable(boolean batchMode) throws Exception {
534+
TableId tableId =
535+
TableId.tableId(
536+
DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME);
537+
538+
Schema schema =
539+
Schema.newBuilder()
540+
.column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
541+
.column(new PhysicalColumn("name", DataTypes.VARCHAR(50), null))
542+
.column(
543+
new PhysicalColumn(
544+
"created_time",
545+
DataTypes.TIMESTAMP(),
546+
null,
547+
DorisSchemaUtils.INVALID_OR_MISSING_DATATIME))
548+
.column(
549+
new PhysicalColumn(
550+
"updated_time",
551+
DataTypes.TIMESTAMP_LTZ(),
552+
null,
553+
DorisSchemaUtils.INVALID_OR_MISSING_DATATIME))
554+
.primaryKey("id")
555+
.build();
556+
557+
runJobWithEvents(
558+
Collections.singletonList(new CreateTableEvent(tableId, schema)), batchMode);
559+
560+
List<String> actual = inspectTableSchema(tableId);
561+
562+
List<String> expected =
563+
Arrays.asList(
564+
"id | INT | Yes | true | null",
565+
"name | VARCHAR(150) | Yes | false | null",
566+
"created_time | DATETIME(6) | Yes | false | "
567+
+ DorisSchemaUtils.DEFAULT_DATETIME,
568+
"updated_time | DATETIME(6) | Yes | false | "
569+
+ DorisSchemaUtils.DEFAULT_DATETIME);
570+
571+
assertEqualsInOrder(expected, actual);
572+
}
573+
574+
@ParameterizedTest(name = "batchMode: {0}")
575+
@ValueSource(booleans = {true, false})
576+
void testMysqlDefaultTimestampValueConversionInAddColumn(boolean batchMode) throws Exception {
577+
TableId tableId =
578+
TableId.tableId(
579+
DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME);
580+
581+
Schema initialSchema =
582+
Schema.newBuilder()
583+
.column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
584+
.column(new PhysicalColumn("name", DataTypes.VARCHAR(50), null))
585+
.primaryKey("id")
586+
.build();
587+
588+
List<Event> events = new ArrayList<>();
589+
events.add(new CreateTableEvent(tableId, initialSchema));
590+
591+
PhysicalColumn createdTimeCol =
592+
new PhysicalColumn(
593+
"created_time",
594+
DataTypes.TIMESTAMP(),
595+
null,
596+
DorisSchemaUtils.INVALID_OR_MISSING_DATATIME);
597+
598+
PhysicalColumn updatedTimeCol =
599+
new PhysicalColumn(
600+
"updated_time",
601+
DataTypes.TIMESTAMP_LTZ(),
602+
null,
603+
DorisSchemaUtils.INVALID_OR_MISSING_DATATIME);
604+
605+
events.add(
606+
new AddColumnEvent(
607+
tableId,
608+
Collections.singletonList(
609+
new AddColumnEvent.ColumnWithPosition(createdTimeCol))));
610+
611+
events.add(
612+
new AddColumnEvent(
613+
tableId,
614+
Collections.singletonList(
615+
new AddColumnEvent.ColumnWithPosition(updatedTimeCol))));
616+
617+
runJobWithEvents(events, batchMode);
618+
619+
List<String> actual = inspectTableSchema(tableId);
620+
621+
List<String> expected =
622+
Arrays.asList(
623+
"id | INT | Yes | true | null",
624+
"name | VARCHAR(150) | Yes | false | null",
625+
"created_time | DATETIME(6) | Yes | false | "
626+
+ DorisSchemaUtils.DEFAULT_DATETIME,
627+
"updated_time | DATETIME(6) | Yes | false | "
628+
+ DorisSchemaUtils.DEFAULT_DATETIME);
629+
630+
assertEqualsInOrder(expected, actual);
631+
}
632+
530633
private void runJobWithEvents(List<Event> events, boolean batchMode) throws Exception {
531634
DataStream<Event> stream =
532635
env.fromCollection(events, TypeInformation.of(Event.class)).setParallelism(1);

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
import org.apache.flink.cdc.common.event.AddColumnEvent;
2121
import org.apache.flink.cdc.common.schema.Column;
2222
import org.apache.flink.cdc.common.types.DataType;
23+
import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
24+
import org.apache.flink.cdc.common.types.TimestampType;
25+
import org.apache.flink.cdc.common.types.ZonedTimestampType;
2326
import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
2427

2528
import org.apache.paimon.flink.LogicalTypeConversion;
@@ -35,6 +38,9 @@
3538
* represent different types of schema modifications.
3639
*/
3740
public class SchemaChangeProvider {
41+
42+
public static final String DEFAULT_DATETIME = "1970-01-01 00:00:00";
43+
public static final String INVALID_OR_MISSING_DATATIME = "0000-00-00 00:00:00";
3844
/**
3945
* Creates a SchemaChange object for adding a column without specifying its position.
4046
*
@@ -55,7 +61,9 @@ public static List<SchemaChange> add(AddColumnEvent.ColumnWithPosition columnWit
5561
// if default value express exists, we need to set the default value to the table
5662
// option
5763
Column column = columnWithPosition.getAddColumn();
58-
Optional.ofNullable(column.getDefaultValueExpression())
64+
Optional.ofNullable(
65+
convertInvalidTimestampDefaultValue(
66+
column.getDefaultValueExpression(), column.getType()))
5967
.ifPresent(
6068
value -> {
6169
result.add(
@@ -89,7 +97,9 @@ public static List<SchemaChange> add(
8997
// if default value express exists, we need to set the default value to the table
9098
// option
9199
Column column = columnWithPosition.getAddColumn();
92-
Optional.ofNullable(column.getDefaultValueExpression())
100+
Optional.ofNullable(
101+
convertInvalidTimestampDefaultValue(
102+
column.getDefaultValueExpression(), column.getType()))
93103
.ifPresent(
94104
value -> {
95105
result.add(
@@ -149,4 +159,22 @@ public static List<SchemaChange> drop(String columnName) {
149159
public static SchemaChange setOption(String key, String value) {
150160
return SchemaChange.setOption(key, value);
151161
}
162+
163+
private static String convertInvalidTimestampDefaultValue(
164+
String defaultValue, DataType dataType) {
165+
if (defaultValue == null) {
166+
return null;
167+
}
168+
169+
if (dataType instanceof LocalZonedTimestampType
170+
|| dataType instanceof TimestampType
171+
|| dataType instanceof ZonedTimestampType) {
172+
173+
if (INVALID_OR_MISSING_DATATIME.equals(defaultValue)) {
174+
return DEFAULT_DATETIME;
175+
}
176+
}
177+
178+
return defaultValue;
179+
}
152180
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.paimon.types.DataTypes;
3838
import org.apache.paimon.types.RowType;
3939
import org.assertj.core.api.Assertions;
40+
import org.junit.jupiter.api.Test;
4041
import org.junit.jupiter.api.io.TempDir;
4142
import org.junit.jupiter.params.ParameterizedTest;
4243
import org.junit.jupiter.params.provider.ValueSource;
@@ -581,4 +582,53 @@ public void testCreateTableWithComment(String metastore)
581582
Assertions.assertThat(table.options()).containsEntry("bucket", "-1");
582583
Assertions.assertThat(table.comment()).contains("comment of table_with_comment");
583584
}
585+
586+
@Test
587+
public void testMysqlDefaultTimestampValueConversionInAddColumn()
588+
throws SchemaEvolveException, Catalog.TableNotExistException,
589+
Catalog.DatabaseNotEmptyException, Catalog.DatabaseNotExistException {
590+
initialize("filesystem");
591+
Map<String, String> tableOptions = new HashMap<>();
592+
tableOptions.put("bucket", "-1");
593+
MetadataApplier metadataApplier =
594+
new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>());
595+
596+
CreateTableEvent createTableEvent =
597+
new CreateTableEvent(
598+
TableId.parse("test.timestamp_test"),
599+
org.apache.flink.cdc.common.schema.Schema.newBuilder()
600+
.physicalColumn(
601+
"id",
602+
org.apache.flink.cdc.common.types.DataTypes.INT().notNull())
603+
.physicalColumn(
604+
"name",
605+
org.apache.flink.cdc.common.types.DataTypes.STRING())
606+
.primaryKey("id")
607+
.build());
608+
metadataApplier.applySchemaChange(createTableEvent);
609+
610+
List<AddColumnEvent.ColumnWithPosition> addedColumns = new ArrayList<>();
611+
addedColumns.add(
612+
AddColumnEvent.last(
613+
Column.physicalColumn(
614+
"created_time",
615+
org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP(),
616+
null,
617+
SchemaChangeProvider.INVALID_OR_MISSING_DATATIME)));
618+
addedColumns.add(
619+
AddColumnEvent.last(
620+
Column.physicalColumn(
621+
"updated_time",
622+
org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP_LTZ(),
623+
null,
624+
SchemaChangeProvider.INVALID_OR_MISSING_DATATIME)));
625+
626+
AddColumnEvent addColumnEvent =
627+
new AddColumnEvent(TableId.parse("test.timestamp_test"), addedColumns);
628+
metadataApplier.applySchemaChange(addColumnEvent);
629+
630+
Table table = catalog.getTable(Identifier.fromString("test.timestamp_test"));
631+
632+
Assertions.assertThat(table).isNotNull();
633+
}
584634
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,9 @@ private void applyAddColumn(AddColumnEvent addColumnEvent) throws SchemaEvolveEx
169169
.setColumnName(column.getName())
170170
.setOrdinalPosition(-1)
171171
.setColumnComment(column.getComment())
172-
.setDefaultValue(column.getDefaultValueExpression());
172+
.setDefaultValue(
173+
StarRocksUtils.convertInvalidTimestampDefaultValue(
174+
column.getDefaultValueExpression(), column.getType()));
173175
toStarRocksDataType(column, false, builder);
174176
addColumns.add(builder.build());
175177
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@
5353
/** Utilities for conversion from source table to StarRocks table. */
5454
public class StarRocksUtils {
5555

56+
public static final String DEFAULT_DATETIME = "1970-01-01 00:00:00";
57+
public static final String INVALID_OR_MISSING_DATATIME = "0000-00-00 00:00:00";
58+
5659
/** Convert a source table to {@link StarRocksTable}. */
5760
public static StarRocksTable toStarRocksTable(
5861
TableId tableId, Schema schema, TableCreateConfig tableCreateConfig) {
@@ -85,7 +88,9 @@ public static StarRocksTable toStarRocksTable(
8588
.setColumnName(column.getName())
8689
.setOrdinalPosition(i)
8790
.setColumnComment(column.getComment())
88-
.setDefaultValue(column.getDefaultValueExpression());
91+
.setDefaultValue(
92+
convertInvalidTimestampDefaultValue(
93+
column.getDefaultValueExpression(), column.getType()));
8994
toStarRocksDataType(column, i < primaryKeyCount, builder);
9095
starRocksColumns.add(builder.build());
9196
}
@@ -386,4 +391,22 @@ protected StarRocksColumn.Builder defaultMethod(DataType dataType) {
386391
throw new UnsupportedOperationException("Unsupported CDC data type " + dataType);
387392
}
388393
}
394+
395+
public static String convertInvalidTimestampDefaultValue(
396+
String defaultValue, org.apache.flink.cdc.common.types.DataType dataType) {
397+
if (defaultValue == null) {
398+
return null;
399+
}
400+
401+
if (dataType instanceof org.apache.flink.cdc.common.types.LocalZonedTimestampType
402+
|| dataType instanceof org.apache.flink.cdc.common.types.TimestampType
403+
|| dataType instanceof org.apache.flink.cdc.common.types.ZonedTimestampType) {
404+
405+
if (INVALID_OR_MISSING_DATATIME.equals(defaultValue)) {
406+
return DEFAULT_DATETIME;
407+
}
408+
}
409+
410+
return defaultValue;
411+
}
389412
}

0 commit comments

Comments
 (0)