Skip to content

Commit d866158

Browse files
authored
[core] Introduce gentle lookup compaction mode to reduce overall compaction frequency (#5178)
1 parent ca52728 commit d866158

File tree

17 files changed

+545
-30
lines changed

17 files changed

+545
-30
lines changed

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,18 @@
435435
<td>Integer</td>
436436
<td>The maximal fan-in for external merge sort. It limits the number of file handles. If it is too small, may cause intermediate merging. But if it is too large, it will cause too many files opened at the same time, consume memory and lead to random reading.</td>
437437
</tr>
438+
<tr>
439+
<td><h5>lookup-compact</h5></td>
440+
<td style="word-wrap: break-word;">RADICAL</td>
441+
<td><p>Enum</p></td>
442+
<td>Lookup compact mode used for lookup compaction.<br /><br />Possible values:<ul><li>"RADICAL"</li><li>"GENTLE"</li></ul></td>
443+
</tr>
444+
<tr>
445+
<td><h5>lookup-compact.max-interval</h5></td>
446+
<td style="word-wrap: break-word;">(none)</td>
447+
<td>Integer</td>
448+
<td>The max interval for a gentle mode lookup compaction to be triggered. For every interval, a forced lookup compaction will be performed to flush L0 files to higher level. This option is only valid when lookup-compact mode is gentle.</td>
449+
</tr>
438450
<tr>
439451
<td><h5>lookup-wait</h5></td>
440452
<td style="word-wrap: break-word;">true</td>

paimon-common/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1523,6 +1523,21 @@ public class CoreOptions implements Serializable {
15231523
.withDescription(
15241524
"When need to lookup, commit will wait for compaction by lookup.");
15251525

1526+
public static final ConfigOption<LookupCompactMode> LOOKUP_COMPACT =
1527+
key("lookup-compact")
1528+
.enumType(LookupCompactMode.class)
1529+
.defaultValue(LookupCompactMode.RADICAL)
1530+
.withDescription("Lookup compact mode used for lookup compaction.");
1531+
1532+
public static final ConfigOption<Integer> LOOKUP_COMPACT_MAX_INTERVAL =
1533+
key("lookup-compact.max-interval")
1534+
.intType()
1535+
.noDefaultValue()
1536+
.withDescription(
1537+
"The max interval for a gentle mode lookup compaction to be triggered. For every interval, "
1538+
+ "a forced lookup compaction will be performed to flush L0 files to higher level. "
1539+
+ "This option is only valid when lookup-compact mode is gentle.");
1540+
15261541
public static final ConfigOption<Integer> DELETE_FILE_THREAD_NUM =
15271542
key("delete-file.thread-num")
15281543
.intType()
@@ -2524,6 +2539,23 @@ public boolean prepareCommitWaitCompaction() {
25242539
return options.get(LOOKUP_WAIT);
25252540
}
25262541

2542+
public boolean statefulLookup() {
2543+
return needLookup()
2544+
&& (!options.get(LOOKUP_WAIT) || LookupCompactMode.GENTLE.equals(lookupCompact()));
2545+
}
2546+
2547+
public LookupCompactMode lookupCompact() {
2548+
return options.get(LOOKUP_COMPACT);
2549+
}
2550+
2551+
public int lookupCompactMaxInterval() {
2552+
Integer maxInterval = options.get(LOOKUP_COMPACT_MAX_INTERVAL);
2553+
if (maxInterval == null) {
2554+
maxInterval = MathUtils.multiplySafely(numSortedRunCompactionTrigger(), 2);
2555+
}
2556+
return Math.max(numSortedRunCompactionTrigger(), maxInterval);
2557+
}
2558+
25272559
public boolean asyncFileWrite() {
25282560
return options.get(ASYNC_FILE_WRITE);
25292561
}
@@ -3285,4 +3317,16 @@ public static OrderType of(String orderType) {
32853317
throw new IllegalArgumentException("cannot match type: " + orderType + " for ordering");
32863318
}
32873319
}
3320+
3321+
/** The compact mode for lookup compaction. */
3322+
public enum LookupCompactMode {
3323+
/**
3324+
* Lookup compaction will use ForceUpLevel0Compaction strategy to radically compact new
3325+
* files.
3326+
*/
3327+
RADICAL,
3328+
3329+
/** Lookup compaction will use UniversalCompaction strategy to gently compact new files. */
3330+
GENTLE
3331+
}
32883332
}

paimon-common/src/main/java/org/apache/paimon/utils/MathUtils.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,4 +110,15 @@ public static int addSafely(int a, int b) {
110110
return Integer.MAX_VALUE;
111111
}
112112
}
113+
114+
/**
115+
* Safely multiply the given int value by another int value, ensuring that no overflow occurs.
116+
*/
117+
public static int multiplySafely(int a, int b) {
118+
try {
119+
return Math.multiplyExact(a, b);
120+
} catch (ArithmeticException e) {
121+
return Integer.MAX_VALUE;
122+
}
123+
}
113124
}

paimon-core/src/main/java/org/apache/paimon/compact/CompactManager.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ Optional<CompactResult> getCompactionResult(boolean blocking)
5252
/** Cancel currently running compaction task. */
5353
void cancelCompaction();
5454

55-
/** Check if a compaction is in progress, or if a compaction result remains to be fetched. */
55+
/**
56+
* Check if a compaction is in progress, or if a compaction result remains to be fetched, or if
57+
* a compaction should be triggered later.
58+
*/
5659
boolean isCompacting();
5760
}

paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ private long newSequenceNumber() {
142142
}
143143

144144
@VisibleForTesting
145-
CompactManager compactManager() {
145+
public CompactManager compactManager() {
146146
return compactManager;
147147
}
148148

paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ForceUpLevel0Compaction.java

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -40,18 +40,6 @@ public Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> runs) {
4040
return pick;
4141
}
4242

43-
// collect all level 0 files
44-
int candidateCount = 0;
45-
for (int i = candidateCount; i < runs.size(); i++) {
46-
if (runs.get(i).level() > 0) {
47-
break;
48-
}
49-
candidateCount++;
50-
}
51-
52-
return candidateCount == 0
53-
? Optional.empty()
54-
: Optional.of(
55-
universal.pickForSizeRatio(numLevels - 1, runs, candidateCount, true));
43+
return universal.forcePickL0(numLevels, runs);
5644
}
5745
}

paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ public class MergeTreeCompactManager extends CompactFutureManager {
6363
@Nullable private final CompactionMetrics.Reporter metricsReporter;
6464
@Nullable private final DeletionVectorsMaintainer dvMaintainer;
6565
private final boolean lazyGenDeletionFile;
66+
private final boolean needLookup;
6667

6768
public MergeTreeCompactManager(
6869
ExecutorService executor,
@@ -74,7 +75,8 @@ public MergeTreeCompactManager(
7475
CompactRewriter rewriter,
7576
@Nullable CompactionMetrics.Reporter metricsReporter,
7677
@Nullable DeletionVectorsMaintainer dvMaintainer,
77-
boolean lazyGenDeletionFile) {
78+
boolean lazyGenDeletionFile,
79+
boolean needLookup) {
7880
this.executor = executor;
7981
this.levels = levels;
8082
this.strategy = strategy;
@@ -85,6 +87,7 @@ public MergeTreeCompactManager(
8587
this.metricsReporter = metricsReporter;
8688
this.dvMaintainer = dvMaintainer;
8789
this.lazyGenDeletionFile = lazyGenDeletionFile;
90+
this.needLookup = needLookup;
8891

8992
MetricUtils.safeCall(this::reportMetrics, LOG);
9093
}
@@ -240,6 +243,11 @@ public Optional<CompactResult> getCompactionResult(boolean blocking)
240243
return result;
241244
}
242245

246+
@Override
247+
public boolean isCompacting() {
248+
return super.isCompacting() || (needLookup && !levels().level0().isEmpty());
249+
}
250+
243251
private void reportMetrics() {
244252
if (metricsReporter != null) {
245253
metricsReporter.reportLevel0FileCount(levels.level0().size());
@@ -254,4 +262,9 @@ public void close() throws IOException {
254262
MetricUtils.safeCall(metricsReporter::unregister, LOG);
255263
}
256264
}
265+
266+
@VisibleForTesting
267+
public CompactStrategy getStrategy() {
268+
return strategy;
269+
}
257270
}

paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.time.Duration;
3232
import java.util.List;
3333
import java.util.Optional;
34+
import java.util.concurrent.atomic.AtomicInteger;
3435

3536
/**
3637
* Universal Compaction Style is a compaction style, targeting the use cases requiring lower write
@@ -50,6 +51,9 @@ public class UniversalCompaction implements CompactStrategy {
5051
@Nullable private final Long opCompactionInterval;
5152
@Nullable private Long lastOptimizedCompaction;
5253

54+
@Nullable private final Integer maxLookupCompactInterval;
55+
@Nullable private final AtomicInteger lookupCompactTriggerCount;
56+
5357
public UniversalCompaction(int maxSizeAmp, int sizeRatio, int numRunCompactionTrigger) {
5458
this(maxSizeAmp, sizeRatio, numRunCompactionTrigger, null);
5559
}
@@ -59,11 +63,22 @@ public UniversalCompaction(
5963
int sizeRatio,
6064
int numRunCompactionTrigger,
6165
@Nullable Duration opCompactionInterval) {
66+
this(maxSizeAmp, sizeRatio, numRunCompactionTrigger, opCompactionInterval, null);
67+
}
68+
69+
public UniversalCompaction(
70+
int maxSizeAmp,
71+
int sizeRatio,
72+
int numRunCompactionTrigger,
73+
@Nullable Duration opCompactionInterval,
74+
@Nullable Integer maxLookupCompactInterval) {
6275
this.maxSizeAmp = maxSizeAmp;
6376
this.sizeRatio = sizeRatio;
6477
this.numRunCompactionTrigger = numRunCompactionTrigger;
6578
this.opCompactionInterval =
6679
opCompactionInterval == null ? null : opCompactionInterval.toMillis();
80+
this.maxLookupCompactInterval = maxLookupCompactInterval;
81+
this.lookupCompactTriggerCount = new AtomicInteger(0);
6782
}
6883

6984
@Override
@@ -107,9 +122,44 @@ public Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> runs) {
107122
return Optional.ofNullable(pickForSizeRatio(maxLevel, runs, candidateCount));
108123
}
109124

125+
// 4 checking if a forced L0 compact should be triggered
126+
if (maxLookupCompactInterval != null && lookupCompactTriggerCount != null) {
127+
lookupCompactTriggerCount.getAndIncrement();
128+
if (lookupCompactTriggerCount.compareAndSet(maxLookupCompactInterval, 0)) {
129+
if (LOG.isDebugEnabled()) {
130+
LOG.debug(
131+
"Universal compaction due to max lookup compaction interval {}.",
132+
maxLookupCompactInterval);
133+
}
134+
return forcePickL0(numLevels, runs);
135+
} else {
136+
if (LOG.isDebugEnabled()) {
137+
LOG.debug(
138+
"Skip universal compaction due to lookup compaction trigger count {} is less than the max interval {}.",
139+
lookupCompactTriggerCount.get(),
140+
maxLookupCompactInterval);
141+
}
142+
}
143+
}
144+
110145
return Optional.empty();
111146
}
112147

148+
Optional<CompactUnit> forcePickL0(int numLevels, List<LevelSortedRun> runs) {
149+
// collect all level 0 files
150+
int candidateCount = 0;
151+
for (int i = candidateCount; i < runs.size(); i++) {
152+
if (runs.get(i).level() > 0) {
153+
break;
154+
}
155+
candidateCount++;
156+
}
157+
158+
return candidateCount == 0
159+
? Optional.empty()
160+
: Optional.of(pickForSizeRatio(numLevels - 1, runs, candidateCount, true));
161+
}
162+
113163
@VisibleForTesting
114164
CompactUnit pickForSizeAmp(int maxLevel, List<LevelSortedRun> runs) {
115165
if (runs.size() < numRunCompactionTrigger) {

paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,8 @@ Function<WriterContainer<T>, Boolean> createConflictAwareWriterCleanChecker(
290290
//
291291
// Condition 2: No compaction is in progress. That is, no more changelog will be
292292
// produced.
293+
//
294+
// Condition 3: The writer has no postponed compaction like gentle lookup compaction.
293295
return writerContainer ->
294296
writerContainer.lastModifiedCommitIdentifier < latestCommittedIdentifier
295297
&& !writerContainer.writer.isCompacting();

paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -196,16 +196,7 @@ protected MergeTreeWriter createWriter(
196196
writerFactoryBuilder.build(partition, bucket, options);
197197
Comparator<InternalRow> keyComparator = keyComparatorSupplier.get();
198198
Levels levels = new Levels(keyComparator, restoreFiles, options.numLevels());
199-
UniversalCompaction universalCompaction =
200-
new UniversalCompaction(
201-
options.maxSizeAmplificationPercent(),
202-
options.sortedRunSizeRatio(),
203-
options.numSortedRunCompactionTrigger(),
204-
options.optimizedCompactionInterval());
205-
CompactStrategy compactStrategy =
206-
options.needLookup()
207-
? new ForceUpLevel0Compaction(universalCompaction)
208-
: universalCompaction;
199+
CompactStrategy compactStrategy = createCompactStrategy(options);
209200
CompactManager compactManager =
210201
createCompactManager(
211202
partition, bucket, compactStrategy, compactExecutor, levels, dvMaintainer);
@@ -232,6 +223,32 @@ public boolean bufferSpillable() {
232223
return options.writeBufferSpillable(fileIO.isObjectStore(), isStreamingMode, true);
233224
}
234225

226+
private CompactStrategy createCompactStrategy(CoreOptions options) {
227+
if (options.needLookup()) {
228+
if (CoreOptions.LookupCompactMode.RADICAL.equals(options.lookupCompact())) {
229+
return new ForceUpLevel0Compaction(
230+
new UniversalCompaction(
231+
options.maxSizeAmplificationPercent(),
232+
options.sortedRunSizeRatio(),
233+
options.numSortedRunCompactionTrigger(),
234+
options.optimizedCompactionInterval()));
235+
} else if (CoreOptions.LookupCompactMode.GENTLE.equals(options.lookupCompact())) {
236+
return new UniversalCompaction(
237+
options.maxSizeAmplificationPercent(),
238+
options.sortedRunSizeRatio(),
239+
options.numSortedRunCompactionTrigger(),
240+
options.optimizedCompactionInterval(),
241+
options.lookupCompactMaxInterval());
242+
}
243+
}
244+
245+
return new UniversalCompaction(
246+
options.maxSizeAmplificationPercent(),
247+
options.sortedRunSizeRatio(),
248+
options.numSortedRunCompactionTrigger(),
249+
options.optimizedCompactionInterval());
250+
}
251+
235252
private CompactManager createCompactManager(
236253
BinaryRow partition,
237254
int bucket,
@@ -264,7 +281,8 @@ private CompactManager createCompactManager(
264281
? null
265282
: compactionMetrics.createReporter(partition, bucket),
266283
dvMaintainer,
267-
options.prepareCommitWaitCompaction());
284+
options.prepareCommitWaitCompaction(),
285+
options.needLookup());
268286
}
269287
}
270288

0 commit comments

Comments
 (0)