From 1ee55c37e7abef5965d1ad18ab0c5c771a529c8a Mon Sep 17 00:00:00 2001 From: MOBIN-F <18814118038@163.com> Date: Mon, 28 Apr 2025 11:03:01 +0800 Subject: [PATCH 1/2] fix flink-conf is not obtained in local mode --- .../org/apache/flink/cdc/cli/CliExecutor.java | 3 +- .../composer/flink/FlinkPipelineComposer.java | 43 +++++++++++++------ 2 files changed, 31 insertions(+), 15 deletions(-) diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java index ce7d973e255..23e2b0fa383 100644 --- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java +++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java @@ -76,7 +76,8 @@ public PipelineExecution.ExecutionInfo run() throws Exception { case YARN_APPLICATION: return deployWithApplicationComposer(new YarnApplicationDeploymentExecutor()); case LOCAL: - return deployWithComposer(FlinkPipelineComposer.ofMiniCluster()); + return deployWithComposer( + FlinkPipelineComposer.ofMiniCluster(flinkConfig, additionalJars)); case REMOTE: case YARN_SESSION: return deployWithComposer( diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java index c0189833cef..6f359b76b1d 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java @@ -18,6 +18,7 @@ package org.apache.flink.cdc.composer.flink; import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.cdc.common.annotation.VisibleForTesting; import org.apache.flink.cdc.common.configuration.Configuration; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.pipeline.PipelineOptions; @@ -59,20 +60,7 @@ public class FlinkPipelineComposer implements PipelineComposer { public static FlinkPipelineComposer ofRemoteCluster( org.apache.flink.configuration.Configuration flinkConfig, List additionalJars) { StreamExecutionEnvironment env = new StreamExecutionEnvironment(flinkConfig); - additionalJars.forEach( - jarPath -> { - try { - FlinkEnvironmentUtils.addJar( - env, - jarPath.makeQualified(jarPath.getFileSystem()).toUri().toURL()); - } catch (Exception e) { - throw new RuntimeException( - String.format( - "Unable to convert JAR path \"%s\" to URL when adding JAR to Flink environment", - jarPath), - e); - } - }); + addAdditionalJars(env, additionalJars); return new FlinkPipelineComposer(env, false); } @@ -80,11 +68,20 @@ public static FlinkPipelineComposer ofApplicationCluster(StreamExecutionEnvironm return new FlinkPipelineComposer(env, false); } + @VisibleForTesting public static FlinkPipelineComposer ofMiniCluster() { return new FlinkPipelineComposer( StreamExecutionEnvironment.getExecutionEnvironment(), true); } + public static FlinkPipelineComposer ofMiniCluster( + org.apache.flink.configuration.Configuration flinkConfig, List additionalJars) { + StreamExecutionEnvironment localEnvironment = + StreamExecutionEnvironment.getExecutionEnvironment(flinkConfig); + addAdditionalJars(localEnvironment, additionalJars); + return new FlinkPipelineComposer(localEnvironment, true); + } + private FlinkPipelineComposer(StreamExecutionEnvironment env, boolean isBlocking) { this.env = env; this.isBlocking = isBlocking; @@ -254,4 +251,22 @@ private Optional getContainingJar(Class clazz) throws Exception { } return Optional.of(container); } + + private static void addAdditionalJars( + StreamExecutionEnvironment env, List additionalJars) { + additionalJars.forEach( + jarPath -> { + try { + FlinkEnvironmentUtils.addJar( + env, + jarPath.makeQualified(jarPath.getFileSystem()).toUri().toURL()); + } catch (Exception e) { + throw new RuntimeException( + String.format( + "Unable to convert JAR path \"%s\" to URL when adding JAR to Flink environment", + jarPath), + e); + } + }); + } } From 0a71a88d0aac6994b9595fd4cbf164acb17bdaf5 Mon Sep 17 00:00:00 2001 From: MOBIN-F <18814118038@163.com> Date: Mon, 28 Apr 2025 11:42:59 +0800 Subject: [PATCH 2/2] add test --- .../composer/flink/FlinkPipelineComposer.java | 5 ++++ .../flink/FlinkPipelineComposerTest.java | 26 +++++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java index 6f359b76b1d..99114683cbc 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java @@ -269,4 +269,9 @@ private static void addAdditionalJars( } }); } + + @VisibleForTesting + public StreamExecutionEnvironment getEnv() { + return env; + } } diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerTest.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerTest.java index 3a1fc553238..0187c2ac69f 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerTest.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerTest.java @@ -24,12 +24,21 @@ import org.apache.flink.cdc.composer.definition.SinkDef; import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils; import org.apache.flink.cdc.composer.utils.factory.DataSinkFactory1; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.core.fs.Path; import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.configuration.CheckpointingOptions.CHECKPOINTING_INTERVAL; + /** A test for the {@link FlinkPipelineComposer}. */ class FlinkPipelineComposerTest { @@ -59,4 +68,21 @@ void testCreateDataSinkFromSinkDef() { Assertions.assertThat(((DataSinkFactory1.TestDataSink) dataSink).getHost()) .isEqualTo("0.0.0.0"); } + + @Test + void testOfMiniCluster() { + org.apache.flink.configuration.Configuration flinkConfig = + new org.apache.flink.configuration.Configuration(); + flinkConfig.set(CHECKPOINTING_INTERVAL, Duration.ofSeconds(30)); + List additionalJars = new ArrayList<>(); + additionalJars.add(new Path("/path/to/additionalJars.jar")); + FlinkPipelineComposer flinkPipelineComposer = + FlinkPipelineComposer.ofMiniCluster(flinkConfig, additionalJars); + ReadableConfig configuration = flinkPipelineComposer.getEnv().getConfiguration(); + + Assertions.assertThat(configuration.get(CHECKPOINTING_INTERVAL)) + .isEqualTo(Duration.ofSeconds(30)); + Assertions.assertThat(configuration.get(PipelineOptions.JARS)) + .contains("file:/path/to/additionalJars.jar"); + } }