-
Notifications
You must be signed in to change notification settings - Fork 2.1k
[FLINK-36546] Handle batch sources in DataSinkTranslator #3646
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
I think a test for this is still necessary. |
@lvyanquan please see the temporary reproducer in the second commit.
I got confused. The problem is not that the sink never receives the end-of-input signal but that the two-phase committing sink doesn't commit the last checkpoint. In order to reproduce the issue I had to make the following temporary changes:
The point is that with the fix from the first commit, when Line 123 in 2274b7e
If the first commit is reverted, this message won't be produced because the sink won't commit. |
@lvyanquan how do we proceed with this change? I propose that you verify the correctness of the fix by reverting it and running the updated test. Then I drop the commit that modifies the test. Otherwise, can you recommend an approach that would work for you? |
Hi, @morozov, sorry for my late reply and I've verified you fix and it look good to me, so you can revert the last commit if you want. By the way, the bound source is MySQLCDC Source that run in snapshot-only mode? |
2274b7e
to
a0a1c3e
Compare
Thank you! Done.
No. I'm working on a proprietary BTW, is |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
I guess master branch is enough as we plan to release 3.3.0 in the following weeks.
Hi, @leonardBang, PTAL. |
Hi @morozov. |
@lvyanquan I made a couple of approaches at writing a test and realized that there are no Doris or Paimon pipeline integration tests which I could reuse (or, more specifically, which would fail without this change if they existed). In fact, there's only a single connector test ( Given all that, could we proceed without a test for now? I will be happy to assist if someone else is willing to write such a test. |
@lvyanquan on the one hand, it looks like #3812 itself could be used as a solution to my problem. In my PR, I'm trying to detect if the pipeline needs to run in the batch mode based on the source, while the PR you mentioned adds an explicit configuration to enable the batch mode. I could use explicit configuration and withdraw my PR. What I don't understand is, how schema change events are handled in the batch mode. I see that there is a new Lines 101 to 115 in c2230d5
It looks like it will fail to process any other schema change event than Internally, I'm using the code changes from my other PR (#3999), and I'm trying to understand how this logic should apply to the batch schema operator. Could you clarify how the batch mode is meant to handle schema changes? |
@lvyanquan could you take a look at my previous comment? |
There's no public API in Flink to detect the boundedness of a stream, so this patch duplicates the code from
StreamGraphGenerator
that Flink itself uses to instantiateCommitterOperatorFactory
.