Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-6550

Add PreAnalyzer to keep logical plan consistent across DataFrame



    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 1.3.1, 1.4.0
    • SQL
    • None



      In some cases, the expressions in a logical plan will be modified to new ones during analysis, e.g. the handling for self-join cases. If some expressions are resolved based on the analyzed plan, they are referring to changed expression ids, not original ids.

      But the transformation of DataFrame will use logical plan to construct new DataFrame, e.g. groupBy and aggregation. So in such cases, the expressions in these DataFrames will be inconsistent.

      The problems are specified as following:

      1. Expression ids in logical plan are possibly inconsistent if expression ids are changed during analysis and some expressions are resolved after that

      When we try to run the following codes:

      val df = Seq(1,2,3).map(i => (i, i.toString)).toDF("int", "str")
      val df2 = df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("y.str").min("y.int")

      Because groupBy and min will perform resolving based on the analyzed logical plan, their expression ids refer to analyzed plan, instead of logical plan.

      So the logical plan of df2 looks like:

      'Aggregate [str#5], [str#5,MIN(int#4) AS MIN(int)#6]
       'Join Inner, Some(('x.str = 'y.str))
        Subquery x
         Project [_1#0 AS int#2,_2#1 AS str#3]
          LocalRelation [_1#0,_2#1], [[1,1],[2,2],[3,3]]
        Subquery y
         Project [_1#0 AS int#2,_2#1 AS str#3]
          LocalRelation [_1#0,_2#1], [[1,1],[2,2],[3,3]]

      As you see, the expression ids in Aggregate are different to the expression ids in Subquery y. This is the first problem.

      1. The df2 can't be performed

      The showing logical plan of df2 can't be performed. Because the expression ids of Subquery y will be modified for self-join handling during analysis, the analyzed plan of df2 becomes:

      Aggregate [str#5], [str#5,MIN(int#4) AS MIN(int)#6]
       Join Inner, Some((str#3 = str#8))
        Subquery x
         Project [_1#0 AS int#2,_2#1 AS str#3]
          LocalRelation [_1#0,_2#1], [[1,1],[2,2],[3,3]]
        Subquery y
         Project [_1#0 AS int#7,_2#1 AS str#8]
          LocalRelation [_1#0,_2#1], [[1,1],[2,2],[3,3]]

      The expressions referred in Aggregate are not matching to these in Subquery y. This is the second problem.

      Proposed solution

      We try to add a PreAnalyzer. When a logical plan rawPlan is given to SQLContext, it uses PreAnalyzer to modify the logical plan before assigning to QueryExecution.logical. Then later operations will based on the pre-analyzed logical plan, instead of the original rawPlan.




            marmbrus Michael Armbrust
            viirya L. C. Hsieh
            0 Vote for this issue
            5 Start watching this issue