Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-24208

Cannot resolve column in self join after applying Pandas UDF

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Minor
    • Resolution: Incomplete
    • Affects Version/s: 2.3.0
    • Fix Version/s: None
    • Component/s: PySpark
    • Labels:
    • Environment:

      AWS EMR 5.13.0
      Amazon Hadoop distribution 2.8.3
      Spark 2.3.0
      Pandas 0.22.0

      Description

      I noticed that after applying Pandas UDF function, a self join of resulted DataFrame will fail to resolve columns. The workaround that I found is to recreate DataFrame with its RDD and schema.

      Below you can find a Python code that reproduces the issue.

      from pyspark import Row
      import pyspark.sql.functions as F
      
      @F.pandas_udf('key long, col string', F.PandasUDFType.GROUPED_MAP)
      def dummy_pandas_udf(df):
          return df[['key','col']]
      
      df = spark.createDataFrame([Row(key=1,col='A'), Row(key=1,col='B'), Row(key=2,col='C')])
      
      # transformation that causes the issue
      df = df.groupBy('key').apply(dummy_pandas_udf)
      
      # WORKAROUND that fixes the issue
      # df = spark.createDataFrame(df.rdd, df.schema)
      
      df.alias('temp0').join(df.alias('temp1'), F.col('temp0.key') == F.col('temp1.key')).show()
      

      If workaround line is commented out, then above code fails with the following error:

      AnalysisExceptionTraceback (most recent call last)
      <ipython-input-88-8de763656d6d> in <module>()
           12 # df = spark.createDataFrame(df.rdd, df.schema)
           13 
      ---> 14 df.alias('temp0').join(df.alias('temp1'), F.col('temp0.key') == F.col('temp1.key')).show()
      
      /usr/lib/spark/python/pyspark/sql/dataframe.py in join(self, other, on, how)
          929                 on = self._jseq([])
          930             assert isinstance(how, basestring), "how should be basestring"
      --> 931             jdf = self._jdf.join(other._jdf, on, how)
          932         return DataFrame(jdf, self.sql_ctx)
          933 
      
      /usr/lib/spark/python/lib/py4j-src.zip/py4j/java_gateway.py in __call__(self, *args)
         1158         answer = self.gateway_client.send_command(command)
         1159         return_value = get_return_value(
      -> 1160             answer, self.gateway_client, self.target_id, self.name)
         1161 
         1162         for temp_arg in temp_args:
      
      /usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
           67                                              e.java_exception.getStackTrace()))
           68             if s.startswith('org.apache.spark.sql.AnalysisException: '):
      ---> 69                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
           70             if s.startswith('org.apache.spark.sql.catalyst.analysis'):
           71                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
      
      AnalysisException: u"cannot resolve '`temp0.key`' given input columns: [temp0.key, temp0.col];;\n'Join Inner, ('temp0.key = 'temp1.key)\n:- AnalysisBarrier\n:     +- SubqueryAlias temp0\n:        +- FlatMapGroupsInPandas [key#4099L], dummy_pandas_udf(col#4098, key#4099L), [key#4104L, col#4105]\n:           +- Project [key#4099L, col#4098, key#4099L]\n:              +- LogicalRDD [col#4098, key#4099L], false\n+- AnalysisBarrier\n      +- SubqueryAlias temp1\n         +- FlatMapGroupsInPandas [key#4099L], dummy_pandas_udf(col#4098, key#4099L), [key#4104L, col#4105]\n            +- Project [key#4099L, col#4098, key#4099L]\n               +- LogicalRDD [col#4098, key#4099L], false\n"
      

      The same happens, if instead of DataFrame API I use Spark SQL to do a self join:

      # df is a DataFrame after applying dummy_pandas_udf
      df.createOrReplaceTempView('df')
      spark.sql('''
          SELECT 
              *
          FROM df temp0
          LEFT JOIN df temp1 ON
              temp0.key == temp1.key
      ''').show()
      

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              rgan Rafal Ganczarek

              Dates

              • Created:
                Updated:
                Resolved:

                Issue deployment