Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-9796

Broker shutdown could be stuck forever under certain conditions

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 2.6.0
    • None
    • None

    Description

      During the broker initialisation, the Acceptor threads are started early to know the bound port and delays starting the processors to the end of the initialisation sequence. We have found out that the shutdown of a broker could be stuck forever under the following conditions:

      • the shutdown procedure is started before the processors are started;
      • the `newConnections` queues of the processors are full; and
      • an extra new connection has been accepted but can't be queued up in a processor.

      For instance, this could happen if a `NodeExistsException` is raised when the broker tries to register itself in ZK.

      When the above conditions happens, the shutting down triggers the shutdown of the acceptor threads and waits until they are (first thread dump bellow). If an acceptor as a pending connection which can't be queued up in a processor, it ends up waiting until space is made is new queue to accept the new connection (second thread dump bellow). As the processors are not started, the new connection queues are not drained so it never releases the acceptor thread.

      Shutdown wait on acceptor to shutdown

      "main" #1 prio=5 os_prio=0 cpu=3626.89ms elapsed=106360.56s tid=0x00007f625001c800 nid=0x272 waiting on condition  [0x00007f6257ca4000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@11.0.5/Native Method)
      	- parking to wait for  <0x0000000689a61800> (a java.util.concurrent.CountDownLatch$Sync)
      	at java.util.concurrent.locks.LockSupport.park(java.base@11.0.5/LockSupport.java:194)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.5/AbstractQueuedSynchronizer.java:885)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.5/AbstractQueuedSynchronizer.java:1039)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.5/AbstractQueuedSynchronizer.java:1345)
      	at java.util.concurrent.CountDownLatch.await(java.base@11.0.5/CountDownLatch.java:232)
      	at kafka.network.AbstractServerThread.shutdown(SocketServer.scala:430)
      	at kafka.network.Acceptor.shutdown(SocketServer.scala:521)
      	at kafka.network.SocketServer.$anonfun$stopProcessingRequests$2(SocketServer.scala:267)
      	at kafka.network.SocketServer.$anonfun$stopProcessingRequests$2$adapted(SocketServer.scala:267)
      	at kafka.network.SocketServer$$Lambda$604/0x0000000840540840.apply(Unknown Source)
      	at scala.collection.Iterator.foreach(Iterator.scala:941)
      	at scala.collection.Iterator.foreach$(Iterator.scala:941)
      	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
      	at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:213)
      	at kafka.network.SocketServer.stopProcessingRequests(SocketServer.scala:267)
      	- locked <0x0000000689a61ac0> (a kafka.network.SocketServer)
      	at kafka.server.KafkaServer.$anonfun$shutdown$5(KafkaServer.scala:806)
      	at kafka.server.KafkaServer$$Lambda$602/0x000000084052b040.apply$mcV$sp(Unknown Source)
      	at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:68)
      	at kafka.server.KafkaServer.shutdown(KafkaServer.scala:806)
      	at kafka.server.KafkaServer.startup(KafkaServer.scala:522)
      	at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
      	at kafka.Kafka$.main(Kafka.scala:82)
      	at kafka.Kafka.main(Kafka.scala)
      

      Acceptor waits on processor to accept the new connection

      "data-plane-kafka-socket-acceptor-ListenerName(EXTERNAL)-SASL_SSL-9092" #54 prio=5 os_prio=0 cpu=16.23ms elapsed=106346.62s tid=0x00007f62523b5000 nid=0x2ca waiting on condition  [0x00007f6157130000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@11.0.5/Native Method)
      	- parking to wait for  <0x0000000689a7cad8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.park(java.base@11.0.5/LockSupport.java:194)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(java.base@11.0.5/AbstractQueuedSynchronizer.java:2081)
      	at java.util.concurrent.ArrayBlockingQueue.put(java.base@11.0.5/ArrayBlockingQueue.java:367)
      	at kafka.network.Processor.accept(SocketServer.scala:1020)
      	at kafka.network.Acceptor.assignNewConnection(SocketServer.scala:639)
      	at kafka.network.Acceptor.$anonfun$run$1(SocketServer.scala:566)
      	at kafka.network.Acceptor.run(SocketServer.scala:550)
      	at java.lang.Thread.run(java.base@11.0.5/Thread.java:834)
      

      Attachments

        Issue Links

          Activity

            People

              dajac David Jacot
              dajac David Jacot
              Rajini Sivaram Rajini Sivaram
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: