Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Duplicate
-
3.1.1
-
None
-
None
Description
When using an UDF returning string or complex type (Struct) on array members the resulting array consists of the last array member UDF result.
Example code:
import org.apache.spark.sql.{Column, SparkSession} import org.apache.spark.sql.functions.{callUDF, col, transform, udf} val sparkBuilder: SparkSession.Builder = SparkSession.builder() .master("local[*]") .appName(s"Udf Bug Demo") .config("spark.ui.enabled", "false") .config("spark.debug.maxToStringFields", 100) val spark: SparkSession = sparkBuilder .config("spark.driver.bindAddress", "127.0.0.1") .config("spark.driver.host", "127.0.0.1") .getOrCreate() import spark.implicits._ case class Foo(num: Int, s: String) val src = Seq( (1, 2, Array(1, 2, 3)), (2, 2, Array(2, 2, 2)), (3, 4, Array(3, 4, 3, 4)) ).toDF("A", "B", "C") val udfStringName = "UdfString" val udfIntName = "UdfInt" val udfStructName = "UdfStruct" val udfString = udf((num: Int) => { (num + 1).toString }) spark.udf.register(udfStringName, udfString) val udfInt = udf((num: Int) => { num + 1 }) spark.udf.register(udfIntName, udfInt) val udfStruct = udf((num: Int) => { Foo(num + 1, (num + 1).toString) }) spark.udf.register(udfStructName, udfStruct) val lambdaString = (forCol: Column) => callUDF(udfStringName, forCol) val lambdaInt = (forCol: Column) => callUDF(udfIntName, forCol) val lambdaStruct = (forCol: Column) => callUDF(udfStructName, forCol) val cA = callUDF(udfStringName, col("A")) val cB = callUDF(udfStringName, col("B")) val cCString: Column = transform(col("C"), lambdaString) val cCInt: Column = transform(col("C"), lambdaInt) val cCStruc: Column = transform(col("C"), lambdaStruct) val dest = src.withColumn("AStr", cA) .withColumn("BStr", cB) .withColumn("CString (Wrong)", cCString) .withColumn("CInt (OK)", cCInt) .withColumn("CStruct (Wrong)", cCStruc) dest.show(false) dest.printSchema()
Expected:
+---+---+------------+----+----+---------------+------------+--------------------------------+ |A |B |C |AStr|BStr|CString |CInt |CStruct | +---+---+------------+----+----+---------------+------------+--------------------------------+ |1 |2 |[1, 2, 3] |2 |3 |[2, 3, 4] |[2, 3, 4] |[{2, 2}, {3, 3}, {4, 4}] | |2 |2 |[2, 2, 2] |3 |3 |[3, 3, 3] |[3, 3, 3] |[{3, 3}, {3, 3}, {3, 3}] | |3 |4 |[3, 4, 3, 4]|4 |5 |[4, 5, 4, 5] |[4, 5, 4, 5]|[{4, 4}, {5, 5}, {4, 4}, {5, 5}]| +---+---+------------+----+----+---------------+------------+--------------------------------+
Got:
+---+---+------------+----+----+---------------+------------+--------------------------------+ |A |B |C |AStr|BStr|CString (Wrong)|CInt (Ok) |CStruct (Wrong) | +---+---+------------+----+----+---------------+------------+--------------------------------+ |1 |2 |[1, 2, 3] |2 |3 |[4, 4, 4] |[2, 3, 4] |[{4, 4}, {4, 4}, {4, 4}] | |2 |2 |[2, 2, 2] |3 |3 |[3, 3, 3] |[3, 3, 3] |[{3, 3}, {3, 3}, {3, 3}] | |3 |4 |[3, 4, 3, 4]|4 |5 |[5, 5, 5, 5] |[4, 5, 4, 5]|[{5, 5}, {5, 5}, {5, 5}, {5, 5}]| +---+---+------------+----+----+---------------+------------+--------------------------------+
Observation
- Work correctly on Spark 3.0.2
- When UDF is registered as Java UDF, it works as supposed
- The UDF is called the appropriate number of times (regardless if UDF is marked as deterministic or non-deterministic).
- When debugged, the correct value is actually saved into the result array at first but every subsequent item processing overwrites the previous result values as well. Therefore the last item values filling the array is the final result.
- When the UDF returns NULL/None it does not "overwrite” the prior array values nor is “overwritten” by subsequent non-NULL values. See with following UDF impelementation:
val udfString = udf((num: Int) => { if (num == 3) { None } else { Some((num + 1).toString) } })
Attachments
Issue Links
- duplicates
-
SPARK-34829 transform_values return identical values when it's used with udf that returns reference type
- Resolved