Details
Description
Smells like another optimizer bug that's similar to SPARK-17100 and SPARK-18254. I'm seeing this on 2.0.2 and on master at commit fb07bbe575aabe68422fd3a31865101fb7fa1722.
I don't have a minimal repro for this yet, but the error I'm seeing is:
py4j.protocol.Py4JJavaError: An error occurred while calling o247.count. : java.lang.RuntimeException: Invalid PythonUDF <...>(...), requires attributes from more than one child. at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract$2.apply(ExtractPythonUDFs.scala:150) at org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract$2.apply(ExtractPythonUDFs.scala:149) at scala.collection.immutable.Stream.foreach(Stream.scala:594) at org.apache.spark.sql.execution.python.ExtractPythonUDFs$.org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract(ExtractPythonUDFs.scala:149) at org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$apply$2.applyOrElse(ExtractPythonUDFs.scala:114) at org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$apply$2.applyOrElse(ExtractPythonUDFs.scala:113) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$2.apply(TreeNode.scala:312) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$2.apply(TreeNode.scala:312) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:311) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:328) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:326) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:305) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:328) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:326) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:305) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:328) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:326) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:305) at org.apache.spark.sql.execution.python.ExtractPythonUDFs$.apply(ExtractPythonUDFs.scala:113) at org.apache.spark.sql.execution.python.ExtractPythonUDFs$.apply(ExtractPythonUDFs.scala:93) at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:93) at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:93) at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124) at scala.collection.immutable.List.foldLeft(List.scala:84) at org.apache.spark.sql.execution.QueryExecution.prepareForExecution(QueryExecution.scala:93) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:83) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:83) at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2555) at org.apache.spark.sql.Dataset.count(Dataset.scala:2226) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:745)
The extended plan (cleaned of field names) is as follows:
== Parsed Logical Plan == 'Filter NOT ('expected_prediction = 'prediction) +- Project [p1, p2, pair_features, rawPrediction, probability, prediction, cast((p1._testing_universal_key = p2._testing_universal_key) as float) AS expected_prediction] +- Project [p1, p2, pair_features, rawPrediction, probability, UDF(rawPrediction) AS prediction] +- Project [p1, p2, pair_features, rawPrediction, UDF(rawPrediction) AS probability] +- Project [p1, p2, pair_features, UDF(pair_features) AS rawPrediction] +- Project [p1, p2, <lambda>(p1.person, p2.person) AS pair_features] +- Project [struct(...) AS p1, struct(...) AS p2] +- Project [_blocking_key, ..., ...] +- Join Inner, (_blocking_key = _blocking_key) :- SubqueryAlias p1 : +- Project [..., <lambda>(dataset_name, primary_key, person) AS _blocking_key] : +- Project [...] : +- Project [primary_key, universal_key, _testing_universal_key, struct(...) AS person] : +- Project [...] : +- Project [_testing_universal_key, primary_key, struct(...) AS person] : +- LogicalRDD [...] +- SubqueryAlias p2 +- Project [..., <lambda>(dataset_name, primary_key, person) AS _blocking_key] +- Project [...] +- Project [primary_key, universal_key, _testing_universal_key, struct(...) AS person] +- Project [...] +- Project [_testing_universal_key, primary_key, struct(...) AS person] +- LogicalRDD [...] == Analyzed Logical Plan == p1: struct<...>, p2: struct<...>, pair_features: vector, rawPrediction: vector, probability: vector, prediction: double, expected_prediction: float Filter NOT (cast(expected_prediction as double) = prediction) +- Project [p1, p2, pair_features, rawPrediction, probability, prediction, cast((p1._testing_universal_key = p2._testing_universal_key) as float) AS expected_prediction] +- Project [p1, p2, pair_features, rawPrediction, probability, UDF(rawPrediction) AS prediction] +- Project [p1, p2, pair_features, rawPrediction, UDF(rawPrediction) AS probability] +- Project [p1, p2, pair_features, UDF(pair_features) AS rawPrediction] +- Project [p1, p2, <lambda>(p1.person, p2.person) AS pair_features] +- Project [struct(...) AS p1, struct(...) AS p2] +- Project [_blocking_key, ..., ...] +- Join Inner, (_blocking_key = _blocking_key) :- SubqueryAlias p1 : +- Project [..., <lambda>(dataset_name, primary_key, person) AS _blocking_key] : +- Project [...] : +- Project [primary_key, universal_key, _testing_universal_key, struct(...) AS person] : +- Project [...] : +- Project [_testing_universal_key, primary_key, struct(...) AS person] : +- LogicalRDD [...] +- SubqueryAlias p2 +- Project [..., <lambda>(dataset_name, primary_key, person) AS _blocking_key] +- Project [...] +- Project [primary_key, universal_key, _testing_universal_key, struct(...) AS person] +- Project [...] +- Project [_testing_universal_key, primary_key, struct(...) AS person] +- LogicalRDD [...] == Optimized Logical Plan == Project [struct(...) AS p1, struct(...) AS p2, <lambda>(struct(...).person, struct(...).person) AS pair_features, UDF(<lambda>(struct(...).person, struct(...).person)) AS rawPrediction, UDF(UDF(<lambda>(struct(...).person, struct(...).person))) AS probability, UDF(UDF(<lambda>(struct(...).person, struct(...).person))) AS prediction, cast((struct(...)._testing_universal_key = struct(...)._testing_universal_key) as float) AS expected_prediction] +- Join Inner, (NOT (cast(cast((struct(...)._testing_universal_key = struct(...)._testing_universal_key) as float) as double) = UDF(UDF(<lambda>(struct(...).person, struct(...).person)))) && (_blocking_key = _blocking_key)) :- Project [..., <lambda>(dataset_name, primary_key, person) AS _blocking_key] : +- Filter isnotnull(<lambda>(dataset_name, primary_key, person)) : +- InMemoryRelation [...], true, 10000, StorageLevel(memory, 1 replicas) : : +- *Project [primary_key, struct(...) AS person, test_people AS dataset_name] : : +- Scan ExistingRDD[...] +- Project [..., <lambda>(dataset_name, primary_key, person) AS _blocking_key] +- Filter isnotnull(<lambda>(dataset_name, primary_key, person)) +- InMemoryRelation [...], true, 10000, StorageLevel(memory, 1 replicas) : +- *Project [primary_key, struct(...) AS person, test_people AS dataset_name] : +- Scan ExistingRDD[...] == Physical Plan == java.lang.RuntimeException: Invalid PythonUDF <lambda>(struct(...).person, struct(...).person), requires attributes from more than one child.
Note the error at the end when Spark tries to print the physical plan. I've scrubbed some Project fields from the plan to simplify the display, but if I've scrubbed anything you think is important let me know.
I can get around this problem by adding a persist() right before the operation that fails. The failing operation is a filter.
Any clues on how I can boil this down to a minimal repro? Any clues about where the problem is?
Attachments
Issue Links
- is duplicated by
-
SPARK-19728 PythonUDF with multiple parents shouldn't be pushed down when used as a predicate
- Closed
- relates to
-
SPARK-17100 pyspark filter on a udf column after join gives java.lang.UnsupportedOperationException
- Resolved
-
SPARK-18254 UDFs don't see aliased column names
- Resolved
- links to