We recently see a few cases where RecordTooLargeException is thrown because the compressed message sent by KafkaProducer exceeded the max message size.
The root cause of this issue is because the compressor is estimating the batch size using an estimated compression ratio based on heuristic compression ratio statistics. This does not quite work for the traffic with highly variable compression ratios.
For example, if the batch size is set to 1MB and the max message size is 1MB. Initially a the producer is sending messages (each message is 1MB) to topic_1 whose data can be compressed to 1/10 of the original size. After a while the estimated compression ratio in the compressor will be trained to 1/10 and the producer would put 10 messages into one batch. Now the producer starts to send messages (each message is also 1MB) to topic_2 whose message can only be compress to 1/5 of the original size. The producer would still use 1/10 as the estimated compression ratio and put 10 messages into a batch. That batch would be 2 MB after compression which exceeds the maximum message size. In this case the user do not have many options other than resend everything or close the producer if they care about ordering.
This is especially an issue for services like MirrorMaker whose producer is shared by many different topics.
To solve this issue, we can probably add a configuration "enable.compression.ratio.estimation" to the producer. So when this configuration is set to false, we stop estimating the compressed size but will close the batch once the uncompressed bytes in the batch reaches the batch size.