Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-22641

Pyspark UDF relying on column added with withColumn after distinct

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.3.0
    • None
    • PySpark

    Description

      We seem to have found an issue with PySpark UDFs interacting with withColumn when the UDF depends on the column added in withColumn, but only if withColumn is performed after a distinct().

      Simplest repro in a local PySpark shell:

      import pyspark.sql.functions as F
      
      @F.udf
      def ident(x):
          return x
      
      spark.createDataFrame([{'a': '1'}]) \
          .distinct() \
          .withColumn('b', F.lit('qq')) \
          .withColumn('fails_here', ident('b')) \
          .collect()
      

      This fails with the following exception:

      : org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: pythonUDF0#13
              at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
              at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:91)
              at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:90)
              at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
              at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
              at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
              at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
              at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
              at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
              at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
              at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
              at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
              at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
              at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
              at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:90)
              at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$38.apply(HashAggregateExec.scala:514)
              at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$38.apply(HashAggregateExec.scala:513)
              at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
              at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
              at scala.collection.immutable.List.foreach(List.scala:381)
              at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
              at scala.collection.immutable.List.map(List.scala:285)
              at org.apache.spark.sql.execution.aggregate.HashAggregateExec.generateResultFunction(HashAggregateExec.scala:513)
              at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduceWithKeys(HashAggregateExec.scala:659)
              at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduce(HashAggregateExec.scala:164)
              at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
              at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:80)
              at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:141)
              at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
              at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:138)
              at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:80)
              at org.apache.spark.sql.execution.aggregate.HashAggregateExec.produce(HashAggregateExec.scala:38)
              at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:374)
              at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:422)
              at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
              at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:113)
              at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:141)
              at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
              at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:138)
              at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
              at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:233)
              at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:280)
              at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:3088)
              at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3085)
              at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3085)
              at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
              at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:3118)
              at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3085)
              at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
              at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
              at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
              at java.lang.reflect.Method.invoke(Method.java:498)
              at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
              at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
              at py4j.Gateway.invoke(Gateway.java:282)
              at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
              at py4j.commands.CallCommand.execute(CallCommand.java:79)
              at py4j.GatewayConnection.run(GatewayConnection.java:214)
              at java.lang.Thread.run(Thread.java:748)
      Caused by: java.lang.RuntimeException: Couldn't find pythonUDF0#13 in [a#0]
              at scala.sys.package$.error(package.scala:27)
              at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:97)
              at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:91)
              at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
              ... 58 more
      

      The odd part is that if you run the code, but remove the .distinct(), or place it after either of the .withColumn lines, we don't get the error.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              andreweduffy Andrew Duffy
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: