Kafka
  1. Kafka
  2. KAFKA-1043

Time-consuming FetchRequest could block other request in the response queue

    Details

    • Type: Bug Bug
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: 0.8.1
    • Fix Version/s: 0.8.2
    • Component/s: None
    • Labels:
      None

      Description

      Since in SocketServer the processor who takes any request is also responsible for writing the response for that request, we make each processor owning its own response queue. If a FetchRequest takes irregularly long time to write the channel buffer it would block all other responses in the queue.

        Activity

        Hide
        Jun Rao added a comment -

        moving this to 0.8.1 since the issue happens rarely.

        Show
        Jun Rao added a comment - moving this to 0.8.1 since the issue happens rarely.
        Hide
        Neha Narkhede added a comment -

        As Sriram said, we no longer block on a full socket buffer. The problem is really large fetch requests, like those coming from a lagging mirror maker, hogging the network thread by writing as much as possible while the socket buffer is not full. This basically increases the response send time for all other requests whose responses are queued up behind this large fetch request. This causes a downward spiral that takes quite some time to recover due to the filled up response queues.

        We could cap based on size where we yield the network thread after n MBs are written on the wire, giving a chance for the rest of the smaller responses to get written on the socket. This will ensure a large or a few large fetch requests don't penalize several other smaller requests.

        Show
        Neha Narkhede added a comment - As Sriram said, we no longer block on a full socket buffer. The problem is really large fetch requests, like those coming from a lagging mirror maker, hogging the network thread by writing as much as possible while the socket buffer is not full. This basically increases the response send time for all other requests whose responses are queued up behind this large fetch request. This causes a downward spiral that takes quite some time to recover due to the filled up response queues. We could cap based on size where we yield the network thread after n MBs are written on the wire, giving a chance for the rest of the smaller responses to get written on the socket. This will ensure a large or a few large fetch requests don't penalize several other smaller requests.
        Hide
        Guozhang Wang added a comment -

        IMHO the local time processing the fetch response is linear to # partitions in the request, while the network time writing the socket buffer is not, depending on whether the data is still in file cache or not. Hence following the 1) reset-socket-buffer-size or 2) subset-topic-partitions-at-a-time methods if we need either 1) set the buffer size too small which is unfair for other requests that do not hit I/O and may result in unnecessary round trips or 2) fetch too small a subset of topic-partitions which will be the same case as 1).

        Capping based on time is better since it provides "fairness" but that seems a little hacky.

        My reasoning of decoupling socket and network processor is the following. As we scale up the principle should be "various clients are isolated from each other". As for fetch request it would be "if you request old data from many topic partitions only your self-request should take long time but other requests should not be impacted". Today a request's life time as on server is

        socket -> network processor -> request handler -> (possible) disk I/O due to flush for produce request -> socket processor -> network I/O

        and one way to enable isolation is that no pair of this path is single-threaded. Today socket -> network processor is via acceptor, network processor -> request handler is via request queue, request handler -> (possible) disk I/O due to flush for produce request is fixed in KAFKA-615; but socket processor -> network I/O is still coupled, and fixes to issues resulted by this coupling would be taking care of the "worst case", which does not obey the "isolation" principle.

        I agree this is rather complex and would be a long term thing.

        Show
        Guozhang Wang added a comment - IMHO the local time processing the fetch response is linear to # partitions in the request, while the network time writing the socket buffer is not, depending on whether the data is still in file cache or not. Hence following the 1) reset-socket-buffer-size or 2) subset-topic-partitions-at-a-time methods if we need either 1) set the buffer size too small which is unfair for other requests that do not hit I/O and may result in unnecessary round trips or 2) fetch too small a subset of topic-partitions which will be the same case as 1). Capping based on time is better since it provides "fairness" but that seems a little hacky. My reasoning of decoupling socket and network processor is the following. As we scale up the principle should be "various clients are isolated from each other". As for fetch request it would be "if you request old data from many topic partitions only your self-request should take long time but other requests should not be impacted". Today a request's life time as on server is socket -> network processor -> request handler -> (possible) disk I/O due to flush for produce request -> socket processor -> network I/O and one way to enable isolation is that no pair of this path is single-threaded. Today socket -> network processor is via acceptor, network processor -> request handler is via request queue, request handler -> (possible) disk I/O due to flush for produce request is fixed in KAFKA-615 ; but socket processor -> network I/O is still coupled, and fixes to issues resulted by this coupling would be taking care of the "worst case", which does not obey the "isolation" principle. I agree this is rather complex and would be a long term thing.
        Hide
        Sriram Subramanian added a comment -

        We no longer block on the socket buffers if they get full. We do block on a slow I/O. That can be fixed by either capping it based on time or only fetching a subset of topics. Even if there is no slow I/O and the socket buffers do not get full, we could spend too much time processing the fetch response. In that case, fetching only a subset of topics should help. Anything more is just over engineering to me.

        Show
        Sriram Subramanian added a comment - We no longer block on the socket buffers if they get full. We do block on a slow I/O. That can be fixed by either capping it based on time or only fetching a subset of topics. Even if there is no slow I/O and the socket buffers do not get full, we could spend too much time processing the fetch response. In that case, fetching only a subset of topics should help. Anything more is just over engineering to me.
        Hide
        Jay Kreps added a comment -

        Why not just configure the socket buffer size based on the maximum I/O size we want to block on? Say 512k.

        We had two issues--one was blocking on the full write which I think/hope we fixed. The second was that even if we do that you still have to block for one socket buffers worth of data. The simplest thing is just to set this buffer to a reasonable default.

        Decoupling the socket from the selectors is possible but probably pretty complex for a minor bug fix. I also think this only works as long as we maintain the current setup where we only allow a single in-flight request at a time per socket. If we allowed multiple requests then the socket must remain registered with some selector and you have to send the response back to that selector.

        Show
        Jay Kreps added a comment - Why not just configure the socket buffer size based on the maximum I/O size we want to block on? Say 512k. We had two issues--one was blocking on the full write which I think/hope we fixed. The second was that even if we do that you still have to block for one socket buffers worth of data. The simplest thing is just to set this buffer to a reasonable default. Decoupling the socket from the selectors is possible but probably pretty complex for a minor bug fix. I also think this only works as long as we maintain the current setup where we only allow a single in-flight request at a time per socket. If we allowed multiple requests then the socket must remain registered with some selector and you have to send the response back to that selector.
        Hide
        Guozhang Wang added a comment -

        Giving it some more thought, I think a long term fix should be decoupling the sockets with the network threads. With this decoupling given a response and its corresponding target socket, any network threads can then pick it up and write it to socket. In this way we would have a network threads pool AND a socket pool and any thread can work on any socket.

        Any comments Jay Kreps?

        Show
        Guozhang Wang added a comment - Giving it some more thought, I think a long term fix should be decoupling the sockets with the network threads. With this decoupling given a response and its corresponding target socket, any network threads can then pick it up and write it to socket. In this way we would have a network threads pool AND a socket pool and any thread can work on any socket. Any comments Jay Kreps ?
        Hide
        Guozhang Wang added a comment -

        One possible approach would be adding a timer to the Send trait, and add one more parameter for the writeTo function as the max time for a single write to the socket buffer. When the time threshold has reached force the writeTo function to return.

        An alternative approach would be to make the SocketServer's Acceptor smarter than doing round-robin to assign requests so that incoming new requests would be assign to processors that has shorter response queue. I think we do not need to change SocketServer to response queue-aware but just write a pluggable acceptor interface would be fine. The cons of this approach is that requests that have already been assigned to the blocked processor would still suffer from the delay, but future requests would be assigned to other processors.

        Show
        Guozhang Wang added a comment - One possible approach would be adding a timer to the Send trait, and add one more parameter for the writeTo function as the max time for a single write to the socket buffer. When the time threshold has reached force the writeTo function to return. An alternative approach would be to make the SocketServer's Acceptor smarter than doing round-robin to assign requests so that incoming new requests would be assign to processors that has shorter response queue. I think we do not need to change SocketServer to response queue-aware but just write a pluggable acceptor interface would be fine. The cons of this approach is that requests that have already been assigned to the blocked processor would still suffer from the delay, but future requests would be assigned to other processors.

          People

          • Assignee:
            Guozhang Wang
            Reporter:
            Guozhang Wang
          • Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

            • Created:
              Updated:

              Development