Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-37784

CodeGenerator.addBufferedState() does not properly handle UDTs



    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.0.0
    • 3.1.3, 3.0.4, 3.2.1, 3.3.0
    • SQL


      The CodeGenerator.addBufferedState() method does not properly handle UDTs: it pattern-matches on a data type to determine whether copy() or clone() operations need to be performed but the current pattern match does not handle UDTs and instead falls through to the default case which causes values to be stored without copying. This is problematic if the UDT's underlying data type requires copying (i.e. the UDT is internally represented using an array, struct, map, or sting type).

      This issue impacts queries which use sort-merge join where UDTs appear as part of join keys.

      I discovered this while investigating a query which failed with segfaults. I managed to shrink my original query down to the following reproduction (which uses Spark's built-in Vector UDT):

      import org.apache.spark.ml.linalg.Vectors
      val df = spark.createDataFrame(
          (Vectors.dense(1.0), Vectors.dense(1.0)),
          (Vectors.dense(1.0), Vectors.dense(2.0))
        )).toDF("key", "value")
      sql("set spark.sql.adaptive.enabled = false")
      sql("set spark.sql.autoBroadcastJoinThreshold = -1")
      sql("set spark.sql.shuffle.partitions = 1")df.join(df, "key").show()
      df.join(df, "key").explain("codegen")
      df.join(df, "key").show() 

      When run with off-heap memory enabled, this failed with a segfault at the stack

      Stack: [0x00007f518a7b5000,0x00007f518abb6000],  sp=0x00007f518abb32e0,  free space=4088k
      Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native code)
      J 12956 C1 org.apache.spark.unsafe.Platform.getLong(Ljava/lang/Object;J)J (9 bytes) @ 0x00007f5e5ec2809e [0x00007f5e5ec28060+0x3e]
      j  org.apache.spark.unsafe.array.ByteArrayMethods.arrayEquals(Ljava/lang/Object;JLjava/lang/Object;JJ)Z+135
      j  org.apache.spark.sql.catalyst.expressions.UnsafeRow.equals(Ljava/lang/Object;)Z+44
      j  org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.smj_compareStruct_0(Lorg/apache/spark/sql/catalyst/InternalRow;Lorg/apache/spark/sql/catalyst/InternalRow;)I+16
      j  org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.smj_findNextInnerJoinRows_0$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIteratorForCodegenStage1;Lscala/collection/Iterator;Lscala/collection/Iterator;)Z+107
      j  org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext()V+393
      j  org.apache.spark.sql.execution.BufferedRowIterator.hasNext()Z+11
      j  org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext()Z+4

      Please note that this particular reproduction does not fail in all environments (I can't reproduce it on my laptop, for example, or on certain EC2 instance types).

      Here is an annotated excerpt from the generated code which shows the source of the problem:

      /* 223 */   private boolean smj_findNextJoinRows_0(
      /* 224 */     scala.collection.Iterator streamedIter,
      /* 225 */     scala.collection.Iterator bufferedIter) {
      /* 226 */     smj_streamedRow_0 = null;
      /* 227 */     int comp = 0;
      /* 228 */     while (smj_streamedRow_0 == null) {
      /* 229 */       if (!streamedIter.hasNext()) return false;
      /* 230 */       smj_streamedRow_0 = (InternalRow) streamedIter.next();
      /* 231 */       boolean smj_isNull_0 = smj_streamedRow_0.isNullAt(0);
      // smj_value_0 is a value retrieved from a streamed row:
      //                              |
      //                              V
      /* 232 */       InternalRow smj_value_0 = smj_isNull_0 ?
      /* 233 */       null : (smj_streamedRow_0.getStruct(0, 4));
      // This value is stored in smj_mutableStateArray_0[0] without
      // copying (even though it's a struct, not an atomic type):
      /* 265 */           smj_mutableStateArray_0[0] = smj_value_1;
      /* 266 */         }

      I believe the fix for this bug is fairly simple: we just need to modify CodeGenerator.addBufferedState() so that it uses UDTs' underlying sqlType when determining whether value copying is needed. 

      I've labeled this as a correctness issue because a "missing copying" bug can theoretically lead to wrong query results, not just crashes, although I haven't been able to contrive a test case demonstrating a wrong result due to this bug.




            joshrosen Josh Rosen
            joshrosen Josh Rosen
            0 Vote for this issue
            2 Start watching this issue