Details
Description
Currently shuffle repartition uses RoundRobinPartitioning, the generated result is nondeterministic since the sequence of input rows are not determined.
The bug can be triggered when there is a repartition call following a shuffle (which would lead to non-deterministic row ordering), as the pattern shows below:
upstream stage -> repartition stage -> result stage
(-> indicate a shuffle)
When one of the executors process goes down, some tasks on the repartition stage will be retried and generate inconsistent ordering, and some tasks of the result stage will be retried generating different data.
The following code returns 931532, instead of 1000000:
import scala.sys.process._ import org.apache.spark.TaskContext val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x => x }.repartition(200).map { x => if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) { throw new Exception("pkill -f java".!!) } x } res.distinct().count()
Attachments
Issue Links
- is duplicated by
-
SPARK-25156 Same query returns different result
- Closed
- is related to
-
SPARK-23243 Shuffle+Repartition on an RDD could lead to incorrect answers
- Resolved
-
SPARK-28699 Cache an indeterminate RDD could lead to incorrect result while stage rerun
- Resolved
- relates to
-
SPARK-25114 RecordBinaryComparator may return wrong result when subtraction between two words is divisible by Integer.MAX_VALUE
- Resolved
- links to