Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-22223

ObjectHashAggregate introduces unnecessary shuffle

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.2.0
    • 2.2.1, 2.3.0
    • Optimizer, SQL
    • None
    • Spark 2.2.0 and following.
      spark.sql.execution.useObjectHashAggregateExec = true

    Description

      Since Spark 2.2 the groupBy plus collect_list makes use of unnecessary shuffle when the partitions at previous step are based on looser criteria than the current groupBy.

      For example:

      //sample data from https://github.com/databricks/Spark-The-Definitive-Guide/tree/master/data/retail-data
      
      //Read the data and repartitions by "Country"
      val retailDF = spark.sql("Select * from online_retail")
          .repartition(col("Country"))
      
      //Group the data and collect.
      val aggregatedDF = retailDF
        .withColumn("Good", expr("(StockCode, UnitPrice, Quantity, Description)"))
        .groupBy("Country", "CustomerID", "InvoiceNo", "InvoiceDate")
        .agg(collect_list("Good").as("Goods"))
        .withColumn("Invoice", expr("(InvoiceNo, InvoiceDate, Goods)"))
        .groupBy("Country", "CustomerID")
        .agg(collect_list("Invoice").as("Invoices"))
        .withColumn("Customer", expr("(CustomerID, Invoices)"))
        .groupBy("Country")
        .agg(collect_list("Customer").as("Customers"))
      

      Without disabling the ObjectHashAggregate one gets the following physical plan:

      == Physical Plan ==
      ObjectHashAggregate(keys=[Country#14], functions=[finalmerge_collect_list(merge buf#317) AS collect_list(Customer#299, 0, 0)#310])
      +- Exchange hashpartitioning(Country#14, 200)
         +- ObjectHashAggregate(keys=[Country#14], functions=[partial_collect_list(Customer#299, 0, 0) AS buf#317])
            +- *Project [Country#14, named_struct(CustomerID, CustomerID#13, Invoices, Invoices#294) AS Customer#299]
               +- ObjectHashAggregate(keys=[Country#14, CustomerID#13], functions=[finalmerge_collect_list(merge buf#319) AS collect_list(Invoice#278, 0, 0)#293])
                  +- Exchange hashpartitioning(Country#14, CustomerID#13, 200)
                     +- ObjectHashAggregate(keys=[Country#14, CustomerID#13], functions=[partial_collect_list(Invoice#278, 0, 0) AS buf#319])
                        +- *Project [Country#14, CustomerID#13, named_struct(InvoiceNo, InvoiceNo#7, InvoiceDate, InvoiceDate#11, Goods, Goods#271) AS Invoice#278]
                           +- ObjectHashAggregate(keys=[Country#14, CustomerID#13, InvoiceNo#7, InvoiceDate#11], functions=[finalmerge_collect_list(merge buf#321) AS collect_list(Good#249, 0, 0)#270])
                              +- Exchange hashpartitioning(Country#14, CustomerID#13, InvoiceNo#7, InvoiceDate#11, 200)
                                 +- ObjectHashAggregate(keys=[Country#14, CustomerID#13, InvoiceNo#7, InvoiceDate#11], functions=[partial_collect_list(Good#249, 0, 0) AS buf#321])
                                    +- *Project [InvoiceNo#7, InvoiceDate#11, CustomerID#13, Country#14, named_struct(StockCode, StockCode#8, UnitPrice, UnitPrice#12, Quantity, Quantity#10, Description, Description#9) AS Good#249]
                                       +- Exchange hashpartitioning(Country#14, 200)
                                          +- *FileScan csv default.online_retail[InvoiceNo#7,StockCode#8,Description#9,Quantity#10,InvoiceDate#11,UnitPrice#12,CustomerID#13,Country#14] Batched: false, Format: CSV, Location: InMemoryFileIndex[dbfs:/FileStore/tables/scgc0grb1506404260438], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<InvoiceNo:string,StockCode:string,Description:string,Quantity:string,InvoiceDate:string,Un...
      

      With Spark 2.1.0 or when ObjectHashAggregate is disabled, one gets a more efficient:

      == Physical Plan ==
      SortAggregate(key=[Country#14], functions=[finalmerge_collect_list(merge buf#3834) AS collect_list(Customer#299, 0, 0)#310])
      +- SortAggregate(key=[Country#14], functions=[partial_collect_list(Customer#299, 0, 0) AS buf#3834])
         +- *Project [Country#14, named_struct(CustomerID, CustomerID#13, Invoices, Invoices#294) AS Customer#299]
            +- SortAggregate(key=[Country#14, CustomerID#13], functions=[finalmerge_collect_list(merge buf#319) AS collect_list(Invoice#278, 0, 0)#293])
               +- SortAggregate(key=[Country#14, CustomerID#13], functions=[partial_collect_list(Invoice#278, 0, 0) AS buf#319])
                  +- *Project [Country#14, CustomerID#13, named_struct(InvoiceNo, InvoiceNo#7, InvoiceDate, InvoiceDate#11, Goods, Goods#271) AS Invoice#278]
                     +- SortAggregate(key=[Country#14, CustomerID#13, InvoiceNo#7, InvoiceDate#11], functions=[finalmerge_collect_list(merge buf#321) AS collect_list(Good#249, 0, 0)#270])
                        +- SortAggregate(key=[Country#14, CustomerID#13, InvoiceNo#7, InvoiceDate#11], functions=[partial_collect_list(Good#249, 0, 0) AS buf#321])
                           +- *Sort [Country#14 ASC NULLS FIRST, CustomerID#13 ASC NULLS FIRST, InvoiceNo#7 ASC NULLS FIRST, InvoiceDate#11 ASC NULLS FIRST], false, 0
                              +- *Project [InvoiceNo#7, InvoiceDate#11, CustomerID#13, Country#14, named_struct(StockCode, StockCode#8, UnitPrice, UnitPrice#12, Quantity, Quantity#10, Description, Description#9) AS Good#249]
                                 +- Exchange hashpartitioning(Country#14, 200)
                                    +- *FileScan csv default.online_retail[InvoiceNo#7,StockCode#8,Description#9,Quantity#10,InvoiceDate#11,UnitPrice#12,CustomerID#13,Country#14] Batched: false, Format: CSV, Location: InMemoryFileIndex[dbfs:/FileStore/tables/scgc0grb1506404260438], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<InvoiceNo:string,StockCode:string,Description:string,Quantity:string,InvoiceDate:string,Un...
      

      In this example, a quick run on DataBricks Notebook showed that by manually disabling the ObjectHashAggregate one gets around 16s execution time versus the 25s needed when ObjectHashAggregate is enabled.

      The use of the ObjectHashAggregate in the groupBy was introduced with SPARK-17949.

      Attachments

        Issue Links

          Activity

            People

              viirya L. C. Hsieh
              mcs Michele Costantino Soccio
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: