Running through the SequenceFile.Sorter.merge with a fine-toothed comb and turning the profiler on it yielded interesting results.
Telling - a reasonably large job we profiled had this characteristic for a reduce which started after all maps had completed:
Note: merge was also active while shuffle was happening...
So folks get the picture...
HADOOP-3365, HADOOP-2095 etc.
2. We really need to tighten the merge code, eliminate copies etc.
HADOOP-2919 did it for the sort, we need something similar for the merge.
1. Eliminate the usage of SequenceFiles completely for intermediate sort/merge. We just need to write (key-length, key, value-length, value)* to a compressed stream. We do not need any of the features provided by the SequenceFile i.e. header, sync etc.
2. Currently the map-side sort writes out index, index.crc, data and data.crc files. This costs 4 seeks per map-reduce pair which is 4*300,000*10,000 assuming a large job with 300k maps and 10k reduces (slightly futuristic). We could do much better by putting the crc at the end of the data file, and crc for each record in the index, cuts down seeks by 50%. Potentially we could keep the index in-memory at the TaskTracker for currently running jobs, a future optimization.
3. At the reducer, decompress the (key-length, key, value-length, value)*, check crc (flag error if necessary) and keep it.
4. Throw away RamFS, implement a simple manager who returns byte-arrays of a given size (i.e. decompressed shuffle split) until it runs out of the amount of memory available.
5. Copy the shuffled data into the byte-array and merge with other byte-arrays, write merged data to disk after compressing it.
6. Now use raw-comparators on the data in the byte-arrays for optimized compares.
This will be a reasonable first-step, measure more and optimize later.