The initial datasets are derived from hive tables using the spark.table() functions.
dataset (close to 10 billion rows) with the following columns (and sample rows) :
|ItemId (bigint)||CustomerId (bigint)||qty_sold (bigint)|
Dataset (close to 50000 rows) with the following columns (and sample rows):
|CustomerId (bigint)||CustomerGrpNbr (smallint)|
I am doing the following steps:
- Caching sales dataset with close to 10 billion rows.
- Doing an inner join of 'sales' with 'customer' dataset
- Doing group by on the resultant dataset, based on CustomerGrpNbr column to get sum(qty_sold) and stddev(qty_sold) vales in the customer groups.
- Caching the resultant grouped dataset.
- Doing a .count() on the grouped dataset.
The step 5 count is supposed to return only 20, because when you do a customer.select("CustomerGroupNbr").distinct().count you get 20 values. However, you get a value of around 65,000 in step 5.
Following are the commands I am running in spark-shell:
I have been able to replicate the same behavior using the java api as well. This anamolous behavior disappears however, when I remove the caching statements. I.e. if i run the following in spark-shell, it works as expected:
The tables in hive from which the datasets are built do not change during this entire process. So why does the caching cause this problem?