Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-38667

Optimizer generates error when using inner join along with sequence

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Resolved
    • 3.1.2
    • None
    • Optimizer
    • None

    Description

      This issue occurred in a more complex scenario, so I've broken it down into a simple case.

      Steps to reproduce: Execute the following example. The code should run without errors, but instead a java.lang.IllegalArgumentException: Illegal sequence boundaries: 4 to 2 by 1 is thrown.

      package com.example
      
      import org.apache.spark.sql.SparkSession
      import org.apache.spark.sql.functions._
      
      object SparkIssue {
          def main(args: Array[String]): Unit = {
      
              val spark = SparkSession
                  .builder()
                  .master("local[*]")
                  .getOrCreate()
      
              val dfA = spark
                  .createDataFrame(Seq((1, 1), (2, 4)))
                  .toDF("a1", "a2")
      
              val dfB = spark
                  .createDataFrame(Seq((1, 5), (2, 2)))
                  .toDF("b1", "b2")
      
              dfA.join(dfB, dfA("a1") === dfB("b1"), "inner")
                  .where(col("a2") < col("b2"))
                  .withColumn("x", explode(sequence(col("a2"), col("b2"), lit(1))))
                  .show()
      
              spark.stop()
      
          }
      }
      

      When I look at the Optimized Logical Plan I can see that the Inner Join and the Filter are brought together, with an additional check for an empty Sequence. The exception is thrown because the Sequence check is executed before the Filter.

      == Parsed Logical Plan ==
      'Project [a1#4, a2#5, b1#12, b2#13, explode(sequence('a2, 'b2, Some(1), None)) AS x#24]
      +- Filter (a2#5 < b2#13)
         +- Join Inner, (a1#4 = b1#12)
            :- Project [_1#0 AS a1#4, _2#1 AS a2#5]
            :  +- LocalRelation [_1#0, _2#1]
            +- Project [_1#8 AS b1#12, _2#9 AS b2#13]
               +- LocalRelation [_1#8, _2#9]
      
      == Analyzed Logical Plan ==
      a1: int, a2: int, b1: int, b2: int, x: int
      Project [a1#4, a2#5, b1#12, b2#13, x#25]
      +- Generate explode(sequence(a2#5, b2#13, Some(1), Some(Europe/Berlin))), false, [x#25]
         +- Filter (a2#5 < b2#13)
            +- Join Inner, (a1#4 = b1#12)
               :- Project [_1#0 AS a1#4, _2#1 AS a2#5]
               :  +- LocalRelation [_1#0, _2#1]
               +- Project [_1#8 AS b1#12, _2#9 AS b2#13]
                  +- LocalRelation [_1#8, _2#9]
      
      == Optimized Logical Plan ==
      Generate explode(sequence(a2#5, b2#13, Some(1), Some(Europe/Berlin))), false, [x#25]
      +- Join Inner, (((size(sequence(a2#5, b2#13, Some(1), Some(Europe/Berlin)), true) > 0) AND (a2#5 < b2#13)) AND (a1#4 = b1#12))
         :- LocalRelation [a1#4, a2#5]
         +- LocalRelation [b1#12, b2#13]
      
      == Physical Plan ==
      Generate explode(sequence(a2#5, b2#13, Some(1), Some(Europe/Berlin))), [a1#4, a2#5, b1#12, b2#13], false, [x#25]
      +- *(1) BroadcastHashJoin [a1#4], [b1#12], Inner, BuildRight, ((size(sequence(a2#5, b2#13, Some(1), Some(Europe/Berlin)), true) > 0) AND (a2#5 < b2#13)), false
         :- *(1) LocalTableScan [a1#4, a2#5]
         +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#15]
            +- LocalTableScan [b1#12, b2#13]
      

       

       

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            wetzer Lars
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: