Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-2821

Deadlock in group metadata persistence callback

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 0.9.0.0
    • Fix Version/s: 0.9.0.0
    • Component/s: None
    • Labels:
      None

      Description

      Found this when system testing:

      Found one Java-level deadlock:
      =============================
      "kafka-request-handler-7":
        waiting for ownable synchronizer 0x00000000bc0007f8, (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
        which is held by "kafka-request-handler-6"
      "kafka-request-handler-6":
        waiting to lock monitor 0x00007fcb94004e28 (object 0x00000000bc000c70, a kafka.coordinator.GroupMetadata),
        which is held by "kafka-request-handler-2"
      "kafka-request-handler-2":
        waiting for ownable synchronizer 0x00000000bc0007f8, (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
        which is held by "kafka-request-handler-6"
      
      Java stack information for the threads listed above:
      ===================================================
      "kafka-request-handler-7":
              at sun.misc.Unsafe.park(Native Method)
              - parking to wait for  <0x00000000bc0007f8> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
              at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:867)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1197)
              at java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(ReentrantReadWriteLock.java:945)
              at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:260)
              at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:270)
              at kafka.cluster.Partition.maybeExpandIsr(Partition.scala:258)
              at kafka.cluster.Partition.updateReplicaLogReadResult(Partition.scala:235)
              at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:841)
              at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:838)
              at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
              at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
              at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
              at kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:838)
              at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:471)
              at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:433)
              at kafka.server.KafkaApis.handle(KafkaApis.scala:68)
              at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
              at java.lang.Thread.run(Thread.java:745)
      "kafka-request-handler-6":
              at kafka.coordinator.GroupCoordinator$$anonfun$doSyncGroup$2.apply(GroupCoordinator.scala:289)
              - waiting to lock <0x00000000bc000c70> (a kafka.coordinator.GroupMetadata)
              at kafka.coordinator.GroupCoordinator$$anonfun$doSyncGroup$2.apply(GroupCoordinator.scala:284)
              at kafka.coordinator.GroupMetadataManager.kafka$coordinator$GroupMetadataManager$$putCacheCallback$1(GroupMetadataManager.scala:220)
              at kafka.coordinator.GroupMetadataManager$$anonfun$storeGroup$1.apply(GroupMetadataManager.scala:229)
              at kafka.coordinator.GroupMetadataManager$$anonfun$storeGroup$1.apply(GroupMetadataManager.scala:229)
              at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129)
              at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72)
              at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:111)
              at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307)
              - locked <0x00000000bc000d20> (a kafka.server.DelayedProduce)
              - locked <0x00000000bc000d60> (a java.util.LinkedList)
              at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227)
              at kafka.server.ReplicaManager.tryCompleteDelayedProduce(ReplicaManager.scala:194)
              at kafka.cluster.Partition.kafka$cluster$Partition$$maybeIncrementLeaderHW(Partition.scala:349)
              at kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply$mcV$sp(Partition.scala:278)
              at kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply(Partition.scala:260)
              at kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply(Partition.scala:260)
              at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
              at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:270)
              at kafka.cluster.Partition.maybeExpandIsr(Partition.scala:258)
              at kafka.cluster.Partition.updateReplicaLogReadResult(Partition.scala:235)
             at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:841)
              at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:838)
              at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
              at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
              at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
              at kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:838)
              at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:471)
              at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:433)
              at kafka.server.KafkaApis.handle(KafkaApis.scala:68)
              at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
              at java.lang.Thread.run(Thread.java:745)
      "kafka-request-handler-2":
              at sun.misc.Unsafe.park(Native Method)
              - parking to wait for  <0x00000000bc0007f8> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
              at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:964)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:964)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1282)
              at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:731)
              at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:260)
              at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268)
              at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:400)
              at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:405)
              at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:390)
              at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
              at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
              at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
              at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
              at scala.collection.AbstractTraversable.map(Traversable.scala:105)
              at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:390)
              at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:326)
              at kafka.coordinator.GroupMetadataManager.storeGroup(GroupMetadataManager.scala:224)
              at kafka.coordinator.GroupCoordinator.doSyncGroup(GroupCoordinator.scala:284)
              - locked <0x00000000bc000c70> (a kafka.coordinator.GroupMetadata)
              at kafka.coordinator.GroupCoordinator.handleSyncGroup(GroupCoordinator.scala:248)
              at kafka.coordinator.GroupCoordinator.handleSyncGroup(GroupCoordinator.scala:248)
              at kafka.server.KafkaApis.handleSyncGroupRequest(KafkaApis.scala:818)
              at kafka.server.KafkaApis.handle(KafkaApis.scala:81)
              at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
              at java.lang.Thread.run(Thread.java:745)
      

      It looks like the cause is trying to grab the group metadata lock inside the storeGroup callback while holding the leaderIsrUpdateLock in kafka.cluster.Partition.

        Attachments

          Activity

            People

            • Assignee:
              hachikuji Jason Gustafson
              Reporter:
              hachikuji Jason Gustafson
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: