Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-13002

listOffsets must downgrade immediately for non MAX_TIMESTAMP specs

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • None
    • 3.0.0
    • streams
    • None

    Description

      Note: this is not a report against a released version of AK. It seems to be a problem on the trunk development branch only.

      After deploying our soak test against `trunk/HEAD` on Friday, I noticed that Streams is no longer processing:

      I found this stacktrace in the logs during startup:

      5075 [2021-06-25T16:50:44-05:00] (streams-soak-trunk-ccloud-alos_soak_i-0691913411e8c77c3_streamslog) [2021-06-25 21:50:44,499] WARN [i-0691913411e8c77c3-StreamThread-1] The listOffsets request failed. (org.apache.kafka.streams.processor.internals.ClientUtils)
       5076 [2021-06-25T16:50:44-05:00] (streams-soak-trunk-ccloud-alos_soak_i-0691913411e8c77c3_streamslog) java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support LIST_OFFSETS with version in range [7,7].       The supported range is [0,6].
       5077         at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
       5078         at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
       5079         at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
       5080         at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
       5081         at org.apache.kafka.streams.processor.internals.ClientUtils.getEndOffsets(ClientUtils.java:147)
       5082         at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.populateClientStatesMap(StreamsPartitionAssignor.java:643)
       5083         at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToClients(StreamsPartitionAssignor.java:579)
       5084         at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:387)
       5085         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:589)
       5086         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:689)
       5087         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:111)
       5088         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:593)
       5089         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:556)
       5090         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1178)
       5091         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1153)
       5092         at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
       5093         at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
       5094         at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
       5095         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602)
       5096         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412)
       5097         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
       5098         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
       5099         at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1297)
       5100         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
       5101         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
       5102         at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:932)
       5103         at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:885)
       5104         at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:720)
       5105         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
       5106         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:555) 

      Just eyeballing the recent commits, I'm guessing it was due to https://github.com/apache/kafka/commit/bd72ef1bf1e40feb3bc17349a385b479fa5fa530 . It looks like that code sets the initial "minimum version" to 7, but then should back off into compatibility mode. Therefore, maybe that stacktrace is expected (though it's not great UX regardless).

      However, it does not seem like Streams is actually able to back off. The next thing I see is:

      [2021-06-25T16:50:44-05:00] (streams-soak-trunk-ccloud-alos_soak_i-0691913411e8c77c3_streamslog) [2021-06-25 21:50:44,507] WARN [i-0691913411e8c77c3-StreamThread-1] Task 3_2 had endOffsetSum=-3 smaller than offsetSum=0 on member 24e46b47-0a01-4b57-9d15-771482869097. This probably means the task is corrupted, which in turn indicates that it will need to restore from scratch if it gets assigned. The assignor will de-prioritize returning this task to this member in the hopes that some other member may be able to re-use its state. (org.apache.kafka.streams.processor.internals.assignment.ClientState) 

      Which is itself a problem. It looks like there's a sentinel "-3" value returned as the end offset, but since that value is lower than any real endOffset Streams will have book-kept, Streams will assume that all its local state is corrupt. The result is that Streams will delete all its local state and rebuild from the changelog. This isn't an ideal behavior on restart.

      Finally, I never actually see Streams able to proceed with processing. The only thing it logs after this point (as far as I can tell) is:

      [2021-06-25T16:50:54-05:00] (streams-soak-trunk-ccloud-alos_soak_i-0691913411e8c77c3_streamslog) [2021-06-25 21:50:52,463] INFO [i-0691913411e8c77c3-StreamThread-1] stream-thread [i-0691913411e8c77c3-StreamThread-1] End offset for changelog stream-soak-test-trunk-ccloud-alos--KSTREAM-AGGREGATE-STATE-STORE-0000000013-changelog-0 cannot be found; will retry in the next time. (org.apache.kafka.streams.processor.internals.StoreChangelogReader) 

      So, it seems the version backoff simply isn't working.

      Obviously, we'll need to fix these problems before we can release 3.0

      Attachments

        1. soaks.png
          49 kB
          John Roesler

        Issue Links

          Activity

            People

              tom@confluent.io Tom Scott
              vvcephei John Roesler
              David Jacot David Jacot
              Votes:
              0 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: