Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-6517

ZooKeeperClient holds a lock while waiting for responses, blocking shutdown

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 1.1.0
    • Fix Version/s: 1.1.0
    • Component/s: core
    • Labels:
      None

      Description

      Stack traces from a local test run that was deadlocked because shutdown couldn't acquire the lock:

      1. kafka-scheduler-7: acquired read lock in kafka.zookeeper.ZooKeeperClient.handleRequests
      2. Test worker-EventThread waiting for write lock to process SessionExpired in kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$.process
      3. ForkJoinPool-1-worker-11 processing KafkaServer.shutdown is queued behind 2) waiting to acquire read lock for kafka.zookeeper.ZooKeeperClient.unregisterStateChangeHandler

      Stack traces of the relevant threads:

      "kafka-scheduler-7" daemon prio=5 tid=0x00007fade918d800 nid=0xd317 waiting on condition [0x000070000b371000]
         java.lang.Thread.State: WAITING (parking)
              at sun.misc.Unsafe.park(Native Method)
              - parking to wait for  <0x00000007e4c6e698> (a java.util.concurrent.CountDownLatch$Sync)
              at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
              at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236)
              at kafka.zookeeper.ZooKeeperClient$$anonfun$handleRequests$1.apply(ZooKeeperClient.scala:146)
              at kafka.zookeeper.ZooKeeperClient$$anonfun$handleRequests$1.apply(ZooKeeperClient.scala:126)
              at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
              at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:256)
              at kafka.zookeeper.ZooKeeperClient.handleRequests(ZooKeeperClient.scala:125)
              at kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1432)
              at kafka.zk.KafkaZkClient.kafka$zk$KafkaZkClient$$retryRequestUntilConnected(KafkaZkClient.scala:1425)
              at kafka.zk.KafkaZkClient.conditionalUpdatePath(KafkaZkClient.scala:583)
              at kafka.utils.ReplicationUtils$.updateLeaderAndIsr(ReplicationUtils.scala:33)
              at kafka.cluster.Partition.kafka$cluster$Partition$$updateIsr(Partition.scala:665)
              at kafka.cluster.Partition$$anonfun$4.apply$mcZ$sp(Partition.scala:509)
              at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:500)
              at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:500)
              at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
              at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:258)
              at kafka.cluster.Partition.maybeShrinkIsr(Partition.scala:499)
              at kafka.server.ReplicaManager$$anonfun$kafka$server$ReplicaManager$$maybeShrinkIsr$2.apply(ReplicaManager.scala:1335)
              at kafka.server.ReplicaManager$$anonfun$kafka$server$ReplicaManager$$maybeShrinkIsr$2.apply(ReplicaManager.scala:1335)

      ......

      "Test worker-EventThread" daemon prio=5 tid=0x00007fade90cf800 nid=0xef13 waiting on condition [0x000070000a23f000]
         java.lang.Thread.State: WAITING (parking)
              at sun.misc.Unsafe.park(Native Method)
              - parking to wait for  <0x0000000781847620> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
              at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:867)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1197)
              at java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(ReentrantReadWriteLock.java:945)
              at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248)
              at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:258)
              at kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$.process(ZooKeeperClient.scala:355)
              at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:531)
              at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:506)

       

      "ForkJoinPool-1-worker-11" daemon prio=5 tid=0x00007fade9a83000 nid=0x17907 waiting on condition [0x0000700011eaf000]
         java.lang.Thread.State: WAITING (parking)
              at sun.misc.Unsafe.park(Native Method)
              - parking to wait for  <0x0000000781847620> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
              at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:964)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1282)
              at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:731)
              at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248)
              at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:256)
              at kafka.zookeeper.ZooKeeperClient.unregisterStateChangeHandler(ZooKeeperClient.scala:295)
              at kafka.zk.KafkaZkClient.unregisterStateChangeHandler(KafkaZkClient.scala:1217)
              at kafka.common.ZkNodeChangeNotificationListener.close(ZkNodeChangeNotificationListener.scala:68)
              at kafka.server.DynamicConfigManager.shutdown(DynamicConfigManager.scala:181)
              at kafka.server.KafkaServer$$anonfun$shutdown$2.apply$mcV$sp(KafkaServer.scala:552)
              at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:85)
              at kafka.server.KafkaServer.shutdown(KafkaServer.scala:552)

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                rsivaram Rajini Sivaram
                Reporter:
                rsivaram Rajini Sivaram
                Reviewer:
                Jun Rao
              • Votes:
                0 Vote for this issue
                Watchers:
                2 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: