Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-21441

Incorrect Codegen in SortMergeJoinExec results failures in some cases

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.1.0, 2.1.1, 2.2.0
    • Fix Version/s: 2.1.2, 2.2.1, 2.3.0
    • Component/s: SQL
    • Labels:
      None

      Description

      We noticed that the codegen mechanism in SortMergeJoinExec caused job fails in some cases. The below simple example demonstrates this issue.

      The query joins two relations with conditions containing a HiveUDF (i.e., base64) in OR predicates.

      SELECT ca_zip
      FROM customer, customer_address
      WHERE customer.c_current_addr_sk = customer_address.ca_address_sk
      AND (base64(ca_zip) = '85669' OR customer.c_birth_month > 2)
      

      Physical plan before execution

      *Project ca_zip#27
      +- *SortMergeJoin c_current_addr_sk#4, ca_address_sk#18, Inner, ((HiveSimpleUDF#Base64(ca_zip#27) = 85669) || (c_birth_month#12 > 2))
      :- *Sort c_current_addr_sk#4 ASC NULLS FIRST, false, 0
      : +- Exchange hashpartitioning(c_current_addr_sk#4, 200)
      : +- *Filter isnotnull(c_current_addr_sk#4)
      : +- HiveTableScan c_current_addr_sk#4, c_birth_month#12, MetastoreRelation test, customer
      +- *Sort ca_address_sk#18 ASC NULLS FIRST, false, 0
      +- Exchange hashpartitioning(ca_address_sk#18, 200)
      +- *Filter isnotnull(ca_address_sk#18)
      +- HiveTableScan ca_address_sk#18, ca_zip#27, MetastoreRelation test, customer_address

      By default, the query will fail and throws the following exception:

      org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times
      ...............................................................................................
      Caused by: java.lang.NegativeArraySizeException
      	at org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:229)
      	at org.apache.spark.unsafe.types.UTF8String.toString(UTF8String.java:821)
      	at java.lang.String.valueOf(String.java:2994)
      	at scala.collection.mutable.StringBuilder.append(StringBuilder.scala:200)
      	at scala.collection.TraversableOnce$$anonfun$addString$1.apply(TraversableOnce.scala:359)
      	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
      	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
      	at scala.collection.TraversableOnce$class.addString(TraversableOnce.scala:357)
      	at scala.collection.AbstractTraversable.addString(Traversable.scala:104)
      	at scala.collection.TraversableOnce$class.mkString(TraversableOnce.scala:323)
      	at scala.collection.AbstractTraversable.mkString(Traversable.scala:104)
      	at scala.collection.TraversableLike$class.toString(TraversableLike.scala:600)
      	at scala.collection.SeqLike$class.toString(SeqLike.scala:682)
      	at scala.collection.AbstractSeq.toString(Seq.scala:41)
      	at java.lang.String.valueOf(String.java:2994)
      	at scala.collection.mutable.StringBuilder.append(StringBuilder.scala:200)
      	at org.apache.spark.sql.hive.HiveSimpleUDF$$anonfun$eval$1.apply(hiveUDFs.scala:179)
      	at org.apache.spark.sql.hive.HiveSimpleUDF$$anonfun$eval$1.apply(hiveUDFs.scala:179)
      	at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
      	at org.apache.spark.sql.hive.HiveSimpleUDF.logInfo(hiveUDFs.scala:130)
      	at org.apache.spark.sql.hive.HiveSimpleUDF.eval(hiveUDFs.scala:179)
      	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
      	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
      	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$2.hasNext(WholeStageCodegenExec.scala:396)
      	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
      	...................................................................
      

      However, when we close the codegen (i.e., spark.sql.codegen.wholeStage=false, spark.sql.codegen=false), it works well.

        Attachments

          Activity

            People

            • Assignee:
              donnyzone Feng Zhu
              Reporter:
              donnyzone Feng Zhu
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: