Description
It looks like that the `f.udf(UDF0, DataType)` variant of the UDF Column-creating methods is wrong (https://github.com/apache/spark/blob/c3e32bf06c35ba2580d46150923abfa795b4446a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L4061):
def udf(f: UDF0[_], returnType: DataType): UserDefinedFunction = { val func = f.asInstanceOf[UDF0[Any]].call() SparkUserDefinedFunction.create(() => func, returnType, inputSchemas = Seq.fill(0)(None)) }
Here the UDF passed as the first argument will be called right inside the `udf` method on the driver, rather than at the dataframe computation time on executors. One of the major issues here is that non-deterministic UDFs (e.g. generating a random value) will produce unexpected results:
val scalaudf = f.udf { () => scala.util.Random.nextInt() }.asNondeterministic() val javaudf = f.udf(new UDF0[Int] { override def call(): Int = scala.util.Random.nextInt() }, IntegerType).asNondeterministic() (1 to 100).toDF().select(scalaudf().as("scala"), javaudf().as("java")).show() // prints +-----------+---------+ | scala| java| +-----------+---------+ | 934190385|478543809| |-1082102515|478543809| | 774466710|478543809| | 1883582103|478543809| |-1959743031|478543809| | 1534685218|478543809| | 1158899264|478543809| |-1572590653|478543809| | -309451364|478543809| | -906574467|478543809| | -436584308|478543809| | 1598340674|478543809| |-1331343156|478543809| |-1804177830|478543809| |-1682906106|478543809| | -197444289|478543809| | 260603049|478543809| |-1993515667|478543809| |-1304685845|478543809| | 481017016|478543809| +-----------+---------
Note that the version which relies on a different overload of the `functions.udf` method works correctly.