Skip to content

Commit 9a8f63f

Browse files
authored
[core] Extract ChangelogManager from SnapshotManager (#5188)
1 parent 75c855e commit 9a8f63f

File tree

62 files changed

+815
-433
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

62 files changed

+815
-433
lines changed

paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.apache.paimon.tag.SuccessFileTagCallback;
5656
import org.apache.paimon.tag.TagAutoManager;
5757
import org.apache.paimon.types.RowType;
58+
import org.apache.paimon.utils.ChangelogManager;
5859
import org.apache.paimon.utils.FileStorePathFactory;
5960
import org.apache.paimon.utils.SegmentsCache;
6061
import org.apache.paimon.utils.SnapshotManager;
@@ -176,6 +177,11 @@ public SnapshotManager snapshotManager() {
176177
snapshotCache);
177178
}
178179

180+
@Override
181+
public ChangelogManager changelogManager() {
182+
return new ChangelogManager(fileIO, options.path(), options.branch());
183+
}
184+
179185
@Override
180186
public ManifestFile.Factory manifestFileFactory() {
181187
return manifestFileFactory(false);

paimon-core/src/main/java/org/apache/paimon/FileStore.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.paimon.table.sink.TagCallback;
4040
import org.apache.paimon.tag.TagAutoManager;
4141
import org.apache.paimon.types.RowType;
42+
import org.apache.paimon.utils.ChangelogManager;
4243
import org.apache.paimon.utils.FileStorePathFactory;
4344
import org.apache.paimon.utils.SegmentsCache;
4445
import org.apache.paimon.utils.SnapshotManager;
@@ -61,6 +62,8 @@ public interface FileStore<T> {
6162

6263
SnapshotManager snapshotManager();
6364

65+
ChangelogManager changelogManager();
66+
6467
RowType partitionType();
6568

6669
CoreOptions options();

paimon-core/src/main/java/org/apache/paimon/Snapshot.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
import java.io.FileNotFoundException;
3535
import java.io.IOException;
36+
import java.io.Serializable;
3637
import java.util.Map;
3738
import java.util.Objects;
3839

@@ -61,7 +62,9 @@
6162
*/
6263
@Public
6364
@JsonIgnoreProperties(ignoreUnknown = true)
64-
public class Snapshot {
65+
public class Snapshot implements Serializable {
66+
67+
private static final long serialVersionUID = 1L;
6568

6669
public static final long FIRST_SNAPSHOT_ID = 1;
6770

paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendCompactionTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public CommitMessage doCompact(FileStoreTable table, AppendOnlyFileStoreWrite wr
7979
UnawareAppendDeletionFileMaintainer dvIndexFileMaintainer =
8080
AppendDeletionFileMaintainer.forUnawareAppend(
8181
table.store().newIndexFileHandler(),
82-
table.snapshotManager().latestSnapshotId(),
82+
table.snapshotManager().latestSnapshot(),
8383
partition);
8484
compactAfter.addAll(
8585
write.compactRewrite(

paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -355,9 +355,7 @@ private UnawareAppendDeletionFileMaintainer dvMaintainer(BinaryRow partition) {
355355
synchronized (this) {
356356
maintainer =
357357
AppendDeletionFileMaintainer.forUnawareAppend(
358-
indexFileHandler,
359-
snapshotManager.latestSnapshotId(),
360-
partition);
358+
indexFileHandler, snapshotManager.latestSnapshot(), partition);
361359
}
362360
cache.put(partition, maintainer);
363361
}

paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.paimon.deletionvectors;
2020

21+
import org.apache.paimon.Snapshot;
2122
import org.apache.paimon.annotation.VisibleForTesting;
2223
import org.apache.paimon.data.BinaryRow;
2324
import org.apache.paimon.index.IndexFileHandler;
@@ -152,23 +153,23 @@ public Factory(IndexFileHandler handler) {
152153
}
153154

154155
public DeletionVectorsMaintainer createOrRestore(
155-
@Nullable Long snapshotId, BinaryRow partition, int bucket) {
156+
@Nullable Snapshot snapshot, BinaryRow partition, int bucket) {
156157
List<IndexFileMeta> indexFiles =
157-
snapshotId == null
158+
snapshot == null
158159
? Collections.emptyList()
159-
: handler.scan(snapshotId, DELETION_VECTORS_INDEX, partition, bucket);
160+
: handler.scan(snapshot, DELETION_VECTORS_INDEX, partition, bucket);
160161
Map<String, DeletionVector> deletionVectors =
161162
new HashMap<>(handler.readAllDeletionVectors(indexFiles));
162163
return createOrRestore(deletionVectors);
163164
}
164165

165166
@VisibleForTesting
166167
public DeletionVectorsMaintainer createOrRestore(
167-
@Nullable Long snapshotId, BinaryRow partition) {
168+
@Nullable Snapshot snapshot, BinaryRow partition) {
168169
List<IndexFileMeta> indexFiles =
169-
snapshotId == null
170+
snapshot == null
170171
? Collections.emptyList()
171-
: handler.scanEntries(snapshotId, DELETION_VECTORS_INDEX, partition)
172+
: handler.scanEntries(snapshot, DELETION_VECTORS_INDEX, partition)
172173
.stream()
173174
.map(IndexManifestEntry::indexFile)
174175
.collect(Collectors.toList());

paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainer.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.paimon.deletionvectors.append;
2020

21+
import org.apache.paimon.Snapshot;
2122
import org.apache.paimon.data.BinaryRow;
2223
import org.apache.paimon.deletionvectors.DeletionVector;
2324
import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
@@ -53,21 +54,21 @@ public interface AppendDeletionFileMaintainer {
5354

5455
static BucketedAppendDeletionFileMaintainer forBucketedAppend(
5556
IndexFileHandler indexFileHandler,
56-
@Nullable Long snapshotId,
57+
@Nullable Snapshot snapshot,
5758
BinaryRow partition,
5859
int bucket) {
5960
// bucket should have only one deletion file, so here we should read old deletion vectors,
6061
// overwrite the entire deletion file of the bucket when writing deletes.
6162
DeletionVectorsMaintainer maintainer =
6263
new DeletionVectorsMaintainer.Factory(indexFileHandler)
63-
.createOrRestore(snapshotId, partition, bucket);
64+
.createOrRestore(snapshot, partition, bucket);
6465
return new BucketedAppendDeletionFileMaintainer(partition, bucket, maintainer);
6566
}
6667

6768
static UnawareAppendDeletionFileMaintainer forUnawareAppend(
68-
IndexFileHandler indexFileHandler, @Nullable Long snapshotId, BinaryRow partition) {
69+
IndexFileHandler indexFileHandler, @Nullable Snapshot snapshot, BinaryRow partition) {
6970
Map<String, DeletionFile> deletionFiles =
70-
indexFileHandler.scanDVIndex(snapshotId, partition, UNAWARE_BUCKET);
71+
indexFileHandler.scanDVIndex(snapshot, partition, UNAWARE_BUCKET);
7172
return new UnawareAppendDeletionFileMaintainer(indexFileHandler, partition, deletionFiles);
7273
}
7374
}

paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777

7878
/** migrate iceberg table to paimon table. */
7979
public class IcebergMigrator implements Migrator {
80+
8081
private static final Logger LOG = LoggerFactory.getLogger(IcebergMigrator.class);
8182

8283
private final ThreadPoolExecutor executor;

paimon-core/src/main/java/org/apache/paimon/index/HashIndexMaintainer.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.paimon.index;
2020

2121
import org.apache.paimon.KeyValue;
22+
import org.apache.paimon.Snapshot;
2223
import org.apache.paimon.annotation.VisibleForTesting;
2324
import org.apache.paimon.data.BinaryRow;
2425
import org.apache.paimon.data.InternalRow;
@@ -44,14 +45,14 @@ public class HashIndexMaintainer implements IndexMaintainer<KeyValue> {
4445

4546
private HashIndexMaintainer(
4647
IndexFileHandler fileHandler,
47-
@Nullable Long snapshotId,
48+
@Nullable Snapshot snapshot,
4849
BinaryRow partition,
4950
int bucket) {
5051
this.fileHandler = fileHandler;
5152
IntHashSet hashcode = new IntHashSet();
52-
if (snapshotId != null) {
53+
if (snapshot != null) {
5354
Optional<IndexFileMeta> indexFile =
54-
fileHandler.scanHashIndex(snapshotId, partition, bucket);
55+
fileHandler.scanHashIndex(snapshot, partition, bucket);
5556
if (indexFile.isPresent()) {
5657
IndexFileMeta file = indexFile.get();
5758
hashcode = new IntHashSet((int) file.rowCount());
@@ -115,8 +116,8 @@ public Factory(IndexFileHandler handler) {
115116

116117
@Override
117118
public IndexMaintainer<KeyValue> createOrRestore(
118-
@Nullable Long snapshotId, BinaryRow partition, int bucket) {
119-
return new HashIndexMaintainer(handler, snapshotId, partition, bucket);
119+
@Nullable Snapshot snapshot, BinaryRow partition, int bucket) {
120+
return new HashIndexMaintainer(handler, snapshot, partition, bucket);
120121
}
121122
}
122123
}

paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,9 @@ public DeletionVectorsIndexFile deletionVectorsIndex() {
7575
return this.deletionVectorsIndex;
7676
}
7777

78-
public Optional<IndexFileMeta> scanHashIndex(long snapshotId, BinaryRow partition, int bucket) {
79-
List<IndexFileMeta> result = scan(snapshotId, HASH_INDEX, partition, bucket);
78+
public Optional<IndexFileMeta> scanHashIndex(
79+
Snapshot snapshot, BinaryRow partition, int bucket) {
80+
List<IndexFileMeta> result = scan(snapshot, HASH_INDEX, partition, bucket);
8081
if (result.size() > 1) {
8182
throw new IllegalArgumentException(
8283
"Find multiple hash index files for one bucket: " + result);
@@ -85,11 +86,10 @@ public Optional<IndexFileMeta> scanHashIndex(long snapshotId, BinaryRow partitio
8586
}
8687

8788
public Map<String, DeletionFile> scanDVIndex(
88-
@Nullable Long snapshotId, BinaryRow partition, int bucket) {
89-
if (snapshotId == null) {
89+
@Nullable Snapshot snapshot, BinaryRow partition, int bucket) {
90+
if (snapshot == null) {
9091
return Collections.emptyMap();
9192
}
92-
Snapshot snapshot = snapshotManager.snapshot(snapshotId);
9393
String indexManifest = snapshot.indexManifest();
9494
if (indexManifest == null) {
9595
return Collections.emptyMap();
@@ -136,9 +136,9 @@ public List<IndexManifestEntry> scan(String indexType) {
136136
}
137137

138138
public List<IndexFileMeta> scan(
139-
long snapshotId, String indexType, BinaryRow partition, int bucket) {
139+
Snapshot snapshot, String indexType, BinaryRow partition, int bucket) {
140140
List<IndexFileMeta> result = new ArrayList<>();
141-
for (IndexManifestEntry file : scanEntries(snapshotId, indexType, partition)) {
141+
for (IndexManifestEntry file : scanEntries(snapshot, indexType, partition)) {
142142
if (file.bucket() == bucket) {
143143
result.add(file.indexFile());
144144
}
@@ -171,7 +171,7 @@ public List<IndexManifestEntry> scanEntries() {
171171
}
172172

173173
public List<IndexManifestEntry> scanEntries(String indexType, BinaryRow partition) {
174-
Long snapshot = snapshotManager.latestSnapshotId();
174+
Snapshot snapshot = snapshotManager.latestSnapshot();
175175
if (snapshot == null) {
176176
return Collections.emptyList();
177177
}
@@ -180,13 +180,8 @@ public List<IndexManifestEntry> scanEntries(String indexType, BinaryRow partitio
180180
}
181181

182182
public List<IndexManifestEntry> scanEntries(
183-
long snapshotId, String indexType, BinaryRow partition) {
184-
return scanEntries(snapshotId, indexType, Collections.singleton(partition));
185-
}
186-
187-
public List<IndexManifestEntry> scanEntries(
188-
long snapshot, String indexType, Set<BinaryRow> partitions) {
189-
return scanEntries(snapshotManager.snapshot(snapshot), indexType, partitions);
183+
Snapshot snapshot, String indexType, BinaryRow partition) {
184+
return scanEntries(snapshot, indexType, Collections.singleton(partition));
190185
}
191186

192187
public List<IndexManifestEntry> scanEntries(

0 commit comments

Comments
 (0)