Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-29375

Exchange reuse across all subquery levels

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.1.0
    • 3.2.0
    • SQL
    • None

    Description

      Currently exchange reuse doesn't work across all subquery levels.
      Here is an example query:

      SELECT
       (SELECT max(a.key) FROM testData AS a JOIN testData AS b ON b.key = a.key),
       a.key
      FROM testData AS a
      JOIN testData AS b ON b.key = a.key

      where the plan is:

      *(5) Project [Subquery scalar-subquery#240, [id=#193] AS scalarsubquery()#247, key#13]
      :  +- Subquery scalar-subquery#240, [id=#193]
      :     +- *(6) HashAggregate(keys=[], functions=[max(key#13)], output=[max(key)#246])
      :        +- Exchange SinglePartition, true, [id=#189]
      :           +- *(5) HashAggregate(keys=[], functions=[partial_max(key#13)], output=[max#251])
      :              +- *(5) Project [key#13]
      :                 +- *(5) SortMergeJoin [key#13], [key#243], Inner
      :                    :- *(2) Sort [key#13 ASC NULLS FIRST], false, 0
      :                    :  +- Exchange hashpartitioning(key#13, 5), true, [id=#145]
      :                    :     +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13]
      :                    :        +- Scan[obj#12]
      :                    +- *(4) Sort [key#243 ASC NULLS FIRST], false, 0
      :                       +- ReusedExchange [key#243], Exchange hashpartitioning(key#13, 5), true, [id=#145]
      +- *(5) SortMergeJoin [key#13], [key#241], Inner
         :- *(2) Sort [key#13 ASC NULLS FIRST], false, 0
         :  +- Exchange hashpartitioning(key#13, 5), true, [id=#205]
         :     +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13]
         :        +- Scan[obj#12]
         +- *(4) Sort [key#241 ASC NULLS FIRST], false, 0
            +- ReusedExchange [key#241], Exchange hashpartitioning(key#13, 5), true, [id=#205]
      

      but it could be improved as here:

      *(5) Project [Subquery scalar-subquery#240, [id=#211] AS scalarsubquery()#247, key#13]
      :  +- Subquery scalar-subquery#240, [id=#211]
      :     +- *(6) HashAggregate(keys=[], functions=[max(key#13)], output=[max(key)#246])
      :        +- Exchange SinglePartition, true, [id=#207]
      :           +- *(5) HashAggregate(keys=[], functions=[partial_max(key#13)], output=[max#251])
      :              +- *(5) Project [key#13]
      :                 +- *(5) SortMergeJoin [key#13], [key#243], Inner
      :                    :- *(2) Sort [key#13 ASC NULLS FIRST], false, 0
      :                    :  +- Exchange hashpartitioning(key#13, 5), true, [id=#145]
      :                    :     +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13]
      :                    :        +- Scan[obj#12]
      :                    +- *(4) Sort [key#243 ASC NULLS FIRST], false, 0
      :                       +- ReusedExchange [key#243], Exchange hashpartitioning(key#13, 5), true, [id=#145]
      +- *(5) SortMergeJoin [key#13], [key#241], Inner
         :- *(2) Sort [key#13 ASC NULLS FIRST], false, 0
         :  +- ReusedExchange [key#13], Exchange hashpartitioning(key#13, 5), true, [id=#145]
         +- *(4) Sort [key#241 ASC NULLS FIRST], false, 0
            +- ReusedExchange [key#241], Exchange hashpartitioning(key#13, 5), true, [id=#145]
      

      Attachments

        Issue Links

          Activity

            People

              petertoth Peter Toth
              petertoth Peter Toth
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: