Kafka
  1. Kafka
  2. KAFKA-749

Bug in socket server shutdown logic makes the broker hang on shutdown until it has to be killed

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Blocker Blocker
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: None
    • Component/s: network
    • Labels:

      Description

      The current shutdown logic of the server shuts down the io threads first, followed by acceptor and finally processor threads. The shutdown API of io threads enqueues a special AllDone command into the common request queue. It shuts down the io thread when it dequeues this special all done command. What can happen is that while this shutdown command processing is happening on the io threads, the network/processor threads can still accept new connections and requests and will add those new requests to the request queue. That means, more requests can be enqueued after the AllDone command. What happens is that after the io threads have shutdown, there is no thread available to dequeue from the request queue. So the processor threads can hang while adding new requests to a full request queue, thereby blocking the server from shutting down.

      1. kafka-749-v1.patch
        7 kB
        Neha Narkhede
      2. kafka-749-v2.patch
        3 kB
        Neha Narkhede

        Activity

        Hide
        Neha Narkhede added a comment -

        Bug fix includes the following changes -

        1. KafkaRequestHandler

        • Removed AllDone, instead the shutdown() command will set isRunning to false and wait for the request handler thread to finish processing existing request and then stop

        2. RequestChannel

        • Modified receiveRequest to wait on a condition variable if the queue is empty. The purpose is to introduce a clean way to wake it up when it is time to shutdown.
        • Added a close() API that will set isShuttingDown to true and signal the condition so all io threads waiting to receiveRequest() will return null
        • Did not clear the queue since the io threads shutdown after the socket server. If we clear the queue, all io threads will try to get the next request and will get null until it shuts down. This time period should hopefully be very short, but it is still inefficient. This is ok for the io thread shutdown logic since it will just process one request before it shuts down as well. So its ok to not clear the queue.

        3. SocketServer

        • Invoke close on request channel after the acceptor and processor threads are shutdown

        4. KafkaServer

        • Shutdown the socket server before the request handler. This ensures we don't accept and enqueue more requests that will timeout anyway.
        Show
        Neha Narkhede added a comment - Bug fix includes the following changes - 1. KafkaRequestHandler Removed AllDone, instead the shutdown() command will set isRunning to false and wait for the request handler thread to finish processing existing request and then stop 2. RequestChannel Modified receiveRequest to wait on a condition variable if the queue is empty. The purpose is to introduce a clean way to wake it up when it is time to shutdown. Added a close() API that will set isShuttingDown to true and signal the condition so all io threads waiting to receiveRequest() will return null Did not clear the queue since the io threads shutdown after the socket server. If we clear the queue, all io threads will try to get the next request and will get null until it shuts down. This time period should hopefully be very short, but it is still inefficient. This is ok for the io thread shutdown logic since it will just process one request before it shuts down as well. So its ok to not clear the queue. 3. SocketServer Invoke close on request channel after the acceptor and processor threads are shutdown 4. KafkaServer Shutdown the socket server before the request handler. This ensures we don't accept and enqueue more requests that will timeout anyway.
        Hide
        Sriram Subramanian added a comment -

        Took a look at this.

        1. This adds another layer of locking to the Request queue which in itself is a Blocking queue implementation. This does not seem very efficient. Either we implement our blocking queue or use the api.
        2. Having said that, it seems a lot easier to just purge the queue. Is this not an option?
        3. If 2 is not possible, a more elegant way is to use the poison pill approach. We shutdown the socket server, and enqueue a shutdown request to the queue. Each request handler thread dequeues the request, checks it is is shutdown request, if so re-queues it and exits.

        Show
        Sriram Subramanian added a comment - Took a look at this. 1. This adds another layer of locking to the Request queue which in itself is a Blocking queue implementation. This does not seem very efficient. Either we implement our blocking queue or use the api. 2. Having said that, it seems a lot easier to just purge the queue. Is this not an option? 3. If 2 is not possible, a more elegant way is to use the poison pill approach. We shutdown the socket server, and enqueue a shutdown request to the queue. Each request handler thread dequeues the request, checks it is is shutdown request, if so re-queues it and exits.
        Hide
        Jay Kreps added a comment -

        The ugly part here is the extra layer of synchronization and signally around the already synchronized blocking queue. This code is a bit hard to validate (for example shouldn't it be signal instead of signalAll--since only one thing was added?) so it tends to quickly get broken by later people who don't understand it.

        I think I don't quite understand why we can't just call clear on the queue and enqueue the AllDone object to achieve this. The uglinesses of the previous implementation where that AllDone actually came out of the RequestChannel and that it was a ProducerRequest. This is easily fixed. There is no reason it should be a Producer request, and the check for eq AllDone can be done in receiveRequest.

        Show
        Jay Kreps added a comment - The ugly part here is the extra layer of synchronization and signally around the already synchronized blocking queue. This code is a bit hard to validate (for example shouldn't it be signal instead of signalAll--since only one thing was added?) so it tends to quickly get broken by later people who don't understand it. I think I don't quite understand why we can't just call clear on the queue and enqueue the AllDone object to achieve this. The uglinesses of the previous implementation where that AllDone actually came out of the RequestChannel and that it was a ProducerRequest. This is easily fixed. There is no reason it should be a Producer request, and the check for eq AllDone can be done in receiveRequest.
        Hide
        Neha Narkhede added a comment -

        The only problem with the AllDone approach is that we need to ensure that the request queue size is atleast as big as the number of io threads we have. If it is less than that, we have the same problem where shutdown will hang.

        Show
        Neha Narkhede added a comment - The only problem with the AllDone approach is that we need to ensure that the request queue size is atleast as big as the number of io threads we have. If it is less than that, we have the same problem where shutdown will hang.
        Hide
        Jay Kreps added a comment -

        So what if we just do

        while(true)

        { queue.clear() queue.offer(AllDone) return }
        Show
        Jay Kreps added a comment - So what if we just do while(true) { queue.clear() queue.offer(AllDone) return }
        Hide
        Jay Kreps added a comment -

        Err, should be

        while(true)

        { queue.clear() if(queue.offer(AllDone)) return }
        Show
        Jay Kreps added a comment - Err, should be while(true) { queue.clear() if(queue.offer(AllDone)) return }
        Hide
        Neha Narkhede added a comment -

        Is this the logic for the shutdown() API in KafkaRequestHandler ? If yes, then I don't think we can keep clearing the queue in a loop. What can happen is that AllDone requests for the rest of the request handlers might still be in the queue. If we clear those, then request handlers won't shutdown and some of the request handlers will just busy wait trying to clear and offer AllDone in a loop.

        I'm thinking an easier solution might be to just require the request queue size as large as the number of io threads. This makes sense so that io threads are never underutilized and there is space for one request per io thread in the request queue. Thoughts ?

        Show
        Neha Narkhede added a comment - Is this the logic for the shutdown() API in KafkaRequestHandler ? If yes, then I don't think we can keep clearing the queue in a loop. What can happen is that AllDone requests for the rest of the request handlers might still be in the queue. If we clear those, then request handlers won't shutdown and some of the request handlers will just busy wait trying to clear and offer AllDone in a loop. I'm thinking an easier solution might be to just require the request queue size as large as the number of io threads. This makes sense so that io threads are never underutilized and there is space for one request per io thread in the request queue. Thoughts ?
        Hide
        Jay Kreps added a comment -

        That makes sense, good solution.

        Show
        Jay Kreps added a comment - That makes sense, good solution.
        Hide
        Sriram Subramanian added a comment -

        If that works fine. But it looks like there really is a solution that does not need any constraints.

        1. SocketServer on shutdown closes the acceptor and processor threads. It can then add a alldone request to the queue.
        2. Each requesthandler thread just dequeues from the request queue. If it is alldone, it just re-enqueues it and exits.
        3. requesthandler shutdown just waits for all threads to exit (this is same as today).

        Will this not work?

        Show
        Sriram Subramanian added a comment - If that works fine. But it looks like there really is a solution that does not need any constraints. 1. SocketServer on shutdown closes the acceptor and processor threads. It can then add a alldone request to the queue. 2. Each requesthandler thread just dequeues from the request queue. If it is alldone, it just re-enqueues it and exits. 3. requesthandler shutdown just waits for all threads to exit (this is same as today). Will this not work?
        Hide
        Neha Narkhede added a comment -

        Thanks for the review ! I think I was over thinking the issue of request queue size having to be larger than io threads. Even if it is smaller, some io thread's shutdown will wait for some space to free up. Space will free up since some other io thread will dequeue the AllDone command.

        This patch is very simple. It changed the shutdown logic of the Kafka server to go through following steps -
        1. Shutdown acceptor, so no new connections are accepted
        2. Shutdown processor threads, they will enqueue the currently selected keys' requests in the request queue. This is fine since io threads are alive and will dequeue requests. So this step will not block
        3. Request channel shutdown will clear the queue. At this time, no thread is enqueuing more data. IO threads trying to dequeue data will hang on the receiveRequest
        4. Shutdown io threads, this will enqueue AllDone command in the queue. And all io threads will shutdown one after the other. Even if the request queue is smaller than # of io threads, it will eventually shutdown

        Show
        Neha Narkhede added a comment - Thanks for the review ! I think I was over thinking the issue of request queue size having to be larger than io threads. Even if it is smaller, some io thread's shutdown will wait for some space to free up. Space will free up since some other io thread will dequeue the AllDone command. This patch is very simple. It changed the shutdown logic of the Kafka server to go through following steps - 1. Shutdown acceptor, so no new connections are accepted 2. Shutdown processor threads, they will enqueue the currently selected keys' requests in the request queue. This is fine since io threads are alive and will dequeue requests. So this step will not block 3. Request channel shutdown will clear the queue. At this time, no thread is enqueuing more data. IO threads trying to dequeue data will hang on the receiveRequest 4. Shutdown io threads, this will enqueue AllDone command in the queue. And all io threads will shutdown one after the other. Even if the request queue is smaller than # of io threads, it will eventually shutdown
        Hide
        Neha Narkhede added a comment -

        Sriram, That is what I had described in my simpler solution and is what patch v2 does.

        Show
        Neha Narkhede added a comment - Sriram, That is what I had described in my simpler solution and is what patch v2 does.
        Hide
        Sriram Subramanian added a comment -

        Awesome. looks good to me. Nit - are we planning to fix getShutdownReceive to not use produceRequest as part of this patch or is that a separate jira?

        Show
        Sriram Subramanian added a comment - Awesome. looks good to me. Nit - are we planning to fix getShutdownReceive to not use produceRequest as part of this patch or is that a separate jira?
        Hide
        Neha Narkhede added a comment -

        KAFKA-745 is filed for that. I prefer to not touch it here, it is a much bigger change

        Show
        Neha Narkhede added a comment - KAFKA-745 is filed for that. I prefer to not touch it here, it is a much bigger change
        Hide
        Jay Kreps added a comment -

        Why is that a big change?

        Show
        Jay Kreps added a comment - Why is that a big change?
        Hide
        Jay Kreps added a comment -

        Also, not sure if I get it. If the I/O threads are able to make progress and the response queue is unlimited in size then shouldn't requestQueue.put always succeed eventually? Even if the queue is currently full some requests will get processed and free up some space...?

        Show
        Jay Kreps added a comment - Also, not sure if I get it. If the I/O threads are able to make progress and the response queue is unlimited in size then shouldn't requestQueue.put always succeed eventually? Even if the queue is currently full some requests will get processed and free up some space...?
        Hide
        Neha Narkhede added a comment -

        That is correct. Also, removing the getShutdownReceive() is part of the refactoring in KAFKA-745. This patch includes the bug fix itself, I would prefer not to extend the scope of this bug fix.

        Show
        Neha Narkhede added a comment - That is correct. Also, removing the getShutdownReceive() is part of the refactoring in KAFKA-745 . This patch includes the bug fix itself, I would prefer not to extend the scope of this bug fix.
        Hide
        Neha Narkhede added a comment -

        Thanks for the reviews, checked in patch v2

        Show
        Neha Narkhede added a comment - Thanks for the reviews, checked in patch v2

          People

          • Assignee:
            Neha Narkhede
            Reporter:
            Neha Narkhede
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Time Tracking

              Estimated:
              Original Estimate - 24h
              24h
              Remaining:
              Remaining Estimate - 24h
              24h
              Logged:
              Time Spent - Not Specified
              Not Specified

                Development