Affects Version/s: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1
Fix Version/s: None
Environment:Kafka broker 0.10.1.1 (but this bug is not dependent on the broker version)
Kafka clients 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1
The old consumer supports deserializing records into typed objects and throws a SerializationException through MessageAndMetadata#key() and MessageAndMetadata#message() that can be catched by the client .
When using the new consumer API with kafka-clients version < 0.10.0.1, such the exception is swallowed by the NetworkClient class and result in an infinite loop which the client has no control over like:
KAFKA-3977, this has been partially fixed in 0.10.1.0 but another issue still remains.
Indeed, the client can now catch the SerializationException but the next call to Consumer#poll(long) will throw the same exception indefinitely.
The following snippet (full example available on Github  for most released kafka-clients versions):
when run with the following records (third record has an invalid Integer value):
will produce the following logs:
I don't believe committing offsets would help and even if it did this could potentially result in a few well formed records not being consumed from that ConsumerRecords batch (data loss).
I have only seen a few mentions of this bug online  but I believe this is a critical issue as the new consumer API is not in beta anymore yet if you do not control producers (that can inject malformed values) or you use some advanced deserializer that throws such exception (e.g. schema-registry KafkaAvroDeserializer) then you can end up blocking a consumer from advancing in the stream.
- use a Deserializer that do not throw a SerializationException (e.g. ByteArrayDeserializer, StringDeserializer)
- wrap the Deserializer to catch and log the SerializationException but return null and then check for null in the client code (that's what we use on top of KafkaAvroDeserializer in case there is an issue reaching the schema registry or the Avro datum is either invalid or not compatible with the reader's schema for some reason)
- continue to throw SerializationException when calling Consumer#poll(long) but skip that malformed record on next Consumer#poll(long)
- do not throw SerializationException when calling Consumer#poll(long) but expose information about invalid records in ConsumerRecords
- do not throw SerializationException when calling Consumer#poll(long) but store the exception(s) in the ConsumerRecord object record so that it is rethrown on ConsumerRecord#key() and ConsumerRecord#value()
- do not deserialize records during Consumer#poll() but do it when calling ConsumerRecord#key() and ConsumerRecord#value() (similar to the old consumer)
I believe any of those solutions breaks compatibility semantic wise but not necessary binary compatibility as the SerializationException is a RuntimeException so it could be "moved around".
My preference goes to the two last ones and I would be happy to contribute such a change as well as update the documentation on SerializationException to reflect that it is not only used for serializing records.