Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
3.1.2
Description
Expectation
For a given row, Nondeterministic expressions are expected to have stable values.
import org.apache.spark.sql.functions._ val df = sparkContext.parallelize(1 to 5).toDF("x") val v1 = rand().*(lit(10000)).cast(IntegerType) df.select(v1, v1).collect
Returns a set like this:
8777 | 8777 |
1357 | 1357 |
3435 | 3435 |
9204 | 9204 |
3870 | 3870 |
where both columns always have the same value, but what that value is changes from row to row. This is different from the following:
df.select(rand(), rand()).collect
In this case, because the rand() calls are distinct, the values in both columns should be different.
Problem
This expectation does not appear to be stable in the event that any subsequent expression is a CodegenFallback. This program:
import org.apache.spark.sql._ import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ val sparkSession = SparkSession.builder().getOrCreate() val df = sparkSession.sparkContext.parallelize(1 to 5).toDF("x") val v1 = rand().*(lit(10000)).cast(IntegerType) val v2 = to_csv(struct(v1.as("a"))) // to_csv is CodegenFallback df.select(v1, v1, v2, v2).collect
produces output like this:
8159 | 8159 | 8159 | 2028 |
8320 | 8320 | 8320 | 1640 |
7937 | 7937 | 7937 | 769 |
436 | 436 | 436 | 8924 |
8924 | 8924 | 2827 | 2731 |
Not sure why the first call via the CodegenFallback path should be correct while subsequent calls aren't.
Workaround
If the Nondeterministic expression is moved to a separate, earlier select() call, so the CodegenFallback instead only refers to a column reference, then the problem seems to go away. But this workaround may not be reliable if optimization is ever able to restructure adjacent select()s.