This patch effects the following changes to improve our efficiency in this area. Instead of gradually growing our buffers, we use properties to determine the size of the K,V byte buffer and accounting data and allocate it up front. We maintain accounting information for the task as two arrays of ints (rather than separate arrays for each partition), mimicking the existing BufferSorter interface. The first stores offsets into the second, which maintains the k/v offsets and partition information for the keys. This permits us to swap offsets to effect the sort, as is presently implemented in BufferSorter, but without requiring us to wrap them in IntWritables.
kvoffset buffer kvindices buffer
|offset k1,v1 | | partition k1,v1 |
|offset k1,v2 | | k1 offset |
... | v1 offset |
|offset kn,vn | | partition k2,v2 |
| k2 offset |
| v2 offset |
| partition kn,vn |
| kn offset |
| vn offset |
By default, the total size of the accounting space is 5% of io.sort.mb. We build on the work done in
HADOOP-1965, but rather than using 50% of io.sort.mb before a spill, we set a "soft" limit that defaults to 80% of the number of records or 80% of the K,V buffer before starting a spill thread. Note that this limit does not require us to query each partition collector for its memory usage, but can be effected by examining our indices. Rather than permitting the spill thread to "own" references to the buffers, we maintain a set of indices into the offset and k,v byte buffers defining the area of each in which the spill buffer is permitted to work. According to the Java VM spec, we can assume that reading/writing array elements does not require a lock on the array.
We maintain three indices for both the accounting and k,v buffers: start, end, and index. The area between start and end is available to the spill, while the area between end and index (in truth, a marker noting end of the last record written) contains "spillable" data yet to be written to disk. If the soft limit is reached- or if one attempts a write into the buffer that is too large to accommodate without a spill- then the task thread sets the end index to the last record marker and triggers a spill. While the spill is running, the area between the start and end indices is unavailable for writing from collect(K,V) and the task thread will block until the spill has completed if the index marker hits the start marker.
Buffer indices uring a spill:
___________ ___________ ___________
|___________| |___________| |___________|
^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^
s e i v i s e v e i s v
It is worth mentioning that each key must be contiguous to be used with a RawComparator, but values can wrap around the end of the buffer. This requires us to note the "voided" space in the buffer that contains no data. When the spill completes, it sets the start marker to the end marker, making that space available for writing. Note that it must also reset the void marker to the buffer size if the spill wraps around the end of the buffer (the rightmost case in the preceding figure). The "voided" marker is owned by whichever thread needs to manipulate it, so we require no special locking for it.
When we sort, we sort all spill data by partition instead of creating a separate collector for each partition. Further, we can use appendRaw (as was suggested in
HADOOP-1609) to write our serialized data directly from the k,v buffer to our spill file writer instead of deserializing each prior to the write. Note that for record-compressed data (when not using a combiner), this permits us to store compressed values in our k,v buffer.
The attached patch is a work in progress, and is known to suffer from the following deficiencies:
- Very large keys and values (with a comparably small io.sort.mb) present a difficult problem for a statically allocated collection buffer. If a series of writes to an empty collection exceed the space allocated to the k,v byte buffer (e.g. a 100MB k,v byte buffer and a Writable that attempts 2 51MB write(byte,int,int) calls), the current patch will loop forever. This will also happen for separate writes. The current patch only spills when the soft limit is reached.
- Handling of compression is inelegantly implemented. Again, this is a work in progress and will be cleaned up.
- The spill thread is created each time it is invoked, but it need not be.
- The code managing the contiguous key property is not as efficient as it could be.
- The implementation of QuickSort could be improved (re: Sedgewick) to handle the case where keys are equal to the pivot, probably a fairly common case.