#### Description

In RewriteDistinctAggregates, when grouping aggregate expressions by function children, we should treat children that are semantically equivalent as the same.

This proposed change potentially reduces the number of projections in the Expand operator added to a plan. In some cases, it may eliminate the need for an Expand operator.

Example: In the following query, the Expand operator creates 3*n rows (where n is the number of incoming rows) because it has a projection for function children b + 1, 1 + b and c.

create or replace temp view v1 as select * from values (1, 2, 3.0), (1, 3, 4.0), (2, 4, 2.5), (2, 3, 1.0) v1(a, b, c); select a, count(distinct b + 1), avg(distinct 1 + b) filter (where c > 0), sum(c) from v1 group by a;

The Expand operator has three projections (each producing a row for each incoming row):

[a#87, null, null, 0, null, UnscaledValue(c#89)], <== projection #1 (for regular aggregation) [a#87, (b#88 + 1), null, 1, null, null], <== projection #2 (for distinct aggregation of b + 1) [a#87, null, (1 + b#88), 2, (c#89 > 0.0), null]], <== projection #3 (for distinct aggregation of 1 + b)

In reality, the Expand only needs one projection for 1 + b and b + 1, because they are semantically equivalent.

With the proposed change, the Expand operator's projections look like this:

[a#67, null, 0, null, UnscaledValue(c#69)], <== projection #1 (for regular aggregations) [a#67, (b#68 + 1), 1, (c#69 > 0.0), null]], <== projection #2 (for distinct aggregation on b + 1 and 1 + b)

With one less projection, Expand produces n*2 rows instead of n*3 rows, but still produces the correct result.

In the case where all distinct aggregates have semantically equivalent children, the Expand operator is not needed at all.

Assume this benchmark:

runBenchmark("distinct aggregates") { val N = 20 << 22 val benchmark = new Benchmark("distinct aggregates", N, output = output) spark.range(N).selectExpr("id % 100 as k", "id % 10 as id1") .createOrReplaceTempView("test") def f1(): Unit = spark.sql( """ select k, sum(distinct id1 + 1), count(distinct 1 + id1), avg(distinct 1 + ID1) from test group by k""").noop() benchmark.addCase("all semantically equivalent", numIters = 2) { _ => f1() } def f2(): Unit = spark.sql( """ select k, sum(distinct id1 + 1), count(distinct 1 + id1), avg(distinct 2 + ID1) from test group by k""").noop() benchmark.addCase("some semantically equivalent", numIters = 2) { _ => f2() } def f3(): Unit = spark.sql( """ select k, sum(distinct id1 + 1), count(distinct 3 + id1), avg(distinct 2 + ID1) from test group by k""").noop() benchmark.addCase("none semantically equivalent", numIters = 2) { _ => f3() } benchmark.run() }

Before the change:

[info] distinct aggregates: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] all semantically equivalent 14721 14859 195 5.7 175.5 1.0X [info] some semantically equivalent 14569 14572 5 5.8 173.7 1.0X [info] none semantically equivalent 14408 14488 113 5.8 171.8 1.0X

After the proposed change:

[info] distinct aggregates: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] all semantically equivalent 3658 3692 49 22.9 43.6 1.0X [info] some semantically equivalent 9124 9214 127 9.2 108.8 0.4X [info] none semantically equivalent 14601 14777 250 5.7 174.1 0.3X