Skip to content

Commit a0a1c3e

Browse files
committed
[FLINK-36546] Handle batch sources in DataSinkTranslator
1 parent e825131 commit a0a1c3e

File tree

1 file changed

+19
-2
lines changed

1 file changed

+19
-2
lines changed

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import org.apache.flink.api.common.typeinfo.TypeInformation;
2121
import org.apache.flink.api.connector.sink2.Sink;
2222
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
23+
import org.apache.flink.api.connector.source.Boundedness;
24+
import org.apache.flink.api.dag.Transformation;
2325
import org.apache.flink.cdc.common.annotation.Internal;
2426
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
2527
import org.apache.flink.cdc.common.configuration.Configuration;
@@ -48,6 +50,8 @@
4850
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
4951
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
5052
import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
53+
import org.apache.flink.streaming.api.transformations.WithBoundedness;
54+
import org.apache.flink.util.Preconditions;
5155

5256
import java.lang.reflect.InvocationTargetException;
5357

@@ -158,8 +162,9 @@ private <CommT> void addCommittingTopology(
158162
((WithPreCommitTopology<Event, CommT>) sink).addPreCommitTopology(written);
159163
}
160164

161-
// TODO: Hard coding stream mode and checkpoint
162-
boolean isBatchMode = false;
165+
boolean isBatchMode = !existsUnboundedSource(inputStream.getTransformation());
166+
167+
// TODO: Hard coding checkpoint
163168
boolean isCheckpointingEnabled = true;
164169
DataStream<CommittableMessage<CommT>> committed =
165170
preCommitted.transform(
@@ -209,4 +214,16 @@ private static <CommT> SimpleVersionedSerializer<CommT> getCommittableSerializer
209214
throw new RuntimeException("Failed to create CommitterOperatorFactory", e);
210215
}
211216
}
217+
218+
private boolean existsUnboundedSource(final Transformation<?> transformation) {
219+
return isUnboundedSource(transformation)
220+
|| transformation.getTransitivePredecessors().stream()
221+
.anyMatch(this::isUnboundedSource);
222+
}
223+
224+
private boolean isUnboundedSource(final Transformation<?> transformation) {
225+
Preconditions.checkNotNull(transformation);
226+
return transformation instanceof WithBoundedness
227+
&& ((WithBoundedness) transformation).getBoundedness() != Boundedness.BOUNDED;
228+
}
212229
}

0 commit comments

Comments
 (0)