Details
-
Bug
-
Status: Resolved
-
Blocker
-
Resolution: Fixed
-
2.1.3, 2.2.3, 2.3.4, 2.4.7, 3.0.0
Description
When pyspark.sql.functions.lit() function is used with dataframe cache, it returns wrong result
eg:lit() function with cache() function.
-----------------------------------
from pyspark.sql import Row from pyspark.sql import functions as F df_1 = spark.createDataFrame(Row(**x) for x in [{'col1': 'b'}]).withColumn("col2", F.lit(str(2))) df_2 = spark.createDataFrame(Row(**x) for x in [{'col1': 'a', 'col3': 8}]).withColumn("col2", F.lit(str(1))) df_3 = spark.createDataFrame(Row(**x) for x in [{'col1': 'b', 'col3': 9}]).withColumn("col2", F.lit(str(2))) df_23 = df_2.union(df_3) df_4 = spark.createDataFrame(Row(**x) for x in [{'col3': 9}]).withColumn("col2", F.lit(str(2))) sel_col3 = df_23.select('col3', 'col2') df_4 = df_4.join(sel_col3, on=['col3', 'col2'], how = "inner") df_23_a = df_23.join(df_1, on=["col1", 'col2'], how="inner").cache() finaldf = df_23_a.join(df_4, on=['col2', 'col3'], how='left').filter(F.col('col3') == 9) finaldf.show() finaldf.select('col2').show() #Wrong result
Output
-----------
>>> finaldf.show()
+----+----+----+
|col2|col3|col1|
+----+----+----+
| 2| 9| b|
+----+----+----+
>>> finaldf.select('col2').show() #Wrong result, instead of 2, got 1
+----+
|col2|
+----+
| 1|
+----+
+----+
lit() function without cache() function.
from pyspark.sql import Row from pyspark.sql import functions as F df_1 = spark.createDataFrame(Row(**x) for x in [{'col1': 'b'}]).withColumn("col2", F.lit(str(2))) df_2 = spark.createDataFrame(Row(**x) for x in [{'col1': 'a', 'col3': 8}]).withColumn("col2", F.lit(str(1))) df_3 = spark.createDataFrame(Row(**x) for x in [{'col1': 'b', 'col3': 9}]).withColumn("col2", F.lit(str(2))) df_23 = df_2.union(df_3) df_4 = spark.createDataFrame(Row(**x) for x in [{'col3': 9}]).withColumn("col2", F.lit(str(2))) sel_col3 = df_23.select('col3', 'col2') df_4 = df_4.join(sel_col3, on=['col3', 'col2'], how = "inner") df_23_a = df_23.join(df_1, on=["col1", 'col2'], how="inner") finaldf = df_23_a.join(df_4, on=['col2', 'col3'], how='left').filter(F.col('col3') == 9) finaldf.show() finaldf.select('col2').show() #Correct result
Output
----------
>>> finaldf.show()
+----+----+----+
|col2|col3|col1|
+----+----+----+
| 2| 9| b|
+----+----+----+
>>> finaldf.select('col2').show() #Correct result, when df_23_a is not cached
+----+
|col2|
+----+
| 2|
+----+