diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java index efbdb8686ff..72355d5e70c 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java @@ -20,6 +20,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.dag.Transformation; import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.common.annotation.VisibleForTesting; import org.apache.flink.cdc.common.configuration.Configuration; @@ -48,6 +50,8 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; import org.apache.flink.streaming.api.transformations.LegacySinkTransformation; import org.apache.flink.streaming.api.transformations.PhysicalTransformation; +import org.apache.flink.streaming.api.transformations.WithBoundedness; +import org.apache.flink.util.Preconditions; import java.lang.reflect.InvocationTargetException; @@ -158,8 +162,9 @@ private void addCommittingTopology( ((WithPreCommitTopology) sink).addPreCommitTopology(written); } - // TODO: Hard coding stream mode and checkpoint - boolean isBatchMode = false; + boolean isBatchMode = !existsUnboundedSource(inputStream.getTransformation()); + + // TODO: Hard coding checkpoint boolean isCheckpointingEnabled = true; DataStream> committed = preCommitted.transform( @@ -209,4 +214,16 @@ private static SimpleVersionedSerializer getCommittableSerializer throw new RuntimeException("Failed to create CommitterOperatorFactory", e); } } + + private boolean existsUnboundedSource(final Transformation transformation) { + return isUnboundedSource(transformation) + || transformation.getTransitivePredecessors().stream() + .anyMatch(this::isUnboundedSource); + } + + private boolean isUnboundedSource(final Transformation transformation) { + Preconditions.checkNotNull(transformation); + return transformation instanceof WithBoundedness + && ((WithBoundedness) transformation).getBoundedness() != Boundedness.BOUNDED; + } }