Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,20 @@ interface Context {
boolean isRestored();

OperatorStateStore stateStore();

int getParallelism();

int getSubtaskIndex();
}

static Context createContext(
String commitUser,
@Nullable OperatorMetricGroup metricGroup,
boolean streamingCheckpointEnabled,
boolean isRestored,
OperatorStateStore stateStore) {
OperatorStateStore stateStore,
int parallelism,
int subtaskIndex) {
return new Committer.Context() {
@Override
public String commitUser() {
Expand All @@ -114,6 +120,16 @@ public boolean isRestored() {
public OperatorStateStore stateStore() {
return stateStore;
}

@Override
public int getParallelism() {
return parallelism;
}

@Override
public int getSubtaskIndex() {
return subtaskIndex;
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,14 +130,19 @@ public void initializeState(StateInitializationContext context) throws Exception
StateUtils.getSingleValueFromState(
context, "commit_user_state", String.class, initialCommitUser);
// parallelism of commit operator is always 1, so commitUser will never be null
int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
int index = getRuntimeContext().getIndexOfThisSubtask();

committer =
committerFactory.create(
Committer.createContext(
commitUser,
getMetricGroup(),
streamingCheckpointEnabled,
context.isRestored(),
context.getOperatorStateStore()));
context.getOperatorStateStore(),
parallelism,
index));

committableStateManager.initializeState(context, committer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,12 @@ public void setup(int numChannels) {

@Override
public int channel(MultiTableCommittable multiTableCommittable) {
return Math.floorMod(
Objects.hash(multiTableCommittable.getDatabase(), multiTableCommittable.getTable()),
numChannels);
return computeChannel(
multiTableCommittable.getDatabase(), multiTableCommittable.getTable(), numChannels);
}

public static int computeChannel(String database, String table, int numChannels) {
return Math.floorMod(Objects.hash(database, table), numChannels);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,21 @@ public StoreMultiCommitter(
this.tableCommitters = new HashMap<>();

this.tableFilter = tableFilter;
int parallelism = context.getParallelism();
int index = context.getSubtaskIndex();

if (eagerInit) {
List<Identifier> tableIds = filterTables();
List<Identifier> tableIds =
filterTables().stream()
.filter(
identifier ->
MultiTableCommittableChannelComputer.computeChannel(
identifier.getDatabaseName(),
identifier.getTableName(),
parallelism)
== index)
.collect(Collectors.toList());

tableIds.stream().forEach(this::getStoreCommitter);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,9 @@ public void testCalcDataBytesSend() throws Exception {
OperatorMetricGroup metricGroup = UnregisteredMetricsGroup.createOperatorMetricGroup();
StoreCommitter committer =
new StoreCommitter(
table, commit, Committer.createContext("", metricGroup, true, false, null));
table,
commit,
Committer.createContext("", metricGroup, true, false, null, 1, 1));
committer.commit(Collections.singletonList(manifestCommittable));
CommitterMetrics metrics = committer.getCommitterMetrics();
assertThat(metrics.getNumBytesOutCounter().getCount()).isEqualTo(533);
Expand Down
Loading