Skip to content

Commit b1ccfb8

Browse files
authored
[core] Enables buffer spill when targetFileSize is greater than write buffer size. (#5121)
1 parent c4dbf7f commit b1ccfb8

File tree

6 files changed

+61
-6
lines changed

6 files changed

+61
-6
lines changed

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1048,7 +1048,7 @@
10481048
<td><h5>write-buffer-spillable</h5></td>
10491049
<td style="word-wrap: break-word;">(none)</td>
10501050
<td>Boolean</td>
1051-
<td>Whether the write buffer can be spillable. Enabled by default when using object storage.</td>
1051+
<td>Whether the write buffer can be spillable. Enabled by default when using object storage or when 'target-file-size' is greater than 'write-buffer-size'.</td>
10521052
</tr>
10531053
<tr>
10541054
<td><h5>write-manifest-cache</h5></td>

paimon-common/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,7 @@ public class CoreOptions implements Serializable {
479479
.booleanType()
480480
.noDefaultValue()
481481
.withDescription(
482-
"Whether the write buffer can be spillable. Enabled by default when using object storage.");
482+
"Whether the write buffer can be spillable. Enabled by default when using object storage or when 'target-file-size' is greater than 'write-buffer-size'.");
483483

484484
public static final ConfigOption<Boolean> WRITE_BUFFER_FOR_APPEND =
485485
key("write-buffer-for-append")
@@ -1957,9 +1957,14 @@ public long writeBufferSize() {
19571957
return options.get(WRITE_BUFFER_SIZE).getBytes();
19581958
}
19591959

1960-
public boolean writeBufferSpillable(boolean usingObjectStore, boolean isStreaming) {
1960+
public boolean writeBufferSpillable(
1961+
boolean usingObjectStore, boolean isStreaming, boolean hasPrimaryKey) {
19611962
// if not streaming mode, we turn spillable on by default.
1962-
return options.getOptional(WRITE_BUFFER_SPILLABLE).orElse(usingObjectStore || !isStreaming);
1963+
return options.getOptional(WRITE_BUFFER_SPILLABLE)
1964+
.orElse(
1965+
usingObjectStore
1966+
|| !isStreaming
1967+
|| targetFileSize(hasPrimaryKey) > writeBufferSize());
19631968
}
19641969

19651970
public MemorySize writeBufferSpillDiskSize() {

paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ protected RecordWriter<InternalRow> createWriter(
125125
pathFactory.createDataFilePathFactory(partition, bucket),
126126
restoreIncrement,
127127
options.useWriteBufferForAppend() || forceBufferSpill,
128-
options.writeBufferSpillable(fileIO.isObjectStore(), isStreamingMode)
128+
options.writeBufferSpillable(fileIO.isObjectStore(), isStreamingMode, false)
129129
|| forceBufferSpill,
130130
options.fileCompression(),
131131
options.spillCompressOptions(),

paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ protected MergeTreeWriter createWriter(
229229

230230
@VisibleForTesting
231231
public boolean bufferSpillable() {
232-
return options.writeBufferSpillable(fileIO.isObjectStore(), isStreamingMode);
232+
return options.writeBufferSpillable(fileIO.isObjectStore(), isStreamingMode, true);
233233
}
234234

235235
private CompactManager createCompactManager(

paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,54 @@ public void testNoBuffer() throws Exception {
409409
writer.close();
410410
}
411411

412+
@Test
413+
public void tesWriteBufferSpillAutoEnabled() {
414+
HashMap<String, String> map = new HashMap<>();
415+
// This is the default behavior,no object store and streaming mode.
416+
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(false, true, false))
417+
.isFalse();
418+
419+
// Using object store.
420+
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(true, true, false))
421+
.isTrue();
422+
423+
// Batch mode.
424+
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(false, false, false))
425+
.isTrue();
426+
427+
// Append only table.
428+
map.put(CoreOptions.WRITE_BUFFER_SIZE.key(), "200 MB");
429+
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(false, false, false))
430+
.isTrue();
431+
432+
// Primary key table.
433+
map.put(CoreOptions.WRITE_BUFFER_SIZE.key(), "100 MB");
434+
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(false, false, true))
435+
.isTrue();
436+
437+
// targetFileSize is greater than write buffer size.
438+
map.clear();
439+
map.put(CoreOptions.TARGET_FILE_SIZE.key(), "2 b");
440+
map.put(CoreOptions.WRITE_BUFFER_SIZE.key(), "1 b");
441+
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(false, true, false))
442+
.isTrue();
443+
444+
// target-file-size is smaller than write-buffer-size.
445+
map.clear();
446+
map.put(CoreOptions.TARGET_FILE_SIZE.key(), "1 b");
447+
map.put(CoreOptions.WRITE_BUFFER_SIZE.key(), "2 b");
448+
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(false, true, false))
449+
.isFalse();
450+
451+
// Set to false manually.
452+
map.clear();
453+
map.put(CoreOptions.TARGET_FILE_SIZE.key(), "2 b");
454+
map.put(CoreOptions.WRITE_BUFFER_SIZE.key(), "1 b");
455+
map.put(CoreOptions.WRITE_BUFFER_SPILLABLE.key(), "false");
456+
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(false, true, false))
457+
.isFalse();
458+
}
459+
412460
@Test
413461
public void testMultipleFlush() throws Exception {
414462
AppendOnlyWriter writer = createEmptyWriter(Long.MAX_VALUE, true);

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ public void testPrimaryKeyTableMetrics() throws Exception {
8787
Options options = new Options();
8888
options.set("bucket", "1");
8989
options.set("write-buffer-size", "256 b");
90+
options.set("write-buffer-spillable", "false");
9091
options.set("page-size", "32 b");
9192

9293
FileStoreTable table =
@@ -332,6 +333,7 @@ public void testNumWritersMetric() throws Exception {
332333
Options options = new Options();
333334
options.set("bucket", "1");
334335
options.set("write-buffer-size", "256 b");
336+
options.set("write-buffer-spillable", "false");
335337
options.set("page-size", "32 b");
336338

337339
FileStoreTable fileStoreTable =

0 commit comments

Comments
 (0)