Kafka
  1. Kafka
  2. KAFKA-702

Deadlock between request handler/processor threads

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Blocker Blocker
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.8.0
    • Component/s: network
    • Labels:

      Description

      We have seen this a couple of times in the past few days in a test cluster. The request handler and processor threads deadlock on the request/response queues bringing the server to a halt

      "kafka-processor-10251-7" prio=10 tid=0x00007f4a0c3c9800 nid=0x4c39 waiting on condition [0x00007f46f698e000]
      java.lang.Thread.State: WAITING (parking)
      at sun.misc.Unsafe.park(Native Method)

      • parking to wait for <0x00007f48c9dd2698> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
        at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:252)
        at kafka.network.RequestChannel.sendRequest(RequestChannel.scala:107)
        at kafka.network.Processor.read(SocketServer.scala:321)
        at kafka.network.Processor.run(SocketServer.scala:231)
        at java.lang.Thread.run(Thread.java:619)

      "kafka-request-handler-7" daemon prio=10 tid=0x00007f4a0c57f000 nid=0x4c47 waiting on condition [0x00007f46f5b80000]
      java.lang.Thread.State: WAITING (parking)
      at sun.misc.Unsafe.park(Native Method)

      • parking to wait for <0x00007f48c9dd6348> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
        at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:252)
        at kafka.network.RequestChannel.sendResponse(RequestChannel.scala:112)
        at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:198)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:58)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41)
        at java.lang.Thread.run(Thread.java:619)

      This is because there is a cycle in the wait-for graph of processor threads and request handler threads. If the request handling slows down on a busy server, the request queue fills up. All processor threads quickly block on adding incoming requests to the request queue. Due to this, those threads do not processes responses filling up their response queues. At this moment, the request handler threads start blocking on adding responses to the respective response queues. This can lead to a deadlock where every thread is holding a lock on one queue and asking a lock for the other queue. This brings the server to a halt where it accepts connections but every request gets timed out.

      One way to resolve this is by breaking the cycle in the wait-for graph of the request handler and processor threads. Instead of having the processor threads dispatching the responses, we can have one or more dedicated response handler threads that dequeue responses from the queue and write those on the socket. One downside of this approach is that now access to the selector will have to be synchronized.

        Activity

        Joel Koshy created issue -
        Neha Narkhede made changes -
        Field Original Value New Value
        Summary Livelock between request handler/processor threads Deadlock between request handler/processor threads
        Description We have seen this a couple of times in the past few days in a test cluster.

        The processor thread enqueues requests into the request queue and dequeues
        responses from the response queue. The reverse is true for the request handler
        thread. This leads to the following livelock situation (all the processor/request
        handler threads show this in the thread-dump):

        "kafka-processor-10251-7" prio=10 tid=0x00007f4a0c3c9800 nid=0x4c39 waiting on condition [0x00007f46f698e000]
           java.lang.Thread.State: WAITING (parking)
                at sun.misc.Unsafe.park(Native Method)
                - parking to wait for <0x00007f48c9dd2698> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
                at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
                at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
                at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:252)
                at kafka.network.RequestChannel.sendRequest(RequestChannel.scala:107)
                at kafka.network.Processor.read(SocketServer.scala:321)
                at kafka.network.Processor.run(SocketServer.scala:231)
                at java.lang.Thread.run(Thread.java:619)


        "kafka-request-handler-7" daemon prio=10 tid=0x00007f4a0c57f000 nid=0x4c47 waiting on condition [0x00007f46f5b80000]
           java.lang.Thread.State: WAITING (parking)
                at sun.misc.Unsafe.park(Native Method)
                - parking to wait for <0x00007f48c9dd6348> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
                at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
                at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
                at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:252)
                at kafka.network.RequestChannel.sendResponse(RequestChannel.scala:112)
                at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:198)
                at kafka.server.KafkaApis.handle(KafkaApis.scala:58)
                at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41)
                at java.lang.Thread.run(Thread.java:619)

        We have seen this a couple of times in the past few days in a test cluster. The request handler and processor threads deadlock on the request/response queues bringing the server to a halt

        "kafka-processor-10251-7" prio=10 tid=0x00007f4a0c3c9800 nid=0x4c39 waiting on condition [0x00007f46f698e000]
           java.lang.Thread.State: WAITING (parking)
                at sun.misc.Unsafe.park(Native Method)
                - parking to wait for <0x00007f48c9dd2698> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
                at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
                at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
                at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:252)
                at kafka.network.RequestChannel.sendRequest(RequestChannel.scala:107)
                at kafka.network.Processor.read(SocketServer.scala:321)
                at kafka.network.Processor.run(SocketServer.scala:231)
                at java.lang.Thread.run(Thread.java:619)


        "kafka-request-handler-7" daemon prio=10 tid=0x00007f4a0c57f000 nid=0x4c47 waiting on condition [0x00007f46f5b80000]
           java.lang.Thread.State: WAITING (parking)
                at sun.misc.Unsafe.park(Native Method)
                - parking to wait for <0x00007f48c9dd6348> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
                at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
                at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
                at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:252)
                at kafka.network.RequestChannel.sendResponse(RequestChannel.scala:112)
                at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:198)
                at kafka.server.KafkaApis.handle(KafkaApis.scala:58)
                at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41)
                at java.lang.Thread.run(Thread.java:619)

        This is because there is a cycle in the wait-for graph of processor threads and request handler threads. If the request handling slows down on a busy server, the request queue fills up. All processor threads quickly block on adding incoming requests to the request queue. Due to this, those threads do not processes responses filling up their response queues. At this moment, the request handler threads start blocking on adding responses to the respective response queues. This can lead to a deadlock where every thread is holding a lock on one queue and asking a lock for the other queue. This brings the server to a halt where it accepts connections but every request gets timed out.

        One way to resolve this is by breaking the cycle in the wait-for graph of the request handler and processor threads. Instead of having the processor threads dispatching the responses, we can have one or more dedicated response handler threads that dequeue responses from the queue and write those on the socket. One downside of this approach is that now access to the selector will have to be synchronized.
        Labels bugs
        Component/s network [ 12320322 ]
        Jay Kreps made changes -
        Assignee Jay Kreps [ jkreps ]
        Hide
        Jun Rao added a comment -

        I am wondering if the following approach will break the deadlock.

        In RequestChannel.sendRequest(), instead of doing a blocking put(), we do offer() and return false if the queue is full. We propagate this flag to Processor.read(). Then, In Processor.run(), we only remove the selected key if read returns true. This way, read() will never block, which allows us to handle the responses even when the request queue is full.

        Show
        Jun Rao added a comment - I am wondering if the following approach will break the deadlock. In RequestChannel.sendRequest(), instead of doing a blocking put(), we do offer() and return false if the queue is full. We propagate this flag to Processor.read(). Then, In Processor.run(), we only remove the selected key if read returns true. This way, read() will never block, which allows us to handle the responses even when the request queue is full.
        Hide
        Neha Narkhede added a comment -

        I like this idea better than having separate response threads and locking between the processor and response handler threads. In the non blocking request put approach, we will have to be careful when the queue is full to not select new ready keys when the previous ones have not been successfully enqueued. This can be a little tricky.

        Show
        Neha Narkhede added a comment - I like this idea better than having separate response threads and locking between the processor and response handler threads. In the non blocking request put approach, we will have to be careful when the queue is full to not select new ready keys when the previous ones have not been successfully enqueued. This can be a little tricky.
        Hide
        Jay Kreps added a comment -

        TLDR: attached one-line patch should fix the deadlock, but there is a larger issue (though not a blocker).

        I don't like the idea of splitting response and request between different threads. The reason our socket server is simple is because each network thread is totally independent of all others, so there are no threading issues. Mixing these is slow (because of all the locking) and error prone. I have seen this done before and it is a big mess.

        Jun's second idea is better, but its not as simple as described. We have to put the newly attached request somewhere and trigger a second attempt on adding it to the queue. Registering again for reading doesn't really work because there won't be more data to read. Registering for writing doesn't work because sockets are always writable so we would end up busy waiting. So to make this work we would need some kind of list where we stored requests that had been read but didn't fit in the queue. But we need something that will check this list periodically and it is hard to guarantee that that would happen with any more frequency that the poll timeout.

        But I think we are muddling things a bit. Let's step back and think about this from first principles.

        Why do queues have limits? The reason is to bound memory usage. So taking data off the socket and putting it in a list is silly, that defeats the original purpose of having the bound (the queue after all is just a list).

        But think about this more. Why are we blocking adding responses to the response queue? The reason would be to bound memory usage. But the response queue doesn't actually bound memory usage. Things going into the response queue come either directly from processors or from purgatory, and in either case they are taking up memory there. Preventing responses from going out isn't helping anything.

        So the short term fix is just to remove the bound on the response queue.

        The larger problem is that regardless of this change in 0.8 we aren't effectively bounding memory usage. The reason is the purgatory. The purgatory will accumulate requests any time expiration gets slow. This could be due to a misconfigured client or due to a slow broker.

        So the error is that we are using queue size to indicate "backlog" but really the proper measure of backlog is the total number of requests in flight including all requests in queues OR in purgatory.

        But even once we understand the correct limit, it isn't clear what to do once we hit that limit. There are two choices: (1) stop taking new requests, (2) prematurely start responding to requests in the purgatory. Neither of these is great. Consider the case where one broker gets slow and umpteen produce requests pile up in purgatory. If we stop taking new requests that is like a GC pause, but since the timeout could be 30 seconds away it will be a long one. If we start dumping the purgatory prematurely we will have to respond with an error because we lack sufficient acknowledgements.

        Show
        Jay Kreps added a comment - TLDR: attached one-line patch should fix the deadlock, but there is a larger issue (though not a blocker). I don't like the idea of splitting response and request between different threads. The reason our socket server is simple is because each network thread is totally independent of all others, so there are no threading issues. Mixing these is slow (because of all the locking) and error prone. I have seen this done before and it is a big mess. Jun's second idea is better, but its not as simple as described. We have to put the newly attached request somewhere and trigger a second attempt on adding it to the queue. Registering again for reading doesn't really work because there won't be more data to read. Registering for writing doesn't work because sockets are always writable so we would end up busy waiting. So to make this work we would need some kind of list where we stored requests that had been read but didn't fit in the queue. But we need something that will check this list periodically and it is hard to guarantee that that would happen with any more frequency that the poll timeout. But I think we are muddling things a bit. Let's step back and think about this from first principles. Why do queues have limits? The reason is to bound memory usage. So taking data off the socket and putting it in a list is silly, that defeats the original purpose of having the bound (the queue after all is just a list). But think about this more. Why are we blocking adding responses to the response queue? The reason would be to bound memory usage. But the response queue doesn't actually bound memory usage. Things going into the response queue come either directly from processors or from purgatory, and in either case they are taking up memory there. Preventing responses from going out isn't helping anything. So the short term fix is just to remove the bound on the response queue. The larger problem is that regardless of this change in 0.8 we aren't effectively bounding memory usage . The reason is the purgatory. The purgatory will accumulate requests any time expiration gets slow. This could be due to a misconfigured client or due to a slow broker. So the error is that we are using queue size to indicate "backlog" but really the proper measure of backlog is the total number of requests in flight including all requests in queues OR in purgatory. But even once we understand the correct limit, it isn't clear what to do once we hit that limit. There are two choices: (1) stop taking new requests, (2) prematurely start responding to requests in the purgatory. Neither of these is great. Consider the case where one broker gets slow and umpteen produce requests pile up in purgatory. If we stop taking new requests that is like a GC pause, but since the timeout could be 30 seconds away it will be a long one. If we start dumping the purgatory prematurely we will have to respond with an error because we lack sufficient acknowledgements.
        Jay Kreps made changes -
        Attachment KAFKA-702-v1.patch [ 12565076 ]
        Hide
        Neha Narkhede added a comment -

        Agree that a longer term fix is to be able to bound the total memory usage by counting the requests in flight in the purgatory. But when we do reach this limit, we should load shed from the purgatory like you suggested. One way of doing that is by implementing client quotas and shedding requests coming from the faulty clients. This might require more thought and probably a big refactoring. I think we should checkin this patch and tune GC and see how that goes.

        +1 on patch v1

        Show
        Neha Narkhede added a comment - Agree that a longer term fix is to be able to bound the total memory usage by counting the requests in flight in the purgatory. But when we do reach this limit, we should load shed from the purgatory like you suggested. One way of doing that is by implementing client quotas and shedding requests coming from the faulty clients. This might require more thought and probably a big refactoring. I think we should checkin this patch and tune GC and see how that goes. +1 on patch v1
        Hide
        Jun Rao added a comment -

        First of all, +1 on the simple patch. I think it solves the immediate problem.

        For socket selector, my understanding after reading the java doc is that the selected keys are always there unless you explicitly remove it. In other words, those selected keys won't magically go away after the socket key is being consumed. Everytime we call select(), only newly available keys are added and existing selected keys are untouched. So, even if we have finished reading from a socket, if the key is not automatically removed and select() will still give the same key back. However, my suggestion is probably worse than this patch. Before the request queue has space again, the processor thread could be doing the busy loop by keeping trying to add the same request from a socket to the request queue.

        A second thing is that, currently, the number of outstanding requests on the broker is bounded by the number of clients since each client can have at most one outstanding request. So, if we bound the number of clients, we can somewhat bound the memory used by outstanding requests. This limit is probably useful for not running out of open file handlers too.

        Show
        Jun Rao added a comment - First of all, +1 on the simple patch. I think it solves the immediate problem. For socket selector, my understanding after reading the java doc is that the selected keys are always there unless you explicitly remove it. In other words, those selected keys won't magically go away after the socket key is being consumed. Everytime we call select(), only newly available keys are added and existing selected keys are untouched. So, even if we have finished reading from a socket, if the key is not automatically removed and select() will still give the same key back. However, my suggestion is probably worse than this patch. Before the request queue has space again, the processor thread could be doing the busy loop by keeping trying to add the same request from a socket to the request queue. A second thing is that, currently, the number of outstanding requests on the broker is bounded by the number of clients since each client can have at most one outstanding request. So, if we bound the number of clients, we can somewhat bound the memory used by outstanding requests. This limit is probably useful for not running out of open file handlers too.
        Hide
        Sriram Subramanian added a comment -

        I would like to add my thoughts to this.

        1. Load shedding arbitrary clients will bound the memory but would essentially cause the system to fail most of the requests and not recover from it till the load goes down. We have quite a few inter-dependencies between requests (producer depends on replica requests, replica depends on produce requests and consumer requests depend on produce requests) and dropping requests would essentially cause the requests depending on it to stay longer in the purgatory and fail.

        2. Having client quotas may not work because we do not have one faulty client. Each client can at most have only one request.

        Few improvements might reduce the failure scenarios

        1. Currently replica request wait on a hard limit (min bytes). Instead they could be made to return earlier to free the purgatory and accept more requests during high load scenarios.
        2. Direct consumers to read from other replicas in the isr that have lesser load. This is going to be harder.

        Show
        Sriram Subramanian added a comment - I would like to add my thoughts to this. 1. Load shedding arbitrary clients will bound the memory but would essentially cause the system to fail most of the requests and not recover from it till the load goes down. We have quite a few inter-dependencies between requests (producer depends on replica requests, replica depends on produce requests and consumer requests depend on produce requests) and dropping requests would essentially cause the requests depending on it to stay longer in the purgatory and fail. 2. Having client quotas may not work because we do not have one faulty client. Each client can at most have only one request. Few improvements might reduce the failure scenarios 1. Currently replica request wait on a hard limit (min bytes). Instead they could be made to return earlier to free the purgatory and accept more requests during high load scenarios. 2. Direct consumers to read from other replicas in the isr that have lesser load. This is going to be harder.
        Hide
        Neha Narkhede added a comment -

        Checked this in to proceed with deployment

        Show
        Neha Narkhede added a comment - Checked this in to proceed with deployment
        Neha Narkhede made changes -
        Status Open [ 1 ] Resolved [ 5 ]
        Resolution Fixed [ 1 ]
        Hide
        Neha Narkhede added a comment -

        >> 2. Having client quotas may not work because we do not have one faulty client. Each client can at most have only one request.

        We understand that. Client quotas are better done probably in terms of expirations per second. Basically, if you setup your partitions with a large replication factor (let's say 6) and set the num.acks in your producer to -1. At the same time, if you set your timeout too low, all requests will timeout and expire. This will allow your client to send many requests that all timeout.

        Load shedding needs more thought. It is not as straightforward and when we scope it out, we will need to obviously keep in mind consequences of load shedding.

        Show
        Neha Narkhede added a comment - >> 2. Having client quotas may not work because we do not have one faulty client. Each client can at most have only one request. We understand that. Client quotas are better done probably in terms of expirations per second. Basically, if you setup your partitions with a large replication factor (let's say 6) and set the num.acks in your producer to -1. At the same time, if you set your timeout too low, all requests will timeout and expire. This will allow your client to send many requests that all timeout. Load shedding needs more thought. It is not as straightforward and when we scope it out, we will need to obviously keep in mind consequences of load shedding.
        Neha Narkhede made changes -
        Status Resolved [ 5 ] Closed [ 6 ]

          People

          • Assignee:
            Jay Kreps
            Reporter:
            Joel Koshy
          • Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development