Skip to content

[FLINK-38079] Add Pipeline support for DateType and TimeType #4060

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

proletarians
Copy link
Contributor

Pipeline currently treats DateType and TimeType as plain INT, so date/time columns lose precision (TIME only to milliseconds, DATE range limited) and cannot be used with built-in temporal functions.

@yuxiqian yuxiqian self-requested a review July 15, 2025 02:06
Copy link
Member

@yuxiqian yuxiqian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for @proletarians' nice work, just left some trivial comments.

@@ -85,6 +93,37 @@
*/
@PublicEvolving
public class SchemaMergingUtils {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may handle these changes in another PR and focus on supporting DATE and TIME formats here

void testCalculatedColumns(ValuesDataSink.SinkApi sinkApi) throws Exception {
@ParameterizedTest(name = "API version: {0}, initializeMode: {1}")
@MethodSource(value = "parameterProvider")
void testCalculatedColumns(ValuesDataSink.SinkApi sinkApi, boolean initializeMode)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is initializeMode? Seems it doesn't affect test cases at all.

@@ -141,6 +143,7 @@ public void testMySql8JsonDataTypesWithUseLegacyJsonFormat() throws Throwable {

@Test
void testMysql57TimeDataTypes() throws Throwable {
UniqueDatabase usedDd = fullTypesMySql57Database;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please double check if these changes are necessary, and revert these to keep this PR minimum.

@@ -48,6 +48,7 @@ public class DebeziumSchemaDataTypeInference implements SchemaDataTypeInference,

private static final long serialVersionUID = 1L;

public static final String DEBEZIUM_DATE_SCHEMA_NAME = "io.debezium.time.Date";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems redundant as it's an alias of io.debezium.time.Date.SCHEMA_NAME?

Comment on lines 111 to 118
if (o instanceof DateData) {
writer.writeDate(pos, (DateData) o);
} else {
writer.writeInt(pos, (int) o);
}
break;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may add some comments here, clarifying that the writeInt paths are kept for compatibility.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants