Skip to content

Commit b67ba29

Browse files
authored
[FLINK-37963] Fix potential NPE when triggering JobManager failover prematurely (#4044)
1 parent 06ab765 commit b67ba29

File tree

6 files changed

+61
-6
lines changed

6 files changed

+61
-6
lines changed

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ class MongoDBFullChangelogITCase extends MongoDBSourceTestBase {
7777
private static final int USE_PRE_HIGHWATERMARK_HOOK = 2;
7878
private static final int USE_POST_HIGHWATERMARK_HOOK = 3;
7979

80+
private static final StreamExecutionEnvironment env =
81+
StreamExecutionEnvironment.getExecutionEnvironment();
82+
8083
@Test
8184
void testGetMongoDBVersion() {
8285
MongoDBSourceConfig config =
@@ -470,7 +473,6 @@ private List<String> testBackfillWhenWritingEvents(
470473
customerDatabase);
471474
MONGO_CONTAINER.executeCommandFileInDatabase("customer", customerDatabase);
472475

473-
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
474476
env.enableCheckpointing(1000);
475477
env.setParallelism(1);
476478

@@ -580,7 +582,6 @@ private void testMongoDBParallelSourceWithMetadataColumns(
580582
customerDatabase);
581583
}
582584

583-
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
584585
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
585586

586587
env.setParallelism(parallelism);
@@ -752,7 +753,6 @@ private void testMongoDBParallelSource(
752753
customerDatabase);
753754
}
754755

755-
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
756756
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
757757

758758
env.setParallelism(parallelism);

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBParallelSourceITCase.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ class MongoDBParallelSourceITCase extends MongoDBSourceTestBase {
7171
private static final int USE_PRE_HIGHWATERMARK_HOOK = 2;
7272
private static final int USE_POST_HIGHWATERMARK_HOOK = 3;
7373

74+
private static final StreamExecutionEnvironment env =
75+
StreamExecutionEnvironment.getExecutionEnvironment();
76+
7477
@Test
7578
void testReadSingleCollectionWithSingleParallelism() throws Exception {
7679
testMongoDBParallelSource(
@@ -406,7 +409,6 @@ private List<String> testBackfillWhenWritingEvents(
406409
boolean skipBackFill, int fetchSize, int hookType, StartupOptions startupOptions)
407410
throws Exception {
408411
String customerDatabase = MONGO_CONTAINER.executeCommandFileInSeparateDatabase("customer");
409-
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
410412
env.enableCheckpointing(1000);
411413
env.setParallelism(1);
412414

@@ -517,7 +519,6 @@ private void testMongoDBParallelSource(
517519

518520
String customerDatabase = MONGO_CONTAINER.executeCommandFileInSeparateDatabase("customer");
519521

520-
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
521522
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
522523

523524
env.setParallelism(parallelism);

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/utils/MongoDBTestUtils.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.flink.cdc.connectors.mongodb.utils;
1919

2020
import org.apache.flink.api.common.JobID;
21+
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
2122
import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
2223
import org.apache.flink.runtime.minicluster.MiniCluster;
2324
import org.apache.flink.table.data.RowData;
@@ -126,11 +127,24 @@ public static void triggerFailover(
126127
}
127128
}
128129

130+
public static void ensureJmLeaderServiceExists(
131+
HaLeadershipControl leadershipControl, JobID jobId) throws Exception {
132+
EmbeddedHaServices control = (EmbeddedHaServices) leadershipControl;
133+
134+
// Make sure JM leader service has been created, or an NPE might be thrown when we're
135+
// triggering JM failover later.
136+
control.getJobManagerLeaderElection(jobId).close();
137+
}
138+
129139
public static void triggerJobManagerFailover(
130140
JobID jobId, MiniCluster miniCluster, Runnable afterFailAction) throws Exception {
131141
final HaLeadershipControl haLeadershipControl = miniCluster.getHaLeadershipControl().get();
142+
ensureJmLeaderServiceExists(haLeadershipControl, jobId);
132143
haLeadershipControl.revokeJobMasterLeadership(jobId).get();
144+
133145
afterFailAction.run();
146+
147+
ensureJmLeaderServiceExists(haLeadershipControl, jobId);
134148
haLeadershipControl.grantJobMasterLeadership(jobId).get();
135149
}
136150

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceTestBase.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
2323
import org.apache.flink.cdc.connectors.utils.ExternalResourceProxy;
2424
import org.apache.flink.configuration.Configuration;
25+
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
2526
import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
2627
import org.apache.flink.runtime.minicluster.MiniCluster;
2728
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
@@ -149,11 +150,22 @@ protected static void triggerFailover(
149150
}
150151
}
151152

153+
protected static void ensureJmLeaderServiceExists(
154+
HaLeadershipControl leadershipControl, JobID jobId) throws Exception {
155+
EmbeddedHaServices control = (EmbeddedHaServices) leadershipControl;
156+
157+
// Make sure JM leader service has been created, or an NPE might be thrown when we're
158+
// triggering JM failover later.
159+
control.getJobManagerLeaderElection(jobId).close();
160+
}
161+
152162
protected static void triggerJobManagerFailover(
153163
JobID jobId, MiniCluster miniCluster, Runnable afterFailAction) throws Exception {
154164
final HaLeadershipControl haLeadershipControl = miniCluster.getHaLeadershipControl().get();
165+
ensureJmLeaderServiceExists(haLeadershipControl, jobId);
155166
haLeadershipControl.revokeJobMasterLeadership(jobId).get();
156167
afterFailAction.run();
168+
ensureJmLeaderServiceExists(haLeadershipControl, jobId);
157169
haLeadershipControl.grantJobMasterLeadership(jobId).get();
158170
}
159171

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/testutils/OracleTestUtils.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.flink.cdc.connectors.oracle.testutils;
1919

2020
import org.apache.flink.api.common.JobID;
21+
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
2122
import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
2223
import org.apache.flink.runtime.minicluster.MiniCluster;
2324
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
@@ -61,11 +62,24 @@ public static void triggerFailover(
6162
}
6263
}
6364

65+
public static void ensureJmLeaderServiceExists(
66+
HaLeadershipControl leadershipControl, JobID jobId) throws Exception {
67+
EmbeddedHaServices control = (EmbeddedHaServices) leadershipControl;
68+
69+
// Make sure JM leader service has been created, or an NPE might be thrown when we're
70+
// triggering JM failover later.
71+
control.getJobManagerLeaderElection(jobId).close();
72+
}
73+
6474
public static void triggerJobManagerFailover(
6575
JobID jobId, MiniCluster miniCluster, Runnable afterFailAction) throws Exception {
6676
final HaLeadershipControl haLeadershipControl = miniCluster.getHaLeadershipControl().get();
77+
ensureJmLeaderServiceExists(haLeadershipControl, jobId);
6778
haLeadershipControl.revokeJobMasterLeadership(jobId).get();
79+
6880
afterFailAction.run();
81+
82+
ensureJmLeaderServiceExists(haLeadershipControl, jobId);
6983
haLeadershipControl.grantJobMasterLeadership(jobId).get();
7084
}
7185

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/testutils/PostgresTestUtils.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.flink.cdc.connectors.postgres.testutils;
1919

2020
import org.apache.flink.api.common.JobID;
21+
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
2122
import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
2223
import org.apache.flink.runtime.minicluster.MiniCluster;
2324
import org.apache.flink.table.api.TableResult;
@@ -71,11 +72,24 @@ public static void triggerFailover(
7172
}
7273
}
7374

74-
protected static void triggerJobManagerFailover(
75+
public static void ensureJmLeaderServiceExists(
76+
HaLeadershipControl leadershipControl, JobID jobId) throws Exception {
77+
EmbeddedHaServices control = (EmbeddedHaServices) leadershipControl;
78+
79+
// Make sure JM leader service has been created, or an NPE might be thrown when we're
80+
// triggering JM failover later.
81+
control.getJobManagerLeaderElection(jobId).close();
82+
}
83+
84+
public static void triggerJobManagerFailover(
7585
JobID jobId, MiniCluster miniCluster, Runnable afterFailAction) throws Exception {
7686
final HaLeadershipControl haLeadershipControl = miniCluster.getHaLeadershipControl().get();
87+
ensureJmLeaderServiceExists(haLeadershipControl, jobId);
7788
haLeadershipControl.revokeJobMasterLeadership(jobId).get();
89+
7890
afterFailAction.run();
91+
92+
ensureJmLeaderServiceExists(haLeadershipControl, jobId);
7993
haLeadershipControl.grantJobMasterLeadership(jobId).get();
8094
}
8195

0 commit comments

Comments
 (0)