Description
Hello,
we lost millions of messages because of this try/catch in the producer DefaultEventHandler:
https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala#L114-L116
If a Throwable is caught by this try/catch, the retry policy will have no effect and all yet-to-be-sent messages are lost (the error will break the loop over the broker list).
This issue is very hard to detect because: the producer (async or sync) cannot even catch the error, and all the metrics are updated as if everything was fine.
Only the abnormal drop in the producers network I/O, or the incoming message rate on the brokers; or the alerting on errors in producer logs could have revealed the issue.
This behavior was introduced by KAFKA-300. I can't see a good reason for it, so here is a patch that will let the retry-policy do its job when such a Throwable occurs.
Thanks in advance for your help.
Alexis
ps: you might wonder how could this try/catch ever caught something? DefaultEventHandler#groupMessagesToSet looks so harmless.
Here are the details:
We use Snappy compression. When the native snappy library is not installed on the host, Snappy, during the initialization of class org.xerial.snappy.Snappy will write a C library in the JVM temp directory java.io.tmpdir.
In our scenario, java.io.tmpdir was a subdirectory of /tmp. After an instance reboot (thank you AWS!), the JVM temp directory was removed. The JVM was then running with a non-existing temp dir. Snappy class would be impossible to initialize and the following message would be silently logged:
ERROR [2014-10-07 22:23:56,530] kafka.producer.async.DefaultEventHandler: Failed to send messages
! java.lang.NoClassDefFoundError: Could not initialize class org.xerial.snappy.Snappy