Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-42937

Join with subquery in condition can fail with wholestage codegen and adaptive execution disabled

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.3.2, 3.4.0
    • 3.3.3, 3.4.1
    • SQL
    • None

    Description

      The below left outer join gets an error:

      create or replace temp view v1 as
      select * from values
      (1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1),
      (2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2),
      (3, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1)
      as v1(key, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10);
      
      create or replace temp view v2 as
      select * from values
      (1, 2),
      (3, 8),
      (7, 9)
      as v2(a, b);
      
      create or replace temp view v3 as
      select * from values
      (3),
      (8)
      as v3(col1);
      
      set spark.sql.codegen.maxFields=10; -- let's make maxFields 10 instead of 100
      set spark.sql.adaptive.enabled=false;
      
      select *
      from v1
      left outer join v2
      on key = a
      and key in (select col1 from v3);
      

      The join fails during predicate codegen:

      23/03/27 12:24:12 WARN Predicate: Expr codegen error and falling back to interpreter mode
      java.lang.IllegalArgumentException: requirement failed: input[0, int, false] IN subquery#34 has not finished
      	at scala.Predef$.require(Predef.scala:281)
      	at org.apache.spark.sql.execution.InSubqueryExec.prepareResult(subquery.scala:144)
      	at org.apache.spark.sql.execution.InSubqueryExec.doGenCode(subquery.scala:156)
      	at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:201)
      	at scala.Option.getOrElse(Option.scala:189)
      	at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:196)
      	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.$anonfun$generateExpressions$2(CodeGenerator.scala:1278)
      	at scala.collection.immutable.List.map(List.scala:293)
      	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.generateExpressions(CodeGenerator.scala:1278)
      	at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.create(GeneratePredicate.scala:41)
      	at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.generate(GeneratePredicate.scala:33)
      	at org.apache.spark.sql.catalyst.expressions.Predicate$.createCodeGeneratedObject(predicates.scala:73)
      	at org.apache.spark.sql.catalyst.expressions.Predicate$.createCodeGeneratedObject(predicates.scala:70)
      	at org.apache.spark.sql.catalyst.expressions.CodeGeneratorWithInterpretedFallback.createObject(CodeGeneratorWithInterpretedFallback.scala:51)
      	at org.apache.spark.sql.catalyst.expressions.Predicate$.create(predicates.scala:86)
      	at org.apache.spark.sql.execution.joins.HashJoin.boundCondition(HashJoin.scala:146)
      	at org.apache.spark.sql.execution.joins.HashJoin.boundCondition$(HashJoin.scala:140)
      	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.boundCondition$lzycompute(BroadcastHashJoinExec.scala:40)
      	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.boundCondition(BroadcastHashJoinExec.scala:40)
      

      It fails again after fallback to interpreter mode:

      23/03/27 12:24:12 ERROR Executor: Exception in task 2.0 in stage 2.0 (TID 7)
      java.lang.IllegalArgumentException: requirement failed: input[0, int, false] IN subquery#34 has not finished
      	at scala.Predef$.require(Predef.scala:281)
      	at org.apache.spark.sql.execution.InSubqueryExec.prepareResult(subquery.scala:144)
      	at org.apache.spark.sql.execution.InSubqueryExec.eval(subquery.scala:151)
      	at org.apache.spark.sql.catalyst.expressions.InterpretedPredicate.eval(predicates.scala:52)
      	at org.apache.spark.sql.execution.joins.HashJoin.$anonfun$boundCondition$2(HashJoin.scala:146)
      	at org.apache.spark.sql.execution.joins.HashJoin.$anonfun$boundCondition$2$adapted(HashJoin.scala:146)
      	at org.apache.spark.sql.execution.joins.HashJoin.$anonfun$outerJoin$1(HashJoin.scala:205)
      

      Both the predicate codegen and the evaluation fail for the same reason: PlanSubqueries creates InSubqueryExec with shouldBroadcast=false. The driver waits for the subquery to finish, but it's the executor that uses the results of the subquery (for predicate codegen or evaluation). Because shouldBroadcast is set to false, the result is stored in a transient field (InSubqueryExec#result), so the result of the subquery is not serialized when the InSubqueryExec instance is sent to the executor.

      When wholestage codegen is enabled, the predicate codegen happens on the driver, so the subquery's result is available. When adaptive execution is enabled, PlanAdaptiveSubqueries always sets shouldBroadcast=true, so the subquery's result is available on the executor, if needed.

      Attachments

        Activity

          People

            bersprockets Bruce Robbins
            bersprockets Bruce Robbins
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: