Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-18589

persist() resolves "java.lang.RuntimeException: Invalid PythonUDF <lambda>(...), requires attributes from more than one child"

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: 2.0.2, 2.1.0
    • Fix Version/s: 2.1.1, 2.2.0
    • Component/s: PySpark, SQL
    • Labels:
      None
    • Environment:

      Python 3.5, Java 8

      Description

      Smells like another optimizer bug that's similar to SPARK-17100 and SPARK-18254. I'm seeing this on 2.0.2 and on master at commit fb07bbe575aabe68422fd3a31865101fb7fa1722.

      I don't have a minimal repro for this yet, but the error I'm seeing is:

      py4j.protocol.Py4JJavaError: An error occurred while calling o247.count.
      : java.lang.RuntimeException: Invalid PythonUDF <...>(...), requires attributes from more than one child.
          at scala.sys.package$.error(package.scala:27)
          at org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract$2.apply(ExtractPythonUDFs.scala:150)
          at org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract$2.apply(ExtractPythonUDFs.scala:149)
          at scala.collection.immutable.Stream.foreach(Stream.scala:594)
          at org.apache.spark.sql.execution.python.ExtractPythonUDFs$.org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract(ExtractPythonUDFs.scala:149)
          at org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$apply$2.applyOrElse(ExtractPythonUDFs.scala:114)
          at org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$apply$2.applyOrElse(ExtractPythonUDFs.scala:113)
          at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$2.apply(TreeNode.scala:312)
          at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$2.apply(TreeNode.scala:312)
          at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
          at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:311)
          at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
          at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
          at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:328)
          at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
          at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:326)
          at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:305)
          at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
          at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
          at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:328)
          at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
          at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:326)
          at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:305)
          at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
          at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
          at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:328)
          at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
          at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:326)
          at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:305)
          at org.apache.spark.sql.execution.python.ExtractPythonUDFs$.apply(ExtractPythonUDFs.scala:113)
          at org.apache.spark.sql.execution.python.ExtractPythonUDFs$.apply(ExtractPythonUDFs.scala:93)
          at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:93)
          at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:93)
          at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
          at scala.collection.immutable.List.foldLeft(List.scala:84)
          at org.apache.spark.sql.execution.QueryExecution.prepareForExecution(QueryExecution.scala:93)
          at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:83)
          at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:83)
          at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2555)
          at org.apache.spark.sql.Dataset.count(Dataset.scala:2226)
          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.lang.reflect.Method.invoke(Method.java:498)
          at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
          at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
          at py4j.Gateway.invoke(Gateway.java:280)
          at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
          at py4j.commands.CallCommand.execute(CallCommand.java:79)
          at py4j.GatewayConnection.run(GatewayConnection.java:214)
          at java.lang.Thread.run(Thread.java:745)
      

      The extended plan (cleaned of field names) is as follows:

      == Parsed Logical Plan ==
      'Filter NOT ('expected_prediction = 'prediction)
      +- Project [p1, p2, pair_features, rawPrediction, probability, prediction, cast((p1._testing_universal_key = p2._testing_universal_key) as float) AS expected_prediction]
         +- Project [p1, p2, pair_features, rawPrediction, probability, UDF(rawPrediction) AS prediction]
            +- Project [p1, p2, pair_features, rawPrediction, UDF(rawPrediction) AS probability]
               +- Project [p1, p2, pair_features, UDF(pair_features) AS rawPrediction]
                  +- Project [p1, p2, <lambda>(p1.person, p2.person) AS pair_features]
                     +- Project [struct(...) AS p1, struct(...) AS p2]
                        +- Project [_blocking_key, ..., ...]
                           +- Join Inner, (_blocking_key = _blocking_key)
                              :- SubqueryAlias p1
                              :  +- Project [..., <lambda>(dataset_name, primary_key, person) AS _blocking_key]
                              :     +- Project [...]
                              :        +- Project [primary_key, universal_key, _testing_universal_key, struct(...) AS person]
                              :           +- Project [...]
                              :              +- Project [_testing_universal_key, primary_key, struct(...) AS person]
                              :                 +- LogicalRDD [...]
                              +- SubqueryAlias p2
                                 +- Project [..., <lambda>(dataset_name, primary_key, person) AS _blocking_key]
                                    +- Project [...]
                                       +- Project [primary_key, universal_key, _testing_universal_key, struct(...) AS person]
                                          +- Project [...]
                                             +- Project [_testing_universal_key, primary_key, struct(...) AS person]
                                                +- LogicalRDD [...]
      
      == Analyzed Logical Plan ==
      p1: struct<...>, p2: struct<...>, pair_features: vector, rawPrediction: vector, probability: vector, prediction: double, expected_prediction: float
      Filter NOT (cast(expected_prediction as double) = prediction)
      +- Project [p1, p2, pair_features, rawPrediction, probability, prediction, cast((p1._testing_universal_key = p2._testing_universal_key) as float) AS expected_prediction]
         +- Project [p1, p2, pair_features, rawPrediction, probability, UDF(rawPrediction) AS prediction]
            +- Project [p1, p2, pair_features, rawPrediction, UDF(rawPrediction) AS probability]
               +- Project [p1, p2, pair_features, UDF(pair_features) AS rawPrediction]
                  +- Project [p1, p2, <lambda>(p1.person, p2.person) AS pair_features]
                     +- Project [struct(...) AS p1, struct(...) AS p2]
                        +- Project [_blocking_key, ..., ...]
                           +- Join Inner, (_blocking_key = _blocking_key)
                              :- SubqueryAlias p1
                              :  +- Project [..., <lambda>(dataset_name, primary_key, person) AS _blocking_key]
                              :     +- Project [...]
                              :        +- Project [primary_key, universal_key, _testing_universal_key, struct(...) AS person]
                              :           +- Project [...]
                              :              +- Project [_testing_universal_key, primary_key, struct(...) AS person]
                              :                 +- LogicalRDD [...]
                              +- SubqueryAlias p2
                                 +- Project [..., <lambda>(dataset_name, primary_key, person) AS _blocking_key]
                                    +- Project [...]
                                       +- Project [primary_key, universal_key, _testing_universal_key, struct(...) AS person]
                                          +- Project [...]
                                             +- Project [_testing_universal_key, primary_key, struct(...) AS person]
                                                +- LogicalRDD [...]
      
      == Optimized Logical Plan ==
      Project [struct(...) AS p1, struct(...) AS p2, <lambda>(struct(...).person, struct(...).person) AS pair_features, UDF(<lambda>(struct(...).person, struct(...).person)) AS rawPrediction, UDF(UDF(<lambda>(struct(...).person, struct(...).person))) AS probability, UDF(UDF(<lambda>(struct(...).person, struct(...).person))) AS prediction, cast((struct(...)._testing_universal_key = struct(...)._testing_universal_key) as float) AS expected_prediction]
      +- Join Inner, (NOT (cast(cast((struct(...)._testing_universal_key = struct(...)._testing_universal_key) as float) as double) = UDF(UDF(<lambda>(struct(...).person, struct(...).person)))) && (_blocking_key = _blocking_key))
         :- Project [..., <lambda>(dataset_name, primary_key, person) AS _blocking_key]
         :  +- Filter isnotnull(<lambda>(dataset_name, primary_key, person))
         :     +- InMemoryRelation [...], true, 10000, StorageLevel(memory, 1 replicas)
         :        :  +- *Project [primary_key, struct(...) AS person, test_people AS dataset_name]
         :        :     +- Scan ExistingRDD[...]
         +- Project [..., <lambda>(dataset_name, primary_key, person) AS _blocking_key]
            +- Filter isnotnull(<lambda>(dataset_name, primary_key, person))
               +- InMemoryRelation [...], true, 10000, StorageLevel(memory, 1 replicas)
                  :  +- *Project [primary_key, struct(...) AS person, test_people AS dataset_name]
                  :     +- Scan ExistingRDD[...]
      
      == Physical Plan ==
      java.lang.RuntimeException: Invalid PythonUDF <lambda>(struct(...).person, struct(...).person), requires attributes from more than one child.
      

      Note the error at the end when Spark tries to print the physical plan. I've scrubbed some Project fields from the plan to simplify the display, but if I've scrubbed anything you think is important let me know.

      I can get around this problem by adding a persist() right before the operation that fails. The failing operation is a filter.

      Any clues on how I can boil this down to a minimal repro? Any clues about where the problem is?

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                davies Davies Liu
                Reporter:
                nchammas Nicholas Chammas
              • Votes:
                1 Vote for this issue
                Watchers:
                5 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: