Details
-
Improvement
-
Status: Resolved
-
Minor
-
Resolution: Fixed
-
None
-
None
Description
The ordering of elements in shuffled partitions is not deterministic across runs. For instance, consider the following example:
val largeFiles = sc.textFile(...) val airlines = largeFiles.repartition(2000).cache() println(airlines.first)
If this code is run twice, then each run will output a different result. There is non-determinism in the shuffle read code that accounts for this:
Spark's shuffle read path processes blocks as soon as they are fetched Spark uses ShuffleBlockFetcherIterator to fetch shuffle data from mappers. In this code, requests for multiple blocks from the same host are batched together, so nondeterminism in where tasks are run means that the set of requests can vary across runs. In addition, there's an explicit call to randomize the order of the batched fetch requests. As a result, shuffle operations cannot be guaranteed to produce the same ordering of the elements in their partitions.
Therefore, Spark should update its docs to clarify that the ordering of elements in shuffle RDDs' partitions is non-deterministic. Note, however, that the set of elements in each partition will be deterministic: if we used mapPartitions to sort each partition, then the first() call above would produce a deterministic result.