Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-6799

Consumer livelock during consumer group rebalance

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Information Provided
    • 0.11.0.2, 1.0.0, 1.1.0
    • None
    • clients, consumer
    • None

    Description

      We have the following environment:

      • 1 kafka cluster with 3 brokers
      • 1 topic with 3 partitions
      • 1 producer
      • 1 consumer group with 3 consumers

      From this setup, we remove one broker from the cluster, the hard way, by simply killing it. Quite often, we see that the consumer group is not rebalanced correctly. By that I mean that all 3 consumers stop consuming and get stuck in a loop, forever.

      The thread dump shows that the consumer threads aren't blocked but run forever in AbstractCoordinator.ensureCoordinatorReady, holding a lock due to the synchonized keyword on the calling method. Heartbeat threads are blocked, waiting for the consumer threads to release the lock. This situation prevents all consumers from consuming any more record.

      We build a simple project which seems to reliably demonstrate this:

      $ mkdir -p /tmp/sandbox && cd /tmp/sandbox
      $ git clone https://github.com/phdezann/helloworld-kafka-livelock
      $ cd helloworld-kafka-livelock && ./spin.sh
      ...
      livelock detected
      
      Consumer thread
      "kafka-consumer-1@10733" daemon prio=5 tid=0x31 nid=NA runnable
        java.lang.Thread.State: RUNNABLE
      	 blocks kafka-coordinator-heartbeat-thread | helloWorldGroup@10728
      	  at sun.nio.ch.EPollArrayWrapper.epollWait(EPollArrayWrapper.java:-1)
      	  at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
      	  at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
      	  at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
      	  - locked <0x2a15> (a sun.nio.ch.EPollSelectorImpl)
      	  - locked <0x2a16> (a java.util.Collections$UnmodifiableSet)
      	  - locked <0x2a17> (a sun.nio.ch.Util$3)
      	  at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
      	  at org.apache.kafka.common.network.Selector.select(Selector.java:684)
      	  at org.apache.kafka.common.network.Selector.poll(Selector.java:408)
      	  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
      	  at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:261)
      	  at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
      	  at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
      	  at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:156)
      	  at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:228)
      	  - locked <0x2a0c> (a org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
      	  at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:205)
      	  at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:279)
      	  at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1149)
      	  at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
      	  at org.helloworld.kafka.bus.HelloWorldKafkaListener.lambda$createConsumerInDedicatedThread$0(HelloWorldKafkaListener.java:45)
      	  at org.helloworld.kafka.bus.HelloWorldKafkaListener$$Lambda$42.1776656466.run(Unknown Source:-1)
      	  at java.lang.Thread.run(Thread.java:748)
      
      Heartbeat thread
      "kafka-coordinator-heartbeat-thread | helloWorldGroup@10728" daemon prio=5 tid=0x36 nid=NA waiting for monitor entry
        java.lang.Thread.State: BLOCKED
      	 waiting for kafka-consumer-1@10733 to release lock on <0x2a0c> (a org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
      	  at java.lang.Object.wait(Object.java:-1)
      	  at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:955)
      

      Attachments

        Activity

          People

            asasvari Attila Sasvári
            phdezann Pierre-Henri Dezanneau
            Votes:
            3 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: