Details
-
Bug
-
Status: Resolved
-
Critical
-
Resolution: Fixed
-
2.0.0
Description
This happens because the file scan operator does not take into account partition pruning in its implementation of `sameResult()`. As a result, executions may be incorrect on self-joins over the same base file relation. Here's a minimal test case to reproduce:
spark.conf.set("spark.sql.exchange.reuse", true) // defaults to true in 2.0 withTempPath { path => val tempDir = path.getCanonicalPath spark.range(10) .selectExpr("id % 2 as a", "id % 3 as b", "id as c") .write .partitionBy("a") .parquet(tempDir) val df = spark.read.parquet(tempDir) val df1 = df.where("a = 0").groupBy("b").agg("c" -> "sum") val df2 = df.where("a = 1").groupBy("b").agg("c" -> "sum") checkAnswer(df1.join(df2, "b"), Row(0, 6, 12) :: Row(1, 4, 8) :: Row(2, 10, 5) :: Nil)
When exchange reuse is on, the result is
+---+------+------+ | b|sum(c)|sum(c)| +---+------+------+ | 0| 6| 6| | 1| 4| 4| | 2| 10| 10| +---+------+------+
The correct result is
+---+------+------+ | b|sum(c)|sum(c)| +---+------+------+ | 0| 6| 12| | 1| 4| 8| | 2| 10| 5| +---+------+------+