Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
3.1.1, 3.2.0
Description
If return value of an udf that is passed to transform_values is an AnyRef, then the transformation returns identical new values for each map key (to be more precise, each newly obtained value overrides values for all previously processed keys).
Consider following examples:
case class Bar(i: Int) val square = udf((b: Bar) => b.i * b.i) val df = Seq(Map(1 -> Bar(1), 2 -> Bar(2), 3 -> Bar(3))).toDF("map") df.withColumn("map_square", transform_values(col("map"), (_, v) => square(v))).show(truncate = false) +------------------------------+------------------------+ |map |map_square | +------------------------------+------------------------+ |{1 -> {1}, 2 -> {2}, 3 -> {3}}|{1 -> 1, 2 -> 4, 3 -> 9}| +------------------------------+------------------------+
vs
case class Bar(i: Int) case class BarSquare(i: Int) val square = udf((b: Bar) => BarSquare(b.i * b.i)) val df = Seq(Map(1 -> Bar(1), 2 -> Bar(2), 3 -> Bar(3))).toDF("map") df.withColumn("map_square", transform_values(col("map"), (_, v) => square(v))).show(truncate = false) +------------------------------+------------------------------+ |map |map_square | +------------------------------+------------------------------+ |{1 -> {1}, 2 -> {2}, 3 -> {3}}|{1 -> {9}, 2 -> {9}, 3 -> {9}}| +------------------------------+------------------------------+
or even just this one
case class Foo(s: String) val reverse = udf((f: Foo) => f.s.reverse) val df = Seq(Map(1 -> Foo("abc"), 2 -> Foo("klm"), 3 -> Foo("xyz"))).toDF("map") df.withColumn("map_reverse", transform_values(col("map"), (_, v) => reverse(v))).show(truncate = false) +------------------------------------+------------------------------+ |map |map_reverse | +------------------------------------+------------------------------+ |{1 -> {abc}, 2 -> {klm}, 3 -> {xyz}}|{1 -> zyx, 2 -> zyx, 3 -> zyx}| +------------------------------------+------------------------------+
After playing with org.apache.spark.sql.catalyst.expressions.TransformValues it looks like something wrong is happening while executing this line:
resultValues.update(i, functionForEval.eval(inputRow))
To be more precise , it's all about functionForEval.eval(inputRow) , because if you do something like this:
println(s"RESULTS PRIOR TO EVALUATION - $resultValues") val resultValue = functionForEval.eval(inputRow) println(s"RESULT - $resultValue") println(s"RESULTS PRIOR TO UPDATE - $resultValues") resultValues.update(i, resultValue) println(s"RESULTS AFTER UPDATE - $resultValues")
You'll see in the logs, something like:
RESULTS PRIOR TO EVALUATION - [null,null,null] RESULT - [0,1] RESULTS PRIOR TO UPDATE - [null,null,null] RESULTS AFTER UPDATE - [[0,1],null,null] ------ RESULTS PRIOR TO EVALUATION - [[0,1],null,null] RESULT - [0,4] RESULTS PRIOR TO UPDATE - [[0,4],null,null] RESULTS AFTER UPDATE - [[0,4],[0,4],null] ------ RESULTS PRIOR TO EVALUATION - [[0,4],[0,4],null] RESULT - [0,9] RESULTS PRIOR TO UPDATE - [[0,9],[0,9],null] RESULTS AFTER UPDATE - [[0,9],[0,9],[0,9]
Attachments
Issue Links
- is duplicated by
-
SPARK-34830 Some UDF calls inside transform are broken
- Resolved
-
SPARK-35371 Scala UDF returning string or complex type applied to array members returns wrong data
- Resolved
- relates to
-
SPARK-32154 Use ExpressionEncoder for the return type of ScalaUDF to convert to catalyst type
- Resolved
- links to