Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
0.8.0
-
None
-
None
Description
Summary doesn't give the full picture and the fetcher-manager/fetcher-thread
code is very complex so it's a bit hard to articulate the following very
clearly. I will try and describe the sequence that results in a deadlock
when starting up a large number of consumers at around the same time:
- When a consumer's createMessageStream method is called, it initiates an
initial inline rebalance. - However, before the above initial rebalance actually begins, a ZK watch
may trigger (due to some other consumers starting up) and initiate a
rebalance. This happens successfully so fetchers start and start filling
up the chunk queues. - Another watch triggers and initiates yet another rebalance. This rebalance
attempt tries to close the fetchers. Before the fetchers are stopped, we
shutdown the leader-finder-thread to prevent new fetchers from being
started. - The shutdown is accomplished by interrupting the leader-finder-thread and
then awaiting its shutdown latch. - If the leader-finder-thread still has a partition without leader to
process and tries to add a fetcher for it, it will get an exception
(InterruptedException if acquiring the partitionMapLock or
ClosedByInterruptException if performing an offset request). If we get an
InterruptedException the thread's interrupted flag is cleared. - However, the leader-finder-thread may have multiple partitions without
leader that it is currently processing. So if the interrupted flag is
cleared and the leader-finder-thread tries to add a fetcher for a another
partition, it does not receive an InterruptedException when it tries to
acquire the partitionMapLock. It can end up blocking indefinitely at that
point. - The problem is that until now, the createMessageStream's initial inline
rebalance has not yet returned - it is blocked on the rebalance lock
waiting on the second watch-triggered rebalance to complete. i.e., the
consumer iterators have not been created yet and thus the fetcher queues
get filled up. [td1] - As a result, processPartitionData (which holds on to the partitionMapLock)
in one or more fetchers will be blocked trying to enqueue into a full
chunk queue.[td2] - So the leader-finder-thread cannot finish adding fetchers for the
remaining partitions without leader and thus cannot shutdown.
One way to fix would be to let the exception from the leader-finder-thread
propagate outside if the leader-finder-thread is currently shutting down -
and avoid the subsequent (unnecessary) attempt to add a fetcher and lock
partitionMapLock. There may be more elegant fixes (such as rewriting the
whole consumer manager logic) but obviously we want to avoid extensive
changes at this point in 0.8.
Relevant portions of the thread-dump are here:
[td1] createMessageStream's initial inline rebalance (blocked on the ongoing
watch-triggered rebalance)
2013-05-20_17:50:13.04848 "main" prio=10 tid=0x00007f5960008000 nid=0x3772 waiting for monitor entry [0x00007f59666c3000]
2013-05-20_17:50:13.04848 java.lang.Thread.State: BLOCKED (on object monitor)
2013-05-20_17:50:13.04848 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:368)
2013-05-20_17:50:13.04849 - waiting to lock <0x00007f58637dfe10> (a java.lang.Object)
2013-05-20_17:50:13.04849 at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:678)
2013-05-20_17:50:13.04850 at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:712)
2013-05-20_17:50:13.04850 at kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:140)
2013-05-20_17:50:13.04850 at kafka.tools.MirrorMaker$$anonfun$4.apply(MirrorMaker.scala:118)
2013-05-20_17:50:13.04850 at kafka.tools.MirrorMaker$$anonfun$4.apply(MirrorMaker.scala:118)
2013-05-20_17:50:13.04850 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
2013-05-20_17:50:13.04851 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
2013-05-20_17:50:13.04851 at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
2013-05-20_17:50:13.04851 at scala.collection.immutable.List.foreach(List.scala:45)
2013-05-20_17:50:13.04851 at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
2013-05-20_17:50:13.04852 at scala.collection.immutable.List.map(List.scala:45)
2013-05-20_17:50:13.04852 at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:118)
2013-05-20_17:50:13.04852 at kafka.tools.MirrorMaker.main(MirrorMaker.scala)
[td2] A consumer fetcher thread blocked on full queue.
2013-05-20_17:50:13.04703 "ConsumerFetcherThread-xxxx-1368836182178-2009023c-0-3248" prio=10 tid=0x00007f57a4010800 nid=0x3920 waiting on condition [0x00
007f58316ae000]
2013-05-20_17:50:13.04703 java.lang.Thread.State: WAITING (parking)
2013-05-20_17:50:13.04703 at sun.misc.Unsafe.park(Native Method)
2013-05-20_17:50:13.04704 - parking to wait for <0x00007f586381d6c0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
2013-05-20_17:50:13.04704 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
2013-05-20_17:50:13.04704 at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
2013-05-20_17:50:13.04704 at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
2013-05-20_17:50:13.04704 at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
2013-05-20_17:50:13.04705 at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:50)
2013-05-20_17:50:13.04706 at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:131)
2013-05-20_17:50:13.04707 at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:112)
2013-05-20_17:50:13.04708 at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
2013-05-20_17:50:13.04709 at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:112)
2013-05-20_17:50:13.04709 at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
2013-05-20_17:50:13.04709 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
2
[td3] Second watch-triggered rebalance
2013-05-20_17:50:13.04725 "xxxx-1368836182178-2009023c_watcher_executor" prio=10 tid=0x00007f5960777800 nid=0x37af waiting on condition [0x00007f58318b00
00]
2013-05-20_17:50:13.04725 java.lang.Thread.State: WAITING (parking)
2013-05-20_17:50:13.04726 at sun.misc.Unsafe.park(Native Method)
2013-05-20_17:50:13.04726 - parking to wait for <0x00007f5863728de8> (a java.util.concurrent.CountDownLatch$Sync)
2013-05-20_17:50:13.04726 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
2013-05-20_17:50:13.04727 at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
2013-05-20_17:50:13.04727 at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
2013-05-20_17:50:13.04728 at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
2013-05-20_17:50:13.04728 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
2013-05-20_17:50:13.04729 at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
2013-05-20_17:50:13.04729 at kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:125)
2013-05-20_17:50:13.04730 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerCo
nnector.scala:486)
2013-05-20_17:50:13.04730 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:523)
2013-05-20_17:50:13.04731 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala
:420)
2013-05-20_17:50:13.04731 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:373)
2013-05-20_17:50:13.04732 at scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)
2013-05-20_17:50:13.04733 at scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265)
2013-05-20_17:50:13.04733 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:368)
2013-05-20_17:50:13.04733 - locked <0x00007f58637dfe10> (a java.lang.Object)
2013-05-20_17:50:13.04734 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:325)
[td4] leader-finder-thread still trying to process partitions without leader, blocked on the partitionMapLock held by processPartitionData in td2.
2013-05-20_17:50:13.04712 "xxxx-1368836182178-2009023c-leader-finder-thread" prio=10 tid=0x00007f57b0027800 nid=0x38d8 waiting on condition [0x00007f5831
7af000]
2013-05-20_17:50:13.04712 java.lang.Thread.State: WAITING (parking)
2013-05-20_17:50:13.04713 at sun.misc.Unsafe.park(Native Method)
2013-05-20_17:50:13.04713 - parking to wait for <0x00007f586375e3d8> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
2013-05-20_17:50:13.04713 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
2013-05-20_17:50:13.04714 at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
2013-05-20_17:50:13.04714 at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:867)
2013-05-20_17:50:13.04717 at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1201)
2013-05-20_17:50:13.04718 at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:312)
2013-05-20_17:50:13.04718 at kafka.server.AbstractFetcherThread.addPartition(AbstractFetcherThread.scala:173)
2013-05-20_17:50:13.04719 at kafka.server.AbstractFetcherManager.addFetcher(AbstractFetcherManager.scala:48)
2013-05-20_17:50:13.04719 - locked <0x00007f586374b040> (a java.lang.Object)
2013-05-20_17:50:13.04719 at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$4.apply(ConsumerFetcherManager.scala:83)
2013-05-20_17:50:13.04720 at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$4.apply(ConsumerFetcherManager.scala:79)
2013-05-20_17:50:13.04721 at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
2013-05-20_17:50:13.04721 at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
2013-05-20_17:50:13.04721 at scala.collection.Iterator$class.foreach(Iterator.scala:631)
2013-05-20_17:50:13.04722 at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
2013-05-20_17:50:13.04723 at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
2013-05-20_17:50:13.04723 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
2013-05-20_17:50:13.04723 at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
2013-05-20_17:50:13.04724 at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:79)
2013-05-20_17:50:13.04724 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)