Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-23177

PySpark parameter-less UDFs raise exception if applied after distinct

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.1.2, 2.2.0, 2.2.1
    • Fix Version/s: 2.3.0
    • Component/s: PySpark
    • Labels:
      None

      Description

      It seems there is an issue with UDFs that take no arguments, but only if UDF is applied after distinct() operation.

      Here is the short example, that allows reproduce an issue in PySpark shell:

      import pyspark.sql.functions as f
      import uuid
      
      df = spark.createDataFrame([(1,2), (3,4)])
      f_udf = f.udf(lambda: str(uuid.uuid4()))
      df.distinct().withColumn("a", f_udf()).show()
      

      and it raises the following exception:

      Traceback (most recent call last):
        File "<stdin>", line 1, in <module>
        File "/opt/spark/python/pyspark/sql/dataframe.py", line 336, in show
          print(self._jdf.showString(n, 20))
        File "/opt/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
        File "/opt/spark/python/pyspark/sql/utils.py", line 63, in deco
          return f(*a, **kw)
        File "/opt/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
      py4j.protocol.Py4JJavaError: An error occurred while calling o54.showString.
      : org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: pythonUDF0#16
      	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
      	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:88)
      	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:87)
      	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
      	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
      	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
      	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
      	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
      	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
      	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
      	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
      	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
      	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
      	at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
      	at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87)
      	at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$33.apply(HashAggregateExec.scala:475)
      	at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$33.apply(HashAggregateExec.scala:474)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
      	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
      	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
      	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
      	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.generateResultCode(HashAggregateExec.scala:474)
      	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduceWithKeys(HashAggregateExec.scala:612)
      	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduce(HashAggregateExec.scala:148)
      	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
      	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:80)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
      	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:80)
      	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.produce(HashAggregateExec.scala:38)
      	at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:331)
      	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:372)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
      	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
      	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:228)
      	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:311)
      	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
      	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2861)
      	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2150)
      	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2150)
      	at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2842)
      	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
      	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2841)
      	at org.apache.spark.sql.Dataset.head(Dataset.scala:2150)
      	at org.apache.spark.sql.Dataset.take(Dataset.scala:2363)
      	at org.apache.spark.sql.Dataset.showString(Dataset.scala:241)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
      	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
      	at py4j.Gateway.invoke(Gateway.java:280)
      	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
      	at py4j.commands.CallCommand.execute(CallCommand.java:79)
      	at py4j.GatewayConnection.run(GatewayConnection.java:214)
      	at java.lang.Thread.run(Thread.java:748)
      Caused by: java.lang.RuntimeException: Couldn't find pythonUDF0#16 in [_1#0L,_2#1L]
      	at scala.sys.package$.error(package.scala:27)
      	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:94)
      	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:88)
      	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
      	... 63 more
      

      It is also worth to mention, that the same code without distinct does not cause an error.
      Furthermore, if the UDF takes at least one argument then an exception is not raised as well.

        Attachments

          Activity

            People

            • Assignee:
              viirya Liang-Chi Hsieh
              Reporter:
              Wasikowski Jakub Wasikowski
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: