Skip to content

[FLINK-36546] Handle batch sources in DataSinkTranslator #3646

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

morozov
Copy link
Contributor

@morozov morozov commented Oct 16, 2024

There's no public API in Flink to detect the boundedness of a stream, so this patch duplicates the code from StreamGraphGenerator that Flink itself uses to instantiate CommitterOperatorFactory.

@lvyanquan
Copy link
Contributor

I think a test for this is still necessary.
We have two bound sources(values, MySQL when setting scan.startup.mode to snapshot), and I don't get the point that you said "the sink never receives the end-of-input signal", can you provide a more detailed description? we can add some logs in flush method if endOfInput is true to verify this.

@morozov
Copy link
Contributor Author

morozov commented Oct 16, 2024

@lvyanquan please see the temporary reproducer in the second commit.

and I don't get the point that you said "the sink never receives the end-of-input signal", can you provide a more detailed description?

I got confused. The problem is not that the sink never receives the end-of-input signal but that the two-phase committing sink doesn't commit the last checkpoint.

In order to reproduce the issue I had to make the following temporary changes:

  1. Make ValuesSink a TwoPhaseCommittingSink.
  2. Bypass the usage of reflection in DataSinkTranslator. Otherwise, the test would fail with an IllegalAccessException, which I didn't know how to address.

The point is that with the fix from the first commit, when DataSinkTranslatorBatchModeIT runs, it produces the following log message:

4890 [PostPartition -> Sink Writer: Value Sink -> Sink Committer: Value Sink (1/1)#0] INFO org.apache.flink.cdc.connectors.values.sink.ValuesDataSink$ValuesSink - Find me in the logs. Committing 1 committables.

If the first commit is reverted, this message won't be produced because the sink won't commit.

@morozov
Copy link
Contributor Author

morozov commented Nov 15, 2024

@lvyanquan how do we proceed with this change? I propose that you verify the correctness of the fix by reverting it and running the updated test. Then I drop the commit that modifies the test. Otherwise, can you recommend an approach that would work for you?

@lvyanquan
Copy link
Contributor

lvyanquan commented Dec 9, 2024

Hi, @morozov, sorry for my late reply and I've verified you fix and it look good to me, so you can revert the last commit if you want.

By the way, the bound source is MySQLCDC Source that run in snapshot-only mode?

@morozov morozov force-pushed the FLINK-36546-handle-batch-sources branch from 2274b7e to a0a1c3e Compare December 9, 2024 17:36
@morozov
Copy link
Contributor Author

morozov commented Dec 9, 2024

Hi, @morozov, sorry for my late reply and I've verified you fix and it look good to me, so you can revert the last commit if you want.

Thank you! Done.

By the way, the bound source is MySQLCDC Source that run in snapshot-only mode?

No. I'm working on a proprietary DataSink implementation and I use ValuesDataSource for integration testing – that's the bounded source.

BTW, is master the right target branch for this? Please let me know if you want it to be retargeted against release-3.2.

Copy link
Contributor

@lvyanquan lvyanquan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

I guess master branch is enough as we plan to release 3.3.0 in the following weeks.

@lvyanquan
Copy link
Contributor

Hi, @leonardBang, PTAL.
The CI failure is unrelated to this pr as the failure happened in sqlserver connector.

@lvyanquan
Copy link
Contributor

Hi @morozov.
Could you add a test that use doris or paimon as sink as they are TwoPhaseCommittingSinks?

@morozov
Copy link
Contributor Author

morozov commented Apr 2, 2025

@lvyanquan I made a couple of approaches at writing a test and realized that there are no Doris or Paimon pipeline integration tests which I could reuse (or, more specifically, which would fail without this change if they existed).

In fact, there's only a single connector test (MySqlParallelizedPipelineITCase) that tests the connector as part of the pipeline. I don't think I can write such a test myself in a reasonable time. The test that I did write earlier (see 2274b7e) clearly demonstrated the issue.

Given all that, could we proceed without a test for now? I will be happy to assist if someone else is willing to write such a test.

@lvyanquan
Copy link
Contributor

Hi, @morozov.

We can base on the extensive test cases in #3812 to test it.
After 3812 merged, we can rebase this and run CI to verify it. If CI passed, this PR should no longer have a blocker.

@lvyanquan
Copy link
Contributor

Hi @morozov. You can rebase to master and test it with existed case in batch mode refer to #3812.

@morozov
Copy link
Contributor Author

morozov commented Apr 24, 2025

@lvyanquan on the one hand, it looks like #3812 itself could be used as a solution to my problem. In my PR, I'm trying to detect if the pipeline needs to run in the batch mode based on the source, while the PR you mentioned adds an explicit configuration to enable the batch mode. I could use explicit configuration and withdraw my PR.

What I don't understand is, how schema change events are handled in the batch mode. I see that there is a new BatchSchemaOperator with the following code:

public void processElement(StreamRecord<Event> streamRecord) throws Exception {
Event event = streamRecord.getValue();
// Only catch create table event and data change event in batch mode
if (event instanceof CreateTableEvent) {
handleCreateTableEvent((CreateTableEvent) event);
} else if (event instanceof DataChangeEvent) {
if (!alreadyMergedCreateTableTables) {
handleFirstDataChangeEvent();
alreadyMergedCreateTableTables = true;
}
handleDataChangeEvent((DataChangeEvent) event);
} else {
throw new RuntimeException("Unknown event type in Batch record: " + event);
}
}

It looks like it will fail to process any other schema change event than CreateTableEvent. Won't it?

Internally, I'm using the code changes from my other PR (#3999), and I'm trying to understand how this logic should apply to the batch schema operator.

Could you clarify how the batch mode is meant to handle schema changes?

@lvyanquan lvyanquan self-assigned this Apr 29, 2025
@morozov
Copy link
Contributor Author

morozov commented Jun 10, 2025

@lvyanquan could you take a look at my previous comment?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants