Skip to content

[FLINK-38250] The Flink CDC type supports the timeTZ field type. #4107

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,14 @@ public interface RecordData {
*/
DecimalData getDecimal(int pos, int precision, int scale);

/**
* Returns the zone time value at the given position.
*
* <p>The precision is required to determine whether the time value was stored in a compact
* representation (see {@link ZoneTimeData}).
*/
ZoneTimeData getZoneTime(int pos, int precision);

/**
* Returns the timestamp value at the given position.
*
Expand Down Expand Up @@ -201,6 +209,9 @@ static RecordData.FieldGetter createFieldGetter(DataType fieldType, int fieldPos
case TIME_WITHOUT_TIME_ZONE:
fieldGetter = record -> record.getInt(fieldPos);
break;
case TIME_WITH_TIME_ZONE:
fieldGetter = record -> record.getZoneTime(fieldPos, getPrecision(fieldType));
break;
case BIGINT:
fieldGetter = record -> record.getLong(fieldPos);
break;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.common.data;

import org.apache.flink.cdc.common.annotation.PublicEvolving;

import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Objects;

/**
* An internal data structure representing data of {@link ZonedTimeType}. It aims to converting
* various Java time representations into the date-time in a particular time zone.
*
* <p>The ISO time format is used by default, it includes the date, time (including fractional
* parts), and offset from UTC, such as '10:15:30+01:00'.
*/
@PublicEvolving
public class ZoneTimeData implements Comparable<ZoneTimeData> {

private static final long MILLIS_TO_NANO = 1_000L;
/**
* The ISO time format includes the date, time (including fractional parts), and offset from
* UTC, such as '10:15:30.030431+01:00'.
*/
public static final DateTimeFormatter ISO_FORMATTER = DateTimeFormatter.ISO_OFFSET_TIME;

private final long nanoOfDay;
// this field holds time zone id
private final String zoneId;

public ZoneTimeData(long nanoOfDay, String zoneId) {
this.nanoOfDay = nanoOfDay;
this.zoneId = zoneId;
}

public static ZoneTimeData fromLocalTime(LocalTime localTime, ZoneId zoneId) {
return new ZoneTimeData(localTime.toNanoOfDay(), zoneId.getId());
}

public static ZoneTimeData fromNanoOfDay(long nanoOfDay, ZoneId zoneId) {
return new ZoneTimeData(nanoOfDay, zoneId.getId());
}

public static ZoneTimeData fromMillsOfDay(long millsOfDay, ZoneId zoneId) {
return new ZoneTimeData(millsOfDay * MILLIS_TO_NANO, zoneId.getId());
}

public static ZoneTimeData fromIsoLocalTimeString(String timeString, ZoneId zoneId) {
return fromLocalTime(LocalTime.parse(timeString), zoneId);
}

public ZonedDateTime toZoneLocalTime() {
return ZonedDateTime.ofInstant(
Instant.from(LocalDateTime.of(LocalDate.now(), LocalTime.ofNanoOfDay(nanoOfDay))),
ZoneId.of(zoneId));
}

public String toString() {
return toZoneLocalTime().format(ISO_FORMATTER);
}

@Override
public final boolean equals(Object o) {
if (!(o instanceof ZoneTimeData)) {
return false;
}

ZoneTimeData timeData = (ZoneTimeData) o;
return nanoOfDay == timeData.nanoOfDay;
}

@Override
public int compareTo(ZoneTimeData other) {
return Long.compare(nanoOfDay, other.nanoOfDay);
}

@Override
public int hashCode() {
return Objects.hash(nanoOfDay, zoneId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.StringData;
import org.apache.flink.cdc.common.data.TimestampData;
import org.apache.flink.cdc.common.data.ZoneTimeData;
import org.apache.flink.cdc.common.data.ZonedTimestampData;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;

import java.nio.ByteOrder;
import java.time.ZoneId;

/**
* An implementation of {@link RecordData} which is backed by {@link MemorySegment} instead of
Expand Down Expand Up @@ -164,6 +166,12 @@ public DecimalData getDecimal(int pos, int precision, int scale) {
segments, offset, offsetAndSize, precision, scale);
}

@Override
public ZoneTimeData getZoneTime(int pos, int precision) {
String[] parts = getString(pos).toString().split(TIMESTAMP_DELIMITER);
return ZoneTimeData.fromNanoOfDay(Long.parseLong(parts[0]), ZoneId.of(parts[2]));
}

@Override
public TimestampData getTimestamp(int pos, int precision) {
assertIndexIsValid(pos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ public R visit(TimeType timeType) {
return defaultMethod(timeType);
}

@Override
public R visit(ZoneTimeType timeType) {
return defaultMethod(timeType);
}

@Override
public R visit(TimestampType timestampType) {
return defaultMethod(timestampType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ public enum DataTypeRoot {

TIME_WITHOUT_TIME_ZONE(DataTypeFamily.PREDEFINED, DataTypeFamily.DATETIME, DataTypeFamily.TIME),

TIME_WITH_TIME_ZONE(DataTypeFamily.PREDEFINED, DataTypeFamily.DATETIME, DataTypeFamily.TIME),

TIMESTAMP_WITHOUT_TIME_ZONE(
DataTypeFamily.PREDEFINED, DataTypeFamily.DATETIME, DataTypeFamily.TIMESTAMP),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public interface DataTypeVisitor<R> {

R visit(TimeType timeType);

R visit(ZoneTimeType zoneTimeType);

R visit(TimestampType timestampType);

R visit(ZonedTimestampType zonedTimestampType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,44 @@ public static TimeType TIME(int precision) {
return new TimeType(precision);
}

/**
* Data type of a time WITHOUT time zone {@code TIME(p)} where {@code p} is the number of digits
* of fractional seconds (=precision). {@code p} must have a value between 0 and 9 (both
* inclusive).
*
* <p>An instance consists of {@code hour:minute:second[.fractional]} with up to nanosecond
* precision and values ranging from {@code 00:00:00.000000000} to {@code 23:59:59.999999999}.
*
* <p>Compared to the SQL standard, leap seconds (23:59:60 and 23:59:61) are not supported as
* the semantics are closer to {@link java.time.LocalTime}. A time WITH time zone is not
* provided.
*
* @see #TIME()
* @see TimeType
*/
public static ZoneTimeType TIME_TZ() {
return new ZoneTimeType();
}

/**
* Data type of a time WITHOUT time zone {@code TIME(p)} where {@code p} is the number of digits
* of fractional seconds (=precision). {@code p} must have a value between 0 and 9 (both
* inclusive).
*
* <p>An instance consists of {@code hour:minute:second[.fractional]} with up to nanosecond
* precision and values ranging from {@code 00:00:00.000000000} to {@code 23:59:59.999999999}.
*
* <p>Compared to the SQL standard, leap seconds (23:59:60 and 23:59:61) are not supported as
* the semantics are closer to {@link java.time.LocalTime}. A time WITH time zone is not
* provided.
*
* @see #TIME()
* @see TimeType
*/
public static ZoneTimeType TIME_TZ(int precision) {
return new ZoneTimeType(precision);
}

/**
* Data type of a timestamp WITHOUT time zone {@code TIMESTAMP} with 6 digits of fractional
* seconds by default.
Expand Down Expand Up @@ -447,6 +485,11 @@ public OptionalInt visit(TimeType timeType) {
return OptionalInt.of(timeType.getPrecision());
}

@Override
public OptionalInt visit(ZoneTimeType zoneTimeType) {
return OptionalInt.of(zoneTimeType.getPrecision());
}

@Override
public OptionalInt visit(TimestampType timestampType) {
return OptionalInt.of(timestampType.getPrecision());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.common.types;

import org.apache.flink.cdc.common.annotation.PublicEvolving;

import java.util.Collections;
import java.util.List;
import java.util.Objects;

/**
* Data type of a time with a time-zone in the ISO-8601 calendar system.
*
* <p>The zone time type is used to represent a time with a specific time zone, such as
* '10:15:30+01:00'.
*/
@PublicEvolving
public class ZoneTimeType extends DataType {

private static final long serialVersionUID = 1L;

public static final int MIN_PRECISION = TimeType.MIN_PRECISION;

public static final int MAX_PRECISION = TimeType.MAX_PRECISION;

public static final int DEFAULT_PRECISION = TimeType.DEFAULT_PRECISION;

private static final String FORMAT = "TIME(%d) WITH TIME ZONE";

private final int precision;

/** Creates a {@link ZoneTimeType} with default precision. */
public ZoneTimeType(boolean isNullable, int precision) {
super(isNullable, DataTypeRoot.TIME_WITH_TIME_ZONE);
if (precision < MIN_PRECISION || precision > MAX_PRECISION) {
throw new IllegalArgumentException(
String.format(
"Time with time zone precision must be between %d and %d (both inclusive).",
MIN_PRECISION, MAX_PRECISION));
}
this.precision = precision;
}

public ZoneTimeType(int precision) {
this(true, precision);
}

public ZoneTimeType() {
this(DEFAULT_PRECISION);
}

public int getPrecision() {
return precision;
}

@Override
public DataType copy(boolean isNullable) {
return new ZoneTimeType(isNullable, precision);
}

@Override
public String asSerializableString() {
return withNullability(FORMAT, precision);
}

@Override
public String asSummaryString() {
return asSerializableString();
}

@Override
public String toString() {
return "ZONETIME";
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
return true; // No additional fields to compare
}

@Override
public List<DataType> getChildren() {
return Collections.emptyList();
}

@Override
public <R> R accept(DataTypeVisitor<R> visitor) {
return visitor.visit(this);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), precision);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.StringData;
import org.apache.flink.cdc.common.data.TimestampData;
import org.apache.flink.cdc.common.data.ZoneTimeData;
import org.apache.flink.cdc.common.data.ZonedTimestampData;
import org.apache.flink.cdc.common.types.DataField;
import org.apache.flink.cdc.common.types.DataType;
Expand Down Expand Up @@ -61,6 +62,8 @@ public static Class<?> toInternalConversionClass(DataType type) {
case DATE:
case TIME_WITHOUT_TIME_ZONE:
return Integer.class;
case TIME_WITH_TIME_ZONE:
return ZoneTimeData.class;
case BIGINT:
return Long.class;
case FLOAT:
Expand Down
Loading