Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-28067

Incorrect results in decimal aggregation with whole-stage code gen enabled

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Minor
    • Resolution: Unresolved
    • Affects Version/s: 2.3.0, 2.4.0
    • Fix Version/s: None
    • Component/s: Spark Core
    • Labels:
    • Environment:

      Ubuntu LTS 16.04

      Oracle Java 1.8.0_201

      spark-2.4.3-bin-without-hadoop

      spark-shell

      Description

      The following test case involving a join followed by a sum aggregation returns the wrong answer for the sum:

       

      val df = Seq(
       (BigDecimal("10000000000000000000"), 1),
       (BigDecimal("10000000000000000000"), 1),
       (BigDecimal("10000000000000000000"), 2),
       (BigDecimal("10000000000000000000"), 2),
       (BigDecimal("10000000000000000000"), 2),
       (BigDecimal("10000000000000000000"), 2),
       (BigDecimal("10000000000000000000"), 2),
       (BigDecimal("10000000000000000000"), 2),
       (BigDecimal("10000000000000000000"), 2),
       (BigDecimal("10000000000000000000"), 2),
       (BigDecimal("10000000000000000000"), 2),
       (BigDecimal("10000000000000000000"), 2)).toDF("decNum", "intNum")
      val df2 = df.withColumnRenamed("decNum", "decNum2").join(df, "intNum").agg(sum("decNum"))
      scala> df2.show(40,false)
       ---------------------------------------
      
      sum(decNum)
      ---------------------------------------
      
      40000000000000000000.000000000000000000
      ---------------------------------------
       
      

       

      The result should be 1040000000000000000000.0000000000000000.

      It appears a partial sum is computed for each join key, as the result returned would be the answer for all rows matching intNum === 1.

      If only the rows with intNum === 2 are included, the answer given is null:

       

      scala> val df3 = df.filter($"intNum" === lit(2))
       df3: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [decNum: decimal(38,18), intNum: int]
      scala> val df4 = df3.withColumnRenamed("decNum", "decNum2").join(df3, "intNum").agg(sum("decNum"))
       df4: org.apache.spark.sql.DataFrame = [sum(decNum): decimal(38,18)]
      scala> df4.show(40,false)
       -----------
      
      sum(decNum)
      -----------
      
      null
      -----------
       
      

       

      The correct answer, 1000000000000000000000.0000000000000000, doesn't fit in the DataType picked for the result, decimal(38,18), so an overflow occurs, which Spark then converts to null.

      The first example, which doesn't filter out the intNum === 1 values should also return null, indicating overflow, but it doesn't.  This may mislead the user to think a valid sum was computed.

      If whole-stage code gen is turned off:

      spark.conf.set("spark.sql.codegen.wholeStage", false)

      ... incorrect results are not returned because the overflow is caught as an exception:

      java.lang.IllegalArgumentException: requirement failed: Decimal precision 39 exceeds max precision 38

       

       

       

       

       

       

       

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              msirek Mark Sirek
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated: