Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-35371

Scala UDF returning string or complex type applied to array members returns wrong data

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Duplicate
    • 3.1.1
    • None
    • SQL
    • None

    Description

      When using an UDF returning string or complex type (Struct) on array members the resulting array consists of the last array member UDF result.

      Example code:

      import org.apache.spark.sql.{Column, SparkSession}
      import org.apache.spark.sql.functions.{callUDF, col, transform, udf}
      
      val sparkBuilder: SparkSession.Builder = SparkSession.builder()
        .master("local[*]")
        .appName(s"Udf Bug Demo")
        .config("spark.ui.enabled", "false")
        .config("spark.debug.maxToStringFields", 100)
      
      val spark: SparkSession = sparkBuilder
        .config("spark.driver.bindAddress", "127.0.0.1")
        .config("spark.driver.host", "127.0.0.1")
        .getOrCreate()
      
      import spark.implicits._
      
      case class Foo(num: Int, s: String)
      
      val src  = Seq(
        (1, 2, Array(1, 2, 3)),
        (2, 2, Array(2, 2, 2)),
        (3, 4, Array(3, 4, 3, 4))
      ).toDF("A", "B", "C")
      
      val udfStringName = "UdfString"
      val udfIntName = "UdfInt"
      val udfStructName = "UdfStruct"
      
      val udfString = udf((num: Int) => {
        (num + 1).toString
      })
      spark.udf.register(udfStringName, udfString)
      
      val udfInt = udf((num: Int) => {
        num + 1
      })
      spark.udf.register(udfIntName, udfInt)
      
      val udfStruct = udf((num: Int) => {
        Foo(num + 1, (num + 1).toString)
      })
      spark.udf.register(udfStructName, udfStruct)
      
      
      val lambdaString = (forCol: Column) => callUDF(udfStringName, forCol)
      val lambdaInt = (forCol: Column) => callUDF(udfIntName, forCol)
      val lambdaStruct = (forCol: Column) => callUDF(udfStructName, forCol)
      
      val cA = callUDF(udfStringName, col("A"))
      val cB = callUDF(udfStringName, col("B"))
      val cCString: Column = transform(col("C"), lambdaString)
      val cCInt: Column = transform(col("C"), lambdaInt)
      val cCStruc: Column = transform(col("C"), lambdaStruct)
      val dest = src.withColumn("AStr", cA)
        .withColumn("BStr", cB)
        .withColumn("CString (Wrong)", cCString)
        .withColumn("CInt (OK)", cCInt)
        .withColumn("CStruct (Wrong)", cCStruc)
      
      dest.show(false)
      dest.printSchema()
      

      Expected:

      +---+---+------------+----+----+---------------+------------+--------------------------------+
      |A  |B  |C           |AStr|BStr|CString        |CInt        |CStruct                      |
      +---+---+------------+----+----+---------------+------------+--------------------------------+
      |1  |2  |[1, 2, 3]   |2   |3   |[2, 3, 4]      |[2, 3, 4]   |[{2, 2}, {3, 3}, {4, 4}]        |
      |2  |2  |[2, 2, 2]   |3   |3   |[3, 3, 3]      |[3, 3, 3]   |[{3, 3}, {3, 3}, {3, 3}]        |
      |3  |4  |[3, 4, 3, 4]|4   |5   |[4, 5, 4, 5]   |[4, 5, 4, 5]|[{4, 4}, {5, 5}, {4, 4}, {5, 5}]|
      +---+---+------------+----+----+---------------+------------+--------------------------------+
      

      Got:

      +---+---+------------+----+----+---------------+------------+--------------------------------+
      |A  |B  |C           |AStr|BStr|CString (Wrong)|CInt (Ok)   |CStruct (Wrong)                 |
      +---+---+------------+----+----+---------------+------------+--------------------------------+
      |1  |2  |[1, 2, 3]   |2   |3   |[4, 4, 4]      |[2, 3, 4]   |[{4, 4}, {4, 4}, {4, 4}]        |
      |2  |2  |[2, 2, 2]   |3   |3   |[3, 3, 3]      |[3, 3, 3]   |[{3, 3}, {3, 3}, {3, 3}]        |
      |3  |4  |[3, 4, 3, 4]|4   |5   |[5, 5, 5, 5]   |[4, 5, 4, 5]|[{5, 5}, {5, 5}, {5, 5}, {5, 5}]|
      +---+---+------------+----+----+---------------+------------+--------------------------------+
      

      Observation

      • Work correctly on Spark 3.0.2
      • When UDF is registered as Java UDF, it works as supposed
      • The UDF is called the appropriate number of times (regardless if UDF is marked as deterministic or non-deterministic).
      • When debugged, the correct value is actually saved into the result array at first but every subsequent item processing overwrites the previous result values as well. Therefore the last item values filling the array is the final result.
      • When the UDF returns NULL/None it does not "overwrite” the prior array values nor is “overwritten” by subsequent non-NULL values. See with following UDF impelementation:
        val udfString = udf((num: Int) => {
          if (num == 3) {
            None
          } else {
            Some((num + 1).toString)
          }
        })
        

         

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              Benedeki David Benedeki
              Votes:
              9 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: