Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-40382

Reduce projections in Expand when multiple distinct aggregations have semantically equivalent children

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.4.0
    • 3.4.0
    • SQL
    • None

    Description

      In RewriteDistinctAggregates, when grouping aggregate expressions by function children, we should treat children that are semantically equivalent as the same.

      This proposed change potentially reduces the number of projections in the Expand operator added to a plan. In some cases, it may eliminate the need for an Expand operator.

      Example: In the following query, the Expand operator creates 3*n rows (where n is the number of incoming rows) because it has a projection for function children b + 1, 1 + b and c.

      create or replace temp view v1 as
      select * from values
      (1, 2, 3.0),
      (1, 3, 4.0),
      (2, 4, 2.5),
      (2, 3, 1.0)
      v1(a, b, c);
      
      select
        a,
        count(distinct b + 1),
        avg(distinct 1 + b) filter (where c > 0),
        sum(c)
      from
        v1
      group by a;
      

      The Expand operator has three projections (each producing a row for each incoming row):

      [a#87, null, null, 0, null, UnscaledValue(c#89)], <== projection #1 (for regular aggregation)
      [a#87, (b#88 + 1), null, 1, null, null],          <== projection #2 (for distinct aggregation of b + 1)
      [a#87, null, (1 + b#88), 2, (c#89 > 0.0), null]], <== projection #3 (for distinct aggregation of 1 + b)
      

      In reality, the Expand only needs one projection for 1 + b and b + 1, because they are semantically equivalent.

      With the proposed change, the Expand operator's projections look like this:

      [a#67, null, 0, null, UnscaledValue(c#69)],  <== projection #1 (for regular aggregations)
      [a#67, (b#68 + 1), 1, (c#69 > 0.0), null]],  <== projection #2 (for distinct aggregation on b + 1 and 1 + b)
      

      With one less projection, Expand produces n*2 rows instead of n*3 rows, but still produces the correct result.

      In the case where all distinct aggregates have semantically equivalent children, the Expand operator is not needed at all.

      Assume this benchmark:

          runBenchmark("distinct aggregates") {
            val N = 20 << 22
      
            val benchmark = new Benchmark("distinct aggregates", N, output = output)
            spark.range(N).selectExpr("id % 100 as k", "id % 10 as id1")
              .createOrReplaceTempView("test")
      
            def f1(): Unit = spark.sql(
              """
                select
                  k,
                  sum(distinct id1 + 1),
                  count(distinct 1 + id1),
                  avg(distinct 1 + ID1)
                from
                  test
                group by k""").noop()
      
            benchmark.addCase("all semantically equivalent", numIters = 2) { _ =>
              f1()
            }
      
            def f2(): Unit = spark.sql(
              """
                select
                  k,
                  sum(distinct id1 + 1),
                  count(distinct 1 + id1),
                  avg(distinct 2 + ID1)
                from
                  test
                group by k""").noop()
      
            benchmark.addCase("some semantically equivalent", numIters = 2) { _ =>
              f2()
            }
      
            def f3(): Unit = spark.sql(
              """
                select
                  k,
                  sum(distinct id1 + 1),
                  count(distinct 3 + id1),
                  avg(distinct 2 + ID1)
                from
                  test
                group by k""").noop()
      
            benchmark.addCase("none semantically equivalent", numIters = 2) { _ =>
              f3()
            }
            benchmark.run()
          }
       

      Before the change:

      [info] distinct aggregates:                      Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
      [info] ------------------------------------------------------------------------------------------------------------------------
      [info] all semantically equivalent                       14721          14859         195          5.7         175.5       1.0X
      [info] some semantically equivalent                      14569          14572           5          5.8         173.7       1.0X
      [info] none semantically equivalent                      14408          14488         113          5.8         171.8       1.0X
      

      After the proposed change:

      [info] distinct aggregates:                      Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
      [info] ------------------------------------------------------------------------------------------------------------------------
      [info] all semantically equivalent                        3658           3692          49         22.9          43.6       1.0X
      [info] some semantically equivalent                       9124           9214         127          9.2         108.8       0.4X
      [info] none semantically equivalent                      14601          14777         250          5.7         174.1       0.3X
      

      Attachments

        Activity

          People

            bersprockets Bruce Robbins
            bersprockets Bruce Robbins
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: