Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-22042

ReorderJoinPredicates can break when child's partitioning is not decided

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.3.0
    • 2.3.0
    • SQL
    • None

    Description

      When `ReorderJoinPredicates` tries to get the `outputPartitioning` of its children, the children may not be properly constructed as the child-subtree has to still go through other planner rules.

      In this particular case, the child is `SortMergeJoinExec`. Since the required `Exchange` operators are not in place (because `EnsureRequirements` runs after `ReorderJoinPredicates`), the join's children would not have partitioning defined. This breaks while creation the `PartitioningCollection` here : https://github.com/apache/spark/blob/94439997d57875838a8283c543f9b44705d3a503/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala#L69

      Small repro:

      context.sql("SET spark.sql.autoBroadcastJoinThreshold=0")
      
      val df = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")
      df.write.format("parquet").saveAsTable("table1")
      df.write.format("parquet").saveAsTable("table2")
      df.write.format("parquet").bucketBy(8, "j", "k").saveAsTable("bucketed_table")
      
      sql("""
        SELECT *
        FROM (
          SELECT a.i, a.j, a.k
          FROM bucketed_table a
          JOIN table1 b
          ON a.i = b.i
        ) c
        JOIN table2
        ON c.i = table2.i
      """).explain
      

      This fails with :

      java.lang.IllegalArgumentException: requirement failed: PartitioningCollection requires all of its partitionings have the same numPartitions.
        at scala.Predef$.require(Predef.scala:224)
        at org.apache.spark.sql.catalyst.plans.physical.PartitioningCollection.<init>(partitioning.scala:324)
        at org.apache.spark.sql.execution.joins.SortMergeJoinExec.outputPartitioning(SortMergeJoinExec.scala:69)
        at org.apache.spark.sql.execution.ProjectExec.outputPartitioning(basicPhysicalOperators.scala:82)
        at org.apache.spark.sql.execution.joins.ReorderJoinPredicates$$anonfun$apply$1.applyOrElse(ReorderJoinPredicates.scala:91)
        at org.apache.spark.sql.execution.joins.ReorderJoinPredicates$$anonfun$apply$1.applyOrElse(ReorderJoinPredicates.scala:76)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
        at org.apache.spark.sql.execution.joins.ReorderJoinPredicates.apply(ReorderJoinPredicates.scala:76)
        at org.apache.spark.sql.execution.joins.ReorderJoinPredicates.apply(ReorderJoinPredicates.scala:34)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:100)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:100)
        at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
        at scala.collection.immutable.List.foldLeft(List.scala:84)
        at org.apache.spark.sql.execution.QueryExecution.prepareForExecution(QueryExecution.scala:100)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:90)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:90)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$simpleString$1.apply(QueryExecution.scala:201)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$simpleString$1.apply(QueryExecution.scala:201)
        at org.apache.spark.sql.execution.QueryExecution.stringOrError(QueryExecution.scala:114)
        at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:201)
        at org.apache.spark.sql.execution.command.ExplainCommand.run(commands.scala:147)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:78)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:75)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:91)
        at org.apache.spark.sql.Dataset.explain(Dataset.scala:464)
        at org.apache.spark.sql.Dataset.explain(Dataset.scala:477)
        ... 60 elided
      

      Attachments

        Activity

          People

            tejasp Tejas Patil
            tejasp Tejas Patil
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: