As a user of the Java producer, I want a predictable memory usage for the Kafka client so that I can ensure that my system is sized appropriately and will be stable even under heavy usage.
As a user of the Java producer, I want a smaller memory footprint so that my systems don't consume as many resources.
- Enabling Compression in Kafka should not significantly increase the memory usage of Kafka
- The memory usage of Kafka's Java Producer should be roughly in line with the buffer size (buffer.memory) and the number of producers declared.
I've observed high memory usage in the producer when enabling compression (gzip or lz4). I don't observe the behavior with compression off, but with it on I'll run out of heap (2GB). Using a Java profiler, I see the data is in the KafkaLZ4BlockOutputStream (or related class for gzip). I see that MemoryRecordsBuilder:closeForRecordAppends() is trying to deal with this, but is not successful. I'm most likely network bottlenecked, so I expect the producer buffers to be full while the job is running and potentially a lot of unacknowledged records.
I've tried using the default buffer.memory with 20 producers (across 20 threads) and sending data as quickly as I can. I've also tried 1MB of buffer.memory, which seemed to reduce memory consumption but I could still run OOM in certain cases. I have max.in.flight.requests.per.connection set to 1. In short, I should only have ~20 MB (20* 1MB) of data in buffers, but I can easily exhaust 2000 MB used by Kafka.
In looking at the code more, it looks like the KafkaLZ4BlockOutputStream doesn't clear the compressedBuffer or buffer when close() is called. In my heap dump, both of those are ~65k size each, meaning that each batch is taking up ~148k of space, of which 131k is buffers. (buffer.memory=1,000,000 and messages are 1k each until the batch fills).
Kafka tries to manage memory usage by calling MemoryRecordsBuilder:closeForRecordAppends(), which as documented as "Release resources required for record appends (e.g. compression buffers)". However, this method doesn't actually clear those buffers because KafkaLZ4BlockOutputStream.close() only writes the block and end mark and closes the output stream. It doesn't actually clear the buffer and compressedBuffer in KafkaLZ4BlockOutputStream. Those stay allocated in RAM until the block is acknowledged by the broker, processed in Sender:handleProduceResponse(), and the batch is deallocated. This memory usage therefore increases, possibly without bound. In my test program, the program died with approximately 345 unprocessed batches per producer (20 producers), despite having max.in.flight.requests.per.connection=1.
- Create a topic test with plenty of storage
- Use a connection with a very fast upload pipe and limited download. This allows the outbound data to go out, but acknowledgements to be delayed flowing in.
- Download KafkaSender.java (attached to this ticket)
- Set line 17 to reference your Kafka broker
- Run the program with a 1GB Xmx value
There are a few possible optimizations I can think of:
- We could declare KafkaLZ4BlockOutputStream.buffer and compressedBuffer as non-final and null them in the close() method
- We could declare the MemoryRecordsBuilder.appendStream non-final and null it in the closeForRecordAppends() method
- We could have the ProducerBatch discard the recordsBuilder in closeForRecordAppends(), however, this is likely a bad idea because the recordsBuilder contains significant metadata that is likely needed after the stream is closed. It is also final.
- We could try to limit the number of non-acknowledged batches in flight. This would bound the maximum memory usage but may negatively impact performance.
Fix #1 would only improve the LZ4 algorithm, and not any other algorithms.
Fix #2 would improve all algorithms, compression and otherwise. Of the 3 proposed here, it seems the best. This would also involve having to check appendStreamIsClosed in every usage of appendStream within MemoryRecordsBuilder to avoid NPE's.
Fix #4 is likely necessary if we want to bound the maximum memory usage of Kafka. Removing the buffers in Fix 1 or 2 will reduce the memory usage by ~90%, but theoretically there is still no limit.