Description
Currently exchange reuse doesn't work across all subquery levels.
Here is an example query:
SELECT (SELECT max(a.key) FROM testData AS a JOIN testData AS b ON b.key = a.key), a.key FROM testData AS a JOIN testData AS b ON b.key = a.key
where the plan is:
*(5) Project [Subquery scalar-subquery#240, [id=#193] AS scalarsubquery()#247, key#13] : +- Subquery scalar-subquery#240, [id=#193] : +- *(6) HashAggregate(keys=[], functions=[max(key#13)], output=[max(key)#246]) : +- Exchange SinglePartition, true, [id=#189] : +- *(5) HashAggregate(keys=[], functions=[partial_max(key#13)], output=[max#251]) : +- *(5) Project [key#13] : +- *(5) SortMergeJoin [key#13], [key#243], Inner : :- *(2) Sort [key#13 ASC NULLS FIRST], false, 0 : : +- Exchange hashpartitioning(key#13, 5), true, [id=#145] : : +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13] : : +- Scan[obj#12] : +- *(4) Sort [key#243 ASC NULLS FIRST], false, 0 : +- ReusedExchange [key#243], Exchange hashpartitioning(key#13, 5), true, [id=#145] +- *(5) SortMergeJoin [key#13], [key#241], Inner :- *(2) Sort [key#13 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(key#13, 5), true, [id=#205] : +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13] : +- Scan[obj#12] +- *(4) Sort [key#241 ASC NULLS FIRST], false, 0 +- ReusedExchange [key#241], Exchange hashpartitioning(key#13, 5), true, [id=#205]
but it could be improved as here:
*(5) Project [Subquery scalar-subquery#240, [id=#211] AS scalarsubquery()#247, key#13] : +- Subquery scalar-subquery#240, [id=#211] : +- *(6) HashAggregate(keys=[], functions=[max(key#13)], output=[max(key)#246]) : +- Exchange SinglePartition, true, [id=#207] : +- *(5) HashAggregate(keys=[], functions=[partial_max(key#13)], output=[max#251]) : +- *(5) Project [key#13] : +- *(5) SortMergeJoin [key#13], [key#243], Inner : :- *(2) Sort [key#13 ASC NULLS FIRST], false, 0 : : +- Exchange hashpartitioning(key#13, 5), true, [id=#145] : : +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13] : : +- Scan[obj#12] : +- *(4) Sort [key#243 ASC NULLS FIRST], false, 0 : +- ReusedExchange [key#243], Exchange hashpartitioning(key#13, 5), true, [id=#145] +- *(5) SortMergeJoin [key#13], [key#241], Inner :- *(2) Sort [key#13 ASC NULLS FIRST], false, 0 : +- ReusedExchange [key#13], Exchange hashpartitioning(key#13, 5), true, [id=#145] +- *(4) Sort [key#241 ASC NULLS FIRST], false, 0 +- ReusedExchange [key#241], Exchange hashpartitioning(key#13, 5), true, [id=#145]
Attachments
Issue Links
- is duplicated by
-
SPARK-32041 Exchange reuse won't work in cases when DPP, subqueries are involved
- Resolved
- links to