Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-2821

Deadlock in group metadata persistence callback

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • 0.9.0.0
    • 0.9.0.0
    • None
    • None

    Description

      Found this when system testing:

      Found one Java-level deadlock:
      =============================
      "kafka-request-handler-7":
        waiting for ownable synchronizer 0x00000000bc0007f8, (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
        which is held by "kafka-request-handler-6"
      "kafka-request-handler-6":
        waiting to lock monitor 0x00007fcb94004e28 (object 0x00000000bc000c70, a kafka.coordinator.GroupMetadata),
        which is held by "kafka-request-handler-2"
      "kafka-request-handler-2":
        waiting for ownable synchronizer 0x00000000bc0007f8, (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
        which is held by "kafka-request-handler-6"
      
      Java stack information for the threads listed above:
      ===================================================
      "kafka-request-handler-7":
              at sun.misc.Unsafe.park(Native Method)
              - parking to wait for  <0x00000000bc0007f8> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
              at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:867)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1197)
              at java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(ReentrantReadWriteLock.java:945)
              at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:260)
              at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:270)
              at kafka.cluster.Partition.maybeExpandIsr(Partition.scala:258)
              at kafka.cluster.Partition.updateReplicaLogReadResult(Partition.scala:235)
              at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:841)
              at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:838)
              at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
              at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
              at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
              at kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:838)
              at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:471)
              at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:433)
              at kafka.server.KafkaApis.handle(KafkaApis.scala:68)
              at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
              at java.lang.Thread.run(Thread.java:745)
      "kafka-request-handler-6":
              at kafka.coordinator.GroupCoordinator$$anonfun$doSyncGroup$2.apply(GroupCoordinator.scala:289)
              - waiting to lock <0x00000000bc000c70> (a kafka.coordinator.GroupMetadata)
              at kafka.coordinator.GroupCoordinator$$anonfun$doSyncGroup$2.apply(GroupCoordinator.scala:284)
              at kafka.coordinator.GroupMetadataManager.kafka$coordinator$GroupMetadataManager$$putCacheCallback$1(GroupMetadataManager.scala:220)
              at kafka.coordinator.GroupMetadataManager$$anonfun$storeGroup$1.apply(GroupMetadataManager.scala:229)
              at kafka.coordinator.GroupMetadataManager$$anonfun$storeGroup$1.apply(GroupMetadataManager.scala:229)
              at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129)
              at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72)
              at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:111)
              at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307)
              - locked <0x00000000bc000d20> (a kafka.server.DelayedProduce)
              - locked <0x00000000bc000d60> (a java.util.LinkedList)
              at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227)
              at kafka.server.ReplicaManager.tryCompleteDelayedProduce(ReplicaManager.scala:194)
              at kafka.cluster.Partition.kafka$cluster$Partition$$maybeIncrementLeaderHW(Partition.scala:349)
              at kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply$mcV$sp(Partition.scala:278)
              at kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply(Partition.scala:260)
              at kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply(Partition.scala:260)
              at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
              at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:270)
              at kafka.cluster.Partition.maybeExpandIsr(Partition.scala:258)
              at kafka.cluster.Partition.updateReplicaLogReadResult(Partition.scala:235)
             at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:841)
              at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:838)
              at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
              at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
              at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
              at kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:838)
              at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:471)
              at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:433)
              at kafka.server.KafkaApis.handle(KafkaApis.scala:68)
              at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
              at java.lang.Thread.run(Thread.java:745)
      "kafka-request-handler-2":
              at sun.misc.Unsafe.park(Native Method)
              - parking to wait for  <0x00000000bc0007f8> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
              at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:964)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:964)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1282)
              at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:731)
              at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:260)
              at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268)
              at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:400)
              at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:405)
              at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:390)
              at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
              at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
              at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
              at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
              at scala.collection.AbstractTraversable.map(Traversable.scala:105)
              at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:390)
              at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:326)
              at kafka.coordinator.GroupMetadataManager.storeGroup(GroupMetadataManager.scala:224)
              at kafka.coordinator.GroupCoordinator.doSyncGroup(GroupCoordinator.scala:284)
              - locked <0x00000000bc000c70> (a kafka.coordinator.GroupMetadata)
              at kafka.coordinator.GroupCoordinator.handleSyncGroup(GroupCoordinator.scala:248)
              at kafka.coordinator.GroupCoordinator.handleSyncGroup(GroupCoordinator.scala:248)
              at kafka.server.KafkaApis.handleSyncGroupRequest(KafkaApis.scala:818)
              at kafka.server.KafkaApis.handle(KafkaApis.scala:81)
              at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
              at java.lang.Thread.run(Thread.java:745)
      

      It looks like the cause is trying to grab the group metadata lock inside the storeGroup callback while holding the leaderIsrUpdateLock in kafka.cluster.Partition.

      Attachments

        Activity

          People

            hachikuji Jason Gustafson
            hachikuji Jason Gustafson
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: