Kafka
  1. Kafka
  2. KAFKA-916

Deadlock between fetcher shutdown and handling partitions with error

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.8.0
    • Component/s: None
    • Labels:
      None

      Description

      Here is another consumer deadlock that we encountered. All consumers are
      vulnerable to this during a rebalance if there happen to be partitions in
      error.

      On a rebalance, the fetcher manager closes all fetchers and this holds on to
      the fetcher thread map's lock. (mapLock in AbstractFetcherManager). [t1]
      While the fetcher manager is iterating over fetchers to stop them, a fetcher
      that is yet to be stopped hits an error on a partition and proceeds to
      handle partitions with error [t2]. This handling involves looking up the
      fetcher for that partition and then removing it from the fetcher's set of
      partitions to consume. This requires grabbing the same map lock in [t1],
      hence the deadlock.

      [t1]
      2013-05-22_20:23:11.95767 "main" prio=10 tid=0x00007f1b24007800 nid=0x573b waiting on condition [0x00007f1b2bd38000]
      2013-05-22_20:23:11.95767 java.lang.Thread.State: WAITING (parking)
      2013-05-22_20:23:11.95767 at sun.misc.Unsafe.park(Native Method)
      2013-05-22_20:23:11.95767 - parking to wait for <0x00007f1a25780598> (a java.util.concurrent.CountDownLatch$Sync)
      2013-05-22_20:23:11.95767 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
      2013-05-22_20:23:11.95767 at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
      2013-05-22_20:23:11.95768 at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
      2013-05-22_20:23:11.95768 at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
      2013-05-22_20:23:11.95768 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
      2013-05-22_20:23:11.95768 at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
      2013-05-22_20:23:11.95769 at kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:68)
      2013-05-22_20:23:11.95769 at kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:79)
      2013-05-22_20:23:11.95769 at kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:78)
      2013-05-22_20:23:11.95769 at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
      2013-05-22_20:23:11.95769 at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
      2013-05-22_20:23:11.95770 at scala.collection.Iterator$class.foreach(Iterator.scala:631)
      2013-05-22_20:23:11.95770 at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
      2013-05-22_20:23:11.95770 at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
      2013-05-22_20:23:11.95770 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
      2013-05-22_20:23:11.95771 at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
      2013-05-22_20:23:11.95771 at kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:78)
      ---> 2013-05-22_20:23:11.95771 - locked <0x00007f1a2ae92510> (a java.lang.Object)
      2013-05-22_20:23:11.95771 at kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:156)
      2013-05-22_20:23:11.95771 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerConnector.scala:488)
      2013-05-22_20:23:11.95772 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:525)
      2013-05-22_20:23:11.95772 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:422)
      2013-05-22_20:23:11.95772 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:374)
      2013-05-22_20:23:11.95772 at scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)
      2013-05-22_20:23:11.95773 at scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265)
      2013-05-22_20:23:11.95773 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:369)
      2013-05-22_20:23:11.95773 - locked <0x00007f1a2a29b450> (a java.lang.Object)
      2013-05-22_20:23:11.95773 at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:680)
      2013-05-22_20:23:11.95774 at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.handleTopicEvent(ZookeeperConsumerConnector.scala:754)
      2013-05-22_20:23:11.95774 at kafka.consumer.ZookeeperTopicEventWatcher$ZkTopicEventListener.liftedTree1$1(ZookeeperTopicEventWatcher.scala:74)
      2013-05-22_20:23:11.95774 at kafka.consumer.ZookeeperTopicEventWatcher$ZkTopicEventListener.handleChildChange(ZookeeperTopicEventWatcher.scala:69)
      2013-05-22_20:23:11.95774 - locked <0x00007f1a2a69b1d8> (a java.lang.Object)
      2013-05-22_20:23:11.95774 at kafka.consumer.ZookeeperTopicEventWatcher.startWatchingTopicEvents(ZookeeperTopicEventWatcher.scala:46)
      2013-05-22_20:23:11.95775 at kafka.consumer.ZookeeperTopicEventWatcher.<init>(ZookeeperTopicEventWatcher.scala:33)
      2013-05-22_20:23:11.95775 at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:721)
      2013-05-22_20:23:11.95775 at kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:140)
      2013-05-22_20:23:11.95776 at kafka.tools.MirrorMaker$$anonfun$main$3.apply(MirrorMaker.scala:118)
      2013-05-22_20:23:11.95776 at kafka.tools.MirrorMaker$$anonfun$main$3.apply(MirrorMaker.scala:118)
      2013-05-22_20:23:11.95776 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
      2013-05-22_20:23:11.95776 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
      2013-05-22_20:23:11.95776 at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
      2013-05-22_20:23:11.95777 at scala.collection.immutable.List.foreach(List.scala:45)
      2013-05-22_20:23:11.95777 at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
      2013-05-22_20:23:11.95777 at scala.collection.immutable.List.map(List.scala:45)
      2013-05-22_20:23:11.95777 at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:118)
      2013-05-22_20:23:11.95777 at kafka.tools.MirrorMaker.main(MirrorMaker.scala)

      [t2]

      2013-05-22_20:23:11.87465 "ConsumerFetcherThread-xxxx-1369238724254-cff180ff-0-505" prio=10 tid=0x00007f196401a800 nid=0x717a waiting for monitor entry [0x00007f19bf0ef000]
      2013-05-22_20:23:11.87466 java.lang.Thread.State: BLOCKED (on object monitor)
      2013-05-22_20:23:11.87467 at kafka.server.AbstractFetcherManager.removeFetcher(AbstractFetcherManager.scala:57)
      ---> 2013-05-22_20:23:11.87467 - waiting to lock <0x00007f1a2ae92510> (a java.lang.Object)
      2013-05-22_20:23:11.87468 at kafka.consumer.ConsumerFetcherManager$$anonfun$addPartitionsWithError$2.apply(ConsumerFetcherManager.scala:170)
      2013-05-22_20:23:11.95682 at kafka.consumer.ConsumerFetcherManager$$anonfun$addPartitionsWithError$2.apply(ConsumerFetcherManager.scala:170)
      2013-05-22_20:23:11.95683 at scala.collection.mutable.HashSet.foreach(HashSet.scala:61)
      2013-05-22_20:23:11.95684 at kafka.consumer.ConsumerFetcherManager.addPartitionsWithError(ConsumerFetcherManager.scala:170)
      2013-05-22_20:23:11.95684 at kafka.consumer.ConsumerFetcherThread.handlePartitionsWithErrors(ConsumerFetcherThread.scala:69)
      2013-05-22_20:23:11.95684 at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:168)
      2013-05-22_20:23:11.95684 at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
      2013-05-22_20:23:11.95684 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
      2013-05-22_20:23:11.95686
      2013-05-22_20:23:11.95686 "main-EventThread" daemon prio=10 tid=0x00007f1b2471d000 nid=0x605a waiting on condition [0x00007f19bedec000]
      2013-05-22_20:23:11.95686 java.lang.Thread.State: WAITING (parking)
      2013-05-22_20:23:11.95686 at sun.misc.Unsafe.park(Native Method)
      2013-05-22_20:23:11.95686 - parking to wait for <0x00007f1a2a4426f8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      2013-05-22_20:23:11.95687 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
      2013-05-22_20:23:11.95687 at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
      2013-05-22_20:23:11.95687 at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
      2013-05-22_20:23:11.95687 at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:503)

        Activity

        Joel Koshy made changes -
        Status Resolved [ 5 ] Closed [ 6 ]
        Hide
        Joel Koshy added a comment -

        Thanks for the review. Committed to 0.8

        Show
        Joel Koshy added a comment - Thanks for the review. Committed to 0.8
        Joel Koshy made changes -
        Status Open [ 1 ] Resolved [ 5 ]
        Resolution Fixed [ 1 ]
        Hide
        Jun Rao added a comment -

        Thanks for the patch. +1.

        Show
        Jun Rao added a comment - Thanks for the patch. +1.
        Joel Koshy made changes -
        Field Original Value New Value
        Attachment KAFKA-916-v1.patch [ 12584794 ]
        Hide
        Joel Koshy added a comment -

        Agreed - I think that should fix the issue.

        Show
        Joel Koshy added a comment - Agreed - I think that should fix the issue.
        Hide
        Jun Rao added a comment -

        Thanks for finding this. The deadlock is introduced because AbstractFetcherManager.removeFetcher() can be called from AbstractFetcherThread and AbstractFetcherManager.closeAllFetchers can wait for AbstractFetcherThread to stop. This is only happening for the ConsumerFetcherThread. So, one possible fix is to remove the error partitions from the fetcher directly in ConsumerFetcherThread.handlePartitionsWithErrors(), instead of going through ConsumerFetcherManager. This will break the cycle.

        Show
        Jun Rao added a comment - Thanks for finding this. The deadlock is introduced because AbstractFetcherManager.removeFetcher() can be called from AbstractFetcherThread and AbstractFetcherManager.closeAllFetchers can wait for AbstractFetcherThread to stop. This is only happening for the ConsumerFetcherThread. So, one possible fix is to remove the error partitions from the fetcher directly in ConsumerFetcherThread.handlePartitionsWithErrors(), instead of going through ConsumerFetcherManager. This will break the cycle.
        Joel Koshy created issue -

          People

          • Assignee:
            Unassigned
            Reporter:
            Joel Koshy
          • Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development