Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-16818

Exchange reuse incorrectly reuses scans over different sets of partitions

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • 2.0.0
    • 2.0.1, 2.1.0
    • SQL

    Description

      This happens because the file scan operator does not take into account partition pruning in its implementation of `sameResult()`. As a result, executions may be incorrect on self-joins over the same base file relation. Here's a minimal test case to reproduce:

          spark.conf.set("spark.sql.exchange.reuse", true)  // defaults to true in 2.0
          withTempPath { path =>
            val tempDir = path.getCanonicalPath
            spark.range(10)
              .selectExpr("id % 2 as a", "id % 3 as b", "id as c")
              .write
              .partitionBy("a")
              .parquet(tempDir)
            val df = spark.read.parquet(tempDir)
            val df1 = df.where("a = 0").groupBy("b").agg("c" -> "sum")
            val df2 = df.where("a = 1").groupBy("b").agg("c" -> "sum")
            checkAnswer(df1.join(df2, "b"), Row(0, 6, 12) :: Row(1, 4, 8) :: Row(2, 10, 5) :: Nil)
      

      When exchange reuse is on, the result is

      +---+------+------+
      |  b|sum(c)|sum(c)|
      +---+------+------+
      |  0|     6|     6|
      |  1|     4|     4|
      |  2|    10|    10|
      +---+------+------+
      

      The correct result is

      +---+------+------+
      |  b|sum(c)|sum(c)|
      +---+------+------+
      |  0|     6|    12|
      |  1|     4|     8|
      |  2|    10|     5|
      +---+------+------+
      

      Attachments

        Activity

          People

            ekhliang Eric Liang
            ekhliang Eric Liang
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: