Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-26974

Invalid data in grouped cached dataset, formed by joining a large cached dataset with a small dataset

    XMLWordPrintableJSON

Details

    Description

      The initial datasets are derived from hive tables using the spark.table() functions.

      Dataset descriptions:

      Sales dataset (close to 10 billion rows) with the following columns (and sample rows) : 

      ItemId (bigint) CustomerId (bigint) qty_sold (bigint)
      1 1 20
      1 2 30
      2 1 40

       

      Customer Dataset (close to 50000 rows) with the following columns (and sample rows):

      CustomerId (bigint) CustomerGrpNbr (smallint)
      1 1
      2 2
      3 1

       

      I am doing the following steps:

      1. Caching sales dataset with close to 10 billion rows.
      2. Doing an inner join of 'sales' with 'customer' dataset
         
      3. Doing group by on the resultant dataset, based on CustomerGrpNbr column to get sum(qty_sold) and stddev(qty_sold) vales in the customer groups.
      4. Caching the resultant grouped dataset.
      5. Doing a .count() on the grouped dataset.

      The step 5 count is supposed to return only 20, because when you do a customer.select("CustomerGroupNbr").distinct().count you get 20 values. However, you get a value of around 65,000 in step 5.

      Following are the commands I am running in spark-shell:

      var sales = spark.table("sales_table")
      var customer = spark.table("customer_table")
      var finalDf = sales.join(customer, "CustomerId").groupBy("CustomerGrpNbr").agg(sum("qty_sold"), stddev("qty_sold"))
      sales.cache()
      finalDf.cache()
      finalDf.count() // returns around 65k rows and the count keeps on varying each run
      customer.select("CustomerGrpNbr").distinct().count() //returns 20

      I have been able to replicate the same behavior using the java api as well. This anamolous behavior disappears however, when I remove the caching statements. I.e. if i run the following in spark-shell, it works as expected:

      var sales = spark.table("sales_table")
      var customer = spark.table("customer_table")
      var finalDf = sales.join(customer, "CustomerId").groupBy("CustomerGrpNbr").agg(sum("qty_sold"), stddev("qty_sold")) 
      finalDf.count() // returns 20 
      customer.select("CustomerGrpNbr").distinct().count() //returns 20
      

      The tables in hive from which the datasets are built do not change during this entire process. So why does the caching cause this problem?

      Attachments

        Activity

          People

            Unassigned Unassigned
            litecoder Utkarsh Sharma
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: