Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-24495

SortMergeJoin with duplicate keys wrong results

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 2.3.0
    • Fix Version/s: 2.3.2, 2.4.0
    • Component/s: SQL
    • Labels:

      Description

      To reproduce:

      // the bug is in SortMergeJoin but the Shuffles are correct. with the default 200 it might split the data in such small partitions that the SortMergeJoin cannot return wrong results anymore
      spark.conf.set("spark.sql.shuffle.partitions", "1")
      // disable this, otherwise it would filter results before join, hiding the bug
      spark.conf.set("spark.sql.constraintPropagation.enabled", "false")
      sql("select id as a1 from range(1000)").createOrReplaceTempView("t1")
      sql("select id * 2 as b1, -id as b2 from range(1000)").createOrReplaceTempView("t2")
      
      spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
      sql("""select b1, a1, b2 FROM t1 INNER JOIN t2 ON b1 = a1 AND b2 = a1""").show
      

      In the results, it's expected that all columns are equal (see join condition).

      But the result is:

      +---+---+---+
      | b1| a1| b2|
      +---+---+---+
      |  0|  0|  0|
      |  2|  2| -1|
      |  4|  4| -2|
      |  6|  6| -3|
      |  8|  8| -4|
      ....
      

      I traced it to EnsureRequirements.reorder which was introduced by https://github.com/apache/spark/pull/16985 and https://github.com/apache/spark/pull/20041

      It leads to an incorrect plan:

      == Physical Plan ==
      *(5) Project [b1#735672L, a1#735669L, b2#735673L]
      +- *(5) SortMergeJoin [a1#735669L, a1#735669L], [b1#735672L, b1#735672L], Inner
         :- *(2) Sort [a1#735669L ASC NULLS FIRST, a1#735669L ASC NULLS FIRST], false, 0
         :  +- Exchange hashpartitioning(a1#735669L, a1#735669L, 1)
         :     +- *(1) Project [id#735670L AS a1#735669L]
         :        +- *(1) Range (0, 1000, step=1, splits=8)
         +- *(4) Sort [b1#735672L ASC NULLS FIRST, b2#735673L ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(b1#735672L, b2#735673L, 1)
               +- *(3) Project [(id#735674L * 2) AS b1#735672L, -id#735674L AS b2#735673L]
                  +- *(3) Range (0, 1000, step=1, splits=8)
      

      The SortMergeJoin keys are wrong: key b2 is missing completely.

        Attachments

          Activity

            People

            • Assignee:
              mgaido Marco Gaido
              Reporter:
              bograd Bogdan Raducanu
            • Votes:
              0 Vote for this issue
              Watchers:
              11 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: