-
Notifications
You must be signed in to change notification settings - Fork 3.3k
Open
Description
We recently performed a Trino version upgrade from 414 to 469. After the upgrade, we find that a query that previously completed in 4-5 seconds take close to an hour on the new version
Example query run with (set session push_partial_aggregation_through_join = true;
which is I believe enabled by default in 469)
select
a.colA,
b.colB as alias_for_colB,
sum(a.colC) as sum_of_colC
from
db1.table_a a
join db2.table_b b on b.colE = a.colD
group by
1,
2
where db1 and db2 are iceberg tables
Running an EXPLAIN results in the following query plan
469 query Plan
Query Plan
------------------------------------------------------------------------------------------
Trino version: 469
Fragment 0 [HASH]
Output layout: [colA, colB, sum]
Output partitioning: SINGLE []
Output[columnNames = [colA, alias_for_colB, sum_of_colC]]
│ Layout: [colA:varchar, colB:varchar, sum:decimal(38,18)]
│ Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B}
│ alias_for_colB := colB
│ sum_of_colC := sum
└─ Aggregate[type = FINAL, keys = [colA, colB]]
│ Layout: [colA:varchar, colB:varchar, sum:decimal(38,18)]
│ Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 0B}
│ sum := sum(sum_104)
└─ LocalExchange[partitioning = HASH, arguments = [colA::varchar, colB::varchar]]
│ Layout: [colA:varchar, colB:varchar, sum_104:varbinary]
│ Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
└─ RemoteSource[sourceFragmentIds = [1]]
Layout: [colA:varchar, colB:varchar, sum_104:varbinary]
Fragment 1 [HASH]
Output layout: [colA, colB, sum_104]
Output partitioning: HASH [colA, colB]
Aggregate[type = PARTIAL, keys = [colA, colB]]
│ Layout: [colA:varchar, colB:varchar, sum_104:varbinary]
│ sum_104 := sum(colC)
└─ InnerJoin[criteria = (colD = colE), distribution = PARTITIONED]
│ Layout: [colA:varchar, colC:decimal(38,18), colB:varchar]
│ Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 0B}
│ Distribution: PARTITIONED
│ dynamicFilterAssignments = {colE -> #df_XYZ}
├─ RemoteSource[sourceFragmentIds = [2]]
│ Layout: [colD:varchar, colA:varchar, colC:decimal(38,18)]
└─ LocalExchange[partitioning = HASH, arguments = [colE::varchar]]
│ Layout: [colE:varchar, colB:varchar]
│ Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
└─ RemoteSource[sourceFragmentIds = [3]]
Layout: [colE:varchar, colB:varchar]
Fragment 2 [SOURCE]
Output layout: [colD, colA, colC]
Output partitioning: HASH [colD]
ScanFilter[table = iceberg:db1.table_a$data@..., dynamicFilters = {colD = #df_XYZ}]
Layout: [colD:varchar, colA:varchar, colC:decimal(38,18)]
Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
colC := 114:colC:decimal(38,18)
colA := 69:colA:varchar
colD := 32:colD:varchar
Fragment 3 [SOURCE]
Output layout: [colE, colB]
Output partitioning: HASH [colE]
TableScan[table = iceberg:db2.table_b$data@...]
Layout: [colE:varchar, colB:varchar]
Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
colB := 24:colB:varchar
colE := 1:colE:varchar
414 Query Plan
Trino version: 414
Fragment 0 [HASH]
Output layout: [colA, colB, sum]
Output partitioning: SINGLE []
Output[columnNames = [colA, alias_for_colB, sum_of_colC]]
│ Layout: [colA:varchar, colB:varchar, sum:decimal(38,18)]
│ Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B}
│ alias_for_colB := colB
│ sum_of_colC := sum
└─ Project[]
│ Layout: [colA:varchar, colB:varchar, sum:decimal(38,18)]
│ Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
└─ Aggregate[type = FINAL, keys = [colA, colB], hash = [$hashvalue]]
│ Layout: [colA:varchar, colB:varchar, $hashvalue:bigint, sum:decimal(38,18)]
│ Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 0B}
│ sum := sum("sum_100")
└─ LocalExchange[partitioning = HASH, hashColumn = [$hashvalue], arguments = ["colA", "colB"]]
│ Layout: [colA:varchar, colB:varchar, sum_100:varbinary, $hashvalue:bigint]
│ Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
└─ RemoteSource[sourceFragmentIds = [1]]
Layout: [colA:varchar, colB:varchar, sum_100:varbinary, $hashvalue_101:bigint]
Fragment 1 [HASH]
Output layout: [colA, colB, sum_100, $hashvalue_108]
Output partitioning: HASH [colA, colB][$hashvalue_108]
Project[]
│ Layout: [colA:varchar, sum_100:varbinary, colB:varchar, $hashvalue_108:bigint]
│ Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
│ $hashvalue_108 := combine_hash(combine_hash(bigint '0', COALESCE("$operator$hash_code"("colA"), 0)), COALESCE("$operator$hash_code"("colB"), 0))
└─ InnerJoin[criteria = ("colD" = "colE"), hash = [$hashvalue_102, $hashvalue_105], distribution = PARTITIONED]
│ Layout: [colA:varchar, sum_100:varbinary, colB:varchar]
│ Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 0B}
│ Distribution: PARTITIONED
│ dynamicFilterAssignments = {colE -> #df_ABC}
├─ RemoteSource[sourceFragmentIds = [2]]
│ Layout: [colA:varchar, colD:varchar, sum_100:varbinary, $hashvalue_102:bigint]
└─ LocalExchange[partitioning = HASH, hashColumn = [$hashvalue_105], arguments = ["colE"]]
│ Layout: [colE:varchar, colB:varchar, $hashvalue_105:bigint]
│ Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
└─ RemoteSource[sourceFragmentIds = [3]]
Layout: [colE:varchar, colB:varchar, $hashvalue_106:bigint]
Fragment 2 [SOURCE]
Output layout: [colA, colD, sum_100, $hashvalue_104]
Output partitioning: HASH [colD][$hashvalue_104]
Project[]
│ Layout: [colA:varchar, colD:varchar, sum_100:varbinary, $hashvalue_104:bigint]
│ Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
│ $hashvalue_104 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("colD"), 0))
└─ Aggregate[type = PARTIAL, keys = [colA, colD], hash = [$hashvalue_103]]
│ Layout: [colA:varchar, colD:varchar, $hashvalue_103:bigint, sum_100:varbinary]
│ sum_100 := sum("colC")
└─ ScanFilterProject[table = iceberg:db1.table_a$data@..., dynamicFilters = {"colD" = #df_ABC}]
Layout: [colD:varchar, colA:varchar, colC:decimal(38,18), $hashvalue_103:bigint]
Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
$hashvalue_103 := combine_hash(combine_hash(bigint '0', COALESCE("$operator$hash_code"("colA"), 0)), COALESCE("$operator$hash_code"("colD"), 0))
colD := 32:colD:varchar
colC := 114:colC:decimal(38,18)
colA := 69:colA:varchar
Fragment 3 [SOURCE]
Output layout: [colE, colB, $hashvalue_107]
Output partitioning: HASH [colE][$hashvalue_107]
ScanProject[table = iceberg:db2.table_b$data@...]
Layout: [colE:varchar, colB:varchar, $hashvalue_107:bigint]
Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
$hashvalue_107 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("colE"), 0))
colB := 24:colB:varchar
colE := 1:colE:varchar
Specifically in 414, I see that there is a Aggregate[type = PARTIAL
that is performed prior to the INNER JOIN
, which is something I'd expect to happen given push_partial_aggregation_through_join
is enabled. Is there something else we're missing (session parameter/optimizer config) that would help us optimize the query in later versions?
Metadata
Metadata
Assignees
Labels
No labels