Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-1716

hang during shutdown of ZookeeperConsumerConnector

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Auto Closed
    • 0.8.1.1
    • None
    • consumer
    • None

    Description

      It appears to be possible for ZookeeperConsumerConnector.shutdown() to wedge in the case that some consumer fetcher threads receive messages during the shutdown process.

      Shutdown thread:

          -- Parking to wait for: java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0
          at jrockit/vm/Locks.park0(J)V(Native Method)
          at jrockit/vm/Locks.park(Locks.java:2230)
          at sun/misc/Unsafe.park(ZJ)V(Native Method)
          at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
          at java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
          at java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
          at java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
          at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207)
          at kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36)
          at kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71)
          at kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121)
          at kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120)
          at scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
          at scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
          at scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
          at scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226)
          at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39)
          at scala/collection/mutable/HashMap.foreach(HashMap.scala:98)
          at scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
          at kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120)
          ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock]
          at kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148)
          at kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171)
          at kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167)

      ConsumerFetcherThread:

          -- Parking to wait for: java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568
          at jrockit/vm/Locks.park0(J)V(Native Method)
          at jrockit/vm/Locks.park(Locks.java:2230)
          at sun/misc/Unsafe.park(ZJ)V(Native Method)
          at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
          at java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
          at java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
          at kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
          at kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
          at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
          at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
          at scala/collection/immutable/HashMap$HashMap1.foreach(HashMap.scala:224)
          at scala/collection/immutable/HashMap$HashTrieMap.foreach(HashMap.scala:403)
          at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
          at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
          at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
          at kafka/utils/Utils$.inLock(Utils.scala:538)
          at kafka/server/AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
          at kafka/server/AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
          at kafka/utils/ShutdownableThread.run(ShutdownableThread.scala:51)
          at jrockit/vm/RNI.c2java(JJJJJ)V(Native Method)

      Attachments

        1. after-shutdown.log
          128 kB
          Ashwin Jayaprakash
        2. before-shutdown.log
          142 kB
          Ashwin Jayaprakash
        3. kafka-shutdown-stuck.log
          37 kB
          Ashwin Jayaprakash

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            nehanarkhede Neha Narkhede
            sfay Sean Fay
            Votes:
            3 Vote for this issue
            Watchers:
            16 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment