Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-22223

ObjectHashAggregate introduces unnecessary shuffle

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.2.0
    • 2.2.1, 2.3.0
    • Optimizer, SQL
    • None
    • Spark 2.2.0 and following.
      spark.sql.execution.useObjectHashAggregateExec = true

    Description

      Since Spark 2.2 the groupBy plus collect_list makes use of unnecessary shuffle when the partitions at previous step are based on looser criteria than the current groupBy.

      For example:

      //sample data from https://github.com/databricks/Spark-The-Definitive-Guide/tree/master/data/retail-data
      
      //Read the data and repartitions by "Country"
      val retailDF = spark.sql("Select * from online_retail")
          .repartition(col("Country"))
      
      //Group the data and collect.
      val aggregatedDF = retailDF
        .withColumn("Good", expr("(StockCode, UnitPrice, Quantity, Description)"))
        .groupBy("Country", "CustomerID", "InvoiceNo", "InvoiceDate")
        .agg(collect_list("Good").as("Goods"))
        .withColumn("Invoice", expr("(InvoiceNo, InvoiceDate, Goods)"))
        .groupBy("Country", "CustomerID")
        .agg(collect_list("Invoice").as("Invoices"))
        .withColumn("Customer", expr("(CustomerID, Invoices)"))
        .groupBy("Country")
        .agg(collect_list("Customer").as("Customers"))
      

      Without disabling the ObjectHashAggregate one gets the following physical plan:

      == Physical Plan ==
      ObjectHashAggregate(keys=[Country#14], functions=[finalmerge_collect_list(merge buf#317) AS collect_list(Customer#299, 0, 0)#310])
      +- Exchange hashpartitioning(Country#14, 200)
         +- ObjectHashAggregate(keys=[Country#14], functions=[partial_collect_list(Customer#299, 0, 0) AS buf#317])
            +- *Project [Country#14, named_struct(CustomerID, CustomerID#13, Invoices, Invoices#294) AS Customer#299]
               +- ObjectHashAggregate(keys=[Country#14, CustomerID#13], functions=[finalmerge_collect_list(merge buf#319) AS collect_list(Invoice#278, 0, 0)#293])
                  +- Exchange hashpartitioning(Country#14, CustomerID#13, 200)
                     +- ObjectHashAggregate(keys=[Country#14, CustomerID#13], functions=[partial_collect_list(Invoice#278, 0, 0) AS buf#319])
                        +- *Project [Country#14, CustomerID#13, named_struct(InvoiceNo, InvoiceNo#7, InvoiceDate, InvoiceDate#11, Goods, Goods#271) AS Invoice#278]
                           +- ObjectHashAggregate(keys=[Country#14, CustomerID#13, InvoiceNo#7, InvoiceDate#11], functions=[finalmerge_collect_list(merge buf#321) AS collect_list(Good#249, 0, 0)#270])
                              +- Exchange hashpartitioning(Country#14, CustomerID#13, InvoiceNo#7, InvoiceDate#11, 200)
                                 +- ObjectHashAggregate(keys=[Country#14, CustomerID#13, InvoiceNo#7, InvoiceDate#11], functions=[partial_collect_list(Good#249, 0, 0) AS buf#321])
                                    +- *Project [InvoiceNo#7, InvoiceDate#11, CustomerID#13, Country#14, named_struct(StockCode, StockCode#8, UnitPrice, UnitPrice#12, Quantity, Quantity#10, Description, Description#9) AS Good#249]
                                       +- Exchange hashpartitioning(Country#14, 200)
                                          +- *FileScan csv default.online_retail[InvoiceNo#7,StockCode#8,Description#9,Quantity#10,InvoiceDate#11,UnitPrice#12,CustomerID#13,Country#14] Batched: false, Format: CSV, Location: InMemoryFileIndex[dbfs:/FileStore/tables/scgc0grb1506404260438], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<InvoiceNo:string,StockCode:string,Description:string,Quantity:string,InvoiceDate:string,Un...
      

      With Spark 2.1.0 or when ObjectHashAggregate is disabled, one gets a more efficient:

      == Physical Plan ==
      SortAggregate(key=[Country#14], functions=[finalmerge_collect_list(merge buf#3834) AS collect_list(Customer#299, 0, 0)#310])
      +- SortAggregate(key=[Country#14], functions=[partial_collect_list(Customer#299, 0, 0) AS buf#3834])
         +- *Project [Country#14, named_struct(CustomerID, CustomerID#13, Invoices, Invoices#294) AS Customer#299]
            +- SortAggregate(key=[Country#14, CustomerID#13], functions=[finalmerge_collect_list(merge buf#319) AS collect_list(Invoice#278, 0, 0)#293])
               +- SortAggregate(key=[Country#14, CustomerID#13], functions=[partial_collect_list(Invoice#278, 0, 0) AS buf#319])
                  +- *Project [Country#14, CustomerID#13, named_struct(InvoiceNo, InvoiceNo#7, InvoiceDate, InvoiceDate#11, Goods, Goods#271) AS Invoice#278]
                     +- SortAggregate(key=[Country#14, CustomerID#13, InvoiceNo#7, InvoiceDate#11], functions=[finalmerge_collect_list(merge buf#321) AS collect_list(Good#249, 0, 0)#270])
                        +- SortAggregate(key=[Country#14, CustomerID#13, InvoiceNo#7, InvoiceDate#11], functions=[partial_collect_list(Good#249, 0, 0) AS buf#321])
                           +- *Sort [Country#14 ASC NULLS FIRST, CustomerID#13 ASC NULLS FIRST, InvoiceNo#7 ASC NULLS FIRST, InvoiceDate#11 ASC NULLS FIRST], false, 0
                              +- *Project [InvoiceNo#7, InvoiceDate#11, CustomerID#13, Country#14, named_struct(StockCode, StockCode#8, UnitPrice, UnitPrice#12, Quantity, Quantity#10, Description, Description#9) AS Good#249]
                                 +- Exchange hashpartitioning(Country#14, 200)
                                    +- *FileScan csv default.online_retail[InvoiceNo#7,StockCode#8,Description#9,Quantity#10,InvoiceDate#11,UnitPrice#12,CustomerID#13,Country#14] Batched: false, Format: CSV, Location: InMemoryFileIndex[dbfs:/FileStore/tables/scgc0grb1506404260438], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<InvoiceNo:string,StockCode:string,Description:string,Quantity:string,InvoiceDate:string,Un...
      

      In this example, a quick run on DataBricks Notebook showed that by manually disabling the ObjectHashAggregate one gets around 16s execution time versus the 25s needed when ObjectHashAggregate is enabled.

      The use of the ObjectHashAggregate in the groupBy was introduced with SPARK-17949.

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            viirya L. C. Hsieh
            mcs Michele Costantino Soccio
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment