Uploaded image for project: 'Cassandra'
  1. Cassandra
  2. CASSANDRA-1358

Clogged RRS/RMS stages can hold up processing of gossip messages and request acks

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Fix Version/s: 0.6.5
    • Component/s: None
    • Labels:
      None
    • Environment:

      All.

      Description

      The message deserialization process can become a bottleneck that prevents efficient resource utilization because the executor that manages the deserialization process will never grow beyond a single thread. The message deserializer executor is instantiated in the MessagingService constructor as a JMXEnableThreadPoolExecutor, which extends java.util.concurrent.ThreadPoolExecutor. The thread pool is instantiated with a corePoolSize of 1 and a maximumPoolSize of Runtime.getRuntime().availableProcessors(). But, according to the ThreadPoolExecutor documentation "using an unbounded queue (for example a LinkedBlockingQueue without a predefined capacity) will cause new tasks to be queued in cases where all corePoolSize threads are busy. Thus, no more than corePoolSize threads will ever be created. (And the value of the maximumPoolSize therefore doesn't have any effect.)"

      The message deserializer pool uses a LinkedBlockingQueue, so there will never be more than one deserialization thread. This issue became a problem in our production cluster when the MESSAGE-DESERIALIZER-POOL began to back up on a node that was only lightly loaded. We increased the core pool size to 4 and the situation improved, but the deserializer pool was still backing up while the machine was not fully utilized (less than 100% CPU utilization). This leads me to think that the deserializer thread is blocking on some sort of I/O, which seems like it shouldn't happen.

      1. 1358.txt
        9 kB
        Jonathan Ellis
      2. 1358-v2.txt
        12 kB
        Jonathan Ellis

        Activity

        Hide
        jbellis Jonathan Ellis added a comment -

        I have never ever seen MDP be the bottleneck as opposed to the stage it is pushing the request into next (either mutation or read).

        Show
        jbellis Jonathan Ellis added a comment - I have never ever seen MDP be the bottleneck as opposed to the stage it is pushing the request into next (either mutation or read).
        Hide
        mmalone Mike Malone added a comment - - edited

        So are the message deserializer threads blocking on a response from (or pushing a task onto a queue for) a different thread pool then? The problem we saw / are seeing is that the deserializer pool backs up and it causes the server to flap because (apparently) it's unable to process gossip requests / responses in a timely manner. Since all inter-node RPC goes through the deserializer, this queue ends up causing all sorts of crazy havoc when it gets backed up.

        Even supposing the MDP isn't the bottleneck, the current code seems to be a mistake. If the intent is to have a single threaded executor, the second argument should be 1. If the intent is to have a multiple-threaded executor, the first argument should not be 1.

        Show
        mmalone Mike Malone added a comment - - edited So are the message deserializer threads blocking on a response from (or pushing a task onto a queue for) a different thread pool then? The problem we saw / are seeing is that the deserializer pool backs up and it causes the server to flap because (apparently) it's unable to process gossip requests / responses in a timely manner. Since all inter-node RPC goes through the deserializer, this queue ends up causing all sorts of crazy havoc when it gets backed up. Even supposing the MDP isn't the bottleneck, the current code seems to be a mistake. If the intent is to have a single threaded executor, the second argument should be 1. If the intent is to have a multiple-threaded executor, the first argument should not be 1.
        Hide
        jbellis Jonathan Ellis added a comment -

        So are the message deserializer threads blocking on a response from (or pushing a task onto a queue for) a different thread pool then?

        yes.

        If the intent is to have a single threaded executor, the second argument should be 1

        true, but possibly moot, because:

        it's unable to process gossip requests / responses in a timely manner

        good point. we have a separate connection for gossip and acks so they won't be blocked by a large request on the outbound path, but combining them both through the same MDP on the inbound side defeats that to a large degree.

        probably we should just get rid of MDP, and do the deserialize on the IncomingTcpConnection thread. (This would require moving the flow control from CASSANDRA-685 / CASSANDRA-1284 into RMVH / RVH.)

        Show
        jbellis Jonathan Ellis added a comment - So are the message deserializer threads blocking on a response from (or pushing a task onto a queue for) a different thread pool then? yes. If the intent is to have a single threaded executor, the second argument should be 1 true, but possibly moot, because: it's unable to process gossip requests / responses in a timely manner good point. we have a separate connection for gossip and acks so they won't be blocked by a large request on the outbound path, but combining them both through the same MDP on the inbound side defeats that to a large degree. probably we should just get rid of MDP, and do the deserialize on the IncomingTcpConnection thread. (This would require moving the flow control from CASSANDRA-685 / CASSANDRA-1284 into RMVH / RVH.)
        Hide
        jbellis Jonathan Ellis added a comment -

        (getting rid of MDP would also slightly reduce the object allocations on the network path, since we're not wrapping the work in a separate Task object and involving the extra executor queue. So I would expect a minor performance benefit as well.)

        Show
        jbellis Jonathan Ellis added a comment - (getting rid of MDP would also slightly reduce the object allocations on the network path, since we're not wrapping the work in a separate Task object and involving the extra executor queue. So I would expect a minor performance benefit as well.)
        Hide
        mmalone Mike Malone added a comment -

        I'm continuing to dig deeper into this code while simultaneously nursing one of our cluster's back to health, so I apologize for the sort of stream-of-consciousness here...

        I noticed that several of the executor queues are bounded at 4096 tasks. Has there been much thought put into that choice, or is it an arbitrary round number that someone picked? It seems to me that bumping that number up a couple orders of magnitude or making it unbounded might ameliorate the situation. Instead of having the stage executors filling up and pushing task execution back onto the calling thread (which is single thread in the case of MDP) more messages will stack up in the callee-queues. This should give the various queues a fair chance of processing stuff they're interested in without being blocked by MDP (which is being blocked by some other stage). There may be some slight memory overhead because deserialized objects will be in memory instead of serialized ones, but that's a priced I'd be willing to pay.

        I did find one possible reason to have an executor with a core pool size of 1, an unbounded queue, and a maximumPoolSize > 1. It looks like the default RejectedExecutionHandler is affected by maximumPoolSize. If it's > 1 then the default constructor assumes that tasks can safely be scheduled in parallel, and it defaults to a "caller runs" policy if the queue is full. But if the maximumPoolSize is 1, the rejected execution handler spins on offering the task to the queue with a 1 second timeout. So if your maximum pool size is greater than one you can basically use the calling threads for spare capacity.

        Still, if that's the goal it should be made more explicit. I'm guessing the intent was to give the MDP one thread per core under the assumption that it will be completely CPU bound. But the implementation is borked for a number of reasons. Plus, if the MDP can block on other stuff, the CPU bound assumption is wrong. If MDP can block, it should probably have a lot more threads.

        Show
        mmalone Mike Malone added a comment - I'm continuing to dig deeper into this code while simultaneously nursing one of our cluster's back to health, so I apologize for the sort of stream-of-consciousness here... I noticed that several of the executor queues are bounded at 4096 tasks. Has there been much thought put into that choice, or is it an arbitrary round number that someone picked? It seems to me that bumping that number up a couple orders of magnitude or making it unbounded might ameliorate the situation. Instead of having the stage executors filling up and pushing task execution back onto the calling thread (which is single thread in the case of MDP) more messages will stack up in the callee-queues. This should give the various queues a fair chance of processing stuff they're interested in without being blocked by MDP (which is being blocked by some other stage). There may be some slight memory overhead because deserialized objects will be in memory instead of serialized ones, but that's a priced I'd be willing to pay. I did find one possible reason to have an executor with a core pool size of 1, an unbounded queue, and a maximumPoolSize > 1. It looks like the default RejectedExecutionHandler is affected by maximumPoolSize. If it's > 1 then the default constructor assumes that tasks can safely be scheduled in parallel, and it defaults to a "caller runs" policy if the queue is full. But if the maximumPoolSize is 1, the rejected execution handler spins on offering the task to the queue with a 1 second timeout. So if your maximum pool size is greater than one you can basically use the calling threads for spare capacity. Still, if that's the goal it should be made more explicit. I'm guessing the intent was to give the MDP one thread per core under the assumption that it will be completely CPU bound. But the implementation is borked for a number of reasons. Plus, if the MDP can block on other stuff, the CPU bound assumption is wrong. If MDP can block, it should probably have a lot more threads.
        Hide
        mmalone Mike Malone added a comment -

        Er, or you could get rid of MDP. That sounds like an excellent solution! Still wonder about the bounded queue sizes though.

        Show
        mmalone Mike Malone added a comment - Er, or you could get rid of MDP. That sounds like an excellent solution! Still wonder about the bounded queue sizes though.
        Hide
        jbellis Jonathan Ellis added a comment -

        Yes, unbounding those would also be required in the get-rid-of-MDP scenario. In short, the bound never makes things better and can make things worse.

        (But, if you are running into that 4096 bound – as I predicted earlier – your node is already severely overwhelmed and the only thing we are discussing is how to mitigate the effects, this is not going to stop it from timing out a ton of requests under those conditions.)

        Show
        jbellis Jonathan Ellis added a comment - Yes, unbounding those would also be required in the get-rid-of-MDP scenario. In short, the bound never makes things better and can make things worse. (But, if you are running into that 4096 bound – as I predicted earlier – your node is already severely overwhelmed and the only thing we are discussing is how to mitigate the effects, this is not going to stop it from timing out a ton of requests under those conditions.)
        Hide
        jbellis Jonathan Ellis added a comment -

        Patch to remove MessageDeserializationTask; the drop-messages-that-can't-possibly-complete-within-timeout code moves to MessageDeliveryTask, and the RMS/RRS queues are uncapped.

        Show
        jbellis Jonathan Ellis added a comment - Patch to remove MessageDeserializationTask; the drop-messages-that-can't-possibly-complete-within-timeout code moves to MessageDeliveryTask, and the RMS/RRS queues are uncapped.
        Hide
        stuhood Stu Hood added a comment -
        • defaultExecutor still claims the name MESSAGE-DESERIALIZER-POOL
        • Patch doesn't actually remove net/MessageDeserializationTask.java
        • MessagingService.getDeserializationExecutor() has no callers
        Show
        stuhood Stu Hood added a comment - defaultExecutor still claims the name MESSAGE-DESERIALIZER-POOL Patch doesn't actually remove net/MessageDeserializationTask.java MessagingService.getDeserializationExecutor() has no callers
        Hide
        jbellis Jonathan Ellis added a comment -

        v2 fixes above, and import cleanup in MessagingService

        Show
        jbellis Jonathan Ellis added a comment - v2 fixes above, and import cleanup in MessagingService
        Hide
        stuhood Stu Hood added a comment -

        +1

        Show
        stuhood Stu Hood added a comment - +1
        Hide
        jbellis Jonathan Ellis added a comment -

        committed

        Show
        jbellis Jonathan Ellis added a comment - committed

          People

          • Assignee:
            jbellis Jonathan Ellis
            Reporter:
            mmalone Mike Malone
          • Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development