Skip to content

Partial aggregate push down via push_partial_aggregation_through_join is not working as expected #26394

@lrao-stripe

Description

@lrao-stripe

We recently performed a Trino version upgrade from 414 to 469. After the upgrade, we find that a query that previously completed in 4-5 seconds take close to an hour on the new version

Example query run with (set session push_partial_aggregation_through_join = true; which is I believe enabled by default in 469)

select
  a.colA,
  b.colB as alias_for_colB,
  sum(a.colC) as sum_of_colC
from
  db1.table_a a
  join db2.table_b b on b.colE = a.colD
group by
  1,
  2

where db1 and db2 are iceberg tables

Running an EXPLAIN results in the following query plan

469 query Plan

Query Plan
------------------------------------------------------------------------------------------
 Trino version: 469
 Fragment 0 [HASH]
     Output layout: [colA, colB, sum]
     Output partitioning: SINGLE []
     Output[columnNames = [colA, alias_for_colB, sum_of_colC]]
     │   Layout: [colA:varchar, colB:varchar, sum:decimal(38,18)]
     │   Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B}
     │   alias_for_colB := colB
     │   sum_of_colC := sum
     └─ Aggregate[type = FINAL, keys = [colA, colB]]
        │   Layout: [colA:varchar, colB:varchar, sum:decimal(38,18)]
        │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 0B}
        │   sum := sum(sum_104)
        └─ LocalExchange[partitioning = HASH, arguments = [colA::varchar, colB::varchar]]
           │   Layout: [colA:varchar, colB:varchar, sum_104:varbinary]
           │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
           └─ RemoteSource[sourceFragmentIds = [1]]
                  Layout: [colA:varchar, colB:varchar, sum_104:varbinary]

 Fragment 1 [HASH]
     Output layout: [colA, colB, sum_104]
     Output partitioning: HASH [colA, colB]
     Aggregate[type = PARTIAL, keys = [colA, colB]]
     │   Layout: [colA:varchar, colB:varchar, sum_104:varbinary]
     │   sum_104 := sum(colC)
     └─ InnerJoin[criteria = (colD = colE), distribution = PARTITIONED]
        │   Layout: [colA:varchar, colC:decimal(38,18), colB:varchar]
        │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 0B}
        │   Distribution: PARTITIONED
        │   dynamicFilterAssignments = {colE -> #df_XYZ}
        ├─ RemoteSource[sourceFragmentIds = [2]]
        │      Layout: [colD:varchar, colA:varchar, colC:decimal(38,18)]
        └─ LocalExchange[partitioning = HASH, arguments = [colE::varchar]]
           │   Layout: [colE:varchar, colB:varchar]
           │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
           └─ RemoteSource[sourceFragmentIds = [3]]
                  Layout: [colE:varchar, colB:varchar]

 Fragment 2 [SOURCE]
     Output layout: [colD, colA, colC]
     Output partitioning: HASH [colD]
     ScanFilter[table = iceberg:db1.table_a$data@..., dynamicFilters = {colD = #df_XYZ}]
         Layout: [colD:varchar, colA:varchar, colC:decimal(38,18)]
         Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
         colC := 114:colC:decimal(38,18)
         colA := 69:colA:varchar
         colD := 32:colD:varchar

 Fragment 3 [SOURCE]
     Output layout: [colE, colB]
     Output partitioning: HASH [colE]
     TableScan[table = iceberg:db2.table_b$data@...]
         Layout: [colE:varchar, colB:varchar]
         Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
         colB := 24:colB:varchar
         colE := 1:colE:varchar

414 Query Plan

 Trino version: 414
 Fragment 0 [HASH]
     Output layout: [colA, colB, sum]
     Output partitioning: SINGLE []
     Output[columnNames = [colA, alias_for_colB, sum_of_colC]]
     │   Layout: [colA:varchar, colB:varchar, sum:decimal(38,18)]
     │   Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B}
     │   alias_for_colB := colB
     │   sum_of_colC := sum
     └─ Project[]
        │   Layout: [colA:varchar, colB:varchar, sum:decimal(38,18)]
        │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
        └─ Aggregate[type = FINAL, keys = [colA, colB], hash = [$hashvalue]]
           │   Layout: [colA:varchar, colB:varchar, $hashvalue:bigint, sum:decimal(38,18)]
           │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 0B}
           │   sum := sum("sum_100")
           └─ LocalExchange[partitioning = HASH, hashColumn = [$hashvalue], arguments = ["colA", "colB"]]
              │   Layout: [colA:varchar, colB:varchar, sum_100:varbinary, $hashvalue:bigint]
              │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
              └─ RemoteSource[sourceFragmentIds = [1]]
                     Layout: [colA:varchar, colB:varchar, sum_100:varbinary, $hashvalue_101:bigint]

 Fragment 1 [HASH]
     Output layout: [colA, colB, sum_100, $hashvalue_108]
     Output partitioning: HASH [colA, colB][$hashvalue_108]
     Project[]
     │   Layout: [colA:varchar, sum_100:varbinary, colB:varchar, $hashvalue_108:bigint]
     │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
     │   $hashvalue_108 := combine_hash(combine_hash(bigint '0', COALESCE("$operator$hash_code"("colA"), 0)), COALESCE("$operator$hash_code"("colB"), 0))
     └─ InnerJoin[criteria = ("colD" = "colE"), hash = [$hashvalue_102, $hashvalue_105], distribution = PARTITIONED]
        │   Layout: [colA:varchar, sum_100:varbinary, colB:varchar]
        │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 0B}
        │   Distribution: PARTITIONED
        │   dynamicFilterAssignments = {colE -> #df_ABC}
        ├─ RemoteSource[sourceFragmentIds = [2]]
        │      Layout: [colA:varchar, colD:varchar, sum_100:varbinary, $hashvalue_102:bigint]
        └─ LocalExchange[partitioning = HASH, hashColumn = [$hashvalue_105], arguments = ["colE"]]
           │   Layout: [colE:varchar, colB:varchar, $hashvalue_105:bigint]
           │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
           └─ RemoteSource[sourceFragmentIds = [3]]
                  Layout: [colE:varchar, colB:varchar, $hashvalue_106:bigint]

 Fragment 2 [SOURCE]
     Output layout: [colA, colD, sum_100, $hashvalue_104]
     Output partitioning: HASH [colD][$hashvalue_104]
     Project[]
     │   Layout: [colA:varchar, colD:varchar, sum_100:varbinary, $hashvalue_104:bigint]
     │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
     │   $hashvalue_104 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("colD"), 0))
     └─ Aggregate[type = PARTIAL, keys = [colA, colD], hash = [$hashvalue_103]]
        │   Layout: [colA:varchar, colD:varchar, $hashvalue_103:bigint, sum_100:varbinary]
        │   sum_100 := sum("colC")
        └─ ScanFilterProject[table = iceberg:db1.table_a$data@..., dynamicFilters = {"colD" = #df_ABC}]
               Layout: [colD:varchar, colA:varchar, colC:decimal(38,18), $hashvalue_103:bigint]
               Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
               $hashvalue_103 := combine_hash(combine_hash(bigint '0', COALESCE("$operator$hash_code"("colA"), 0)), COALESCE("$operator$hash_code"("colD"), 0))
               colD := 32:colD:varchar
               colC := 114:colC:decimal(38,18)
               colA := 69:colA:varchar

 Fragment 3 [SOURCE]
     Output layout: [colE, colB, $hashvalue_107]
     Output partitioning: HASH [colE][$hashvalue_107]
     ScanProject[table = iceberg:db2.table_b$data@...]
         Layout: [colE:varchar, colB:varchar, $hashvalue_107:bigint]
         Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
         $hashvalue_107 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("colE"), 0))
         colB := 24:colB:varchar
         colE := 1:colE:varchar

Specifically in 414, I see that there is a Aggregate[type = PARTIAL that is performed prior to the INNER JOIN, which is something I'd expect to happen given push_partial_aggregation_through_join is enabled. Is there something else we're missing (session parameter/optimizer config) that would help us optimize the query in later versions?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions