From 9a9b90e46915e68bb26977e19848b7593f8a1e7c Mon Sep 17 00:00:00 2001 From: Aitozi Date: Fri, 28 Feb 2025 14:49:01 +0800 Subject: [PATCH 1/4] [spark] Enable TagCreationMode#batch for spark writer --- .../apache/paimon/tag/TagBatchCreation.java | 118 ++++++++++++++++++ .../sink/BatchWriteGeneratorTagOperator.java | 89 +------------ .../spark/commands/WriteIntoPaimonTable.scala | 14 ++- .../spark/sql/PaimonTagDdlTestBase.scala | 16 +++ 4 files changed, 151 insertions(+), 86 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/tag/TagBatchCreation.java diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagBatchCreation.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagBatchCreation.java new file mode 100644 index 000000000000..26f1d58d0659 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagBatchCreation.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.tag; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; +import org.apache.paimon.operation.TagDeletion; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.utils.TagManager; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.List; + +/** Tag creation for batch mode. */ +public class TagBatchCreation { + + private static final String BATCH_WRITE_TAG_PREFIX = "batch-write-"; + private final FileStoreTable table; + private final CoreOptions options; + private final TagManager tagManager; + private final SnapshotManager snapshotManager; + private final TagDeletion tagDeletion; + + public TagBatchCreation(FileStoreTable table) { + this.table = table; + this.snapshotManager = table.snapshotManager(); + this.tagManager = table.tagManager(); + this.tagDeletion = table.store().newTagDeletion(); + this.options = table.coreOptions(); + } + + public void createTag() { + Snapshot snapshot = snapshotManager.latestSnapshot(); + if (snapshot == null) { + return; + } + Instant instant = Instant.ofEpochMilli(snapshot.timeMillis()); + LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault()); + String tagName = + options.tagBatchCustomizedName() != null + ? options.tagBatchCustomizedName() + : BATCH_WRITE_TAG_PREFIX + + localDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")); + try { + // If the tag already exists, delete the tag + tagManager.deleteTag( + tagName, tagDeletion, snapshotManager, table.store().createTagCallbacks()); + // Create a new tag + tagManager.createTag( + snapshot, + tagName, + table.coreOptions().tagDefaultTimeRetained(), + table.store().createTagCallbacks(), + false); + // Expire the tag + expireTag(); + } catch (Exception e) { + if (tagManager.tagExists(tagName)) { + tagManager.deleteTag( + tagName, tagDeletion, snapshotManager, table.store().createTagCallbacks()); + } + } + } + + private void expireTag() { + Integer tagNumRetainedMax = options.tagNumRetainedMax(); + if (tagNumRetainedMax != null) { + if (snapshotManager.latestSnapshot() == null) { + return; + } + long tagCount = tagManager.tagCount(); + + while (tagCount > tagNumRetainedMax) { + for (List tagNames : tagManager.tags().values()) { + if (tagCount - tagNames.size() >= tagNumRetainedMax) { + tagManager.deleteAllTagsOfOneSnapshot( + tagNames, tagDeletion, snapshotManager); + tagCount = tagCount - tagNames.size(); + } else { + List sortedTagNames = tagManager.sortTagsOfOneSnapshot(tagNames); + for (String toBeDeleted : sortedTagNames) { + tagManager.deleteTag( + toBeDeleted, + tagDeletion, + snapshotManager, + table.store().createTagCallbacks()); + tagCount--; + if (tagCount == tagNumRetainedMax) { + break; + } + } + break; + } + } + } + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java index d1ae093b2fee..9e9c7b1dc397 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java @@ -18,11 +18,8 @@ package org.apache.paimon.flink.sink; -import org.apache.paimon.Snapshot; -import org.apache.paimon.operation.TagDeletion; import org.apache.paimon.table.FileStoreTable; -import org.apache.paimon.utils.SnapshotManager; -import org.apache.paimon.utils.TagManager; +import org.apache.paimon.tag.TagBatchCreation; import org.apache.flink.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.checkpoint.CheckpointOptions; @@ -37,12 +34,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; -import java.time.Instant; -import java.time.LocalDateTime; -import java.time.ZoneId; -import java.time.format.DateTimeFormatter; -import java.util.List; - /** * Commit {@link Committable} for snapshot using the {@link CommitterOperator}. When the task is * completed, the corresponding tag is generated. @@ -58,10 +49,13 @@ public class BatchWriteGeneratorTagOperator protected final FileStoreTable table; + protected final TagBatchCreation tagBatchCreation; + public BatchWriteGeneratorTagOperator( CommitterOperator commitOperator, FileStoreTable table) { this.table = table; this.commitOperator = commitOperator; + this.tagBatchCreation = new TagBatchCreation(table); } @Override @@ -91,79 +85,6 @@ public void notifyCheckpointAborted(long checkpointId) throws Exception { commitOperator.notifyCheckpointAborted(checkpointId); } - private void createTag() { - SnapshotManager snapshotManager = table.snapshotManager(); - Snapshot snapshot = snapshotManager.latestSnapshot(); - if (snapshot == null) { - return; - } - TagManager tagManager = table.tagManager(); - TagDeletion tagDeletion = table.store().newTagDeletion(); - Instant instant = Instant.ofEpochMilli(snapshot.timeMillis()); - LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault()); - String tagName = - table.coreOptions().tagBatchCustomizedName() != null - ? table.coreOptions().tagBatchCustomizedName() - : BATCH_WRITE_TAG_PREFIX - + localDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")); - try { - // If the tag already exists, delete the tag - tagManager.deleteTag( - tagName, tagDeletion, snapshotManager, table.store().createTagCallbacks()); - // Create a new tag - tagManager.createTag( - snapshot, - tagName, - table.coreOptions().tagDefaultTimeRetained(), - table.store().createTagCallbacks(), - false); - // Expire the tag - expireTag(); - } catch (Exception e) { - if (tagManager.tagExists(tagName)) { - tagManager.deleteTag( - tagName, tagDeletion, snapshotManager, table.store().createTagCallbacks()); - } - } - } - - private void expireTag() { - Integer tagNumRetainedMax = table.coreOptions().tagNumRetainedMax(); - if (tagNumRetainedMax != null) { - SnapshotManager snapshotManager = table.snapshotManager(); - if (snapshotManager.latestSnapshot() == null) { - return; - } - TagManager tagManager = table.tagManager(); - TagDeletion tagDeletion = table.store().newTagDeletion(); - long tagCount = tagManager.tagCount(); - - while (tagCount > tagNumRetainedMax) { - for (List tagNames : tagManager.tags().values()) { - if (tagCount - tagNames.size() >= tagNumRetainedMax) { - tagManager.deleteAllTagsOfOneSnapshot( - tagNames, tagDeletion, snapshotManager); - tagCount = tagCount - tagNames.size(); - } else { - List sortedTagNames = tagManager.sortTagsOfOneSnapshot(tagNames); - for (String toBeDeleted : sortedTagNames) { - tagManager.deleteTag( - toBeDeleted, - tagDeletion, - snapshotManager, - table.store().createTagCallbacks()); - tagCount--; - if (tagCount == tagNumRetainedMax) { - break; - } - } - break; - } - } - } - } - } - @Override public void open() throws Exception { commitOperator.open(); @@ -191,7 +112,7 @@ public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception { @Override public void finish() throws Exception { - createTag(); + tagBatchCreation.createTag(); commitOperator.finish(); } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala index eae8b4f14658..4f9f5a486704 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala @@ -19,13 +19,14 @@ package org.apache.paimon.spark.commands import org.apache.paimon.CoreOptions -import org.apache.paimon.CoreOptions.DYNAMIC_PARTITION_OVERWRITE +import org.apache.paimon.CoreOptions.{DYNAMIC_PARTITION_OVERWRITE, TagCreationMode} import org.apache.paimon.options.Options import org.apache.paimon.partition.actions.PartitionMarkDoneAction import org.apache.paimon.spark._ import org.apache.paimon.spark.schema.SparkSystemColumns import org.apache.paimon.table.FileStoreTable import org.apache.paimon.table.sink.CommitMessage +import org.apache.paimon.tag.TagBatchCreation import org.apache.paimon.utils.{InternalRowPartitionComputer, PartitionPathUtils, TypeUtils} import org.apache.spark.internal.Logging @@ -83,10 +84,19 @@ case class WriteIntoPaimonTable( val commitMessages = writer.write(data) writer.commit(commitMessages) - markDoneIfNeeded(commitMessages) + preFinish(commitMessages) Seq.empty } + private def preFinish(commitMessages: Seq[CommitMessage]): Unit = { + val coreOptions = table.coreOptions(); + if (coreOptions.tagCreationMode() == TagCreationMode.BATCH) { + val tagCreation = new TagBatchCreation(table) + tagCreation.createTag() + } + markDoneIfNeeded(commitMessages) + } + private def markDoneIfNeeded(commitMessages: Seq[CommitMessage]): Unit = { val coreOptions = table.coreOptions() if (coreOptions.toConfiguration.get(CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT)) { diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonTagDdlTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonTagDdlTestBase.scala index 5ad687b4da0f..047a222c4a00 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonTagDdlTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonTagDdlTestBase.scala @@ -142,4 +142,20 @@ abstract class PaimonTagDdlTestBase extends PaimonSparkTestBase { spark.sql("alter table T rename tag `tag-1` to `tag-2`") checkAnswer(spark.sql("show tags T"), Row("tag-2")) } + + test("Tag creation: batch creation mode") { + spark.sql("""CREATE TABLE T (id INT, name STRING) + |USING PAIMON + |TBLPROPERTIES ( + |'file.format' = 'avro', + |'tag.automatic-creation'='batch', + |'tag.batch.customized-name' = 'haha')""".stripMargin) + spark.sql("insert into T values(1, 'a')") + assertResult(1)(loadTable("T").tagManager().tagObjects().size()) + assertResult("haha")(loadTable("T").tagManager().tagObjects().get(0).getRight) + spark.sql("insert into T values(1, 'a')") + // tag overwrite + assertResult(1)(loadTable("T").tagManager().tagObjects().size()) + assertResult("haha")(loadTable("T").tagManager().tagObjects().get(0).getRight) + } } From 0984a2fcc7374a1e2979f18372c65ed479a026f5 Mon Sep 17 00:00:00 2001 From: Aitozi Date: Fri, 28 Feb 2025 15:02:45 +0800 Subject: [PATCH 2/4] fix --- .../apache/paimon/spark/commands/WriteIntoPaimonTable.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala index 4f9f5a486704..84b44984f843 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala @@ -89,8 +89,7 @@ case class WriteIntoPaimonTable( } private def preFinish(commitMessages: Seq[CommitMessage]): Unit = { - val coreOptions = table.coreOptions(); - if (coreOptions.tagCreationMode() == TagCreationMode.BATCH) { + if (table.coreOptions().tagCreationMode() == TagCreationMode.BATCH) { val tagCreation = new TagBatchCreation(table) tagCreation.createTag() } From 2e9431c0d01aa626ef0adc36e5192941a981ab0a Mon Sep 17 00:00:00 2001 From: Aitozi Date: Fri, 28 Feb 2025 15:05:33 +0800 Subject: [PATCH 3/4] fix --- .../java/org/apache/paimon/tag/TagBatchCreation.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagBatchCreation.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagBatchCreation.java index 26f1d58d0659..4b973c20de9c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/tag/TagBatchCreation.java +++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagBatchCreation.java @@ -25,6 +25,9 @@ import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; @@ -34,6 +37,8 @@ /** Tag creation for batch mode. */ public class TagBatchCreation { + private static final Logger LOG = LoggerFactory.getLogger(TagBatchCreation.class); + private static final String BATCH_WRITE_TAG_PREFIX = "batch-write-"; private final FileStoreTable table; private final CoreOptions options; @@ -72,14 +77,15 @@ public void createTag() { table.coreOptions().tagDefaultTimeRetained(), table.store().createTagCallbacks(), false); - // Expire the tag - expireTag(); } catch (Exception e) { + LOG.warn("Failed to create tag", e); if (tagManager.tagExists(tagName)) { tagManager.deleteTag( tagName, tagDeletion, snapshotManager, table.store().createTagCallbacks()); } } + // Expire the tag + expireTag(); } private void expireTag() { From 81a05be04c26d0596096967fc8c2694cdbe09197 Mon Sep 17 00:00:00 2001 From: Aitozi Date: Fri, 28 Feb 2025 16:20:38 +0800 Subject: [PATCH 4/4] fix --- .../main/java/org/apache/paimon/tag/TagBatchCreation.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagBatchCreation.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagBatchCreation.java index 4b973c20de9c..8afeaa62166b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/tag/TagBatchCreation.java +++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagBatchCreation.java @@ -78,7 +78,11 @@ public void createTag() { table.store().createTagCallbacks(), false); } catch (Exception e) { - LOG.warn("Failed to create tag", e); + LOG.warn( + "Failed to create tag '{}' from '${}', you can create this tag manually.", + tagName, + snapshot.id(), + e); if (tagManager.tagExists(tagName)) { tagManager.deleteTag( tagName, tagDeletion, snapshotManager, table.store().createTagCallbacks());