Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-6587

Kafka Streams hangs when not able to access internal topics



    • Type: Bug
    • Status: Resolved
    • Priority: Minor
    • Resolution: Fixed
    • Affects Version/s: 1.0.0
    • Fix Version/s: 2.1.0
    • Component/s: security, streams
    • Labels:


      Expectation: Kafka Streams client will throw an exception, log errors, or crash when a fatal error occurs.

      Observation: Kafka Streams does not log an error or throw an exception when necessary permissions for internal state store topics are not granted. It will hang indefinitely and not start running the topology.

      Steps to reproduce:

      1. Create a Kafka Cluster with ACLs enabled (allow.everyone.if.no.acl.found should be set to false, or deny permissions must be set on the intermediate topics).
      2. Create a simple streams application that does a stateful operation such as count.
      3. Grant ACLs on source and sink topics to principal used for testing (would recommend using ANONYMOUS user if possible for ease of testing).
      4. Grant ACLs for consumer group and cluster create. Add deny permissions to state store topics if the default is "allow". You can run the application to create the topics or use the toplogy describe method to get the names.
      5. Run streams application. It should hang on "(Re-)joining group" with no errors printed.

      Detailed Explanation

      I spent some time trying to figure out what was wrong with my streams app. I'm using ACLs on my Kafka cluster and it turns out I forgot to grant read/write access to the internal topic state store for an aggregation.

      The streams client would hang on "(Re-)joining group" until killed (note ^C is ctrl+c, which I used to kill the app): 

      10:29:10.064 [kafka-consumer-client-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=kafka-consumer-client-StreamThread-1-consumer, groupId=kafka-consumer-test] Discovered coordinator localhost:9092 (id: 2147483647 rack: null)
      10:29:10.105 [kafka-consumer-client-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=kafka-consumer-client-StreamThread-1-consumer, groupId=kafka-consumer-test] Revoking previously assigned partitions []
      10:29:10.106 [kafka-consumer-client-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [kafka-consumer-client-StreamThread-1] State transition from RUNNING to PARTITIONS_REVOKED
      10:29:10.106 [kafka-consumer-client-StreamThread-1] INFO org.apache.kafka.streams.KafkaStreams - stream-client [kafka-consumer-client]State transition from RUNNING to REBALANCING
      10:29:10.107 [kafka-consumer-client-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [kafka-consumer-client-StreamThread-1] partition revocation took 1 ms.
      suspended active tasks: []
      suspended standby tasks: []
      10:29:10.107 [kafka-consumer-client-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=kafka-consumer-client-StreamThread-1-consumer, groupId=kafka-consumer-test] (Re-)joining group
      10:34:53.609 [Thread-3] INFO org.apache.kafka.streams.KafkaStreams - stream-client [kafka-consumer-client]State transition from REBALANCING to PENDING_SHUTDOWN
      10:34:53.610 [Thread-3] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [kafka-consumer-client-StreamThread-1] Informed to shut down
      10:34:53.610 [Thread-3] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [kafka-consumer-client-StreamThread-1] State transition from PARTITIONS_REVOKED to PENDING_SHUTDOWN

      The server log would show:

      [2018-02-23 10:29:10,408] INFO [Partition kafka-consumer-test-KSTREAM-AGGREGATE-STATE-STORE-0000000005-changelog-0 broker=0] kafka-consumer-test-KSTREAM-AGGREGATE-STATE-STORE-0000000005-changelog-0 starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition)
      [2018-02-23 10:29:20,143] INFO [GroupCoordinator 0]: Member kafka-consumer-client-StreamThread-1-consumer-f86e4ca8-4c
      45-4883-bdaa-2383193eabbe in group kafka-consumer-test has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
      [2018-02-23 10:29:20,143] INFO [GroupCoordinator 0]: Preparing to rebalance group kafka-consumer-test with old generation 1 (__consumer_offsets-23) (kafka.coordinator.group.GroupCoordinator)
      [2018-02-23 10:29:20,143] INFO [GroupCoordinator 0]: Group kafka-consumer-test with generation 2 is now empty (__consumer_offsets-23) (kafka.coordinator.group.GroupCoordinator)
      [2018-02-23 10:31:23,448] INFO [GroupMetadataManager brokerId=0] Group kafka-consumer-test transitioned to Dead in generation 2 (kafka.coordinator.group.GroupMetadataManager)

      In this example, the internal topic was created. If the internal topic already exists, it will try to create it again and fail with a "topic already exists" exception (shown in the server log, not the client).

      The streams client then just remains stuck indefinitely. No errors or warnings are printed, and it does not seem to actually shutdown at any point.


          Issue Links



              • Assignee:
                tedyu Zhihong Yu
                crmedved Chris Medved
              • Votes:
                0 Vote for this issue
                3 Start watching this issue


                • Created: