MAPREDUCE-2765, which provided the design spec for DistCpV2, the author describes the implementation of DynamicInputFormat, with one of the main motivations cited being to reduce the chance of long-tails where a few leftover mappers run much longer than the rest.
However, I today ran into a situation where I experienced exactly such a long tail using DistCpV2 and DynamicInputFormat. And when I tried to alleviate the problem by overriding the number of mappers and the split ratio used by the DynamicInputFormat, I was prevented from doing so by the hard-coded limit set in the code by the MAX_CHUNKS_TOLERABLE constant. (Currently set to 400.)
This constant is actually set quite low for production use. (See a description of my use case below.) And although
MAPREDUCE-2765 states that this is an "overridable maximum", when reading through the code there does not actually appear to be any mechanism available to override it.
This should be changed. It should be possible to expand the maximum # of chunks beyond this arbitrary limit.
For example, here is the situation I ran into today:
I ran a distcpv2 job on a cluster with 8 machines containing 128 map slots. The job consisted of copying ~2800 files from HDFS to Amazon S3. I overrode the number of mappers for the job from the default of 20 to 128, so as to more properly parallelize the copy across the cluster. The number of chunk files created was calculated as 241, and mapred.num.entries.per.chunk was calculated as 12.
As the job ran on, it reached a point where there were only 4 remaining map tasks, which had each been running for over 2 hours. The reason for this was that each of the 12 files that those mappers were copying were quite large (several hundred megabytes in size) and took ~20 minutes each. However, during this time, all the other 124 mappers sat idle.
In theory I should be able to alleviate this problem with DynamicInputFormat. If I were able to, say, quadruple the number of chunk files created, that would have made each chunk contain only 3 files, and these large files would have gotten distributed better around the cluster and copied in parallel.
However, when I tried to do that - by overriding mapred.listing.split.ratio to, say, 10 - DynamicInputFormat responded with an exception ("Too many chunks created with splitRatio:10, numMaps:128. Reduce numMaps or decrease split-ratio to proceed.") - presumably because I exceeded the MAX_CHUNKS_TOLERABLE value of 400.
Is there any particular logic behind this MAX_CHUNKS_TOLERABLE limit? I can't personally see any.
If this limit has no particular logic behind it, then it should be overridable - or even better: removed altogether. After all, I'm not sure I see any need for it. Even if numMaps * splitRatio resulted in an extraordinarily large number, if the code were modified so that the number of chunks got calculated as Math.min( numMaps * splitRatio, numFiles), then there would be no need for MAX_CHUNKS_TOLERABLE. In this worst-case scenario where the product of numMaps and splitRatio is large, capping the number of chunks at the number of files (numberOfChunks = numberOfFiles) would result in 1 file per chunk - the maximum parallelization possible. That may not be the best-tuned solution for some users, but I would think that it should be left up to the user to deal with the potential consequence of not having tuned their job properly. Certainly that would be better than having an arbitrary hard-coded limit that prevents proper parallelization when dealing with large files and/or large numbers of mappers.