The core issue is that the data structure that stores serialized keys and values from the map can incorrectly calculate the length of a value under certain conditions. The OOM exception is from deserialized values exceeding the memory available to a merge.
The error was introduced by
HADOOP-2919, which replaced the data structures responsible for collection and sort. The size of one of the buffers- assumed to be exactly 3x the size of a complementary buffer- can be larger than assumed in some cases. If map output is spilled to disk while the index buffer is wrapped, it will read indices from the uninitialized portion of the buffer for the last item in the spill, and incorrectly believe that it extends to the end of the buffer.
An example might be helpful. Per the description in
HADOOP-2919, each key and value is serialized into a circular buffer. The partition, start, and end indices for each record are stored in an array. For each value, its end index is either the following key start index or a marker reserved for the last record in the spill.
| P | 35 | 41 | ... | P | 5 | 20 | P | 25 | 30 | 0 | 0 | 0 |
0 1 2 ... i ... i+9
A record at offset i belongs to partition P, has a key beginning at position 5 w/ length 15 (20 - 5), and a value beginning at position 10 with length 5 (25 - 20). Similarly, the record following it belongs to partition P, a key beginning at position 25 w/ length 5, and a value beginning at position 30. What follows this record (from position i+6) is uninitialized data; the record that follows it has metadata stored at the start of the buffer, but the "next" position is computed mod the length of the buffer will look in a section that suggests that the next key starts at the front of the buffer. As such, it will determine that this value must extend all the way to the end of the buffer, which is almost certainly not true. The correct "next" offset is at the front of the buffer (35).
This is the most common case. It occurs when both of the following are true:
- A map emits no fewer than io.sort.mb * io.sort.record.percent / 4 records (default 1310720 records, 1677721 with a 128 io.sort.mb)
- io.sort.mb * io.sort.record.percent (mod 16) != 0, 7, 10, or 13 (note that with the default settings of 100MB and 0.05, respectively, this isn't the case)
This is corrected by
When the preceding is true, silent data corruption can occur when all of the following are true:
- The heap size is sufficient to prevent the merge from failing
- The Writable consumed takes its length not from the stream, but from the intermediate file format. This could be true of Writables that read an entire stream without a length field (none of the Writables packaged in Hadoop work this way, I think)
- io.sort.mb * io.sort.record.percent (mod 16) == 2, 3, 5, 6, 8, 9, 12, or 15 (other values will cause errors in the spill)
Most of the time, it will emit the value followed by the contents of the buffer, the remainder of which most if not all Writables- satisfying the DataOutput contract- will ignore. Anything dealing with the raw bytes- particularly comparators- might be confused, but deserialized objects should work. Preliminarily, few successful jobs should be suspected.
The second case addressed in this patch is zero-length values (NullWritable is a good, and in my mind only, example). Note that this doesn't include empty Text, BytesWritable, or other data that includes a length field requiring at least one byte. The size for these is almost never correct, but this case should fail noisily.
The last case is when a serialized key fills the last bytes of the buffer. In this case, an ArrayOutOfBoundsException should be thrown during the sort, as the comparator assumes value starts right after the key. An equivalent check for valstart == 0 would work and arguably be more efficient, but this is easier to understand.