Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-22223

ObjectHashAggregate introduces unnecessary shuffle

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.2.0
    • Fix Version/s: 2.2.1, 2.3.0
    • Component/s: Optimizer
    • Labels:
      None
    • Environment:

      Spark 2.2.0 and following.
      spark.sql.execution.useObjectHashAggregateExec = true

      Description

      Since Spark 2.2 the groupBy plus collect_list makes use of unnecessary shuffle when the partitions at previous step are based on looser criteria than the current groupBy.

      For example:

      //sample data from https://github.com/databricks/Spark-The-Definitive-Guide/tree/master/data/retail-data
      
      //Read the data and repartitions by "Country"
      val retailDF = spark.sql("Select * from online_retail")
          .repartition(col("Country"))
      
      //Group the data and collect.
      val aggregatedDF = retailDF
        .withColumn("Good", expr("(StockCode, UnitPrice, Quantity, Description)"))
        .groupBy("Country", "CustomerID", "InvoiceNo", "InvoiceDate")
        .agg(collect_list("Good").as("Goods"))
        .withColumn("Invoice", expr("(InvoiceNo, InvoiceDate, Goods)"))
        .groupBy("Country", "CustomerID")
        .agg(collect_list("Invoice").as("Invoices"))
        .withColumn("Customer", expr("(CustomerID, Invoices)"))
        .groupBy("Country")
        .agg(collect_list("Customer").as("Customers"))
      

      Without disabling the ObjectHashAggregate one gets the following physical plan:

      == Physical Plan ==
      ObjectHashAggregate(keys=[Country#14], functions=[finalmerge_collect_list(merge buf#317) AS collect_list(Customer#299, 0, 0)#310])
      +- Exchange hashpartitioning(Country#14, 200)
         +- ObjectHashAggregate(keys=[Country#14], functions=[partial_collect_list(Customer#299, 0, 0) AS buf#317])
            +- *Project [Country#14, named_struct(CustomerID, CustomerID#13, Invoices, Invoices#294) AS Customer#299]
               +- ObjectHashAggregate(keys=[Country#14, CustomerID#13], functions=[finalmerge_collect_list(merge buf#319) AS collect_list(Invoice#278, 0, 0)#293])
                  +- Exchange hashpartitioning(Country#14, CustomerID#13, 200)
                     +- ObjectHashAggregate(keys=[Country#14, CustomerID#13], functions=[partial_collect_list(Invoice#278, 0, 0) AS buf#319])
                        +- *Project [Country#14, CustomerID#13, named_struct(InvoiceNo, InvoiceNo#7, InvoiceDate, InvoiceDate#11, Goods, Goods#271) AS Invoice#278]
                           +- ObjectHashAggregate(keys=[Country#14, CustomerID#13, InvoiceNo#7, InvoiceDate#11], functions=[finalmerge_collect_list(merge buf#321) AS collect_list(Good#249, 0, 0)#270])
                              +- Exchange hashpartitioning(Country#14, CustomerID#13, InvoiceNo#7, InvoiceDate#11, 200)
                                 +- ObjectHashAggregate(keys=[Country#14, CustomerID#13, InvoiceNo#7, InvoiceDate#11], functions=[partial_collect_list(Good#249, 0, 0) AS buf#321])
                                    +- *Project [InvoiceNo#7, InvoiceDate#11, CustomerID#13, Country#14, named_struct(StockCode, StockCode#8, UnitPrice, UnitPrice#12, Quantity, Quantity#10, Description, Description#9) AS Good#249]
                                       +- Exchange hashpartitioning(Country#14, 200)
                                          +- *FileScan csv default.online_retail[InvoiceNo#7,StockCode#8,Description#9,Quantity#10,InvoiceDate#11,UnitPrice#12,CustomerID#13,Country#14] Batched: false, Format: CSV, Location: InMemoryFileIndex[dbfs:/FileStore/tables/scgc0grb1506404260438], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<InvoiceNo:string,StockCode:string,Description:string,Quantity:string,InvoiceDate:string,Un...
      

      With Spark 2.1.0 or when ObjectHashAggregate is disabled, one gets a more efficient:

      == Physical Plan ==
      SortAggregate(key=[Country#14], functions=[finalmerge_collect_list(merge buf#3834) AS collect_list(Customer#299, 0, 0)#310])
      +- SortAggregate(key=[Country#14], functions=[partial_collect_list(Customer#299, 0, 0) AS buf#3834])
         +- *Project [Country#14, named_struct(CustomerID, CustomerID#13, Invoices, Invoices#294) AS Customer#299]
            +- SortAggregate(key=[Country#14, CustomerID#13], functions=[finalmerge_collect_list(merge buf#319) AS collect_list(Invoice#278, 0, 0)#293])
               +- SortAggregate(key=[Country#14, CustomerID#13], functions=[partial_collect_list(Invoice#278, 0, 0) AS buf#319])
                  +- *Project [Country#14, CustomerID#13, named_struct(InvoiceNo, InvoiceNo#7, InvoiceDate, InvoiceDate#11, Goods, Goods#271) AS Invoice#278]
                     +- SortAggregate(key=[Country#14, CustomerID#13, InvoiceNo#7, InvoiceDate#11], functions=[finalmerge_collect_list(merge buf#321) AS collect_list(Good#249, 0, 0)#270])
                        +- SortAggregate(key=[Country#14, CustomerID#13, InvoiceNo#7, InvoiceDate#11], functions=[partial_collect_list(Good#249, 0, 0) AS buf#321])
                           +- *Sort [Country#14 ASC NULLS FIRST, CustomerID#13 ASC NULLS FIRST, InvoiceNo#7 ASC NULLS FIRST, InvoiceDate#11 ASC NULLS FIRST], false, 0
                              +- *Project [InvoiceNo#7, InvoiceDate#11, CustomerID#13, Country#14, named_struct(StockCode, StockCode#8, UnitPrice, UnitPrice#12, Quantity, Quantity#10, Description, Description#9) AS Good#249]
                                 +- Exchange hashpartitioning(Country#14, 200)
                                    +- *FileScan csv default.online_retail[InvoiceNo#7,StockCode#8,Description#9,Quantity#10,InvoiceDate#11,UnitPrice#12,CustomerID#13,Country#14] Batched: false, Format: CSV, Location: InMemoryFileIndex[dbfs:/FileStore/tables/scgc0grb1506404260438], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<InvoiceNo:string,StockCode:string,Description:string,Quantity:string,InvoiceDate:string,Un...
      

      In this example, a quick run on DataBricks Notebook showed that by manually disabling the ObjectHashAggregate one gets around 16s execution time versus the 25s needed when ObjectHashAggregate is enabled.

      The use of the ObjectHashAggregate in the groupBy was introduced with SPARK-17949.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                viirya Liang-Chi Hsieh
                Reporter:
                mcs Michele Costantino Soccio
              • Votes:
                0 Vote for this issue
                Watchers:
                5 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: