Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-28067

Incorrect results in decimal aggregation with whole-stage code gen enabled

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • 2.0.2, 2.1.3, 2.2.3, 2.3.4, 2.4.4, 3.0.0
    • 3.1.0
    • SQL

    Description

      The following test case involving a join followed by a sum aggregation returns the wrong answer for the sum:

       

      val df = Seq(
       (BigDecimal("10000000000000000000"), 1),
       (BigDecimal("10000000000000000000"), 1),
       (BigDecimal("10000000000000000000"), 2),
       (BigDecimal("10000000000000000000"), 2),
       (BigDecimal("10000000000000000000"), 2),
       (BigDecimal("10000000000000000000"), 2),
       (BigDecimal("10000000000000000000"), 2),
       (BigDecimal("10000000000000000000"), 2),
       (BigDecimal("10000000000000000000"), 2),
       (BigDecimal("10000000000000000000"), 2),
       (BigDecimal("10000000000000000000"), 2),
       (BigDecimal("10000000000000000000"), 2)).toDF("decNum", "intNum")
      val df2 = df.withColumnRenamed("decNum", "decNum2").join(df, "intNum").agg(sum("decNum"))
      scala> df2.show(40,false)
       ---------------------------------------
      
      sum(decNum)
      ---------------------------------------
      
      40000000000000000000.000000000000000000
      ---------------------------------------
       
      

       

      The result should be 1040000000000000000000.0000000000000000.

      It appears a partial sum is computed for each join key, as the result returned would be the answer for all rows matching intNum === 1.

      If only the rows with intNum === 2 are included, the answer given is null:

       

      scala> val df3 = df.filter($"intNum" === lit(2))
       df3: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [decNum: decimal(38,18), intNum: int]
      scala> val df4 = df3.withColumnRenamed("decNum", "decNum2").join(df3, "intNum").agg(sum("decNum"))
       df4: org.apache.spark.sql.DataFrame = [sum(decNum): decimal(38,18)]
      scala> df4.show(40,false)
       -----------
      
      sum(decNum)
      -----------
      
      null
      -----------
       
      

       

      The correct answer, 1000000000000000000000.0000000000000000, doesn't fit in the DataType picked for the result, decimal(38,18), so an overflow occurs, which Spark then converts to null.

      The first example, which doesn't filter out the intNum === 1 values should also return null, indicating overflow, but it doesn't.  This may mislead the user to think a valid sum was computed.

      If whole-stage code gen is turned off:

      spark.conf.set("spark.sql.codegen.wholeStage", false)

      ... incorrect results are not returned because the overflow is caught as an exception:

      java.lang.IllegalArgumentException: requirement failed: Decimal precision 39 exceeds max precision 38

       

       

       

       

       

       

       

      Attachments

        Activity

          People

            ksunitha Sunitha Kambhampati
            msirek Mark Sirek
            Votes:
            0 Vote for this issue
            Watchers:
            9 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: