Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-6502

Kafka streams deserialization handler not committing offsets on error records

Agile BoardAttach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • None
    • 3.2.0
    • streams
    • None

    Description

      See this StackOverflow issue: https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler

      and this comment: https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler#comment84018564_48470899

       I am trying to use the LogAndContinueExceptionHandler on deserialization. It works fine when an error occurs by successfully logging and continuing. However, on a continuous stream of errors, it seems like these messages are not committed and on a restart of the application they reappear again.  It is more problematic if I try to send the messages in error to a DLQ. On a restart, they are sent again to DLQ. As soon as I have a good record coming in, it looks like the offset moves further and not seeing the already logged messages again after a restart. 

      I reproduced this behavior by running the sample provided here: https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java

      I changed the incoming value Serde to Serdes.Integer().getClass().getName() to force a deserialization error on input and reduced the commit interval to just 1 second. Also added the following to the config.

      streamsConfiguration.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);.

       It looks like when deserialization exceptions occur, this flag is never set to be true here: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L228. It only becomes true once processing succeeds. That might be the reason why commit is not happening even after I manually call processorContext#commit().

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Gerrrr Alex Sorokoumov
            sobychacko Soby Chacko
            Matthias J. Sax Matthias J. Sax
            Votes:
            1 Vote for this issue
            Watchers:
            7 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment