Skip to content

Commit 84bd201

Browse files
committed
[Iceberg] Clean up unclosed resources from library
1 parent 52c090e commit 84bd201

File tree

3 files changed

+69
-48
lines changed

3 files changed

+69
-48
lines changed

presto-iceberg/src/main/java/com/facebook/presto/iceberg/FilesTable.java

Lines changed: 56 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -25,21 +25,25 @@
2525
import com.facebook.presto.spi.ConnectorSession;
2626
import com.facebook.presto.spi.ConnectorTableMetadata;
2727
import com.facebook.presto.spi.FixedPageSource;
28+
import com.facebook.presto.spi.PrestoException;
2829
import com.facebook.presto.spi.SchemaTableName;
2930
import com.facebook.presto.spi.SystemTable;
3031
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
3132
import com.google.common.collect.ImmutableList;
3233
import com.google.common.collect.ImmutableMap;
3334
import io.airlift.slice.Slices;
3435
import org.apache.iceberg.DataFile;
36+
import org.apache.iceberg.FileScanTask;
3537
import org.apache.iceberg.Schema;
3638
import org.apache.iceberg.Table;
3739
import org.apache.iceberg.TableScan;
40+
import org.apache.iceberg.io.CloseableIterable;
3841
import org.apache.iceberg.transforms.Transforms;
3942
import org.apache.iceberg.types.Conversions;
4043
import org.apache.iceberg.types.Type;
4144
import org.apache.iceberg.types.Types;
4245

46+
import java.io.IOException;
4347
import java.nio.ByteBuffer;
4448
import java.util.List;
4549
import java.util.Map;
@@ -49,6 +53,7 @@
4953
import static com.facebook.presto.common.type.IntegerType.INTEGER;
5054
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
5155
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
56+
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
5257
import static com.facebook.presto.iceberg.IcebergUtil.getTableScan;
5358
import static com.facebook.presto.iceberg.util.PageListBuilder.forTable;
5459
import static com.google.common.collect.ImmutableMap.toImmutableMap;
@@ -121,53 +126,58 @@ private static List<Page> buildPages(ConnectorTableMetadata tableMetadata, Table
121126
TableScan tableScan = getTableScan(TupleDomain.all(), snapshotId, icebergTable).includeColumnStats();
122127
Map<Integer, Type> idToTypeMap = getIdToTypeMap(icebergTable.schema());
123128

124-
tableScan.planFiles().forEach(fileScanTask -> {
125-
DataFile dataFile = fileScanTask.file();
126-
pagesBuilder.beginRow();
127-
pagesBuilder.appendInteger(dataFile.content().id());
128-
pagesBuilder.appendVarchar(dataFile.path().toString());
129-
pagesBuilder.appendVarchar(dataFile.format().name());
130-
pagesBuilder.appendBigint(dataFile.recordCount());
131-
pagesBuilder.appendBigint(dataFile.fileSizeInBytes());
132-
if (checkNonNull(dataFile.columnSizes(), pagesBuilder)) {
133-
pagesBuilder.appendIntegerBigintMap(dataFile.columnSizes());
129+
try (CloseableIterable<FileScanTask> fileScanTasks = tableScan.planFiles()) {
130+
for (FileScanTask fileScanTask : fileScanTasks) {
131+
DataFile dataFile = fileScanTask.file();
132+
pagesBuilder.beginRow();
133+
pagesBuilder.appendInteger(dataFile.content().id());
134+
pagesBuilder.appendVarchar(dataFile.path().toString());
135+
pagesBuilder.appendVarchar(dataFile.format().name());
136+
pagesBuilder.appendBigint(dataFile.recordCount());
137+
pagesBuilder.appendBigint(dataFile.fileSizeInBytes());
138+
if (checkNonNull(dataFile.columnSizes(), pagesBuilder)) {
139+
pagesBuilder.appendIntegerBigintMap(dataFile.columnSizes());
140+
}
141+
if (checkNonNull(dataFile.valueCounts(), pagesBuilder)) {
142+
pagesBuilder.appendIntegerBigintMap(dataFile.valueCounts());
143+
}
144+
if (checkNonNull(dataFile.nullValueCounts(), pagesBuilder)) {
145+
pagesBuilder.appendIntegerBigintMap(dataFile.nullValueCounts());
146+
}
147+
if (checkNonNull(dataFile.nanValueCounts(), pagesBuilder)) {
148+
pagesBuilder.appendIntegerBigintMap(dataFile.nanValueCounts());
149+
}
150+
if (checkNonNull(dataFile.lowerBounds(), pagesBuilder)) {
151+
pagesBuilder.appendIntegerVarcharMap(dataFile.lowerBounds().entrySet().stream()
152+
.filter(entry -> idToTypeMap.containsKey(entry.getKey()))
153+
.collect(toImmutableMap(
154+
Map.Entry<Integer, ByteBuffer>::getKey,
155+
entry -> Transforms.identity().toHumanString(idToTypeMap.get(entry.getKey()),
156+
Conversions.fromByteBuffer(idToTypeMap.get(entry.getKey()), entry.getValue())))));
157+
}
158+
if (checkNonNull(dataFile.upperBounds(), pagesBuilder)) {
159+
pagesBuilder.appendIntegerVarcharMap(dataFile.upperBounds().entrySet().stream()
160+
.filter(entry -> idToTypeMap.containsKey(entry.getKey()))
161+
.collect(toImmutableMap(
162+
Map.Entry<Integer, ByteBuffer>::getKey,
163+
entry -> Transforms.identity().toHumanString(idToTypeMap.get(entry.getKey()),
164+
Conversions.fromByteBuffer(idToTypeMap.get(entry.getKey()), entry.getValue())))));
165+
}
166+
if (checkNonNull(dataFile.keyMetadata(), pagesBuilder)) {
167+
pagesBuilder.appendVarbinary(Slices.wrappedBuffer(dataFile.keyMetadata()));
168+
}
169+
if (checkNonNull(dataFile.splitOffsets(), pagesBuilder)) {
170+
pagesBuilder.appendBigintArray(dataFile.splitOffsets());
171+
}
172+
if (checkNonNull(dataFile.equalityFieldIds(), pagesBuilder)) {
173+
pagesBuilder.appendIntegerArray(dataFile.equalityFieldIds());
174+
}
175+
pagesBuilder.endRow();
134176
}
135-
if (checkNonNull(dataFile.valueCounts(), pagesBuilder)) {
136-
pagesBuilder.appendIntegerBigintMap(dataFile.valueCounts());
137-
}
138-
if (checkNonNull(dataFile.nullValueCounts(), pagesBuilder)) {
139-
pagesBuilder.appendIntegerBigintMap(dataFile.nullValueCounts());
140-
}
141-
if (checkNonNull(dataFile.nanValueCounts(), pagesBuilder)) {
142-
pagesBuilder.appendIntegerBigintMap(dataFile.nanValueCounts());
143-
}
144-
if (checkNonNull(dataFile.lowerBounds(), pagesBuilder)) {
145-
pagesBuilder.appendIntegerVarcharMap(dataFile.lowerBounds().entrySet().stream()
146-
.filter(entry -> idToTypeMap.containsKey(entry.getKey()))
147-
.collect(toImmutableMap(
148-
Map.Entry<Integer, ByteBuffer>::getKey,
149-
entry -> Transforms.identity().toHumanString(idToTypeMap.get(entry.getKey()),
150-
Conversions.fromByteBuffer(idToTypeMap.get(entry.getKey()), entry.getValue())))));
151-
}
152-
if (checkNonNull(dataFile.upperBounds(), pagesBuilder)) {
153-
pagesBuilder.appendIntegerVarcharMap(dataFile.upperBounds().entrySet().stream()
154-
.filter(entry -> idToTypeMap.containsKey(entry.getKey()))
155-
.collect(toImmutableMap(
156-
Map.Entry<Integer, ByteBuffer>::getKey,
157-
entry -> Transforms.identity().toHumanString(idToTypeMap.get(entry.getKey()),
158-
Conversions.fromByteBuffer(idToTypeMap.get(entry.getKey()), entry.getValue())))));
159-
}
160-
if (checkNonNull(dataFile.keyMetadata(), pagesBuilder)) {
161-
pagesBuilder.appendVarbinary(Slices.wrappedBuffer(dataFile.keyMetadata()));
162-
}
163-
if (checkNonNull(dataFile.splitOffsets(), pagesBuilder)) {
164-
pagesBuilder.appendBigintArray(dataFile.splitOffsets());
165-
}
166-
if (checkNonNull(dataFile.equalityFieldIds(), pagesBuilder)) {
167-
pagesBuilder.appendIntegerArray(dataFile.equalityFieldIds());
168-
}
169-
pagesBuilder.endRow();
170-
});
177+
}
178+
catch (IOException e) {
179+
throw new PrestoException(ICEBERG_FILESYSTEM_ERROR, "failed to read table files", e);
180+
}
171181

172182
return pagesBuilder.build();
173183
}

presto-iceberg/src/main/java/com/facebook/presto/iceberg/changelog/ChangelogSplitSource.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.facebook.presto.spi.SplitWeight;
2626
import com.facebook.presto.spi.connector.ConnectorPartitionHandle;
2727
import com.google.common.collect.ImmutableList;
28+
import com.google.common.io.Closer;
2829
import org.apache.iceberg.AddedRowsScanTask;
2930
import org.apache.iceberg.ChangelogScanTask;
3031
import org.apache.iceberg.ContentScanTask;
@@ -35,6 +36,7 @@
3536
import org.apache.iceberg.PartitionSpec;
3637
import org.apache.iceberg.PartitionSpecParser;
3738
import org.apache.iceberg.Table;
39+
import org.apache.iceberg.io.CloseableIterable;
3840
import org.apache.iceberg.io.CloseableIterator;
3941

4042
import java.io.IOException;
@@ -59,6 +61,8 @@
5961
public class ChangelogSplitSource
6062
implements ConnectorSplitSource
6163
{
64+
private final Closer closer = Closer.create();
65+
private CloseableIterable<ChangelogScanTask> fileScanTaskIterable;
6266
private CloseableIterator<ChangelogScanTask> fileScanTaskIterator;
6367
private final IncrementalChangelogScan tableScan;
6468
private final double minimumAssignedSplitWeight;
@@ -77,7 +81,8 @@ public ChangelogSplitSource(
7781
this.columnHandles = getColumns(table.schema(), table.spec(), typeManager);
7882
this.tableScan = requireNonNull(tableScan, "tableScan is null");
7983
this.minimumAssignedSplitWeight = minimumAssignedSplitWeight;
80-
this.fileScanTaskIterator = tableScan.planFiles().iterator();
84+
this.fileScanTaskIterable = closer.register(tableScan.planFiles());
85+
this.fileScanTaskIterator = closer.register(fileScanTaskIterable.iterator());
8186
}
8287

8388
@Override
@@ -102,7 +107,11 @@ public CompletableFuture<ConnectorSplitBatch> getNextBatch(ConnectorPartitionHan
102107
public void close()
103108
{
104109
try {
105-
fileScanTaskIterator.close();
110+
closer.close();
111+
// TODO: remove this after org.apache.iceberg.io.CloseableIterator'withClose
112+
// correct release resources holds by iterator.
113+
fileScanTaskIterable = CloseableIterable.empty();
114+
fileScanTaskIterator = CloseableIterator.empty();
106115
}
107116
catch (IOException e) {
108117
throw new PrestoException(GENERIC_INTERNAL_ERROR, e);

presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergQueryRunner.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,8 @@ private static void setupLogging()
274274
logging.setLevel("org.glassfish.jersey.internal.inject.Providers", ERROR);
275275
logging.setLevel("parquet.hadoop", WARN);
276276
logging.setLevel("org.apache.iceberg", WARN);
277+
logging.setLevel("com.facebook.airlift.bootstrap", WARN);
278+
logging.setLevel("org.apache.hadoop.io.compress", WARN);
277279
}
278280

279281
public static void main(String[] args)

0 commit comments

Comments
 (0)