Kafka
  1. Kafka
  2. KAFKA-618

Deadlock between leader-finder-thread and consumer-fetcher-thread during broker failure

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Blocker Blocker
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.8.0
    • Component/s: None
    • Labels:
      None

      Description

      This causes the test failure reported in KAFKA-607. This affects high-level consumers - if they hit the deadlock then they would get wedged (or at least until the consumer timeout).

      Here is the threaddump output that shows the issue:

      Found one Java-level deadlock:
      =============================
      "ConsumerFetcherThread-console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-0-1":
      waiting for ownable synchronizer 0x00007f2283ad0000, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
      which is held by "console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-leader-finder-thread"
      "console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-leader-finder-thread":
      waiting to lock monitor 0x00007f2288297190 (object 0x00007f2283ab01d0, a java.lang.Object),
      which is held by "ConsumerFetcherThread-console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-0-1"

      Java stack information for the threads listed above:
      ===================================================
      "ConsumerFetcherThread-console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-0-1":
      at sun.misc.Unsafe.park(Native Method)

      • parking to wait for <0x00007f2283ad0000> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
        at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
        at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
        at kafka.consumer.ConsumerFetcherManager.getPartitionTopicInfo(ConsumerFetcherManager.scala:131)
        at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:43)
        at kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:116)
        at kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:99)
        at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
        at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:99)
      • locked <0x00007f2283ab01d0> (a java.lang.Object)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:50)
        "console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-leader-finder-thread":
        at kafka.server.AbstractFetcherThread.addPartition(AbstractFetcherThread.scala:142)
      • waiting to lock <0x00007f2283ab01d0> (a java.lang.Object)
        at kafka.server.AbstractFetcherManager.addFetcher(AbstractFetcherManager.scala:49)
      • locked <0x00007f2283ab0338> (a java.lang.Object)
        at kafka.consumer.ConsumerFetcherManager$$anon$1$$anonfun$doWork$5.apply(ConsumerFetcherManager.scala:81)
        at kafka.consumer.ConsumerFetcherManager$$anon$1$$anonfun$doWork$5.apply(ConsumerFetcherManager.scala:76)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
        at scala.collection.Iterator$class.foreach(Iterator.scala:631)
        at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
        at kafka.consumer.ConsumerFetcherManager$$anon$1.doWork(ConsumerFetcherManager.scala:76)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:50)

      Found 1 deadlock.

        Issue Links

          Activity

          Hide
          Jun Rao added a comment -

          This is a very good finding. The following is one way of breaking the deadlock.

          In ConsumerFetcherManager, don't expose getPartitionTopicInfo(). Instead, pass partitionMap (which is immutable) to each newly created ConsumerFetcherThread. This way, ConsumerFetcherThread.processPartitionData() and ConsumerFetcherThread.handleOffsetOutOfRange() won't depend on ConsumerFetcherManager any more. If we do that, we can improve ConsumerFetcherManager.stopAllConnections() a bit too. The clearing of noLeaderPartitionSet and partitionMap can be done together before calling closeAllFetchers(). Before, we have to clear partitionMap last because before all fetchers are stopped, the processing of the fetch request still needs to read partitionMap and expects it to be non-null.

          Show
          Jun Rao added a comment - This is a very good finding. The following is one way of breaking the deadlock. In ConsumerFetcherManager, don't expose getPartitionTopicInfo(). Instead, pass partitionMap (which is immutable) to each newly created ConsumerFetcherThread. This way, ConsumerFetcherThread.processPartitionData() and ConsumerFetcherThread.handleOffsetOutOfRange() won't depend on ConsumerFetcherManager any more. If we do that, we can improve ConsumerFetcherManager.stopAllConnections() a bit too. The clearing of noLeaderPartitionSet and partitionMap can be done together before calling closeAllFetchers(). Before, we have to clear partitionMap last because before all fetchers are stopped, the processing of the fetch request still needs to read partitionMap and expects it to be non-null.
          Hide
          Joel Koshy added a comment -

          Ran 20 iterations of testcase 4011 and they all pass.

          One potential concern is partitionMap in the ConsumerFetcherThread being null/inconsistent wrt the partitionMap in ConsumerFetcherManager, but I looked at it closely and don't think it is possible.

          Show
          Joel Koshy added a comment - Ran 20 iterations of testcase 4011 and they all pass. One potential concern is partitionMap in the ConsumerFetcherThread being null/inconsistent wrt the partitionMap in ConsumerFetcherManager, but I looked at it closely and don't think it is possible.
          Hide
          Jun Rao added a comment -

          Thanks for the patch. +1

          Show
          Jun Rao added a comment - Thanks for the patch. +1
          Hide
          Joel Koshy added a comment -

          Committed to 0.8

          Show
          Joel Koshy added a comment - Committed to 0.8

            People

            • Assignee:
              Unassigned
              Reporter:
              Joel Koshy
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development