Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
0.8.0
-
None
-
None
Description
Here is another consumer deadlock that we encountered. All consumers are
vulnerable to this during a rebalance if there happen to be partitions in
error.
On a rebalance, the fetcher manager closes all fetchers and this holds on to
the fetcher thread map's lock. (mapLock in AbstractFetcherManager). [t1]
While the fetcher manager is iterating over fetchers to stop them, a fetcher
that is yet to be stopped hits an error on a partition and proceeds to
handle partitions with error [t2]. This handling involves looking up the
fetcher for that partition and then removing it from the fetcher's set of
partitions to consume. This requires grabbing the same map lock in [t1],
hence the deadlock.
[t1]
2013-05-22_20:23:11.95767 "main" prio=10 tid=0x00007f1b24007800 nid=0x573b waiting on condition [0x00007f1b2bd38000]
2013-05-22_20:23:11.95767 java.lang.Thread.State: WAITING (parking)
2013-05-22_20:23:11.95767 at sun.misc.Unsafe.park(Native Method)
2013-05-22_20:23:11.95767 - parking to wait for <0x00007f1a25780598> (a java.util.concurrent.CountDownLatch$Sync)
2013-05-22_20:23:11.95767 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
2013-05-22_20:23:11.95767 at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
2013-05-22_20:23:11.95768 at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
2013-05-22_20:23:11.95768 at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
2013-05-22_20:23:11.95768 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
2013-05-22_20:23:11.95768 at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
2013-05-22_20:23:11.95769 at kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:68)
2013-05-22_20:23:11.95769 at kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:79)
2013-05-22_20:23:11.95769 at kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:78)
2013-05-22_20:23:11.95769 at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
2013-05-22_20:23:11.95769 at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
2013-05-22_20:23:11.95770 at scala.collection.Iterator$class.foreach(Iterator.scala:631)
2013-05-22_20:23:11.95770 at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
2013-05-22_20:23:11.95770 at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
2013-05-22_20:23:11.95770 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
2013-05-22_20:23:11.95771 at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
2013-05-22_20:23:11.95771 at kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:78)
---> 2013-05-22_20:23:11.95771 - locked <0x00007f1a2ae92510> (a java.lang.Object)
2013-05-22_20:23:11.95771 at kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:156)
2013-05-22_20:23:11.95771 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerConnector.scala:488)
2013-05-22_20:23:11.95772 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:525)
2013-05-22_20:23:11.95772 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:422)
2013-05-22_20:23:11.95772 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:374)
2013-05-22_20:23:11.95772 at scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)
2013-05-22_20:23:11.95773 at scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265)
2013-05-22_20:23:11.95773 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:369)
2013-05-22_20:23:11.95773 - locked <0x00007f1a2a29b450> (a java.lang.Object)
2013-05-22_20:23:11.95773 at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:680)
2013-05-22_20:23:11.95774 at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.handleTopicEvent(ZookeeperConsumerConnector.scala:754)
2013-05-22_20:23:11.95774 at kafka.consumer.ZookeeperTopicEventWatcher$ZkTopicEventListener.liftedTree1$1(ZookeeperTopicEventWatcher.scala:74)
2013-05-22_20:23:11.95774 at kafka.consumer.ZookeeperTopicEventWatcher$ZkTopicEventListener.handleChildChange(ZookeeperTopicEventWatcher.scala:69)
2013-05-22_20:23:11.95774 - locked <0x00007f1a2a69b1d8> (a java.lang.Object)
2013-05-22_20:23:11.95774 at kafka.consumer.ZookeeperTopicEventWatcher.startWatchingTopicEvents(ZookeeperTopicEventWatcher.scala:46)
2013-05-22_20:23:11.95775 at kafka.consumer.ZookeeperTopicEventWatcher.<init>(ZookeeperTopicEventWatcher.scala:33)
2013-05-22_20:23:11.95775 at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:721)
2013-05-22_20:23:11.95775 at kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:140)
2013-05-22_20:23:11.95776 at kafka.tools.MirrorMaker$$anonfun$main$3.apply(MirrorMaker.scala:118)
2013-05-22_20:23:11.95776 at kafka.tools.MirrorMaker$$anonfun$main$3.apply(MirrorMaker.scala:118)
2013-05-22_20:23:11.95776 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
2013-05-22_20:23:11.95776 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
2013-05-22_20:23:11.95776 at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
2013-05-22_20:23:11.95777 at scala.collection.immutable.List.foreach(List.scala:45)
2013-05-22_20:23:11.95777 at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
2013-05-22_20:23:11.95777 at scala.collection.immutable.List.map(List.scala:45)
2013-05-22_20:23:11.95777 at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:118)
2013-05-22_20:23:11.95777 at kafka.tools.MirrorMaker.main(MirrorMaker.scala)
[t2]
2013-05-22_20:23:11.87465 "ConsumerFetcherThread-xxxx-1369238724254-cff180ff-0-505" prio=10 tid=0x00007f196401a800 nid=0x717a waiting for monitor entry [0x00007f19bf0ef000]
2013-05-22_20:23:11.87466 java.lang.Thread.State: BLOCKED (on object monitor)
2013-05-22_20:23:11.87467 at kafka.server.AbstractFetcherManager.removeFetcher(AbstractFetcherManager.scala:57)
---> 2013-05-22_20:23:11.87467 - waiting to lock <0x00007f1a2ae92510> (a java.lang.Object)
2013-05-22_20:23:11.87468 at kafka.consumer.ConsumerFetcherManager$$anonfun$addPartitionsWithError$2.apply(ConsumerFetcherManager.scala:170)
2013-05-22_20:23:11.95682 at kafka.consumer.ConsumerFetcherManager$$anonfun$addPartitionsWithError$2.apply(ConsumerFetcherManager.scala:170)
2013-05-22_20:23:11.95683 at scala.collection.mutable.HashSet.foreach(HashSet.scala:61)
2013-05-22_20:23:11.95684 at kafka.consumer.ConsumerFetcherManager.addPartitionsWithError(ConsumerFetcherManager.scala:170)
2013-05-22_20:23:11.95684 at kafka.consumer.ConsumerFetcherThread.handlePartitionsWithErrors(ConsumerFetcherThread.scala:69)
2013-05-22_20:23:11.95684 at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:168)
2013-05-22_20:23:11.95684 at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
2013-05-22_20:23:11.95684 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
2013-05-22_20:23:11.95686
2013-05-22_20:23:11.95686 "main-EventThread" daemon prio=10 tid=0x00007f1b2471d000 nid=0x605a waiting on condition [0x00007f19bedec000]
2013-05-22_20:23:11.95686 java.lang.Thread.State: WAITING (parking)
2013-05-22_20:23:11.95686 at sun.misc.Unsafe.park(Native Method)
2013-05-22_20:23:11.95686 - parking to wait for <0x00007f1a2a4426f8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
2013-05-22_20:23:11.95687 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
2013-05-22_20:23:11.95687 at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
2013-05-22_20:23:11.95687 at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
2013-05-22_20:23:11.95687 at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:503)