Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-6043

Kafka 8.1.1 - .ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:110) blocked

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Auto Closed
    • 0.8.1.1
    • None
    • clients
    • None

    Description

      we are using 3 node kafka cluster. Around this kafka cluster we have a RESTful service which provides http APIs for client. This service maintains the consumer connection in cache. And this cache is set to expire in 60 minutes after which , the consumer connection will get disconnected.
      But we see this zookeeperconnection thread is blocked and the consumer object is still hanging in jvm. Can you pls let me know if there is any solution identified for this

      Below is the output from thread dump when this occurred
      pool-25100-thread-1" prio=10 tid=0x00007f711804e820 nid=0x6726 waiting for monitor entry [0x00007f6cd999b000]
      java.lang.Thread.State: BLOCKED (on object monitor)
      at kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:161)

      • waiting to lock <0x000000076a922c40> (a java.lang.Object)
        at kafka.javaapi.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:110)
        at com.att.nsa.cambria.backends.kafka.KafkaConsumer$2.call(KafkaConsumer.java:207)
        at com.att.nsa.cambria.backends.kafka.KafkaConsumer$2.call(KafkaConsumer.java:201)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)

      T--APPC-LCM-READ-E2E_T2_watcher_executor" prio=10 tid=0x00007f7180198030 nid=0x55a2 waiting on condition [0x00007f6c9f4fa000]
      java.lang.Thread.State: WAITING (parking)
      at sun.misc.Unsafe.park(Native Method)

      • parking to wait for <0x000000076b2e1508> (a java.util.concurrent.CountDownLatch$Sync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
        at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236)
        at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
        at kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71)
        at kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121)
        at kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120)
      • locked <0x000000076a922340> (a java.lang.Object)
        at kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148)
        at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerConnector.scala
        :524)
        at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:562)
        at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:457)
        at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:408)
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
        at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:402)
      • locked <0x000000076a922c40> (a java.lang.Object)
        at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:355)

      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      at java.lang.Thread.run(Thread.java:745)

      Attachments

        1. stdout.LRMIID-158150.zip
          4.71 MB
          Ramkumar
        2. 6043.v1
          0.6 kB
          Ted Yu

        Activity

          People

            Unassigned Unassigned
            ram_amb@yahoo.com Ramkumar
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: