Skip to content

Commit d16bf86

Browse files
committed
[Iceberg] Use remainingPredicate in filter stats calc
This change combines the TableLayoutHandle's remaining predicate and the pushed-down predicate when calculating statistics for the call to the Iceberg connector's `getTableStatistics` in order to attempt to get more accurate statistic estimates. The logic mimics the Hive connector. Explanation: There were two issues with the previous implementation 1. Non-hive implementations never calculated statistics using the FilterStatsCalculatorService. 2. The hive implementation did not consider the `remainingPredicate` field when using the FilterStatsCalculatorService This commit remedies both issues with some small refactoring and changes to the arguments the connector passes to the `FilterStatsCalculatorService`. The FilterStatsCalculatorService attempts to refine the stats calculations returned from a connector by considering the full table statistics and any predicates applied to the scan in order to reduce the total row count and other statistics like NDVs. When filter pushdown is enabled on the iceberg connector, only some parts of the filter can be pushed into the table scan. The rest is represented by the layout handle's `remainingPredicate` field. The Hive connector implementation creates a new binary expression using an AND of the `remainingPredicate` and `domainPredicate` fields. This change updates the implementation to do the same.
1 parent 72c07cc commit d16bf86

File tree

7 files changed

+183
-71
lines changed

7 files changed

+183
-71
lines changed

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import com.facebook.presto.spi.connector.ConnectorTableVersion.VersionOperator;
5050
import com.facebook.presto.spi.connector.ConnectorTableVersion.VersionType;
5151
import com.facebook.presto.spi.function.StandardFunctionResolution;
52+
import com.facebook.presto.spi.plan.FilterStatsCalculatorService;
5253
import com.facebook.presto.spi.relation.RowExpression;
5354
import com.facebook.presto.spi.relation.RowExpressionService;
5455
import com.facebook.presto.spi.statistics.ColumnStatisticMetadata;
@@ -154,7 +155,8 @@
154155
import static com.facebook.presto.iceberg.TypeConverter.toPrestoType;
155156
import static com.facebook.presto.iceberg.changelog.ChangelogUtil.getRowTypeFromColumnMeta;
156157
import static com.facebook.presto.iceberg.optimizer.IcebergPlanOptimizer.getEnforcedColumns;
157-
import static com.facebook.presto.iceberg.util.StatisticsUtil.combineSelectedAndPredicateColumns;
158+
import static com.facebook.presto.iceberg.util.StatisticsUtil.calculateBaseTableStatistics;
159+
import static com.facebook.presto.iceberg.util.StatisticsUtil.calculateStatisticsConsideringLayout;
158160
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
159161
import static com.facebook.presto.spi.statistics.TableStatisticType.ROW_COUNT;
160162
import static com.google.common.base.Verify.verify;
@@ -172,28 +174,32 @@
172174
public abstract class IcebergAbstractMetadata
173175
implements ConnectorMetadata
174176
{
177+
private static final Logger log = Logger.get(IcebergAbstractMetadata.class);
178+
175179
protected final TypeManager typeManager;
176180
protected final JsonCodec<CommitTaskData> commitTaskCodec;
177181
protected final NodeVersion nodeVersion;
178182
protected final RowExpressionService rowExpressionService;
183+
protected final FilterStatsCalculatorService filterStatsCalculatorService;
179184
protected Transaction transaction;
180185

181186
private final StandardFunctionResolution functionResolution;
182187
private final ConcurrentMap<SchemaTableName, Table> icebergTables = new ConcurrentHashMap<>();
183-
private static final Logger log = Logger.get(IcebergAbstractMetadata.class);
184188

185189
public IcebergAbstractMetadata(
186190
TypeManager typeManager,
187191
StandardFunctionResolution functionResolution,
188192
RowExpressionService rowExpressionService,
189193
JsonCodec<CommitTaskData> commitTaskCodec,
190-
NodeVersion nodeVersion)
194+
NodeVersion nodeVersion,
195+
FilterStatsCalculatorService filterStatsCalculatorService)
191196
{
192197
this.typeManager = requireNonNull(typeManager, "typeManager is null");
193198
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
194199
this.functionResolution = requireNonNull(functionResolution, "functionResolution is null");
195200
this.rowExpressionService = requireNonNull(rowExpressionService, "rowExpressionService is null");
196201
this.nodeVersion = requireNonNull(nodeVersion, "nodeVersion is null");
202+
this.filterStatsCalculatorService = requireNonNull(filterStatsCalculatorService, "filterStatsCalculatorService is null");
197203
}
198204

199205
protected final Table getIcebergTable(ConnectorSession session, SchemaTableName schemaTableName)
@@ -727,19 +733,8 @@ public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, Conn
727733
@Override
728734
public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTableHandle tableHandle, Optional<ConnectorTableLayoutHandle> tableLayoutHandle, List<ColumnHandle> columnHandles, Constraint<ColumnHandle> constraint)
729735
{
730-
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
731-
Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());
732-
733-
List<IcebergColumnHandle> handles = combineSelectedAndPredicateColumns(
734-
columnHandles.stream()
735-
.map(IcebergColumnHandle.class::cast)
736-
.collect(toImmutableList()),
737-
tableLayoutHandle.map(IcebergTableLayoutHandle.class::cast));
738-
return TableStatisticsMaker.getTableStatistics(session, typeManager,
739-
tableLayoutHandle
740-
.map(IcebergTableLayoutHandle.class::cast)
741-
.map(IcebergTableLayoutHandle::getValidPredicate),
742-
constraint, handle, icebergTable, handles);
736+
TableStatistics baseStatistics = calculateBaseTableStatistics(this, typeManager, session, (IcebergTableHandle) tableHandle, tableLayoutHandle, columnHandles, constraint);
737+
return calculateStatisticsConsideringLayout(filterStatsCalculatorService, rowExpressionService, baseStatistics, session, tableLayoutHandle);
743738
}
744739

745740
@Override

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java

Lines changed: 12 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
package com.facebook.presto.iceberg;
1515

1616
import com.facebook.airlift.json.JsonCodec;
17-
import com.facebook.presto.common.predicate.TupleDomain;
1817
import com.facebook.presto.common.type.Type;
1918
import com.facebook.presto.common.type.TypeManager;
2019
import com.facebook.presto.hive.HdfsContext;
@@ -51,13 +50,10 @@
5150
import com.facebook.presto.spi.ViewNotFoundException;
5251
import com.facebook.presto.spi.function.StandardFunctionResolution;
5352
import com.facebook.presto.spi.plan.FilterStatsCalculatorService;
54-
import com.facebook.presto.spi.relation.RowExpression;
5553
import com.facebook.presto.spi.relation.RowExpressionService;
56-
import com.facebook.presto.spi.relation.VariableReferenceExpression;
5754
import com.facebook.presto.spi.security.PrestoPrincipal;
5855
import com.facebook.presto.spi.statistics.ColumnStatisticMetadata;
5956
import com.facebook.presto.spi.statistics.ColumnStatisticType;
60-
import com.facebook.presto.spi.statistics.ColumnStatistics;
6157
import com.facebook.presto.spi.statistics.ComputedStatistics;
6258
import com.facebook.presto.spi.statistics.TableStatisticType;
6359
import com.facebook.presto.spi.statistics.TableStatistics;
@@ -126,13 +122,13 @@
126122
import static com.facebook.presto.iceberg.PartitionFields.parsePartitionFields;
127123
import static com.facebook.presto.iceberg.PartitionSpecConverter.toPrestoPartitionSpec;
128124
import static com.facebook.presto.iceberg.SchemaConverter.toPrestoSchema;
129-
import static com.facebook.presto.iceberg.util.StatisticsUtil.calculateAndSetTableSize;
125+
import static com.facebook.presto.iceberg.util.StatisticsUtil.calculateBaseTableStatistics;
126+
import static com.facebook.presto.iceberg.util.StatisticsUtil.calculateStatisticsConsideringLayout;
130127
import static com.facebook.presto.iceberg.util.StatisticsUtil.mergeHiveStatistics;
131128
import static com.facebook.presto.spi.StandardErrorCode.INVALID_SCHEMA_PROPERTY;
132129
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
133130
import static com.facebook.presto.spi.StandardErrorCode.SCHEMA_NOT_EMPTY;
134131
import static com.facebook.presto.spi.security.PrincipalType.USER;
135-
import static com.facebook.presto.spi.statistics.SourceInfo.ConfidenceLevel.LOW;
136132
import static com.facebook.presto.spi.statistics.TableStatisticType.ROW_COUNT;
137133
import static com.google.common.base.Verify.verify;
138134
import static com.google.common.collect.ImmutableList.toImmutableList;
@@ -141,7 +137,6 @@
141137
import static java.util.Collections.emptyList;
142138
import static java.util.Locale.ENGLISH;
143139
import static java.util.Objects.requireNonNull;
144-
import static java.util.function.Function.identity;
145140
import static org.apache.iceberg.TableMetadata.newTableMetadata;
146141
import static org.apache.iceberg.TableProperties.OBJECT_STORE_PATH;
147142
import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION;
@@ -154,7 +149,6 @@ public class IcebergHiveMetadata
154149
private final ExtendedHiveMetastore metastore;
155150
private final HdfsEnvironment hdfsEnvironment;
156151
private final DateTimeZone timeZone = DateTimeZone.forTimeZone(TimeZone.getTimeZone(ZoneId.of(TimeZone.getDefault().getID())));
157-
private final FilterStatsCalculatorService filterStatsCalculatorService;
158152
private final IcebergHiveTableOperationsConfig hiveTableOeprationsConfig;
159153

160154
public IcebergHiveMetadata(
@@ -168,10 +162,9 @@ public IcebergHiveMetadata(
168162
FilterStatsCalculatorService filterStatsCalculatorService,
169163
IcebergHiveTableOperationsConfig hiveTableOeprationsConfig)
170164
{
171-
super(typeManager, functionResolution, rowExpressionService, commitTaskCodec, nodeVersion);
165+
super(typeManager, functionResolution, rowExpressionService, commitTaskCodec, nodeVersion, filterStatsCalculatorService);
172166
this.metastore = requireNonNull(metastore, "metastore is null");
173167
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
174-
this.filterStatsCalculatorService = requireNonNull(filterStatsCalculatorService, "filterStatsCalculatorService is null");
175168
this.hiveTableOeprationsConfig = requireNonNull(hiveTableOeprationsConfig, "hiveTableOperationsConfig is null");
176169
}
177170

@@ -455,48 +448,16 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab
455448
{
456449
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
457450
org.apache.iceberg.Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());
458-
TableStatistics icebergStatistics = super.getTableStatistics(session, tableHandle, tableLayoutHandle, columnHandles, constraint);
451+
TableStatistics icebergStatistics = calculateBaseTableStatistics(this, typeManager, session, (IcebergTableHandle) tableHandle, tableLayoutHandle, columnHandles, constraint);
459452
EnumSet<ColumnStatisticType> mergeFlags = getHiveStatisticsMergeStrategy(session);
460-
return tableLayoutHandle.map(IcebergTableLayoutHandle.class::cast).map(layoutHandle -> {
461-
TupleDomain<VariableReferenceExpression> predicate = layoutHandle.getValidPredicate()
462-
.transform(columnHandle -> new VariableReferenceExpression(Optional.empty(), columnHandle.getName(), columnHandle.getType()));
463-
RowExpression translatedPredicate = rowExpressionService.getDomainTranslator().toPredicate(predicate);
464-
TableStatistics mergedStatistics = Optional.of(mergeFlags)
465-
.filter(set -> !set.isEmpty())
466-
.map(flags -> {
467-
PartitionStatistics hiveStatistics = metastore.getTableStatistics(getMetastoreContext(session), handle.getSchemaName(), handle.getIcebergTableName().getTableName());
468-
return mergeHiveStatistics(icebergStatistics, hiveStatistics, mergeFlags, icebergTable.spec());
469-
})
470-
.orElse(icebergStatistics);
471-
TableStatistics.Builder filteredStatsBuilder = TableStatistics.builder()
472-
.setRowCount(mergedStatistics.getRowCount());
473-
Set<ColumnHandle> fullColumnSet = icebergStatistics.getColumnStatistics().keySet();
474-
for (ColumnHandle colHandle : fullColumnSet) {
475-
IcebergColumnHandle icebergHandle = (IcebergColumnHandle) colHandle;
476-
if (mergedStatistics.getColumnStatistics().containsKey(icebergHandle)) {
477-
ColumnStatistics stats = mergedStatistics.getColumnStatistics().get(icebergHandle);
478-
filteredStatsBuilder.setColumnStatistics(icebergHandle, stats);
479-
}
480-
}
481-
return filterStatsCalculatorService.filterStats(
482-
calculateAndSetTableSize(filteredStatsBuilder).setConfidenceLevel(LOW).build(),
483-
translatedPredicate,
484-
session,
485-
fullColumnSet.stream()
486-
.map(IcebergColumnHandle.class::cast).collect(toImmutableMap(
487-
identity(),
488-
IcebergColumnHandle::getName)),
489-
fullColumnSet
490-
.stream().map(IcebergColumnHandle.class::cast).collect(toImmutableMap(
491-
IcebergColumnHandle::getName,
492-
IcebergColumnHandle::getType)));
493-
}).orElseGet(() -> {
494-
if (!mergeFlags.isEmpty()) {
495-
PartitionStatistics hiveStats = metastore.getTableStatistics(getMetastoreContext(session), handle.getSchemaName(), handle.getIcebergTableName().getTableName());
496-
return mergeHiveStatistics(icebergStatistics, hiveStats, mergeFlags, icebergTable.spec());
497-
}
498-
return icebergStatistics;
499-
});
453+
TableStatistics mergedStatistics = Optional.of(mergeFlags)
454+
.filter(set -> !set.isEmpty())
455+
.map(flags -> {
456+
PartitionStatistics hiveStatistics = metastore.getTableStatistics(getMetastoreContext(session), handle.getSchemaName(), handle.getIcebergTableName().getTableName());
457+
return mergeHiveStatistics(icebergStatistics, hiveStatistics, mergeFlags, icebergTable.spec());
458+
})
459+
.orElse(icebergStatistics);
460+
return calculateStatisticsConsideringLayout(filterStatsCalculatorService, rowExpressionService, mergedStatistics, session, tableLayoutHandle);
500461
}
501462

502463
@Override

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.facebook.presto.spi.PrestoException;
2828
import com.facebook.presto.spi.SchemaTableName;
2929
import com.facebook.presto.spi.function.StandardFunctionResolution;
30+
import com.facebook.presto.spi.plan.FilterStatsCalculatorService;
3031
import com.facebook.presto.spi.relation.RowExpressionService;
3132
import org.apache.hadoop.fs.Path;
3233
import org.apache.iceberg.PartitionSpec;
@@ -80,9 +81,10 @@ public IcebergNativeMetadata(
8081
RowExpressionService rowExpressionService,
8182
JsonCodec<CommitTaskData> commitTaskCodec,
8283
CatalogType catalogType,
83-
NodeVersion nodeVersion)
84+
NodeVersion nodeVersion,
85+
FilterStatsCalculatorService filterStatsCalculatorService)
8486
{
85-
super(typeManager, functionResolution, rowExpressionService, commitTaskCodec, nodeVersion);
87+
super(typeManager, functionResolution, rowExpressionService, commitTaskCodec, nodeVersion, filterStatsCalculatorService);
8688
this.catalogFactory = requireNonNull(catalogFactory, "catalogFactory is null");
8789
this.catalogType = requireNonNull(catalogType, "catalogType is null");
8890
}

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadataFactory.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.facebook.presto.hive.NodeVersion;
1919
import com.facebook.presto.spi.connector.ConnectorMetadata;
2020
import com.facebook.presto.spi.function.StandardFunctionResolution;
21+
import com.facebook.presto.spi.plan.FilterStatsCalculatorService;
2122
import com.facebook.presto.spi.relation.RowExpressionService;
2223

2324
import javax.inject.Inject;
@@ -34,6 +35,7 @@ public class IcebergNativeMetadataFactory
3435
final StandardFunctionResolution functionResolution;
3536
final RowExpressionService rowExpressionService;
3637
final NodeVersion nodeVersion;
38+
final FilterStatsCalculatorService filterStatsCalculatorService;
3739

3840
@Inject
3941
public IcebergNativeMetadataFactory(
@@ -43,7 +45,8 @@ public IcebergNativeMetadataFactory(
4345
StandardFunctionResolution functionResolution,
4446
RowExpressionService rowExpressionService,
4547
JsonCodec<CommitTaskData> commitTaskCodec,
46-
NodeVersion nodeVersion)
48+
NodeVersion nodeVersion,
49+
FilterStatsCalculatorService filterStatsCalculatorService)
4750
{
4851
this.catalogFactory = requireNonNull(catalogFactory, "catalogFactory is null");
4952
this.typeManager = requireNonNull(typeManager, "typeManager is null");
@@ -53,10 +56,11 @@ public IcebergNativeMetadataFactory(
5356
this.nodeVersion = requireNonNull(nodeVersion, "nodeVersion is null");
5457
requireNonNull(config, "config is null");
5558
this.catalogType = config.getCatalogType();
59+
this.filterStatsCalculatorService = requireNonNull(filterStatsCalculatorService, "filterStatsCalculatorService is null");
5660
}
5761

5862
public ConnectorMetadata create()
5963
{
60-
return new IcebergNativeMetadata(catalogFactory, typeManager, functionResolution, rowExpressionService, commitTaskCodec, catalogType, nodeVersion);
64+
return new IcebergNativeMetadata(catalogFactory, typeManager, functionResolution, rowExpressionService, commitTaskCodec, catalogType, nodeVersion, filterStatsCalculatorService);
6165
}
6266
}

0 commit comments

Comments
 (0)