Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-13428

server hang on shutdown

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Duplicate
    • 2.5.0
    • None
    • core
    • None

    Description

      Kafka Server start as follow step:  

          1. socketServer.startup

          2. zkClient.registerBroker

          3. socketServer.startDataPlaneProcessors

      after step1,the port can be connnected, but default processors queue size is 20, if there is many connections, the AcceptorThread will wait on processors queue put. and then, if registerBroker error(such as zk session not expired, the broker id still exists), server will go shutdown and never start network processors,  AcceptorThread will shutdown fail because thread still wait on queue, at last server is hang.

      stack:

      //代码占位符
      ...
      "data-plane-kafka-socket-acceptor-ListenerName(ING_INSIDE)-PLAINTEXT-9094" #35 prio=5 os_prio=0 tid=0x000055fe58048800 nid=0x6c5 runnable [0x00007f5f60f8b000]
         java.lang.Thread.State: RUNNABLE
      	at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
      	at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
      	at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
      	at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
      	- locked <0x00000006497b86a0> (a sun.nio.ch.Util$3)
      	- locked <0x00000006497b8690> (a java.util.Collections$UnmodifiableSet)
      	- locked <0x00000006497b86b0> (a sun.nio.ch.EPollSelectorImpl)
      	at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
      	at kafka.network.Acceptor.run(SocketServer.scala:534)
      	at java.lang.Thread.run(Thread.java:748)
      
      "data-plane-kafka-socket-acceptor-ListenerName(INSIDE)-PLAINTEXT-9092" #34 prio=5 os_prio=0 tid=0x000055fe55773800 nid=0x6c4 waiting on condition [0x00007f5f63315000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000006497b88d8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
      	at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:353)
      	at kafka.network.Processor.accept(SocketServer.scala:1002)
      	at kafka.network.Acceptor.assignNewConnection(SocketServer.scala:633)
      	at kafka.network.Acceptor.$anonfun$run$1(SocketServer.scala:560)
      	at kafka.network.Acceptor.run(SocketServer.scala:544)
      	at java.lang.Thread.run(Thread.java:748)
      ...
      "main" #1 prio=5 os_prio=0 tid=0x000055fe5519a000 nid=0x69f waiting on condition [0x00007f5f8a0cf000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000006497b8df8> (a java.util.concurrent.CountDownLatch$Sync)
      	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
      	at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
      	at kafka.network.AbstractServerThread.shutdown(SocketServer.scala:430)
      	at kafka.network.Acceptor.shutdown(SocketServer.scala:517)
      	at kafka.network.SocketServer.$anonfun$stopProcessingRequests$2(SocketServer.scala:267)
      	at kafka.network.SocketServer.$anonfun$stopProcessingRequests$2$adapted(SocketServer.scala:267)
      	at kafka.network.SocketServer$$Lambda$408/1620459733.apply(Unknown Source)
      	at scala.collection.Iterator.foreach(Iterator.scala:941)
      	at scala.collection.Iterator.foreach$(Iterator.scala:941)
      	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
      	at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:213)
      	at kafka.network.SocketServer.stopProcessingRequests(SocketServer.scala:267)
      	- locked <0x00000006497b8e98> (a kafka.network.SocketServer)
      	at kafka.server.KafkaServer.$anonfun$shutdown$4(KafkaServer.scala:617)
      	at kafka.server.KafkaServer$$Lambda$406/1338368149.apply$mcV$sp(Unknown Source)
      	at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:67)
      	at kafka.server.KafkaServer.shutdown(KafkaServer.scala:617)
      	at kafka.server.KafkaServer.startup(KafkaServer.scala:358)
      	at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
      	at kafka.Kafka$.main(Kafka.scala:82)
      	at kafka.Kafka.main(Kafka.scala)
       
      

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              yuhuo yuhuo
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: