Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-32635

When pyspark.sql.functions.lit() function is used with dataframe cache, it returns wrong result

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • 2.1.3, 2.2.3, 2.3.4, 2.4.7, 3.0.0
    • 2.4.8, 3.0.2, 3.1.0
    • SQL

    Description

      When pyspark.sql.functions.lit() function is used with dataframe cache, it returns wrong result

      eg:lit() function with cache() function.
      -----------------------------------

      from pyspark.sql import Row
      from pyspark.sql import functions as F
      
      df_1 = spark.createDataFrame(Row(**x) for x in [{'col1': 'b'}]).withColumn("col2", F.lit(str(2)))
      df_2 = spark.createDataFrame(Row(**x) for x in [{'col1': 'a', 'col3': 8}]).withColumn("col2", F.lit(str(1)))
      df_3 = spark.createDataFrame(Row(**x) for x in [{'col1': 'b', 'col3': 9}]).withColumn("col2", F.lit(str(2)))
      df_23 = df_2.union(df_3)
      df_4 = spark.createDataFrame(Row(**x) for x in [{'col3': 9}]).withColumn("col2", F.lit(str(2)))
      
      sel_col3 = df_23.select('col3', 'col2')
      df_4 = df_4.join(sel_col3, on=['col3', 'col2'], how = "inner")
      df_23_a = df_23.join(df_1, on=["col1", 'col2'], how="inner").cache() 
      finaldf = df_23_a.join(df_4, on=['col2', 'col3'], how='left').filter(F.col('col3') == 9)
      finaldf.show()
      finaldf.select('col2').show() #Wrong result
      

       

      Output
      -----------

      >>> finaldf.show()
      +----+----+----+
      |col2|col3|col1|
      +----+----+----+
      | 2| 9| b|
      +----+----+----+
      >>> finaldf.select('col2').show() #Wrong result, instead of 2, got 1
      +----+
      |col2|
      +----+
      | 1|
      +----+
      +----+

       lit() function without cache() function.

      from pyspark.sql import Row
      from pyspark.sql import functions as F
      
      df_1 = spark.createDataFrame(Row(**x) for x in [{'col1': 'b'}]).withColumn("col2", F.lit(str(2)))
      df_2 = spark.createDataFrame(Row(**x) for x in [{'col1': 'a', 'col3': 8}]).withColumn("col2", F.lit(str(1)))
      df_3 = spark.createDataFrame(Row(**x) for x in [{'col1': 'b', 'col3': 9}]).withColumn("col2", F.lit(str(2)))
      df_23 = df_2.union(df_3)
      df_4 = spark.createDataFrame(Row(**x) for x in [{'col3': 9}]).withColumn("col2", F.lit(str(2)))
      
      sel_col3 = df_23.select('col3', 'col2')
      df_4 = df_4.join(sel_col3, on=['col3', 'col2'], how = "inner")
      df_23_a = df_23.join(df_1, on=["col1", 'col2'], how="inner")
      finaldf = df_23_a.join(df_4, on=['col2', 'col3'], how='left').filter(F.col('col3') == 9)
      finaldf.show() 
      finaldf.select('col2').show() #Correct result
      

       

      Output

      ----------
      >>> finaldf.show()
      +----+----+----+
      |col2|col3|col1|
      +----+----+----+
      | 2| 9| b|
      +----+----+----+
      >>> finaldf.select('col2').show() #Correct result, when df_23_a is not cached 
      +----+
      |col2|
      +----+
      | 2|
      +----+
      

       

      Attachments

        Activity

          People

            petertoth Peter Toth
            vinodkc Vinod KC
            Votes:
            0 Vote for this issue
            Watchers:
            7 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: