We noticed that the codegen mechanism in SortMergeJoinExec caused job fails in some cases. The below simple example demonstrates this issue.
The query joins two relations with conditions containing a HiveUDF (i.e., base64) in OR predicates.
SELECT ca_zip FROM customer, customer_address WHERE customer.c_current_addr_sk = customer_address.ca_address_sk AND (base64(ca_zip) = '85669' OR customer.c_birth_month > 2)
Physical plan before execution
*Project ca_zip#27
+- *SortMergeJoin c_current_addr_sk#4, ca_address_sk#18, Inner, ((HiveSimpleUDF#Base64(ca_zip#27) = 85669) || (c_birth_month#12 > 2))
:- *Sort c_current_addr_sk#4 ASC NULLS FIRST, false, 0
: +- Exchange hashpartitioning(c_current_addr_sk#4, 200)
: +- *Filter isnotnull(c_current_addr_sk#4)
: +- HiveTableScan c_current_addr_sk#4, c_birth_month#12, MetastoreRelation test, customer
+- *Sort ca_address_sk#18 ASC NULLS FIRST, false, 0
+- Exchange hashpartitioning(ca_address_sk#18, 200)
+- *Filter isnotnull(ca_address_sk#18)
+- HiveTableScan ca_address_sk#18, ca_zip#27, MetastoreRelation test, customer_address
By default, the query will fail and throws the following exception:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times ............................................................................................... Caused by: java.lang.NegativeArraySizeException at org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:229) at org.apache.spark.unsafe.types.UTF8String.toString(UTF8String.java:821) at java.lang.String.valueOf(String.java:2994) at scala.collection.mutable.StringBuilder.append(StringBuilder.scala:200) at scala.collection.TraversableOnce$$anonfun$addString$1.apply(TraversableOnce.scala:359) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableOnce$class.addString(TraversableOnce.scala:357) at scala.collection.AbstractTraversable.addString(Traversable.scala:104) at scala.collection.TraversableOnce$class.mkString(TraversableOnce.scala:323) at scala.collection.AbstractTraversable.mkString(Traversable.scala:104) at scala.collection.TraversableLike$class.toString(TraversableLike.scala:600) at scala.collection.SeqLike$class.toString(SeqLike.scala:682) at scala.collection.AbstractSeq.toString(Seq.scala:41) at java.lang.String.valueOf(String.java:2994) at scala.collection.mutable.StringBuilder.append(StringBuilder.scala:200) at org.apache.spark.sql.hive.HiveSimpleUDF$$anonfun$eval$1.apply(hiveUDFs.scala:179) at org.apache.spark.sql.hive.HiveSimpleUDF$$anonfun$eval$1.apply(hiveUDFs.scala:179) at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54) at org.apache.spark.sql.hive.HiveSimpleUDF.logInfo(hiveUDFs.scala:130) at org.apache.spark.sql.hive.HiveSimpleUDF.eval(hiveUDFs.scala:179) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$2.hasNext(WholeStageCodegenExec.scala:396) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) ...................................................................
However, when we close the codegen (i.e., spark.sql.codegen.wholeStage=false, spark.sql.codegen=false), it works well.