Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Incomplete
-
2.2.0
-
None
Description
The initial datasets are derived from hive tables using the spark.table() functions.
Dataset descriptions:
Sales dataset (close to 10 billion rows) with the following columns (and sample rows) :
ItemId (bigint) | CustomerId (bigint) | qty_sold (bigint) |
---|---|---|
1 | 1 | 20 |
1 | 2 | 30 |
2 | 1 | 40 |
Customer Dataset (close to 50000 rows) with the following columns (and sample rows):
CustomerId (bigint) | CustomerGrpNbr (smallint) |
---|---|
1 | 1 |
2 | 2 |
3 | 1 |
I am doing the following steps:
- Caching sales dataset with close to 10 billion rows.
- Doing an inner join of 'sales' with 'customer' dataset
- Doing group by on the resultant dataset, based on CustomerGrpNbr column to get sum(qty_sold) and stddev(qty_sold) vales in the customer groups.
- Caching the resultant grouped dataset.
- Doing a .count() on the grouped dataset.
The step 5 count is supposed to return only 20, because when you do a customer.select("CustomerGroupNbr").distinct().count you get 20 values. However, you get a value of around 65,000 in step 5.
Following are the commands I am running in spark-shell:
var sales = spark.table("sales_table") var customer = spark.table("customer_table") var finalDf = sales.join(customer, "CustomerId").groupBy("CustomerGrpNbr").agg(sum("qty_sold"), stddev("qty_sold")) sales.cache() finalDf.cache() finalDf.count() // returns around 65k rows and the count keeps on varying each run customer.select("CustomerGrpNbr").distinct().count() //returns 20
I have been able to replicate the same behavior using the java api as well. This anamolous behavior disappears however, when I remove the caching statements. I.e. if i run the following in spark-shell, it works as expected:
var sales = spark.table("sales_table") var customer = spark.table("customer_table") var finalDf = sales.join(customer, "CustomerId").groupBy("CustomerGrpNbr").agg(sum("qty_sold"), stddev("qty_sold")) finalDf.count() // returns 20 customer.select("CustomerGrpNbr").distinct().count() //returns 20
The tables in hive from which the datasets are built do not change during this entire process. So why does the caching cause this problem?