Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-38386

Combine compatible scalar subqueries

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: In Progress
    • Minor
    • Resolution: Unresolved
    • 3.3.0
    • None
    • Optimizer
    • None
    • Patch

    Description

      The idea of this issue is originated from https://github.com/NVIDIA/spark-rapids/issues/4186

      Currently, Spark SQL executes each incorrelated scalar subquery as an independent spark job. It generates a lot of spark jobs when we run a query with a lot of incorrelated scalar subqueries. Scenarios like this can be optimized in terms of logcial plan. We can combine subquery plans of compatible scalar subqueries into fused subquery plans. And let them shared by multiple scalar subqueries. With combining compatible scalar subqueries, we can cut off the cost of subquery jobs, because common parts of compatible subquery plans (scans/filters) will be reused.

       

      Here is an example to demonstrate the basic idea of combining compatible scalar subqueries:

      SELECT SUM(i)
      FROM t
      WHERE l > (SELECT MIN(l2) FROM t)
      AND l2 < (SELECT MAX(l) FROM t)
      AND i2 <> (SELECT MAX(i2) FROM t)
      AND i2 <> (SELECT MIN(i2) FROM t) 

      Optimized logicial plan of above query looks like:

      Aggregate [sum(i)]
      +- Project [i]
        +- Filter (((l > scalar-subquery#1) AND (l2 < scalar-subquery#2)) AND (NOT (i2 = scalar-subquery#3) AND NOT (i2 = scalar-subquery#4)))
           :  :- Aggregate [min(l2)]
           :  :  +- Project [l2]
           :  :     +- Relation [l,l2,i,i2]
           :  +- Aggregate [max(l)]
           :     +- Project [l]
           :        +- Relation [l,l2,i,i2]
           :  +- Aggregate [max(i2)]
           :     +- Project [l]
           :        +- Relation [l,l2,i,i2]
           :  +- Aggregate [min(i2)]
           :     +- Project [l]
           :        +- Relation [l,l2,i,i2]
           +- Relation [l,l2,i,i2] 

      After the combination of compatible scalar subqueries, the logicial plan becomes:

       Aggregate [sum(i)]
       +- Project [i]
         +- Filter (((l > shared-scalar-subquery#1) AND (l2 < shared-scalar-subquery#2)) AND (NOT (i2 = shared-scalar-subquery#3) AND NOT (i2 = shared-scalar-subquery#4)))
            :  :- Aggregate [min(l2),max(l),max(i2),min(i2)]
            :  :  +- Project [l2,l,i2]
            :  :     +- Relation [l,l2,i,i2]
            :  :- Aggregate [min(l2),max(l),max(i2),min(i2)]
            :  :  +- Project [l2,l,i2]
            :        +- Relation [l,l2,i,i2]
            :  :- Aggregate [min(l2),max(l),max(i2),min(i2)]
            :  :  +- Project [l2,l,i2]
            :        +- Relation [l,l2,i,i2]
            :  :- Aggregate [min(l2),max(l),max(i2),min(i2)]
            :  :  +- Project [l2,l,i2]
            :        +- Relation [l,l2,i,i2]
            +- Relation [l,l2,i,i2] 

       

      There are 4 scalar subqueries within this query. Although they are semantically unequal, they are based on the same relation. Therefore, we can merge all of them into an unified Aggregate to resue the common scan(relation).

       

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            sperlingxx Alfred Xu
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: