Kafka
  1. Kafka
  2. KAFKA-914

Deadlock between initial rebalance and watcher-triggered rebalances

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.8.0
    • Component/s: None
    • Labels:
      None

      Description

      Summary doesn't give the full picture and the fetcher-manager/fetcher-thread
      code is very complex so it's a bit hard to articulate the following very
      clearly. I will try and describe the sequence that results in a deadlock
      when starting up a large number of consumers at around the same time:

      • When a consumer's createMessageStream method is called, it initiates an
        initial inline rebalance.
      • However, before the above initial rebalance actually begins, a ZK watch
        may trigger (due to some other consumers starting up) and initiate a
        rebalance. This happens successfully so fetchers start and start filling
        up the chunk queues.
      • Another watch triggers and initiates yet another rebalance. This rebalance
        attempt tries to close the fetchers. Before the fetchers are stopped, we
        shutdown the leader-finder-thread to prevent new fetchers from being
        started.
      • The shutdown is accomplished by interrupting the leader-finder-thread and
        then awaiting its shutdown latch.
      • If the leader-finder-thread still has a partition without leader to
        process and tries to add a fetcher for it, it will get an exception
        (InterruptedException if acquiring the partitionMapLock or
        ClosedByInterruptException if performing an offset request). If we get an
        InterruptedException the thread's interrupted flag is cleared.
      • However, the leader-finder-thread may have multiple partitions without
        leader that it is currently processing. So if the interrupted flag is
        cleared and the leader-finder-thread tries to add a fetcher for a another
        partition, it does not receive an InterruptedException when it tries to
        acquire the partitionMapLock. It can end up blocking indefinitely at that
        point.
      • The problem is that until now, the createMessageStream's initial inline
        rebalance has not yet returned - it is blocked on the rebalance lock
        waiting on the second watch-triggered rebalance to complete. i.e., the
        consumer iterators have not been created yet and thus the fetcher queues
        get filled up. [td1]
      • As a result, processPartitionData (which holds on to the partitionMapLock)
        in one or more fetchers will be blocked trying to enqueue into a full
        chunk queue.[td2]
      • So the leader-finder-thread cannot finish adding fetchers for the
        remaining partitions without leader and thus cannot shutdown.

      One way to fix would be to let the exception from the leader-finder-thread
      propagate outside if the leader-finder-thread is currently shutting down -
      and avoid the subsequent (unnecessary) attempt to add a fetcher and lock
      partitionMapLock. There may be more elegant fixes (such as rewriting the
      whole consumer manager logic) but obviously we want to avoid extensive
      changes at this point in 0.8.

      Relevant portions of the thread-dump are here:

      [td1] createMessageStream's initial inline rebalance (blocked on the ongoing
      watch-triggered rebalance)

      2013-05-20_17:50:13.04848 "main" prio=10 tid=0x00007f5960008000 nid=0x3772 waiting for monitor entry [0x00007f59666c3000]
      2013-05-20_17:50:13.04848 java.lang.Thread.State: BLOCKED (on object monitor)
      2013-05-20_17:50:13.04848 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:368)
      2013-05-20_17:50:13.04849 - waiting to lock <0x00007f58637dfe10> (a java.lang.Object)
      2013-05-20_17:50:13.04849 at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:678)
      2013-05-20_17:50:13.04850 at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:712)
      2013-05-20_17:50:13.04850 at kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:140)
      2013-05-20_17:50:13.04850 at kafka.tools.MirrorMaker$$anonfun$4.apply(MirrorMaker.scala:118)
      2013-05-20_17:50:13.04850 at kafka.tools.MirrorMaker$$anonfun$4.apply(MirrorMaker.scala:118)
      2013-05-20_17:50:13.04850 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
      2013-05-20_17:50:13.04851 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
      2013-05-20_17:50:13.04851 at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
      2013-05-20_17:50:13.04851 at scala.collection.immutable.List.foreach(List.scala:45)
      2013-05-20_17:50:13.04851 at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
      2013-05-20_17:50:13.04852 at scala.collection.immutable.List.map(List.scala:45)
      2013-05-20_17:50:13.04852 at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:118)
      2013-05-20_17:50:13.04852 at kafka.tools.MirrorMaker.main(MirrorMaker.scala)

      [td2] A consumer fetcher thread blocked on full queue.

      2013-05-20_17:50:13.04703 "ConsumerFetcherThread-xxxx-1368836182178-2009023c-0-3248" prio=10 tid=0x00007f57a4010800 nid=0x3920 waiting on condition [0x00
      007f58316ae000]
      2013-05-20_17:50:13.04703 java.lang.Thread.State: WAITING (parking)
      2013-05-20_17:50:13.04703 at sun.misc.Unsafe.park(Native Method)
      2013-05-20_17:50:13.04704 - parking to wait for <0x00007f586381d6c0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      2013-05-20_17:50:13.04704 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
      2013-05-20_17:50:13.04704 at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
      2013-05-20_17:50:13.04704 at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
      2013-05-20_17:50:13.04704 at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
      2013-05-20_17:50:13.04705 at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:50)
      2013-05-20_17:50:13.04706 at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:131)
      2013-05-20_17:50:13.04707 at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:112)
      2013-05-20_17:50:13.04708 at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
      2013-05-20_17:50:13.04709 at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:112)
      2013-05-20_17:50:13.04709 at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
      2013-05-20_17:50:13.04709 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
      2

      [td3] Second watch-triggered rebalance

      2013-05-20_17:50:13.04725 "xxxx-1368836182178-2009023c_watcher_executor" prio=10 tid=0x00007f5960777800 nid=0x37af waiting on condition [0x00007f58318b00
      00]
      2013-05-20_17:50:13.04725 java.lang.Thread.State: WAITING (parking)
      2013-05-20_17:50:13.04726 at sun.misc.Unsafe.park(Native Method)
      2013-05-20_17:50:13.04726 - parking to wait for <0x00007f5863728de8> (a java.util.concurrent.CountDownLatch$Sync)
      2013-05-20_17:50:13.04726 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
      2013-05-20_17:50:13.04727 at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
      2013-05-20_17:50:13.04727 at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
      2013-05-20_17:50:13.04728 at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
      2013-05-20_17:50:13.04728 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
      2013-05-20_17:50:13.04729 at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
      2013-05-20_17:50:13.04729 at kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:125)
      2013-05-20_17:50:13.04730 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerCo
      nnector.scala:486)
      2013-05-20_17:50:13.04730 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:523)
      2013-05-20_17:50:13.04731 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala
      :420)
      2013-05-20_17:50:13.04731 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:373)
      2013-05-20_17:50:13.04732 at scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)
      2013-05-20_17:50:13.04733 at scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265)
      2013-05-20_17:50:13.04733 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:368)
      2013-05-20_17:50:13.04733 - locked <0x00007f58637dfe10> (a java.lang.Object)
      2013-05-20_17:50:13.04734 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:325)

      [td4] leader-finder-thread still trying to process partitions without leader, blocked on the partitionMapLock held by processPartitionData in td2.

      2013-05-20_17:50:13.04712 "xxxx-1368836182178-2009023c-leader-finder-thread" prio=10 tid=0x00007f57b0027800 nid=0x38d8 waiting on condition [0x00007f5831
      7af000]
      2013-05-20_17:50:13.04712 java.lang.Thread.State: WAITING (parking)
      2013-05-20_17:50:13.04713 at sun.misc.Unsafe.park(Native Method)
      2013-05-20_17:50:13.04713 - parking to wait for <0x00007f586375e3d8> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
      2013-05-20_17:50:13.04713 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
      2013-05-20_17:50:13.04714 at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
      2013-05-20_17:50:13.04714 at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:867)
      2013-05-20_17:50:13.04717 at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1201)
      2013-05-20_17:50:13.04718 at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:312)
      2013-05-20_17:50:13.04718 at kafka.server.AbstractFetcherThread.addPartition(AbstractFetcherThread.scala:173)
      2013-05-20_17:50:13.04719 at kafka.server.AbstractFetcherManager.addFetcher(AbstractFetcherManager.scala:48)
      2013-05-20_17:50:13.04719 - locked <0x00007f586374b040> (a java.lang.Object)
      2013-05-20_17:50:13.04719 at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$4.apply(ConsumerFetcherManager.scala:83)
      2013-05-20_17:50:13.04720 at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$4.apply(ConsumerFetcherManager.scala:79)
      2013-05-20_17:50:13.04721 at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
      2013-05-20_17:50:13.04721 at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
      2013-05-20_17:50:13.04721 at scala.collection.Iterator$class.foreach(Iterator.scala:631)
      2013-05-20_17:50:13.04722 at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
      2013-05-20_17:50:13.04723 at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
      2013-05-20_17:50:13.04723 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
      2013-05-20_17:50:13.04723 at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
      2013-05-20_17:50:13.04724 at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:79)
      2013-05-20_17:50:13.04724 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)

        Activity

          People

          • Assignee:
            Unassigned
            Reporter:
            Joel Koshy
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development