Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-34829

transform_values return identical values when it's used with udf that returns reference type

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.1.1, 3.2.0
    • 3.1.2, 3.2.0
    • SQL

    Description

      If return value of an udf that is passed to transform_values is an AnyRef, then the transformation returns identical new values for each map key (to be more precise, each newly obtained value overrides values for all previously processed keys).

      Consider following examples:

      case class Bar(i: Int)
      val square = udf((b: Bar) => b.i * b.i)
      val df = Seq(Map(1 -> Bar(1), 2 -> Bar(2), 3 -> Bar(3))).toDF("map")
      df.withColumn("map_square", transform_values(col("map"), (_, v) => square(v))).show(truncate = false)
      +------------------------------+------------------------+
      |map                           |map_square              |
      +------------------------------+------------------------+
      |{1 -> {1}, 2 -> {2}, 3 -> {3}}|{1 -> 1, 2 -> 4, 3 -> 9}|
      +------------------------------+------------------------+
      

      vs 

      case class Bar(i: Int)
      case class BarSquare(i: Int)
      val square = udf((b: Bar) => BarSquare(b.i * b.i))
      val df = Seq(Map(1 -> Bar(1), 2 -> Bar(2), 3 -> Bar(3))).toDF("map")
      df.withColumn("map_square", transform_values(col("map"), (_, v) => square(v))).show(truncate = false)
      +------------------------------+------------------------------+
      |map                           |map_square                    |
      +------------------------------+------------------------------+
      |{1 -> {1}, 2 -> {2}, 3 -> {3}}|{1 -> {9}, 2 -> {9}, 3 -> {9}}|
      +------------------------------+------------------------------+
      

      or even just this one

      case class Foo(s: String)
      val reverse = udf((f: Foo) => f.s.reverse)
      val df = Seq(Map(1 -> Foo("abc"), 2 -> Foo("klm"), 3 -> Foo("xyz"))).toDF("map")
      df.withColumn("map_reverse", transform_values(col("map"), (_, v) => reverse(v))).show(truncate = false)
      +------------------------------------+------------------------------+
      |map                                 |map_reverse                   |
      +------------------------------------+------------------------------+
      |{1 -> {abc}, 2 -> {klm}, 3 -> {xyz}}|{1 -> zyx, 2 -> zyx, 3 -> zyx}|
      +------------------------------------+------------------------------+
      

      After playing with org.apache.spark.sql.catalyst.expressions.TransformValues it looks like something wrong is happening while executing this line:

      resultValues.update(i, functionForEval.eval(inputRow))

      To be more precise , it's all about functionForEval.eval(inputRow) , because if you do something like this:

      println(s"RESULTS PRIOR TO EVALUATION - $resultValues")
      val resultValue = functionForEval.eval(inputRow)
      println(s"RESULT - $resultValue")
      println(s"RESULTS PRIOR TO UPDATE - $resultValues")
      resultValues.update(i, resultValue)
      println(s"RESULTS AFTER UPDATE - $resultValues")

      You'll see in the logs, something like:

      RESULTS PRIOR TO EVALUATION - [null,null,null] 
      RESULT - [0,1] 
      RESULTS PRIOR TO UPDATE - [null,null,null]
      RESULTS AFTER UPDATE - [[0,1],null,null]
      ------
      RESULTS PRIOR TO EVALUATION - [[0,1],null,null] 
      RESULT - [0,4]
      RESULTS PRIOR TO UPDATE - [[0,4],null,null] 
      RESULTS  AFTER UPDATE - [[0,4],[0,4],null]
      ------
      RESULTS PRIOR TO EVALUATION - [[0,4],[0,4],null] 
      RESULT - [0,9]
      RESULTS PRIOR TO UPDATE - [[0,9],[0,9],null]
      RESULTS  AFTER UPDATE - [[0,9],[0,9],[0,9]
      

       

      Attachments

        Issue Links

          Activity

            People

              petertoth Peter Toth
              ChernikovP Pavel Chernikov
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: