Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-916

Deadlock between fetcher shutdown and handling partitions with error

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 0.8.0
    • 0.8.0
    • None
    • None

    Description

      Here is another consumer deadlock that we encountered. All consumers are
      vulnerable to this during a rebalance if there happen to be partitions in
      error.

      On a rebalance, the fetcher manager closes all fetchers and this holds on to
      the fetcher thread map's lock. (mapLock in AbstractFetcherManager). [t1]
      While the fetcher manager is iterating over fetchers to stop them, a fetcher
      that is yet to be stopped hits an error on a partition and proceeds to
      handle partitions with error [t2]. This handling involves looking up the
      fetcher for that partition and then removing it from the fetcher's set of
      partitions to consume. This requires grabbing the same map lock in [t1],
      hence the deadlock.

      [t1]
      2013-05-22_20:23:11.95767 "main" prio=10 tid=0x00007f1b24007800 nid=0x573b waiting on condition [0x00007f1b2bd38000]
      2013-05-22_20:23:11.95767 java.lang.Thread.State: WAITING (parking)
      2013-05-22_20:23:11.95767 at sun.misc.Unsafe.park(Native Method)
      2013-05-22_20:23:11.95767 - parking to wait for <0x00007f1a25780598> (a java.util.concurrent.CountDownLatch$Sync)
      2013-05-22_20:23:11.95767 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
      2013-05-22_20:23:11.95767 at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
      2013-05-22_20:23:11.95768 at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
      2013-05-22_20:23:11.95768 at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
      2013-05-22_20:23:11.95768 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
      2013-05-22_20:23:11.95768 at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
      2013-05-22_20:23:11.95769 at kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:68)
      2013-05-22_20:23:11.95769 at kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:79)
      2013-05-22_20:23:11.95769 at kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:78)
      2013-05-22_20:23:11.95769 at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
      2013-05-22_20:23:11.95769 at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
      2013-05-22_20:23:11.95770 at scala.collection.Iterator$class.foreach(Iterator.scala:631)
      2013-05-22_20:23:11.95770 at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
      2013-05-22_20:23:11.95770 at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
      2013-05-22_20:23:11.95770 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
      2013-05-22_20:23:11.95771 at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
      2013-05-22_20:23:11.95771 at kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:78)
      ---> 2013-05-22_20:23:11.95771 - locked <0x00007f1a2ae92510> (a java.lang.Object)
      2013-05-22_20:23:11.95771 at kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:156)
      2013-05-22_20:23:11.95771 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerConnector.scala:488)
      2013-05-22_20:23:11.95772 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:525)
      2013-05-22_20:23:11.95772 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:422)
      2013-05-22_20:23:11.95772 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:374)
      2013-05-22_20:23:11.95772 at scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)
      2013-05-22_20:23:11.95773 at scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265)
      2013-05-22_20:23:11.95773 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:369)
      2013-05-22_20:23:11.95773 - locked <0x00007f1a2a29b450> (a java.lang.Object)
      2013-05-22_20:23:11.95773 at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:680)
      2013-05-22_20:23:11.95774 at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.handleTopicEvent(ZookeeperConsumerConnector.scala:754)
      2013-05-22_20:23:11.95774 at kafka.consumer.ZookeeperTopicEventWatcher$ZkTopicEventListener.liftedTree1$1(ZookeeperTopicEventWatcher.scala:74)
      2013-05-22_20:23:11.95774 at kafka.consumer.ZookeeperTopicEventWatcher$ZkTopicEventListener.handleChildChange(ZookeeperTopicEventWatcher.scala:69)
      2013-05-22_20:23:11.95774 - locked <0x00007f1a2a69b1d8> (a java.lang.Object)
      2013-05-22_20:23:11.95774 at kafka.consumer.ZookeeperTopicEventWatcher.startWatchingTopicEvents(ZookeeperTopicEventWatcher.scala:46)
      2013-05-22_20:23:11.95775 at kafka.consumer.ZookeeperTopicEventWatcher.<init>(ZookeeperTopicEventWatcher.scala:33)
      2013-05-22_20:23:11.95775 at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:721)
      2013-05-22_20:23:11.95775 at kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:140)
      2013-05-22_20:23:11.95776 at kafka.tools.MirrorMaker$$anonfun$main$3.apply(MirrorMaker.scala:118)
      2013-05-22_20:23:11.95776 at kafka.tools.MirrorMaker$$anonfun$main$3.apply(MirrorMaker.scala:118)
      2013-05-22_20:23:11.95776 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
      2013-05-22_20:23:11.95776 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
      2013-05-22_20:23:11.95776 at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
      2013-05-22_20:23:11.95777 at scala.collection.immutable.List.foreach(List.scala:45)
      2013-05-22_20:23:11.95777 at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
      2013-05-22_20:23:11.95777 at scala.collection.immutable.List.map(List.scala:45)
      2013-05-22_20:23:11.95777 at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:118)
      2013-05-22_20:23:11.95777 at kafka.tools.MirrorMaker.main(MirrorMaker.scala)

      [t2]

      2013-05-22_20:23:11.87465 "ConsumerFetcherThread-xxxx-1369238724254-cff180ff-0-505" prio=10 tid=0x00007f196401a800 nid=0x717a waiting for monitor entry [0x00007f19bf0ef000]
      2013-05-22_20:23:11.87466 java.lang.Thread.State: BLOCKED (on object monitor)
      2013-05-22_20:23:11.87467 at kafka.server.AbstractFetcherManager.removeFetcher(AbstractFetcherManager.scala:57)
      ---> 2013-05-22_20:23:11.87467 - waiting to lock <0x00007f1a2ae92510> (a java.lang.Object)
      2013-05-22_20:23:11.87468 at kafka.consumer.ConsumerFetcherManager$$anonfun$addPartitionsWithError$2.apply(ConsumerFetcherManager.scala:170)
      2013-05-22_20:23:11.95682 at kafka.consumer.ConsumerFetcherManager$$anonfun$addPartitionsWithError$2.apply(ConsumerFetcherManager.scala:170)
      2013-05-22_20:23:11.95683 at scala.collection.mutable.HashSet.foreach(HashSet.scala:61)
      2013-05-22_20:23:11.95684 at kafka.consumer.ConsumerFetcherManager.addPartitionsWithError(ConsumerFetcherManager.scala:170)
      2013-05-22_20:23:11.95684 at kafka.consumer.ConsumerFetcherThread.handlePartitionsWithErrors(ConsumerFetcherThread.scala:69)
      2013-05-22_20:23:11.95684 at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:168)
      2013-05-22_20:23:11.95684 at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
      2013-05-22_20:23:11.95684 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
      2013-05-22_20:23:11.95686
      2013-05-22_20:23:11.95686 "main-EventThread" daemon prio=10 tid=0x00007f1b2471d000 nid=0x605a waiting on condition [0x00007f19bedec000]
      2013-05-22_20:23:11.95686 java.lang.Thread.State: WAITING (parking)
      2013-05-22_20:23:11.95686 at sun.misc.Unsafe.park(Native Method)
      2013-05-22_20:23:11.95686 - parking to wait for <0x00007f1a2a4426f8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      2013-05-22_20:23:11.95687 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
      2013-05-22_20:23:11.95687 at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
      2013-05-22_20:23:11.95687 at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
      2013-05-22_20:23:11.95687 at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:503)

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned
            jjkoshy Joel Jacob Koshy
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment