Status: Resolved
Resolution: Incomplete
AWS EMR 5.13.0
Amazon Hadoop distribution 2.8.3
Spark 2.3.0
Pandas 0.22.0
I noticed that after applying Pandas UDF function, a self join of resulted DataFrame will fail to resolve columns. The workaround that I found is to recreate DataFrame with its RDD and schema.
Below you can find a Python code that reproduces the issue.
from pyspark import Row import pyspark.sql.functions as F @F.pandas_udf('key long, col string', F.PandasUDFType.GROUPED_MAP) def dummy_pandas_udf(df): return df[['key','col']] df = spark.createDataFrame([Row(key=1,col='A'), Row(key=1,col='B'), Row(key=2,col='C')]) # transformation that causes the issue df = df.groupBy('key').apply(dummy_pandas_udf) # WORKAROUND that fixes the issue # df = spark.createDataFrame(df.rdd, df.schema) df.alias('temp0').join(df.alias('temp1'), F.col('temp0.key') == F.col('temp1.key')).show()
If workaround line is commented out, then above code fails with the following error:
AnalysisExceptionTraceback (most recent call last) <ipython-input-88-8de763656d6d> in <module>() 12 # df = spark.createDataFrame(df.rdd, df.schema) 13 ---> 14 df.alias('temp0').join(df.alias('temp1'), F.col('temp0.key') == F.col('temp1.key')).show() /usr/lib/spark/python/pyspark/sql/ in join(self, other, on, how) 929 on = self._jseq([]) 930 assert isinstance(how, basestring), "how should be basestring" --> 931 jdf = self._jdf.join(other._jdf, on, how) 932 return DataFrame(jdf, self.sql_ctx) 933 /usr/lib/spark/python/lib/ in __call__(self, *args) 1158 answer = self.gateway_client.send_command(command) 1159 return_value = get_return_value( -> 1160 answer, self.gateway_client, self.target_id, 1161 1162 for temp_arg in temp_args: /usr/lib/spark/python/pyspark/sql/ in deco(*a, **kw) 67 e.java_exception.getStackTrace())) 68 if s.startswith('org.apache.spark.sql.AnalysisException: '): ---> 69 raise AnalysisException(s.split(': ', 1)[1], stackTrace) 70 if s.startswith('org.apache.spark.sql.catalyst.analysis'): 71 raise AnalysisException(s.split(': ', 1)[1], stackTrace) AnalysisException: u"cannot resolve '`temp0.key`' given input columns: [temp0.key, temp0.col];;\n'Join Inner, ('temp0.key = 'temp1.key)\n:- AnalysisBarrier\n: +- SubqueryAlias temp0\n: +- FlatMapGroupsInPandas [key#4099L], dummy_pandas_udf(col#4098, key#4099L), [key#4104L, col#4105]\n: +- Project [key#4099L, col#4098, key#4099L]\n: +- LogicalRDD [col#4098, key#4099L], false\n+- AnalysisBarrier\n +- SubqueryAlias temp1\n +- FlatMapGroupsInPandas [key#4099L], dummy_pandas_udf(col#4098, key#4099L), [key#4104L, col#4105]\n +- Project [key#4099L, col#4098, key#4099L]\n +- LogicalRDD [col#4098, key#4099L], false\n"
The same happens, if instead of DataFrame API I use Spark SQL to do a self join:
# df is a DataFrame after applying dummy_pandas_udf
FROM df temp0
LEFT JOIN df temp1 ON
temp0.key == temp1.key