Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-6502

Kafka streams deserialization handler not committing offsets on error records

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • None
    • 3.2.0
    • streams
    • None

    Description

      See this StackOverflow issue: https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler

      and this comment: https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler#comment84018564_48470899

       I am trying to use the LogAndContinueExceptionHandler on deserialization. It works fine when an error occurs by successfully logging and continuing. However, on a continuous stream of errors, it seems like these messages are not committed and on a restart of the application they reappear again.  It is more problematic if I try to send the messages in error to a DLQ. On a restart, they are sent again to DLQ. As soon as I have a good record coming in, it looks like the offset moves further and not seeing the already logged messages again after a restart. 

      I reproduced this behavior by running the sample provided here: https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java

      I changed the incoming value Serde to Serdes.Integer().getClass().getName() to force a deserialization error on input and reduced the commit interval to just 1 second. Also added the following to the config.

      streamsConfiguration.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);.

       It looks like when deserialization exceptions occur, this flag is never set to be true here: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L228. It only becomes true once processing succeeds. That might be the reason why commit is not happening even after I manually call processorContext#commit().

      Attachments

        Issue Links

          Activity

            People

              Gerrrr Alex Sorokoumov
              sobychacko Soby Chacko
              Matthias J. Sax Matthias J. Sax
              Votes:
              1 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: