As discovered in https://issues.apache.org/jira/browse/KAFKA-3565, the current default block size (1K) used in Snappy and GZIP may cause a sub-optimal compression ratio for Snappy, and hence reduce throughput. Because we no longer recompress data in the broker, it also impacts what gets stored on disk.
A solution might be to use the default block size, which is 64K in LZ4, 32K in Snappy and 0.5K in GZIP. The downside is that this solution will require more memory allocated outside of the buffer pool and hence users may need to bump up their JVM heap size, especially for MirrorMakers. Using Snappy as an example, it's an additional 2x32k per batch (as Snappy uses two buffers) and one would expect at least one batch per partition. However, the number of batches per partition can be much higher if the broker is slow to acknowledge producer requests (depending on `buffer.memory`, `batch.size`, message size, etc.).
Given the above, there are a few things that could be done (potentially more than one):
1) A configuration for the producer compression stream buffer size.
2) Allocate buffers from the buffer pool and pass them to the compression library. This is possible with Snappy and we could adapt our LZ4 code. It's not possible with GZIP, but it uses a very small buffer by default.
3) DONE: Close the existing `RecordBatch.records` when we create a new `RecordBatch` for the `TopicPartition` instead of doing it during `RecordAccumulator.drain`. This would mean that we would only retain resources for one `RecordBatch` per partition, which would improve the worst case scenario significantly.
Note that we decided that this change was too risky for 0.10.0.0 and reverted the original attempt.