Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
2.2.0
-
Linux mint 18
Python 3.5
Description
I'm experiencing a bug with the head version of spark as of 4/17/2017. After joining to dataframes, renaming a column and invoking distinct, the results of the aggregation is incorrect after caching the dataframe. The following code snippet consistently reproduces the error.
from pyspark.sql import SparkSession
import pyspark.sql.functions as sf
import pandas as pd
spark = SparkSession.builder.master("local").appName("Word Count").getOrCreate()
mapping_sdf = spark.createDataFrame(pd.DataFrame([
,
,
{"ITEM": "c", "GROUP": 2}]))
items_sdf = spark.createDataFrame(pd.DataFrame([
,
,
{"ITEM": "c", "ID": 3}]))
mapped_sdf = \
items_sdf.join(mapping_sdf, on='ITEM').select("ID", sf.col("GROUP").alias('ITEM')).distinct()
print(mapped_sdf.groupBy("ITEM").count().count()) # Prints 2, correct
mapped_sdf.cache()
print(mapped_sdf.groupBy("ITEM").count().count()) # Prints 3, incorrect
The next code snippet is almost the same after the first except I don't call distinct on the dataframe. This snippet performs as expected:
mapped_sdf = \
items_sdf.join(mapping_sdf, on='ITEM').select("ID", sf.col("GROUP").alias('ITEM'))
print(mapped_sdf.groupBy("ITEM").count().count()) # Prints 2, correct
mapped_sdf.cache()
print(mapped_sdf.groupBy("ITEM").count().count()) # Prints 2, correct
I don't experience this bug with spark 2.1 or event earlier versions for 2.2