Description
Consider this code in Scala:
case class DemoSubRow(a: Int, b: Array[Int]) case class DemoRow(elems: Array[DemoSubRow]) dataFrame.withColumn( "goodElems", filter(elems, x -> exists(x.getField("b"), y -> x.getField("a") == y)
One could expect that is would be equivalent to
SELECT *, filter(elems, x -> exists(x.b, y -> x.a == y)) AS goodElems FROM dataFrame
However, it's not. If you look into org.apache.spark.sql.functions object, you'll see that private method:
private def createLambda(f: Column => Column) = { val x = UnresolvedNamedLambdaVariable(Seq("x")) val function = f(Column(x)).expr LambdaFunction(function, Seq(x)) }
If you look closely you'll see that column that is passed into the lambda is always unresolved variable x. Because of that, column from Scala sample above is seen as:
… filter(elems, x -> exists(x.b, x -> x.a == x)) AS goodElems … ^^^^^^^^^^^^^
and is obviously wrong. In given example, it will produce AnalysisException, however it can also silently execute wrong code (i.e., imagine there actually is dataframe column x or something).
My current workaround is a reflection hack to call functions.withExpr, but it's a really bad one.
What should probably be done is instead of hard-coded name x there should be a generated unique variable name, or even some proper locally bound resolved variables (because at the moment of lambda creation this variable can be considered already resolved), however there are concerns about how that name would be displayed to an end user if there is an analysis error. Sorry, but at the moment of reporting this issue I have no ideas how to solve that.