Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-4740

Using new consumer API with a Deserializer that throws SerializationException can lead to infinite loop

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Critical
    • Resolution: Unresolved
    • Affects Version/s: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1
    • Fix Version/s: None
    • Component/s: clients, consumer
    • Labels:
      None
    • Environment:
      Kafka broker 0.10.1.1 (but this bug is not dependent on the broker version)
      Kafka clients 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1

      Description

      The old consumer supports deserializing records into typed objects and throws a SerializationException through MessageAndMetadata#key() and MessageAndMetadata#message() that can be catched by the client [1].

      When using the new consumer API with kafka-clients version < 0.10.0.1, such the exception is swallowed by the NetworkClient class and result in an infinite loop which the client has no control over like:

      DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset for partition test2-0 to earliest offset.
      DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetched offset 0 for partition test2-0
      ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request completion:
      org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4
      ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request completion:
      org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4
      ...
      

      Thanks to KAFKA-3977, this has been partially fixed in 0.10.1.0 but another issue still remains.
      Indeed, the client can now catch the SerializationException but the next call to Consumer#poll(long) will throw the same exception indefinitely.

      The following snippet (full example available on Github [2] for most released kafka-clients versions):

      try (KafkaConsumer<String, Integer> kafkaConsumer = new KafkaConsumer<>(consumerConfig, new StringDeserializer(), new IntegerDeserializer())) {
          kafkaConsumer.subscribe(Arrays.asList("topic"));
      
          // Will run till the shutdown hook is called
          while (!doStop) {
              try {
                  ConsumerRecords<String, Integer> records = kafkaConsumer.poll(1000);
                  if (!records.isEmpty()) {
                      logger.info("Got {} messages", records.count());
                      for (ConsumerRecord<String, Integer> record : records) {
                          logger.info("Message with partition: {}, offset: {}, key: {}, value: {}",
                          record.partition(), record.offset(), record.key(), record.value());
      
                      }
                  } else {
                          logger.info("No messages to consume");
                  }
              } catch (SerializationException e) {
                  logger.warn("Failed polling some records", e);
              }
           }
      }
      

      when run with the following records (third record has an invalid Integer value):

          printf "\x00\x00\x00\x00\n" | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic
          printf "\x00\x00\x00\x01\n" | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic
          printf "\x00\x00\x00\n"     | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic
          printf "\x00\x00\x00\x02\n" | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic
      

      will produce the following logs:

      INFO  consumer.Consumer - Got 2 messages
      INFO  consumer.Consumer - Message with partition: 0, offset: 0, key: null, value: 0
      INFO  consumer.Consumer - Message with partition: 0, offset: 1, key: null, value: 1
      WARN  consumer.Consumer - Failed polling some records
      org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition topic-0 at offset 2
      Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4
      WARN  consumer.Consumer - Failed polling some records
      org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition topic-0 at offset 2
      Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4
      WARN  consumer.Consumer - Failed polling some records
      org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition topic-0 at offset 2
      Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4
      ...
      

      I don't believe committing offsets would help and even if it did this could potentially result in a few well formed records not being consumed from that ConsumerRecords batch (data loss).

      I have only seen a few mentions of this bug online [3] but I believe this is a critical issue as the new consumer API is not in beta anymore yet if you do not control producers (that can inject malformed values) or you use some advanced deserializer that throws such exception (e.g. schema-registry KafkaAvroDeserializer) then you can end up blocking a consumer from advancing in the stream.

      Current workarounds:

      • use a Deserializer that do not throw a SerializationException (e.g. ByteArrayDeserializer, StringDeserializer)
      • wrap the Deserializer to catch and log the SerializationException but return null and then check for null in the client code (that's what we use on top of KafkaAvroDeserializer in case there is an issue reaching the schema registry or the Avro datum is either invalid or not compatible with the reader's schema for some reason)

      Potential solutions:

      1. continue to throw SerializationException when calling Consumer#poll(long) but skip that malformed record on next Consumer#poll(long)
      2. do not throw SerializationException when calling Consumer#poll(long) but expose information about invalid records in ConsumerRecords
      3. do not throw SerializationException when calling Consumer#poll(long) but store the exception(s) in the ConsumerRecord object record so that it is rethrown on ConsumerRecord#key() and ConsumerRecord#value()
      4. do not deserialize records during Consumer#poll() but do it when calling ConsumerRecord#key() and ConsumerRecord#value() (similar to the old consumer)

      I believe any of those solutions breaks compatibility semantic wise but not necessary binary compatibility as the SerializationException is a RuntimeException so it could be "moved around".
      My preference goes to the two last ones and I would be happy to contribute such a change as well as update the documentation on SerializationException to reflect that it is not only used for serializing records.

      [1] https://github.com/apache/kafka/blob/0.8.2.2/core/src/main/scala/kafka/message/MessageAndMetadata.scala
      [1] http://docs.confluent.io/2.0.0/schema-registry/docs/serializer-formatter.html#serializer
      [2] https://github.com/slaunay/kafka-consumer-serialization-exception-example
      [3] https://groups.google.com/forum/#!topic/kafka-clients/KBSPmY69H94

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                yabon S├ębastien Launay
                Reporter:
                yabon S├ębastien Launay
              • Votes:
                14 Vote for this issue
                Watchers:
                29 Start watching this issue

                Dates

                • Created:
                  Updated: