Skip to content

[FLINK-37739][cli] fix flink-conf is not obtained in local mode #4006

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public PipelineExecution.ExecutionInfo run() throws Exception {
case YARN_APPLICATION:
return deployWithApplicationComposer(new YarnApplicationDeploymentExecutor());
case LOCAL:
return deployWithComposer(FlinkPipelineComposer.ofMiniCluster());
return deployWithComposer(
FlinkPipelineComposer.ofMiniCluster(flinkConfig, additionalJars));
case REMOTE:
case YARN_SESSION:
return deployWithComposer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.cdc.composer.flink;

import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
Expand Down Expand Up @@ -59,32 +60,28 @@ public class FlinkPipelineComposer implements PipelineComposer {
public static FlinkPipelineComposer ofRemoteCluster(
org.apache.flink.configuration.Configuration flinkConfig, List<Path> additionalJars) {
StreamExecutionEnvironment env = new StreamExecutionEnvironment(flinkConfig);
additionalJars.forEach(
jarPath -> {
try {
FlinkEnvironmentUtils.addJar(
env,
jarPath.makeQualified(jarPath.getFileSystem()).toUri().toURL());
} catch (Exception e) {
throw new RuntimeException(
String.format(
"Unable to convert JAR path \"%s\" to URL when adding JAR to Flink environment",
jarPath),
e);
}
});
addAdditionalJars(env, additionalJars);
return new FlinkPipelineComposer(env, false);
}

public static FlinkPipelineComposer ofApplicationCluster(StreamExecutionEnvironment env) {
return new FlinkPipelineComposer(env, false);
}

@VisibleForTesting
public static FlinkPipelineComposer ofMiniCluster() {
return new FlinkPipelineComposer(
StreamExecutionEnvironment.getExecutionEnvironment(), true);
}
Comment on lines 72 to 75
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it still necessary to keep this method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are still many test cases that use this method. Since they do not use flinkConf and additionalJars, I chose to keep this method.


public static FlinkPipelineComposer ofMiniCluster(
org.apache.flink.configuration.Configuration flinkConfig, List<Path> additionalJars) {
StreamExecutionEnvironment localEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment(flinkConfig);
addAdditionalJars(localEnvironment, additionalJars);
return new FlinkPipelineComposer(localEnvironment, true);
}

private FlinkPipelineComposer(StreamExecutionEnvironment env, boolean isBlocking) {
this.env = env;
this.isBlocking = isBlocking;
Expand Down Expand Up @@ -254,4 +251,27 @@ private Optional<URL> getContainingJar(Class<?> clazz) throws Exception {
}
return Optional.of(container);
}

private static void addAdditionalJars(
StreamExecutionEnvironment env, List<Path> additionalJars) {
additionalJars.forEach(
jarPath -> {
try {
FlinkEnvironmentUtils.addJar(
env,
jarPath.makeQualified(jarPath.getFileSystem()).toUri().toURL());
} catch (Exception e) {
throw new RuntimeException(
String.format(
"Unable to convert JAR path \"%s\" to URL when adding JAR to Flink environment",
jarPath),
e);
}
});
}

@VisibleForTesting
public StreamExecutionEnvironment getEnv() {
return env;
}
Comment on lines +273 to +276
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to use Java reflection in the test to obtain this object, instead of creating a separate method for it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reflection makes code difficult to maintain

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,21 @@
import org.apache.flink.cdc.composer.definition.SinkDef;
import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils;
import org.apache.flink.cdc.composer.utils.factory.DataSinkFactory1;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.Path;

import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

import static org.apache.flink.configuration.CheckpointingOptions.CHECKPOINTING_INTERVAL;

/** A test for the {@link FlinkPipelineComposer}. */
class FlinkPipelineComposerTest {

Expand Down Expand Up @@ -59,4 +68,21 @@ void testCreateDataSinkFromSinkDef() {
Assertions.assertThat(((DataSinkFactory1.TestDataSink) dataSink).getHost())
.isEqualTo("0.0.0.0");
}

@Test
void testOfMiniCluster() {
org.apache.flink.configuration.Configuration flinkConfig =
new org.apache.flink.configuration.Configuration();
flinkConfig.set(CHECKPOINTING_INTERVAL, Duration.ofSeconds(30));
List<Path> additionalJars = new ArrayList<>();
additionalJars.add(new Path("/path/to/additionalJars.jar"));
FlinkPipelineComposer flinkPipelineComposer =
FlinkPipelineComposer.ofMiniCluster(flinkConfig, additionalJars);
ReadableConfig configuration = flinkPipelineComposer.getEnv().getConfiguration();

Assertions.assertThat(configuration.get(CHECKPOINTING_INTERVAL))
.isEqualTo(Duration.ofSeconds(30));
Assertions.assertThat(configuration.get(PipelineOptions.JARS))
.contains("file:/path/to/additionalJars.jar");
}
}