Affects Version/s: None
Fix Version/s: 0.11.0
This issue is about trying to improve the performance of the merge phase (especially on the reduces). Currently, all the map outputs are copied to disk and then the merge phase starts (just to make a note - sorting happens on the maps).
The first optimization that I plan to implement is to do in-memory merging of the map outputs. There are two buffers maintained -
1) a scratch buffer for writing map outputs (directly off the socket). This is a first-come-first-serve buffer (as opposed to strategies like best fit). The map output copier copies the map output off the socket and puts it here (assuming there is sufficient space in the buffer).
2) a merge buffer - when the scratch buffer cannot accomodate any more map output, the roles of the buffers are switched - that is, the scratch buffer becomes the merge buffer and the merge buffer becomes the scratch buffer. We avoid copying by doing this switch of roles. The copier threads can continue writing data from the socket buffer to the current scratch buffer (access to the scratch buffer is synchronized).
Both the above buffers are of equal sizes configured to have default values of 100M.
Before switching roles, a check is done to see whether the merge buffer is in use (merge algorithm is working on the data there). We wait till the merge buffer is free. The hope is that while merging we are reading key/value data from an in-memory buffer and it will be really fast and so we won't see client timeouts on the server serving the map output. However, if they really timeout, the client sees an exception, and resubmits the request to the server.
With the above we are doing copying/merging in parallel.
The merge happens and then a spill to disk happens. At the end of the in-memory merge of all the map outputs, we will end up with ~100M files on disk that we will need to merge. Also, the in-memory merge gets triggered when the in-memory scratch buffer has been idle too long (like 30 secs), or, the number of outputs copied so far is equal to the number of maps in the job, whichever is earlier. We can proceed with the regular merge for these on-disk-files and maybe we can do some optimizations there too (haven't put much thought there).
If the map output can never be copied to the buffer (because the map output is let's say 200M), then that is directly spilled to disk.
To implement the above, I am planning to extend the FileSystem class to provide an InMemoryFileSystem class that will ease the integration of the in-memory scratch/merge with the existing APIs (like SequenceFile, MapOutputCopier) since all them work with the abstractions of FileSystem and Input/Output streams.