There is a potential memory leak in CryptoOutputStream.java. It allocates two direct byte buffers (inBuffer and outBuffer) that get freed when close() method is called. Most of the time, close() method is called. However, when writing to intermediate Map output file or the spill files in MapTask, close() is never called since calling so would close the underlying stream which is not desirable. There is a single underlying physical stream that contains multiple logical streams one per partition of Map output.
By default the amount of memory allocated per byte buffer is 128 KB and so the total memory allocated is 256 KB, This may not sound much. However, if the number of partitions (or number of reducers) is large (in the hundreds) and/or there are spill files created in MapTask, this can grow into a few hundred MB.
I can think of two ways to address this issue:
According to JDK documentation:
The contents of direct buffers may reside outside of the normal garbage-collected heap, and so their impact upon the memory footprint of an application might not be obvious. It is therefore recommended that direct buffers be allocated primarily for large, long-lived buffers that are subject to the underlying system's native I/O operations. In general it is best to allocate direct buffers only when they yield a measureable gain in program performance.
It is not clear to me whether there is any benefit of allocating direct byte buffers in CryptoOutputStream.java. In fact, there is a slight CPU overhead in moving data from outBuffer to a temporary byte array as per the following code in CryptoOutputStream.java.
Even if the underlying stream supports direct byte buffer IO (or direct IO in OS parlance), it is not clear whether it will yield any measurable performance gain.
The fix would be to allocate a ByteBuffer on the heap for inBuffer and wrap a byte array in a ByteBuffer for outBuffer. By the way, the inBuffer and outBuffer have to be ByteBuffer as demanded by the encrypt() method in Encryptor.
Assuming that we want to keep the buffers as direct byte buffers, we can create a new constructor to CryptoOutputStream and pass a boolean flag ownOutputStream to indicate whether the underlying stream will be owned by CryptoOutputStream. If it is true, then calling the close() method will close the underlying stream. Otherwise, when close() is called only the direct byte buffers will be freed and the underlying stream will not be closed.
The scope of changes for this fix will be somewhat wider. We need to modify MapTask.java, CryptoUtils.java, and CryptoFSDataOutputStream.java as well to pass the ownership flag mentioned above.
I can post a patch for either of the above. I welcome any other ideas from developers to fix this issue.