Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-2253

Deadlock in delayed operation purgatory

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 0.9.0.0
    • None
    • None

    Description

      We hit a deadlock while running brokers with git hash: 9e894aa0173b14d64a900bcf780d6b7809368384

      There's a circular wait between the removeWatchersLock and an operations intrinsic lock.

      Found one Java-level deadlock:
      =============================
      "kafka-request-handler-a":
        waiting for ownable synchronizer 0x00000006da08f9e0, (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
        which is held by "ExpirationReaper-xyz"
      "ExpirationReaper-xyz":
        waiting to lock monitor 0x00007f4500004e18 (object 0x00000006b0563fe8, a java.util.LinkedList),
        which is held by "kafka-request-handler-b"
      "kafka-request-handler-b":
        waiting for ownable synchronizer 0x00000006da08f9e0, (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
        which is held by "ExpirationReaper-xyz"
      
      "kafka-request-handler-a":
              at sun.misc.Unsafe.park(Native Method)
              - parking to wait for  <0x00000006da08f9e0> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
              at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:967)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1283)
              at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:727)
              at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:296)
              at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:304)
              at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:224)
              at kafka.server.ReplicaManager.tryCompleteDelayedFetch(ReplicaManager.scala:166)
              at kafka.cluster.Partition.kafka$cluster$Partition$$maybeIncrementLeaderHW(Partition.scala:358)
              at kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply$mcV$sp(Partition.scala:288)
              at kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply(Partition.scala:270)
              at kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply(Partition.scala:270)
              at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:298)
              at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:306)
              at kafka.cluster.Partition.maybeExpandIsr(Partition.scala:268)
              at kafka.cluster.Partition.updateReplicaLogReadResult(Partition.scala:244)
              at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:790)
              at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:787)
              at scala.collection.immutable.Map$Map4.foreach(Map.scala:181)
              at kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:787)
              at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:432)
              at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:312)
              at kafka.server.KafkaApis.handle(KafkaApis.scala:60)
              at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
              at java.lang.Thread.run(Thread.java:745)
      
      "ExpirationReaper-xyz":
              at kafka.server.DelayedOperationPurgatory$Watchers.watched(DelayedOperation.scala:278)
              - waiting to lock <0x00000006b0563fe8> (a java.util.LinkedList)
              at kafka.server.DelayedOperationPurgatory$$anonfun$kafka$server$DelayedOperationPurgatory$$removeKeyIfEmpty$1.apply(DelayedOperation.scala:258)
              at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:298)
              at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:306)
              at kafka.server.DelayedOperationPurgatory.kafka$server$DelayedOperationPurgatory$$removeKeyIfEmpty(DelayedOperation.scala:256)
              at kafka.server.DelayedOperationPurgatory$Watchers.purgeCompleted(DelayedOperation.scala:322)
              - locked <0x000000071a86a478> (a java.util.LinkedList)
              at kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper$$anonfun$3.apply(DelayedOperation.scala:347)
              at kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper$$anonfun$3.apply(DelayedOperation.scala:347)
              at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
              at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
              at scala.collection.Iterator$class.foreach(Iterator.scala:727)
              at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
              at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
              at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
              at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
              at scala.collection.AbstractTraversable.map(Traversable.scala:105)
              at kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper.doWork(DelayedOperation.scala:347)
              at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
      
      "kafka-request-handler-b":
              at sun.misc.Unsafe.park(Native Method)
              - parking to wait for  <0x00000006da08f9e0> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
              at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
              at java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(ReentrantReadWriteLock.java:943)
              at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:296)
              at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:306)
              at kafka.server.DelayedOperationPurgatory.kafka$server$DelayedOperationPurgatory$$removeKeyIfEmpty(DelayedOperation.scala:256)
              at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:303)
              - locked <0x00000006b0563fe8> (a java.util.LinkedList)
              at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:228)
              at kafka.server.ReplicaManager.tryCompleteDelayedFetch(ReplicaManager.scala:166)
              at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:426)
              at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:410)
              at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:298)
              at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:304)
              at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:410)
              at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:365)
              at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:350)
              at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
              at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
              at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
              at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
              at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
              at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
              at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
              at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
              at scala.collection.AbstractTraversable.map(Traversable.scala:105)
              at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:350)
              at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:286)
              at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:272)
              at kafka.server.KafkaApis.handle(KafkaApis.scala:59)
              at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
              at java.lang.Thread.run(Thread.java:745)
      
      Found 1 deadlock.
      

      Attachments

        1. KAFKA-2253_2015-06-08_11:47:40.patch
          6 kB
          Guozhang Wang
        2. KAFKA-2253.patch
          5 kB
          Guozhang Wang
        3. KAFKA-2253.patch
          2 kB
          Guozhang Wang

        Issue Links

          Activity

            People

              guozhang Guozhang Wang
              mgharat Mayuresh Gharat
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: