Description
Issue
I come across a something that seems to be bug in pyspark (when I disable adaptive queries). It is about joining two times the same dataframe (please look at reproduction steps below).
Reproduction steps
pyspark --conf spark.sql.adaptive.enabled=false Python 3.8.10 (default, Nov 14 2022, 12:59:47) [GCC 9.4.0] on linux Type "help", "copyright", "credits" or "license" for more information. 23/08/09 10:18:54 WARN Utils: Your hostname, kondziolka-dd-laptop resolves to a loopback address: 127.0.1.1; using 192.168.0.18 instead (on interface wlp0s20f3) 23/08/09 10:18:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 23/08/09 10:18:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 3.3.0 /_/Using Python version 3.8.10 (default, Nov 14 2022 12:59:47) Spark context Web UI available at http://192.168.0.18:4040 Spark context available as 'sc' (master = local[*], app id = local-1691569137130). SparkSession available as 'spark'. >>> sc.setCheckpointDir("file:///tmp") >>> df1=spark.createDataFrame([(1, 42)], ["id", "fval"]) >>> df2=spark.createDataFrame([(1, 0, "jeden")], ["id", "target", "aux"]) >>> df2.explain() == Physical Plan == *(1) Scan ExistingRDD[id#4L,target#5L,aux#6] >>> j1=df1.join(df2, ["id"]).select("fval", "aux").checkpoint() >>> j1.explain() == Physical Plan == *(1) Scan ExistingRDD[fval#1L,aux#6] >>> # we see that both j1 and df2 refers to the same attribute aux#6 >>> # let's join df2 to j1. Both of them has aux column. >>> j1.join(df2, "aux") Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/home/kondziolkadd/.local/lib/python3.8/site-packages/pyspark/sql/dataframe.py", line 1539, in join jdf = self._jdf.join(other._jdf, on, how) File "/home/kondziolkadd/.local/lib/python3.8/site-packages/pyspark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__ File "/home/kondziolkadd/.local/lib/python3.8/site-packages/pyspark/sql/utils.py", line 196, in deco raise converted from None pyspark.sql.utils.AnalysisException: Failure when resolving conflicting references in Join: 'Join Inner :- LogicalRDD [fval#1L, aux#6], false +- LogicalRDD [id#4L, target#5L, aux#6], false Conflicting attributes: aux#6 ; 'Join Inner :- LogicalRDD [fval#1L, aux#6], false +- LogicalRDD [id#4L, target#5L, aux#6], false
Workaround
The workaround is about renaming columns twice times - I mean identity rename `X -> X' -> X`. It looks like it forces rewrite of metadata (change attribute id) and in this way it avoids conflict.
>>> sc.setCheckpointDir("file:///tmp") >>> df1=spark.createDataFrame([(1, 42)], ["id", "fval"]) >>> df2=spark.createDataFrame([(1, 0, "jeden")], ["id", "target", "aux"]) >>> df2.explain() == Physical Plan == *(1) Scan ExistingRDD[id#4L,target#5L,aux#6] >>> j1=df1.join(df2, ["id"]).select("fval", "aux").withColumnRenamed("aux", "_aux").withColumnRenamed("_aux", "aux").checkpoint() >>> j1.explain() == Physical Plan == *(1) Scan ExistingRDD[fval#1L,aux#19] >>> j1.join(df2, "aux") >>>
Others
- Repartition before checkpoint is workaround as well (it does not change id of attribute)
>>> j1=df1.join(df2, ["id"]).select("fval", "aux").repartition(100).checkpoint() >>> j1.join(df2, "aux")
- Without `checkpoint` issue does not occur (although id is the same)
>>> j1=df1.join(df2, ["id"]).select("fval", "aux") >>> j1.join(df2, "aux")
- Without disabling `AQE` it does not occur
- I was not able to reproduce it on spark - by saying that I mean that I reproduced it only in `pyspark`.