Details
Description
We noticed that the codegen mechanism in SortMergeJoinExec caused job fails in some cases. The below simple example demonstrates this issue.
The query joins two relations with conditions containing a HiveUDF (i.e., base64) in OR predicates.
SELECT ca_zip FROM customer, customer_address WHERE customer.c_current_addr_sk = customer_address.ca_address_sk AND (base64(ca_zip) = '85669' OR customer.c_birth_month > 2)
Physical plan before execution
*Project ca_zip#27
+- *SortMergeJoin c_current_addr_sk#4, ca_address_sk#18, Inner, ((HiveSimpleUDF#Base64(ca_zip#27) = 85669) || (c_birth_month#12 > 2))
:- *Sort c_current_addr_sk#4 ASC NULLS FIRST, false, 0
: +- Exchange hashpartitioning(c_current_addr_sk#4, 200)
: +- *Filter isnotnull(c_current_addr_sk#4)
: +- HiveTableScan c_current_addr_sk#4, c_birth_month#12, MetastoreRelation test, customer
+- *Sort ca_address_sk#18 ASC NULLS FIRST, false, 0
+- Exchange hashpartitioning(ca_address_sk#18, 200)
+- *Filter isnotnull(ca_address_sk#18)
+- HiveTableScan ca_address_sk#18, ca_zip#27, MetastoreRelation test, customer_address
By default, the query will fail and throws the following exception:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times ............................................................................................... Caused by: java.lang.NegativeArraySizeException at org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:229) at org.apache.spark.unsafe.types.UTF8String.toString(UTF8String.java:821) at java.lang.String.valueOf(String.java:2994) at scala.collection.mutable.StringBuilder.append(StringBuilder.scala:200) at scala.collection.TraversableOnce$$anonfun$addString$1.apply(TraversableOnce.scala:359) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableOnce$class.addString(TraversableOnce.scala:357) at scala.collection.AbstractTraversable.addString(Traversable.scala:104) at scala.collection.TraversableOnce$class.mkString(TraversableOnce.scala:323) at scala.collection.AbstractTraversable.mkString(Traversable.scala:104) at scala.collection.TraversableLike$class.toString(TraversableLike.scala:600) at scala.collection.SeqLike$class.toString(SeqLike.scala:682) at scala.collection.AbstractSeq.toString(Seq.scala:41) at java.lang.String.valueOf(String.java:2994) at scala.collection.mutable.StringBuilder.append(StringBuilder.scala:200) at org.apache.spark.sql.hive.HiveSimpleUDF$$anonfun$eval$1.apply(hiveUDFs.scala:179) at org.apache.spark.sql.hive.HiveSimpleUDF$$anonfun$eval$1.apply(hiveUDFs.scala:179) at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54) at org.apache.spark.sql.hive.HiveSimpleUDF.logInfo(hiveUDFs.scala:130) at org.apache.spark.sql.hive.HiveSimpleUDF.eval(hiveUDFs.scala:179) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$2.hasNext(WholeStageCodegenExec.scala:396) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) ...................................................................
However, when we close the codegen (i.e., spark.sql.codegen.wholeStage=false, spark.sql.codegen=false), it works well.