Skip to content

Commit 998e023

Browse files
authored
[FLINK-38185][pipeline-connector][iceberg] Correctly handle the type conversion of TIMESTAMP_TITH_TIME_ZONE. (#4074)
1 parent 7d93fc1 commit 998e023

File tree

2 files changed

+6
-6
lines changed

2 files changed

+6
-6
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,9 +176,9 @@ public static RecordData.FieldGetter createFieldGetter(
176176
case TIMESTAMP_WITH_TIME_ZONE:
177177
fieldGetter =
178178
(row) ->
179-
TimestampData.fromTimestamp(
179+
TimestampData.fromInstant(
180180
row.getZonedTimestamp(fieldPos, getPrecision(fieldType))
181-
.toTimestamp());
181+
.toInstant());
182182
break;
183183
case ROW:
184184
final int rowFieldCount = getFieldCount(fieldType);

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -273,8 +273,8 @@ public void testWriteWithAllSupportedTypes() throws Exception {
273273
Catalog catalog =
274274
CatalogUtil.buildIcebergCatalog(
275275
"cdc-iceberg-catalog", catalogOptions, new Configuration());
276-
IcebergWriter icebergWriter =
277-
new IcebergWriter(catalogOptions, 1, 1, ZoneId.systemDefault());
276+
ZoneId pipelineZoneId = ZoneId.systemDefault();
277+
IcebergWriter icebergWriter = new IcebergWriter(catalogOptions, 1, 1, pipelineZoneId);
278278
IcebergMetadataApplier icebergMetadataApplier = new IcebergMetadataApplier(catalogOptions);
279279
TableId tableId = TableId.parse("test.iceberg_table");
280280

@@ -330,7 +330,7 @@ public void testWriteWithAllSupportedTypes() throws Exception {
330330
LocalZonedTimestampData.fromInstant(Instant.ofEpochSecond(0)),
331331
ZonedTimestampData.fromZonedDateTime(
332332
ZonedDateTime.ofInstant(
333-
Instant.ofEpochSecond(0), ZoneId.of("Asia/Shanghai")))
333+
Instant.ofEpochSecond(0), pipelineZoneId))
334334
});
335335
DataChangeEvent dataChangeEvent = DataChangeEvent.insertEvent(tableId, record1);
336336
icebergWriter.write(dataChangeEvent, null);
@@ -342,7 +342,7 @@ public void testWriteWithAllSupportedTypes() throws Exception {
342342
List<String> result = fetchTableContent(catalog, tableId);
343343
Assertions.assertThat(result)
344344
.containsExactlyInAnyOrder(
345-
"char, varchar, string, false, [1,2,3,4,5,], [1,2,3,4,5,6,7,8,9,10,], 0.00, 1, 2, 12345, 12345, 123.456, 123456.789, 00:00:12.345, 2003-10-20, 1970-01-01T00:00, 1970-01-01T00:00Z, 1970-01-01T08:00Z");
345+
"char, varchar, string, false, [1,2,3,4,5,], [1,2,3,4,5,6,7,8,9,10,], 0.00, 1, 2, 12345, 12345, 123.456, 123456.789, 00:00:12.345, 2003-10-20, 1970-01-01T00:00, 1970-01-01T00:00Z, 1970-01-01T00:00Z");
346346
}
347347

348348
/** Mock CommitRequestImpl. */

0 commit comments

Comments
 (0)