-
Notifications
You must be signed in to change notification settings - Fork 2.1k
[FLINK-37739][cli] fix flink-conf is not obtained in local mode #4006
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ | |
package org.apache.flink.cdc.composer.flink; | ||
|
||
import org.apache.flink.cdc.common.annotation.Internal; | ||
import org.apache.flink.cdc.common.annotation.VisibleForTesting; | ||
import org.apache.flink.cdc.common.configuration.Configuration; | ||
import org.apache.flink.cdc.common.event.Event; | ||
import org.apache.flink.cdc.common.pipeline.PipelineOptions; | ||
|
@@ -59,32 +60,28 @@ public class FlinkPipelineComposer implements PipelineComposer { | |
public static FlinkPipelineComposer ofRemoteCluster( | ||
org.apache.flink.configuration.Configuration flinkConfig, List<Path> additionalJars) { | ||
StreamExecutionEnvironment env = new StreamExecutionEnvironment(flinkConfig); | ||
additionalJars.forEach( | ||
jarPath -> { | ||
try { | ||
FlinkEnvironmentUtils.addJar( | ||
env, | ||
jarPath.makeQualified(jarPath.getFileSystem()).toUri().toURL()); | ||
} catch (Exception e) { | ||
throw new RuntimeException( | ||
String.format( | ||
"Unable to convert JAR path \"%s\" to URL when adding JAR to Flink environment", | ||
jarPath), | ||
e); | ||
} | ||
}); | ||
addAdditionalJars(env, additionalJars); | ||
return new FlinkPipelineComposer(env, false); | ||
} | ||
|
||
public static FlinkPipelineComposer ofApplicationCluster(StreamExecutionEnvironment env) { | ||
return new FlinkPipelineComposer(env, false); | ||
} | ||
|
||
@VisibleForTesting | ||
public static FlinkPipelineComposer ofMiniCluster() { | ||
return new FlinkPipelineComposer( | ||
StreamExecutionEnvironment.getExecutionEnvironment(), true); | ||
} | ||
|
||
public static FlinkPipelineComposer ofMiniCluster( | ||
org.apache.flink.configuration.Configuration flinkConfig, List<Path> additionalJars) { | ||
StreamExecutionEnvironment localEnvironment = | ||
StreamExecutionEnvironment.getExecutionEnvironment(flinkConfig); | ||
addAdditionalJars(localEnvironment, additionalJars); | ||
return new FlinkPipelineComposer(localEnvironment, true); | ||
} | ||
|
||
private FlinkPipelineComposer(StreamExecutionEnvironment env, boolean isBlocking) { | ||
this.env = env; | ||
this.isBlocking = isBlocking; | ||
|
@@ -254,4 +251,27 @@ private Optional<URL> getContainingJar(Class<?> clazz) throws Exception { | |
} | ||
return Optional.of(container); | ||
} | ||
|
||
private static void addAdditionalJars( | ||
StreamExecutionEnvironment env, List<Path> additionalJars) { | ||
additionalJars.forEach( | ||
jarPath -> { | ||
try { | ||
FlinkEnvironmentUtils.addJar( | ||
env, | ||
jarPath.makeQualified(jarPath.getFileSystem()).toUri().toURL()); | ||
} catch (Exception e) { | ||
throw new RuntimeException( | ||
String.format( | ||
"Unable to convert JAR path \"%s\" to URL when adding JAR to Flink environment", | ||
jarPath), | ||
e); | ||
} | ||
}); | ||
} | ||
|
||
@VisibleForTesting | ||
public StreamExecutionEnvironment getEnv() { | ||
return env; | ||
} | ||
Comment on lines
+273
to
+276
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it be possible to use Java reflection in the test to obtain this object, instead of creating a separate method for it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reflection makes code difficult to maintain |
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it still necessary to keep this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are still many test cases that use this method. Since they do not use flinkConf and additionalJars, I chose to keep this method.