From 049c1fa4715619ae3fe8460b9b8b9490961eaae6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Fri, 28 Feb 2025 18:20:19 +0800 Subject: [PATCH 1/3] [flink] Fix StoreMultiCommitter with eager init mode --- .../apache/paimon/flink/sink/Committer.java | 18 +++++++++++++++++- .../paimon/flink/sink/CommitterOperator.java | 7 ++++++- .../MultiTableCommittableChannelComputer.java | 9 ++++++--- .../paimon/flink/sink/StoreMultiCommitter.java | 14 +++++++++++++- .../flink/sink/CommitterOperatorTest.java | 4 +++- 5 files changed, 45 insertions(+), 7 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java index 81c2f6b0077f..23c6c7faeb71 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java @@ -81,6 +81,10 @@ interface Context { boolean isRestored(); OperatorStateStore stateStore(); + + int getParallelism(); + + int getSubtaskIndex(); } static Context createContext( @@ -88,7 +92,9 @@ static Context createContext( @Nullable OperatorMetricGroup metricGroup, boolean streamingCheckpointEnabled, boolean isRestored, - OperatorStateStore stateStore) { + OperatorStateStore stateStore, + int parallelism, + int subtaskIndex) { return new Committer.Context() { @Override public String commitUser() { @@ -114,6 +120,16 @@ public boolean isRestored() { public OperatorStateStore stateStore() { return stateStore; } + + @Override + public int getParallelism() { + return parallelism; + } + + @Override + public int getSubtaskIndex() { + return subtaskIndex; + } }; } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java index 383cbcd6ebf7..0709290fd5bb 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java @@ -130,6 +130,9 @@ public void initializeState(StateInitializationContext context) throws Exception StateUtils.getSingleValueFromState( context, "commit_user_state", String.class, initialCommitUser); // parallelism of commit operator is always 1, so commitUser will never be null + int index = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(); + int parallelism = getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(); + committer = committerFactory.create( Committer.createContext( @@ -137,7 +140,9 @@ public void initializeState(StateInitializationContext context) throws Exception getMetricGroup(), streamingCheckpointEnabled, context.isRestored(), - context.getOperatorStateStore())); + context.getOperatorStateStore(), + parallelism, + index)); committableStateManager.initializeState(context, committer); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableChannelComputer.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableChannelComputer.java index 405c6af271c4..dccc3b84d79a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableChannelComputer.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableChannelComputer.java @@ -37,9 +37,12 @@ public void setup(int numChannels) { @Override public int channel(MultiTableCommittable multiTableCommittable) { - return Math.floorMod( - Objects.hash(multiTableCommittable.getDatabase(), multiTableCommittable.getTable()), - numChannels); + return computeChannel( + multiTableCommittable.getDatabase(), multiTableCommittable.getTable(), numChannels); + } + + public static int computeChannel(String database, String table, int numChannels) { + return Math.floorMod(Objects.hash(database, table), numChannels); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java index 67b2b6bd4627..8ad3e4fb08a3 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java @@ -85,9 +85,21 @@ public StoreMultiCommitter( this.tableCommitters = new HashMap<>(); this.tableFilter = tableFilter; + int parallelism = context.getParallelism(); + int index = context.getSubtaskIndex(); if (eagerInit) { - List tableIds = filterTables(); + List tableIds = + filterTables().stream() + .filter( + identifier -> + MultiTableCommittableChannelComputer.computeChannel( + identifier.getDatabaseName(), + identifier.getTableName(), + parallelism) + == index) + .collect(Collectors.toList()); + tableIds.stream().forEach(this::getStoreCommitter); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java index 1981abd373d7..9d3bc135e2a3 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java @@ -597,7 +597,9 @@ public void testCalcDataBytesSend() throws Exception { OperatorMetricGroup metricGroup = UnregisteredMetricsGroup.createOperatorMetricGroup(); StoreCommitter committer = new StoreCommitter( - table, commit, Committer.createContext("", metricGroup, true, false, null)); + table, + commit, + Committer.createContext("", metricGroup, true, false, null, 1, 1)); committer.commit(Collections.singletonList(manifestCommittable)); CommitterMetrics metrics = committer.getCommitterMetrics(); assertThat(metrics.getNumBytesOutCounter().getCount()).isEqualTo(533); From c77746d0b0a603611f6751efd9d0f6dc535eed09 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Fri, 28 Feb 2025 18:23:19 +0800 Subject: [PATCH 2/3] Fix minus --- .../java/org/apache/paimon/flink/sink/CommitterOperator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java index 0709290fd5bb..37fd593afe67 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java @@ -130,8 +130,8 @@ public void initializeState(StateInitializationContext context) throws Exception StateUtils.getSingleValueFromState( context, "commit_user_state", String.class, initialCommitUser); // parallelism of commit operator is always 1, so commitUser will never be null - int index = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(); int parallelism = getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(); + int index = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(); committer = committerFactory.create( From b25e8ba1362b62c836427c75a86fd67224822e9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Fri, 28 Feb 2025 20:57:16 +0800 Subject: [PATCH 3/3] Fix comment --- .../java/org/apache/paimon/flink/sink/CommitterOperator.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java index 37fd593afe67..4db63b441134 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java @@ -130,8 +130,8 @@ public void initializeState(StateInitializationContext context) throws Exception StateUtils.getSingleValueFromState( context, "commit_user_state", String.class, initialCommitUser); // parallelism of commit operator is always 1, so commitUser will never be null - int parallelism = getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(); - int index = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(); + int parallelism = getRuntimeContext().getNumberOfParallelSubtasks(); + int index = getRuntimeContext().getIndexOfThisSubtask(); committer = committerFactory.create(