Details
Description
Since Spark 2.2 the groupBy plus collect_list makes use of unnecessary shuffle when the partitions at previous step are based on looser criteria than the current groupBy.
For example:
//sample data from https://github.com/databricks/Spark-The-Definitive-Guide/tree/master/data/retail-data //Read the data and repartitions by "Country" val retailDF = spark.sql("Select * from online_retail") .repartition(col("Country")) //Group the data and collect. val aggregatedDF = retailDF .withColumn("Good", expr("(StockCode, UnitPrice, Quantity, Description)")) .groupBy("Country", "CustomerID", "InvoiceNo", "InvoiceDate") .agg(collect_list("Good").as("Goods")) .withColumn("Invoice", expr("(InvoiceNo, InvoiceDate, Goods)")) .groupBy("Country", "CustomerID") .agg(collect_list("Invoice").as("Invoices")) .withColumn("Customer", expr("(CustomerID, Invoices)")) .groupBy("Country") .agg(collect_list("Customer").as("Customers"))
Without disabling the ObjectHashAggregate one gets the following physical plan:
== Physical Plan == ObjectHashAggregate(keys=[Country#14], functions=[finalmerge_collect_list(merge buf#317) AS collect_list(Customer#299, 0, 0)#310]) +- Exchange hashpartitioning(Country#14, 200) +- ObjectHashAggregate(keys=[Country#14], functions=[partial_collect_list(Customer#299, 0, 0) AS buf#317]) +- *Project [Country#14, named_struct(CustomerID, CustomerID#13, Invoices, Invoices#294) AS Customer#299] +- ObjectHashAggregate(keys=[Country#14, CustomerID#13], functions=[finalmerge_collect_list(merge buf#319) AS collect_list(Invoice#278, 0, 0)#293]) +- Exchange hashpartitioning(Country#14, CustomerID#13, 200) +- ObjectHashAggregate(keys=[Country#14, CustomerID#13], functions=[partial_collect_list(Invoice#278, 0, 0) AS buf#319]) +- *Project [Country#14, CustomerID#13, named_struct(InvoiceNo, InvoiceNo#7, InvoiceDate, InvoiceDate#11, Goods, Goods#271) AS Invoice#278] +- ObjectHashAggregate(keys=[Country#14, CustomerID#13, InvoiceNo#7, InvoiceDate#11], functions=[finalmerge_collect_list(merge buf#321) AS collect_list(Good#249, 0, 0)#270]) +- Exchange hashpartitioning(Country#14, CustomerID#13, InvoiceNo#7, InvoiceDate#11, 200) +- ObjectHashAggregate(keys=[Country#14, CustomerID#13, InvoiceNo#7, InvoiceDate#11], functions=[partial_collect_list(Good#249, 0, 0) AS buf#321]) +- *Project [InvoiceNo#7, InvoiceDate#11, CustomerID#13, Country#14, named_struct(StockCode, StockCode#8, UnitPrice, UnitPrice#12, Quantity, Quantity#10, Description, Description#9) AS Good#249] +- Exchange hashpartitioning(Country#14, 200) +- *FileScan csv default.online_retail[InvoiceNo#7,StockCode#8,Description#9,Quantity#10,InvoiceDate#11,UnitPrice#12,CustomerID#13,Country#14] Batched: false, Format: CSV, Location: InMemoryFileIndex[dbfs:/FileStore/tables/scgc0grb1506404260438], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<InvoiceNo:string,StockCode:string,Description:string,Quantity:string,InvoiceDate:string,Un...
With Spark 2.1.0 or when ObjectHashAggregate is disabled, one gets a more efficient:
== Physical Plan == SortAggregate(key=[Country#14], functions=[finalmerge_collect_list(merge buf#3834) AS collect_list(Customer#299, 0, 0)#310]) +- SortAggregate(key=[Country#14], functions=[partial_collect_list(Customer#299, 0, 0) AS buf#3834]) +- *Project [Country#14, named_struct(CustomerID, CustomerID#13, Invoices, Invoices#294) AS Customer#299] +- SortAggregate(key=[Country#14, CustomerID#13], functions=[finalmerge_collect_list(merge buf#319) AS collect_list(Invoice#278, 0, 0)#293]) +- SortAggregate(key=[Country#14, CustomerID#13], functions=[partial_collect_list(Invoice#278, 0, 0) AS buf#319]) +- *Project [Country#14, CustomerID#13, named_struct(InvoiceNo, InvoiceNo#7, InvoiceDate, InvoiceDate#11, Goods, Goods#271) AS Invoice#278] +- SortAggregate(key=[Country#14, CustomerID#13, InvoiceNo#7, InvoiceDate#11], functions=[finalmerge_collect_list(merge buf#321) AS collect_list(Good#249, 0, 0)#270]) +- SortAggregate(key=[Country#14, CustomerID#13, InvoiceNo#7, InvoiceDate#11], functions=[partial_collect_list(Good#249, 0, 0) AS buf#321]) +- *Sort [Country#14 ASC NULLS FIRST, CustomerID#13 ASC NULLS FIRST, InvoiceNo#7 ASC NULLS FIRST, InvoiceDate#11 ASC NULLS FIRST], false, 0 +- *Project [InvoiceNo#7, InvoiceDate#11, CustomerID#13, Country#14, named_struct(StockCode, StockCode#8, UnitPrice, UnitPrice#12, Quantity, Quantity#10, Description, Description#9) AS Good#249] +- Exchange hashpartitioning(Country#14, 200) +- *FileScan csv default.online_retail[InvoiceNo#7,StockCode#8,Description#9,Quantity#10,InvoiceDate#11,UnitPrice#12,CustomerID#13,Country#14] Batched: false, Format: CSV, Location: InMemoryFileIndex[dbfs:/FileStore/tables/scgc0grb1506404260438], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<InvoiceNo:string,StockCode:string,Description:string,Quantity:string,InvoiceDate:string,Un...
In this example, a quick run on DataBricks Notebook showed that by manually disabling the ObjectHashAggregate one gets around 16s execution time versus the 25s needed when ObjectHashAggregate is enabled.
The use of the ObjectHashAggregate in the groupBy was introduced with SPARK-17949.
Attachments
Issue Links
- is duplicated by
-
SPARK-22276 Unnecessary repartitioning
- Resolved
- links to