Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-34829

transform_values return identical values when it's used with udf that returns reference type

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 3.1.1, 3.2.0
    • Fix Version/s: 3.1.2, 3.2.0
    • Component/s: SQL
    • Labels:

      Description

      If return value of an udf that is passed to transform_values is an AnyRef, then the transformation returns identical new values for each map key (to be more precise, each newly obtained value overrides values for all previously processed keys).

      Consider following examples:

      case class Bar(i: Int)
      val square = udf((b: Bar) => b.i * b.i)
      val df = Seq(Map(1 -> Bar(1), 2 -> Bar(2), 3 -> Bar(3))).toDF("map")
      df.withColumn("map_square", transform_values(col("map"), (_, v) => square(v))).show(truncate = false)
      +------------------------------+------------------------+
      |map                           |map_square              |
      +------------------------------+------------------------+
      |{1 -> {1}, 2 -> {2}, 3 -> {3}}|{1 -> 1, 2 -> 4, 3 -> 9}|
      +------------------------------+------------------------+
      

      vs 

      case class Bar(i: Int)
      case class BarSquare(i: Int)
      val square = udf((b: Bar) => BarSquare(b.i * b.i))
      val df = Seq(Map(1 -> Bar(1), 2 -> Bar(2), 3 -> Bar(3))).toDF("map")
      df.withColumn("map_square", transform_values(col("map"), (_, v) => square(v))).show(truncate = false)
      +------------------------------+------------------------------+
      |map                           |map_square                    |
      +------------------------------+------------------------------+
      |{1 -> {1}, 2 -> {2}, 3 -> {3}}|{1 -> {9}, 2 -> {9}, 3 -> {9}}|
      +------------------------------+------------------------------+
      

      or even just this one

      case class Foo(s: String)
      val reverse = udf((f: Foo) => f.s.reverse)
      val df = Seq(Map(1 -> Foo("abc"), 2 -> Foo("klm"), 3 -> Foo("xyz"))).toDF("map")
      df.withColumn("map_reverse", transform_values(col("map"), (_, v) => reverse(v))).show(truncate = false)
      +------------------------------------+------------------------------+
      |map                                 |map_reverse                   |
      +------------------------------------+------------------------------+
      |{1 -> {abc}, 2 -> {klm}, 3 -> {xyz}}|{1 -> zyx, 2 -> zyx, 3 -> zyx}|
      +------------------------------------+------------------------------+
      

      After playing with org.apache.spark.sql.catalyst.expressions.TransformValues it looks like something wrong is happening while executing this line:

      resultValues.update(i, functionForEval.eval(inputRow))

      To be more precise , it's all about functionForEval.eval(inputRow) , because if you do something like this:

      println(s"RESULTS PRIOR TO EVALUATION - $resultValues")
      val resultValue = functionForEval.eval(inputRow)
      println(s"RESULT - $resultValue")
      println(s"RESULTS PRIOR TO UPDATE - $resultValues")
      resultValues.update(i, resultValue)
      println(s"RESULTS AFTER UPDATE - $resultValues")

      You'll see in the logs, something like:

      RESULTS PRIOR TO EVALUATION - [null,null,null] 
      RESULT - [0,1] 
      RESULTS PRIOR TO UPDATE - [null,null,null]
      RESULTS AFTER UPDATE - [[0,1],null,null]
      ------
      RESULTS PRIOR TO EVALUATION - [[0,1],null,null] 
      RESULT - [0,4]
      RESULTS PRIOR TO UPDATE - [[0,4],null,null] 
      RESULTS  AFTER UPDATE - [[0,4],[0,4],null]
      ------
      RESULTS PRIOR TO EVALUATION - [[0,4],[0,4],null] 
      RESULT - [0,9]
      RESULTS PRIOR TO UPDATE - [[0,9],[0,9],null]
      RESULTS  AFTER UPDATE - [[0,9],[0,9],[0,9]
      

       

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                petertoth Peter Toth
                Reporter:
                ChernikovP Pavel Chernikov
              • Votes:
                0 Vote for this issue
                Watchers:
                8 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: