Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-25314

Invalid PythonUDF - requires attributes from more than one child - in "on" join condition

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.3.1
    • Fix Version/s: 2.4.0
    • Component/s: PySpark, SQL
    • Labels:
      None

      Description

      This is another variation of the SPARK-19728 which was tagged as resolved. So I base the example on it:

       

      from pyspark.sql.functions import udf
      from pyspark.sql.types import BooleanType
      df1 = sc.parallelize([(1, ), (2, )]).toDF(["col_a"])
      df2 = sc.parallelize([(2, ), (3, )]).toDF(["col_b"])
      pred = udf(lambda x, y: x == y, BooleanType())
      df1.join(df2, pred(df1.col_a, df2.col_b)).show()
      

       

      This throws:

      java.lang.RuntimeException: Invalid PythonUDF <lambda>(col_a#132L, col_b#135L), requires attributes from more than one child.
      at scala.sys.package$.error(package.scala:27)
      at org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract$2.apply(ExtractPythonUDFs.scala:182)
      at org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract$2.apply(ExtractPythonUDFs.scala:181)
      at scala.collection.immutable.Stream.foreach(Stream.scala:594)
      at org.apache.spark.sql.execution.python.ExtractPythonUDFs$.org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract(ExtractPythonUDFs.scala:181)
      at org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$apply$2.applyOrElse(ExtractPythonUDFs.scala:118)
      at org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$apply$2.applyOrElse(ExtractPythonUDFs.scala:114)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
      at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
      at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
      at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
      at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
      at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
      at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
      at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
      at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
      at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
      at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
      at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
      at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
      at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
      at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
      at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
      at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
      at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
      at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
      at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
      at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
      at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
      at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
      at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
      at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
      at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
      at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
      at org.apache.spark.sql.execution.python.ExtractPythonUDFs$.apply(ExtractPythonUDFs.scala:114)
      at org.apache.spark.sql.execution.python.ExtractPythonUDFs$.apply(ExtractPythonUDFs.scala:94)
      at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:87)
      at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:87)
      at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
      at scala.collection.immutable.List.foldLeft(List.scala:84)
      at org.apache.spark.sql.execution.QueryExecution.prepareForExecution(QueryExecution.scala:87)
      at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
      at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
      at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:100)
      at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:67)
      at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:91)
      at org.apache.spark.sql.Dataset.persist(Dataset.scala:2902)
      at org.apache.spark.sql.Dataset.cache(Dataset.scala:2912)
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:498)
      at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
      at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
      at py4j.Gateway.invoke(Gateway.java:282)
      at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
      at py4j.commands.CallCommand.execute(CallCommand.java:79)
      at py4j.GatewayConnection.run(GatewayConnection.java:238)
      at java.lang.Thread.run(Thread.java:748)

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                XuanYuan Yuanjian Li
                Reporter:
                bahchis Sergey Bahchissaraitsev
              • Votes:
                0 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: