one simple way might be to simply add TRACE level log messages at every collect() call with the current values of every index plus the spill number [...]
That could be an interesting visualization. I'd already made up the diagrams, but anything that helps the analysis and validation would be welcome. I'd rather not add a trace to the committed code, but data from it sounds great.
I ran a simple test where I was running a sort of 10 byte records, and it turned out that the "optimal" io.sort.record.percent caused my job to be significantly slower. It was the case then that a small number of large spills actually ran slower than a large number of small spills. Did we ever determine what that issue was? I think we should try to understand why the theory isn't agreeing with observations here.
IIRC those tests used a non-RawComparator, right? Runping reported similar results, where hits to concurrent collection were more expensive than small spills. The current theory is that keeping the map thread unblocked is usually better for performance. Based on this observation, I'm hoping that the spill.percent can also be eliminated at some point in the future, though the performance we're leaving on the table there is probably not as severe and is more difficult to generalize. Microbenchmarks may also not capture the expense of merging many small spills in a busy, shared cluster, where HDFS and other tasks are completing for disk bandwidth. I'll be very interested in metrics from
MAPREDUCE-1115, as they would help to flesh out this hypothesis.
The documentation (such as it is) in
HADOOP-2919 describes the existing code. The metadata and serialization data are tracked using a set of indices marking the start and end of a spill (kvstart, kvend) and the current position (kvindex) while the serialization data are described by similar markers (bufstart, bufend, bufindex). There are two other indices carried over from the existing design. bufmark is the position in the serialized record data of the end of the last fully serialized record. bufvoid is necessary for the RawComparator interface, which requires contiguous ranges for key compares; if a serialized key crosses the end of the buffer, it must be copied to the front to satisfy the aforementioned API spec. All of these are retained; the role of each is largely unchanged.
The proposed design adds another parameter, the equator (while kvstart and bufstart could be replaced with a single variable similar to equator the effort seemed misspent). The record serialization moves "forward" in the buffer, while the metadata are allocated in 16 byte blocks in the opposite direction. This is illustrated in the following diagram:
The role played by kvoffsets and kvindices is preserved; logically, particularly in the spill, each is interpreted in roughly the same way. In the new code, the allocation is not static, but will instead expand with the serialized records. This avoids degenerate cases for combiners and multilevel merges (though not necessarily optimal performance).
Spills are triggered in two conditions: either the soft limit is reached (collection proceeds concurrently with the spill) or a record is large enough to require a spill before it can be written to the buffer (collection is blocked). In the former case is illustrated here:
The equator is moved to an offset proportional to the average record size (caveats above), kvindex is moved off the equator, aligned with the end of the array (int alignment, also so no metadata block will span the end of the array). Collection proceeds again from the equator, growing toward the ends of the spill. Should either run out of space, collection will block until the spill completes. Note that there is no partially written data when the soft limit is reached; it can only be triggered in collect, not in the blocking buffer.
The other case to consider is when record data are partially written into the collection buffer, but the available space is exhausted:
Here, the equator is moved to the beginning of the partial record and collection blocks. When the spill completes, the metadata are written off the equator and serialization of the record can continue.
During collection, indices are adjusted only when holding a lock. As in the current code, the lock is only obtained in collect when one of the possible conditions for spilling may be satisfied. Since collect does not block, every serialized record performs a zero-length write into the buffer once both the key and value are written. This ensures that all the boundaries are checked and that collection will block if necessary.
That's about it.