Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-44739

Conflicting attribute during join two times the same table (AQE is disabled)

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.3.0
    • None
    • PySpark
    • None

    Description

      Issue

      I come across a something that seems to be bug in pyspark (when I disable adaptive queries). It is about joining two times the same dataframe (please look at reproduction steps below). 


      Reproduction steps

      pyspark --conf spark.sql.adaptive.enabled=false
      Python 3.8.10 (default, Nov 14 2022, 12:59:47) 
      [GCC 9.4.0] on linux
      Type "help", "copyright", "credits" or "license" for more information.
      23/08/09 10:18:54 WARN Utils: Your hostname, kondziolka-dd-laptop resolves to a loopback address: 127.0.1.1; using 192.168.0.18 instead (on interface wlp0s20f3)
      23/08/09 10:18:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
      Setting default log level to "WARN".
      To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
      23/08/09 10:18:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
      Welcome to
            ____              __
           / __/__  ___ _____/ /__
          _\ \/ _ \/ _ `/ __/  '_/
         /__ / .__/\_,_/_/ /_/\_\   version 3.3.0
            /_/Using Python version 3.8.10 (default, Nov 14 2022 12:59:47)
      Spark context Web UI available at http://192.168.0.18:4040
      Spark context available as 'sc' (master = local[*], app id = local-1691569137130).
      SparkSession available as 'spark'.
      
      >>> sc.setCheckpointDir("file:///tmp")
      >>> df1=spark.createDataFrame([(1, 42)], ["id", "fval"])
      >>> df2=spark.createDataFrame([(1, 0, "jeden")], ["id", "target", "aux"]) 
      >>> df2.explain()
      == Physical Plan ==
      *(1) Scan ExistingRDD[id#4L,target#5L,aux#6]
      >>> j1=df1.join(df2, ["id"]).select("fval", "aux").checkpoint()
      >>> j1.explain()
      == Physical Plan ==
      *(1) Scan ExistingRDD[fval#1L,aux#6]
      >>> # we see that both j1 and df2 refers to the same attribute aux#6
      >>> # let's join df2 to j1. Both of them has aux column.
      >>> j1.join(df2, "aux")                                                      
      Traceback (most recent call last):
        File "<stdin>", line 1, in <module>
        File "/home/kondziolkadd/.local/lib/python3.8/site-packages/pyspark/sql/dataframe.py", line 1539, in join
          jdf = self._jdf.join(other._jdf, on, how)
        File "/home/kondziolkadd/.local/lib/python3.8/site-packages/pyspark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
        File "/home/kondziolkadd/.local/lib/python3.8/site-packages/pyspark/sql/utils.py", line 196, in deco
          raise converted from None
      pyspark.sql.utils.AnalysisException: 
      Failure when resolving conflicting references in Join:
      'Join Inner
      :- LogicalRDD [fval#1L, aux#6], false
      +- LogicalRDD [id#4L, target#5L, aux#6], false
      
      Conflicting attributes: aux#6
      ;
      'Join Inner
      :- LogicalRDD [fval#1L, aux#6], false
      +- LogicalRDD [id#4L, target#5L, aux#6], false
      

       


      Workaround

      The workaround is about renaming columns twice times - I mean identity rename `X -> X' -> X`. It looks like it forces rewrite of metadata (change attribute id) and in this way it avoids conflict.

      >>> sc.setCheckpointDir("file:///tmp")
      >>> df1=spark.createDataFrame([(1, 42)], ["id", "fval"])
      >>> df2=spark.createDataFrame([(1, 0, "jeden")], ["id", "target", "aux"])
      >>> df2.explain()
      == Physical Plan ==
      *(1) Scan ExistingRDD[id#4L,target#5L,aux#6]
      >>> j1=df1.join(df2, ["id"]).select("fval", "aux").withColumnRenamed("aux", "_aux").withColumnRenamed("_aux", "aux").checkpoint()
      >>> j1.explain()                                                                
      == Physical Plan ==
      *(1) Scan ExistingRDD[fval#1L,aux#19]
      >>> j1.join(df2, "aux")
      >>>
      

      Others

      • Repartition before checkpoint is workaround as well (it does not change id of attribute)
      >>> j1=df1.join(df2, ["id"]).select("fval", "aux").repartition(100).checkpoint() 
      >>> j1.join(df2, "aux") 
      • Without `checkpoint` issue does not occur (although id is the same)
      >>> j1=df1.join(df2, ["id"]).select("fval", "aux")
      >>> j1.join(df2, "aux") 
      • Without disabling `AQE` it does not occur
      • I was not able to reproduce it on spark -  by saying that I mean that I reproduced it only in `pyspark`.

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            kondziolka9ld kondziolka9ld
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: