Hi there. We've been tracking an issue where Kafka hits an java.lang.OutOfMemoryError during log recovery.
After a bunch of tracking work, we've realized we've hit an instance of a long known issue: http://www.evanjones.ca/java-native-leak-bug.html
TLDR: Kafka breaks the rule "Always close GZIPInputStream and GZIPOutputStream since they use native memory via zlib" from that article.
As such, during broker startup, when you're recovering log segments that have been compressed with gzip, Kafka leaks `GZIPInputStream` all over the place.
Our crashes during startup have this profile - the JVM heap is empty (a few hundred MB), but the offheap memory is full of allocations caused by `Java_java_util_zip_Deflater_init` and `deflatInit2`.
This leads to broker crashes during startup. The only real mitigation is having far more memory than you need to boot (which I'd guess is why folk haven't noticed this in production that much yet).
To dig into the code more (this is based on trunk). Log recovery on unflushed segments eventually calls `LogSegment.recover`: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L172
On compressed segments, that leads to a call to `deepIterator`: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L189
That leads to a call to `CompressionFactory`: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L95 which creates a `GZIPInputStream`: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/CompressionFactory.scala#L46
That `GZIPInputStream` is never closed anywhere, and the low heap pressure means that the finalizer on `GZIPInputStream` that deallocates the native buffers is never called, because GC is never triggered. Instead, we just exhaust the offheap memory and then Kafka dies from an OutOfMemory error.
Kafka does trigger an `inputstream.close()` call, but only when fully reading the whole input stream (see https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L156). When it's performing log recovery, in https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L189 it doesn't read to the end of the stream, but instead reads the first offset and leaves things alone.
This issue likely impacts `lz4` and `snappy` compressed topics in exactly the same way. I think (but haven't 100% verified) that it impacts all versions of Kafka that are supported (0.8 -> 0.10).
Fixing this seems relatively annoying, but only because of some "small matters of coding", nothing hugely problematic.
The main issue is that `deepIterator` only returns an `Iterator`, which doesn't have a `close()` method of any kind. We could create a new `ClosableIterator` trait and have it extend Java's `AutoCloseable` (https://docs.oracle.com/javase/7/docs/api/java/lang/AutoCloseable.html), then explicitly call `close()` everywhere we use a `deepIterator()` and don't always read to the end. Scala unfortunately doesn't seem to have a built in version of Java's `try-with-resources` statement, but we can explicitly call close everywhere perfectly happily.
Another (but much more hacky) solution would be to always read to the end of the iterator in `LogSegment.recover`, but that seems pretty bad, using far more resources than is needed during recovery.
I can't think of any other reasonable solutions for now, but would love to hear input from the community.
We're happy doing the work of developing a patch, but thought we'd report the issue before starting down that path.