Skip to content

Commit bd74361

Browse files
authored
[flink] Fix StoreMultiCommitter with eager init mode (#5187)
1 parent 732bff8 commit bd74361

File tree

5 files changed

+45
-7
lines changed

5 files changed

+45
-7
lines changed

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,14 +81,20 @@ interface Context {
8181
boolean isRestored();
8282

8383
OperatorStateStore stateStore();
84+
85+
int getParallelism();
86+
87+
int getSubtaskIndex();
8488
}
8589

8690
static Context createContext(
8791
String commitUser,
8892
@Nullable OperatorMetricGroup metricGroup,
8993
boolean streamingCheckpointEnabled,
9094
boolean isRestored,
91-
OperatorStateStore stateStore) {
95+
OperatorStateStore stateStore,
96+
int parallelism,
97+
int subtaskIndex) {
9298
return new Committer.Context() {
9399
@Override
94100
public String commitUser() {
@@ -114,6 +120,16 @@ public boolean isRestored() {
114120
public OperatorStateStore stateStore() {
115121
return stateStore;
116122
}
123+
124+
@Override
125+
public int getParallelism() {
126+
return parallelism;
127+
}
128+
129+
@Override
130+
public int getSubtaskIndex() {
131+
return subtaskIndex;
132+
}
117133
};
118134
}
119135
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,14 +130,19 @@ public void initializeState(StateInitializationContext context) throws Exception
130130
StateUtils.getSingleValueFromState(
131131
context, "commit_user_state", String.class, initialCommitUser);
132132
// parallelism of commit operator is always 1, so commitUser will never be null
133+
int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
134+
int index = getRuntimeContext().getIndexOfThisSubtask();
135+
133136
committer =
134137
committerFactory.create(
135138
Committer.createContext(
136139
commitUser,
137140
getMetricGroup(),
138141
streamingCheckpointEnabled,
139142
context.isRestored(),
140-
context.getOperatorStateStore()));
143+
context.getOperatorStateStore(),
144+
parallelism,
145+
index));
141146

142147
committableStateManager.initializeState(context, committer);
143148
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableChannelComputer.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,12 @@ public void setup(int numChannels) {
3737

3838
@Override
3939
public int channel(MultiTableCommittable multiTableCommittable) {
40-
return Math.floorMod(
41-
Objects.hash(multiTableCommittable.getDatabase(), multiTableCommittable.getTable()),
42-
numChannels);
40+
return computeChannel(
41+
multiTableCommittable.getDatabase(), multiTableCommittable.getTable(), numChannels);
42+
}
43+
44+
public static int computeChannel(String database, String table, int numChannels) {
45+
return Math.floorMod(Objects.hash(database, table), numChannels);
4346
}
4447

4548
@Override

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,21 @@ public StoreMultiCommitter(
8585
this.tableCommitters = new HashMap<>();
8686

8787
this.tableFilter = tableFilter;
88+
int parallelism = context.getParallelism();
89+
int index = context.getSubtaskIndex();
8890

8991
if (eagerInit) {
90-
List<Identifier> tableIds = filterTables();
92+
List<Identifier> tableIds =
93+
filterTables().stream()
94+
.filter(
95+
identifier ->
96+
MultiTableCommittableChannelComputer.computeChannel(
97+
identifier.getDatabaseName(),
98+
identifier.getTableName(),
99+
parallelism)
100+
== index)
101+
.collect(Collectors.toList());
102+
91103
tableIds.stream().forEach(this::getStoreCommitter);
92104
}
93105
}

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -597,7 +597,9 @@ public void testCalcDataBytesSend() throws Exception {
597597
OperatorMetricGroup metricGroup = UnregisteredMetricsGroup.createOperatorMetricGroup();
598598
StoreCommitter committer =
599599
new StoreCommitter(
600-
table, commit, Committer.createContext("", metricGroup, true, false, null));
600+
table,
601+
commit,
602+
Committer.createContext("", metricGroup, true, false, null, 1, 1));
601603
committer.commit(Collections.singletonList(manifestCommittable));
602604
CommitterMetrics metrics = committer.getCommitterMetrics();
603605
assertThat(metrics.getNumBytesOutCounter().getCount()).isEqualTo(533);

0 commit comments

Comments
 (0)