Kafka
  1. Kafka
  2. KAFKA-702

Deadlock between request handler/processor threads

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Blocker Blocker
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.8.0
    • Component/s: network
    • Labels:

      Description

      We have seen this a couple of times in the past few days in a test cluster. The request handler and processor threads deadlock on the request/response queues bringing the server to a halt

      "kafka-processor-10251-7" prio=10 tid=0x00007f4a0c3c9800 nid=0x4c39 waiting on condition [0x00007f46f698e000]
      java.lang.Thread.State: WAITING (parking)
      at sun.misc.Unsafe.park(Native Method)

      • parking to wait for <0x00007f48c9dd2698> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
        at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:252)
        at kafka.network.RequestChannel.sendRequest(RequestChannel.scala:107)
        at kafka.network.Processor.read(SocketServer.scala:321)
        at kafka.network.Processor.run(SocketServer.scala:231)
        at java.lang.Thread.run(Thread.java:619)

      "kafka-request-handler-7" daemon prio=10 tid=0x00007f4a0c57f000 nid=0x4c47 waiting on condition [0x00007f46f5b80000]
      java.lang.Thread.State: WAITING (parking)
      at sun.misc.Unsafe.park(Native Method)

      • parking to wait for <0x00007f48c9dd6348> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
        at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:252)
        at kafka.network.RequestChannel.sendResponse(RequestChannel.scala:112)
        at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:198)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:58)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41)
        at java.lang.Thread.run(Thread.java:619)

      This is because there is a cycle in the wait-for graph of processor threads and request handler threads. If the request handling slows down on a busy server, the request queue fills up. All processor threads quickly block on adding incoming requests to the request queue. Due to this, those threads do not processes responses filling up their response queues. At this moment, the request handler threads start blocking on adding responses to the respective response queues. This can lead to a deadlock where every thread is holding a lock on one queue and asking a lock for the other queue. This brings the server to a halt where it accepts connections but every request gets timed out.

      One way to resolve this is by breaking the cycle in the wait-for graph of the request handler and processor threads. Instead of having the processor threads dispatching the responses, we can have one or more dedicated response handler threads that dequeue responses from the queue and write those on the socket. One downside of this approach is that now access to the selector will have to be synchronized.

        Activity

        Joel Koshy created issue -
        Neha Narkhede made changes -
        Field Original Value New Value
        Summary Livelock between request handler/processor threads Deadlock between request handler/processor threads
        Description We have seen this a couple of times in the past few days in a test cluster.

        The processor thread enqueues requests into the request queue and dequeues
        responses from the response queue. The reverse is true for the request handler
        thread. This leads to the following livelock situation (all the processor/request
        handler threads show this in the thread-dump):

        "kafka-processor-10251-7" prio=10 tid=0x00007f4a0c3c9800 nid=0x4c39 waiting on condition [0x00007f46f698e000]
           java.lang.Thread.State: WAITING (parking)
                at sun.misc.Unsafe.park(Native Method)
                - parking to wait for <0x00007f48c9dd2698> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
                at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
                at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
                at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:252)
                at kafka.network.RequestChannel.sendRequest(RequestChannel.scala:107)
                at kafka.network.Processor.read(SocketServer.scala:321)
                at kafka.network.Processor.run(SocketServer.scala:231)
                at java.lang.Thread.run(Thread.java:619)


        "kafka-request-handler-7" daemon prio=10 tid=0x00007f4a0c57f000 nid=0x4c47 waiting on condition [0x00007f46f5b80000]
           java.lang.Thread.State: WAITING (parking)
                at sun.misc.Unsafe.park(Native Method)
                - parking to wait for <0x00007f48c9dd6348> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
                at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
                at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
                at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:252)
                at kafka.network.RequestChannel.sendResponse(RequestChannel.scala:112)
                at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:198)
                at kafka.server.KafkaApis.handle(KafkaApis.scala:58)
                at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41)
                at java.lang.Thread.run(Thread.java:619)

        We have seen this a couple of times in the past few days in a test cluster. The request handler and processor threads deadlock on the request/response queues bringing the server to a halt

        "kafka-processor-10251-7" prio=10 tid=0x00007f4a0c3c9800 nid=0x4c39 waiting on condition [0x00007f46f698e000]
           java.lang.Thread.State: WAITING (parking)
                at sun.misc.Unsafe.park(Native Method)
                - parking to wait for <0x00007f48c9dd2698> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
                at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
                at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
                at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:252)
                at kafka.network.RequestChannel.sendRequest(RequestChannel.scala:107)
                at kafka.network.Processor.read(SocketServer.scala:321)
                at kafka.network.Processor.run(SocketServer.scala:231)
                at java.lang.Thread.run(Thread.java:619)


        "kafka-request-handler-7" daemon prio=10 tid=0x00007f4a0c57f000 nid=0x4c47 waiting on condition [0x00007f46f5b80000]
           java.lang.Thread.State: WAITING (parking)
                at sun.misc.Unsafe.park(Native Method)
                - parking to wait for <0x00007f48c9dd6348> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
                at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
                at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
                at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:252)
                at kafka.network.RequestChannel.sendResponse(RequestChannel.scala:112)
                at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:198)
                at kafka.server.KafkaApis.handle(KafkaApis.scala:58)
                at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41)
                at java.lang.Thread.run(Thread.java:619)

        This is because there is a cycle in the wait-for graph of processor threads and request handler threads. If the request handling slows down on a busy server, the request queue fills up. All processor threads quickly block on adding incoming requests to the request queue. Due to this, those threads do not processes responses filling up their response queues. At this moment, the request handler threads start blocking on adding responses to the respective response queues. This can lead to a deadlock where every thread is holding a lock on one queue and asking a lock for the other queue. This brings the server to a halt where it accepts connections but every request gets timed out.

        One way to resolve this is by breaking the cycle in the wait-for graph of the request handler and processor threads. Instead of having the processor threads dispatching the responses, we can have one or more dedicated response handler threads that dequeue responses from the queue and write those on the socket. One downside of this approach is that now access to the selector will have to be synchronized.
        Labels bugs
        Component/s network [ 12320322 ]
        Jay Kreps made changes -
        Assignee Jay Kreps [ jkreps ]
        Jay Kreps made changes -
        Attachment KAFKA-702-v1.patch [ 12565076 ]
        Neha Narkhede made changes -
        Status Open [ 1 ] Resolved [ 5 ]
        Resolution Fixed [ 1 ]
        Neha Narkhede made changes -
        Status Resolved [ 5 ] Closed [ 6 ]

          People

          • Assignee:
            Jay Kreps
            Reporter:
            Joel Koshy
          • Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development