Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-12477

[SQL] Tungsten projection fails for null values in array fields

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.5.2, 1.6.0
    • Fix Version/s: 1.5.3, 1.6.1, 2.0.0
    • Component/s: SQL
    • Labels:
      None

      Description

      Accessing null elements in an array field fails when tungsten is enabled.
      It works in Spark 1.3.1, and in Spark > 1.5 with Tungsten disabled.

      Example:

      // Array of String
      case class AS( as: Seq[String] )
      val dfAS = sc.parallelize( Seq( AS ( Seq("a",null,"b") ) ) ).toDF
      dfAS.registerTempTable("T_AS")
      for (i <- 0 to 2) { println(i + " = " + sqlContext.sql(s"select as[$i] from T_AS").collect.mkString(","))}
      

      With Tungsten disabled:

      0 = [a]
      1 = [null]
      2 = [b]
      

      With Tungsten enabled:

      0 = [a]
      15/12/22 09:32:50 ERROR Executor: Exception in task 7.0 in stage 1.0 (TID 15)
      java.lang.NullPointerException
          at org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters$UTF8StringWriter.getSize(UnsafeRowWriters.java:90)
          at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
          at org.apache.spark.sql.execution.TungstenProject$$anonfun$3$$anonfun$apply$3.apply(basicOperators.scala:90)
          at org.apache.spark.sql.execution.TungstenProject$$anonfun$3$$anonfun$apply$3.apply(basicOperators.scala:88)
          at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
          at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
          at scala.collection.Iterator$class.foreach(Iterator.scala:727)
          at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
      

      ------------------------------
      More examples below.
      The following code works in Spark 1.3.1, and in Spark > 1.5 with Tungsten disabled:

      // Array of String
      case class AS( as: Seq[String] )
      val dfAS = sc.parallelize( Seq( AS ( Seq("a",null,"b") ) ) ).toDF
      dfAS.registerTempTable("T_AS")
      for (i <- 0 to 10) { println(i + " = " + sqlContext.sql(s"select as[$i] from T_AS").collect.mkString(","))}
      
      // Array of Int
      case class AI( ai: Seq[Option[Int]] )
      val dfAI = sc.parallelize( Seq( AI ( Seq(Some(1),None,Some(2) ) ) ) ).toDF
      dfAI.registerTempTable("T_AI")
      for (i <- 0 to 10) { println(i + " = " + sqlContext.sql(s"select ai[$i] from T_AI").collect.mkString(","))}
      
      // Array of struct[Int,String]
      case class B(x: Option[Int], y: String)
      case class A( b: Seq[B] )
      val df1 = sc.parallelize( Seq( A ( Seq( B(Some(1),"a"),B(Some(2),"b"), B(None, "c"), B(Some(4),null), B(None,null), null ) ) ) ).toDF
      df1.registerTempTable("T1")
      val df2 = sc.parallelize( Seq( A ( Seq( B(Some(1),"a"),B(Some(2),"b"), B(None, "c"), B(Some(4),null), B(None,null), null ) ), A(null) ) ).toDF
      df2.registerTempTable("T2")
      for (i <- 0 to 10) { println(i + " = " + sqlContext.sql(s"select b[$i].x, b[$i].y from T1").collect.mkString(","))}
      for (i <- 0 to 10) { println(i + " = " + sqlContext.sql(s"select b[$i].x, b[$i].y from T2").collect.mkString(","))}
      
      // Struct[Int,String]
      case class C(b: B)
      val df3 = sc.parallelize( Seq( C ( B(Some(1),"test") ), C(null) ) ).toDF
      df3.registerTempTable("T3")
      sqlContext.sql("select b.x, b.y from T3").collect
      

      With Tungsten enabled, it reaches NullPointerException.

        Attachments

          Activity

            People

            • Assignee:
              pborck Pierre Borckmans
              Reporter:
              pborck Pierre Borckmans
            • Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: