Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
1.0.0
Description
According to its docs, Partitioner.defaultPartitioner will use the maximum number of RDD partitions as its partition count when spark.default.parallelism is not set. If that number of upstream partitions is very large then this can result in shuffles where numMappers * numReducers = numMappers^2, which can cause various problems that prevent the job from successfully running.
To help users identify when they have run into this problem, I think we should add warning logs to Spark.
As an example of the problem, let's say that I have an RDD with 100,000 partitions and then do a reduceByKey on it without specifying an explicit partitioner or partition count. In this case, Spark will plan a reduce stage with 100,000 partitions:
scala> sc.parallelize(1 to 100000, 100000).map(x => (x, x)).reduceByKey(_ + _).toDebugString
res7: String =
(100000) ShuffledRDD[21] at reduceByKey at <console>:25 []
+-(100000) MapPartitionsRDD[20] at map at <console>:25 []
| ParallelCollectionRDD[19] at parallelize at <console>:25 []
This results in the creation of 10 billion shuffle blocks, so if this job does run it is likely to be extremely show. However, it's more likely that the driver will crash when serializing map output statuses: if we were able to use one bit per mapper / reducer pair (which is probably overly optimistic in terms of compressibility) then the map statuses would be ~1.25 gigabytes (and the actual size is probably much larger)!
I don't think that users are likely to intentionally wind up in this scenario: it's more likely that either (a) their job depends on spark.default.parallelism being set but it was run on an environment lacking a value for that config, or (b) their input data significantly grew in size. These scenarios may be rare, but they can be frustrating to debug (especially if a failure occurs midway through a long-running job).
I think we should do something to handle this scenario.
A good starting point might be for Partitioner.defaultPartitioner to log a warning when the default partition size exceeds some threshold.
In addition, I think it might be a good idea to log a similar warning in MapOutputTrackerMaster right before we start trying to serialize map statuses: in a real-world situation where this problem cropped up, the map stage ran successfully but the driver crashed when serializing map statuses. Putting a warning about partition counts here makes it more likely that users will spot that error in the logs and be able to identify the source of the problem (compared to a warning that appears much earlier in the job and therefore much farther from the likely site of a crash).
Attachments
Issue Links
- links to