Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-13476

Streams crashes when non Base64 Offset Metadata is found

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 2.4.0
    • 3.2.0
    • streams
    • None

    Description

      Kafka Streams applications use the metadata stored with the committed offsets from previous running instances to extract timestamps.

      But when the metadata field contains other data the Base64 decoder will throw an exception causing the Streams application to fail.
      A new Offset commit is then required to stop this failure.

      I've included the part of the log when we started a Kafka Streams app after setting the offsets using a third party tool. This tool adds some tracing metadata so developers and operators could debug who performed this custom offset commit.

       

      2021-11-16 12:56:36.020  INFO 25 --- [-StreamThread-2] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=example-app-3, groupId=axual-demo-example-example-app] Unsubscribed all topics or patterns and assigned partitions
      	at java.base/java.util.Base64$Decoder.decode(Unknown Source) ~[na:na]
      	at org.apache.kafka.streams.processor.internals.StreamTask.decodeTimestamp(StreamTask.java:1039) ~[kafka-streams-2.7.0.jar:na]
      	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) ~[kafka-streams-2.7.0.jar:na]
      	at org.apache.kafka.streams.processor.internals.StreamTask.initializeTaskTime(StreamTask.java:837) ~[kafka-streams-2.7.0.jar:na]
      java.lang.IllegalArgumentException: Illegal base64 character 7b
      	at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:728) ~[kafka-streams-2.7.0.jar:na]
      	at org.apache.kafka.streams.processor.internals.StreamTask.initializeMetadata(StreamTask.java:818) ~[kafka-streams-2.7.0.jar:na]
      2021-11-16 12:56:36.127 ERROR 25 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams    : stream-client [streams-example-app-1] All stream threads have died. The instance will be in error state and should be closed.
      	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) ~[kafka-streams-2.7.0.jar:na]
      java.lang.IllegalArgumentException: Illegal base64 character 7b
      

      I recommend adding a Try Catch block around the Base64 decode in the StreamTask.decodeTimestamp method and return the Unknown value when this occurs.
      This is pure for resilience when bad data is encountered.
      After the Streams application performs a new offset commit the error should not occur again, limiting the change of frequently occurring warnings in the logs

      I've already made the changes and added a test for this issue, as I would like to contribute to Kafka.

      Attachments

        Issue Links

          Activity

            People

              RBosch81 Richard Bosch
              RBosch81 Richard Bosch
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - 0.5h
                  0.5h
                  Remaining:
                  Remaining Estimate - 0.5h
                  0.5h
                  Logged:
                  Time Spent - Not Specified
                  Not Specified