Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-14128

Kafka Streams terminates on topic check

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.0.0
    • 3.5.0, 3.4.1
    • streams
    • None
    • Any

    Description

      Our streams application shut down unexpectedly after some network issues that should have been easily recoverable.

      Logs:

       

      2022-07-29 13:39:37.854  INFO 25843 --- [348aefeff-admin] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-admin] Disconnecting from node 3 due to request timeout.
      2022-07-29 13:39:37.854  INFO 25843 --- [348aefeff-admin] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-admin] Cancelled in-flight METADATA request with correlation id 985 due to node 3 being disconnected (elapsed time since creation: 60023ms, elapsed time since send: 60023ms, request timeout: 30000ms)
      2022-07-29 13:39:37.867 ERROR 25843 --- [-StreamThread-1] o.a.k.s.p.i.InternalTopicManager         : stream-thread [main] Unexpected error during topic description for L.DII.A-COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog.
      Error message was: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeTopics, deadlineMs=1659101977830, tries=1, nextAllowedTryMs=1659101977955) timed out at 1659101977855 after 1 attempt(s)
      2022-07-29 13:39:37.869  INFO 25843 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN
      

      I think the relevant code is in https://github.com/apache/kafka/blob/31ff6d7f8af57e8c39061f31774a61bd1728904e/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java#L524

      topicFuture.getValue().get();

      without a timeout value cannot throw a TimeoutException, so the TimeoutException of the AdminClient will be an ExecutionException and hit the last else branch where the StreamsException is thrown.

      Possible fix:

      Use the KafkaFuture method with timeout:

      public abstract T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
      TimeoutException;

      instead of 

      public abstract T get() throws InterruptedException, ExecutionException;

       

      Attachments

        Issue Links

          Activity

            People

              Cerchie Lucia Cerchie
              pkleindl Patrik Kleindl
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: