Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-27573

Skip partial aggregation when data is already partitioned (or collapse adjacent partial and final aggregates)

Log workAgile BoardRank to TopRank to BottomAttach filesAttach ScreenshotAdd voteVotersWatch issueWatchersCreate sub-taskConvert to sub-taskLinkCloneLabelsUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 3.1.0
    • Fix Version/s: None
    • Component/s: Optimizer, SQL
    • Labels:
      None

      Description

      When an aggregation requires a shuffle, Spark SQL performs separate partial and final aggregations:

      sql("select id % 100 as k, id as v from range(100000)")
        .groupBy("k")
        .sum("v")
        .explain
      
      == Physical Plan ==
      *(2) HashAggregate(keys=[k#64L], functions=[sum(v#65L)])
      +- Exchange(coordinator id: 2031684357) hashpartitioning(k#64L, 5340), coordinator[target post-shuffle partition size: 67108864]
         +- *(1) HashAggregate(keys=[k#64L], functions=[partial_sum(v#65L)])
            +- *(1) Project [(id#66L % 100) AS k#64L, id#66L AS v#65L]
               +- *(1) Range (0, 100000, step=1, splits=10)
      

      However, consider what happens if the dataset being aggregated is already pre-partitioned by the aggregate's grouping columns:

      sql("select id % 100 as k, id as v from range(100000)")
        .repartition(10, $"k")
        .groupBy("k")
        .sum("v")
        .explain
      
      == Physical Plan ==
      *(2) HashAggregate(keys=[k#50L], functions=[sum(v#51L)], output=[k#50L, sum(v)#58L])
      +- *(2) HashAggregate(keys=[k#50L], functions=[partial_sum(v#51L)], output=[k#50L, sum#63L])
         +- Exchange(coordinator id: 39015877) hashpartitioning(k#50L, 10), coordinator[target post-shuffle partition size: 67108864]
            +- *(1) Project [(id#52L % 100) AS k#50L, id#52L AS v#51L]
               +- *(1) Range (0, 100000, step=1, splits=10) 
      

      Here, we end up with back-to-back HashAggregate operators which are performed as part of the same stage.

      For certain aggregates (e.g. sumcount), this duplication is unnecessary: we could have just performed a total aggregation instead (since we already have all of the data co-located)!

      The duplicate aggregate is problematic in cases where the aggregate inputs and outputs are the same order of magnitude (e.g.counting the number of duplicate records in a dataset where duplicates are extremely rare).

      My motivation for this optimization is similar to SPARK-1412: I know that partial aggregation doesn't help for my workload, so I wanted to somehow coerce Spark into skipping the ineffective partial aggregation and jumping directly to total aggregation. I thought that pre-partitioning would accomplish this, but doing so didn't achieve my goal due to the missing aggregation-collapsing (or partial-aggregate skipping) optimization.

        Attachments

        Issue Links

          Activity

          $i18n.getText('security.level.explanation', $currentSelection) Viewable by All Users
          Cancel

            People

              Dates

              • Created:
                Updated:

                Issue deployment