Details

    • Type: New Feature New Feature
    • Status: Resolved
    • Priority: Minor Minor
    • Resolution: Fixed
    • Fix Version/s: 0.6.4
    • Component/s: Core
    • Labels:
      None

      Description

      Now that we have CASSANDRA-401 and CASSANDRA-488 there is one last piece: we need to stop the target node from pulling mutations out of MessagingService as fast as it can only to take up space in the mutation queue and eventually fill up memory.

        Activity

        Hide
        Jonathan Ellis added a comment -

        the first patch makes it so the target node won't OOM and will instead backpressure the control node.

        the second makes it so the control node will notice the backpressure and pass it on (via timeoutexception) to the thrift client, rather than OOMing itself from continuing to enqueue messages to an unresponsive target.

        Show
        Jonathan Ellis added a comment - the first patch makes it so the target node won't OOM and will instead backpressure the control node. the second makes it so the control node will notice the backpressure and pass it on (via timeoutexception) to the thrift client, rather than OOMing itself from continuing to enqueue messages to an unresponsive target.
        Hide
        Jonathan Ellis added a comment -

        Explained backpressure motivation on IRC:

        under heavy load each node A will have 2 kinds of traffic to each other node B. A will have new commands it needs to send to B, and it will also have replies to commands that B sent to it. If B is overloaded, you need to be able to backpressure new commands to it, while allowing replies to it to go through. Replies create virtually no extra load and it makes the clients much happier.

        Show
        Jonathan Ellis added a comment - Explained backpressure motivation on IRC: under heavy load each node A will have 2 kinds of traffic to each other node B. A will have new commands it needs to send to B, and it will also have replies to commands that B sent to it. If B is overloaded, you need to be able to backpressure new commands to it, while allowing replies to it to go through. Replies create virtually no extra load and it makes the clients much happier.
        Hide
        Jonathan Ellis added a comment -

        We either need to (a) have a separate deserialization queue for "reply" traffic (we could use one of the "header" bits that isn't part of the Message proper to control this), or (b) drop messages for overloaded states on the floor so the deserializer doesn't overload, or (c) we need to give up the command/reply division entirely.

        Alternatively, option (b) reminds me that instead of "backpressure" we could just "timeoutpressure," where instead of overloaded stages backpressuring message deserializer backpressuring socket reads, the deserializer can just discard messages the system is too busy to handle. The downside is, it will take an extra rpc_timeout latency before the clients start to get timeouts. The upside is, as things unclog the messages that get processed will be fresh ones, so we are less likely to waste work processing messages that the client isn't even waiting for anymore.

        Also, I'd like to dynamically adjust stage capacity based on the amount of work that gets processed, rather than have a fixed value that has to be manually tuned. Not sure what that would look like – none of the Java BlockingQueue classes have adjustable capacity post-construction. But, stage enqueueing is only done in one place (by the deserializer executor) so we can one-off something if we have to.

        Show
        Jonathan Ellis added a comment - We either need to (a) have a separate deserialization queue for "reply" traffic (we could use one of the "header" bits that isn't part of the Message proper to control this), or (b) drop messages for overloaded states on the floor so the deserializer doesn't overload, or (c) we need to give up the command/reply division entirely. Alternatively, option (b) reminds me that instead of "backpressure" we could just "timeoutpressure," where instead of overloaded stages backpressuring message deserializer backpressuring socket reads, the deserializer can just discard messages the system is too busy to handle. The downside is, it will take an extra rpc_timeout latency before the clients start to get timeouts. The upside is, as things unclog the messages that get processed will be fresh ones, so we are less likely to waste work processing messages that the client isn't even waiting for anymore. Also, I'd like to dynamically adjust stage capacity based on the amount of work that gets processed, rather than have a fixed value that has to be manually tuned. Not sure what that would look like – none of the Java BlockingQueue classes have adjustable capacity post-construction. But, stage enqueueing is only done in one place (by the deserializer executor) so we can one-off something if we have to.
        Hide
        Jonathan Ellis added a comment -

        Another thought: there is tension between "I want to make the client slow down, so it stops making things worse by attempting more operations against an almost-overloaded node" and "if only one node is overloaded for whatever reason (maybe it is doing compactions and handling bootstrap simultaneously for instance), I want to be able to continue if my ConsistencyLevel and ReplicationFactor allow it."

        Also: reads are different from writes; a read against an overloaded node may just be dropped; a write should be HH'd.

        Show
        Jonathan Ellis added a comment - Another thought: there is tension between "I want to make the client slow down, so it stops making things worse by attempting more operations against an almost-overloaded node" and "if only one node is overloaded for whatever reason (maybe it is doing compactions and handling bootstrap simultaneously for instance), I want to be able to continue if my ConsistencyLevel and ReplicationFactor allow it." Also: reads are different from writes; a read against an overloaded node may just be dropped; a write should be HH'd.
        Hide
        Jonathan Ellis added a comment -

        note to self: cap CFS.flushWriter queue

        Show
        Jonathan Ellis added a comment - note to self: cap CFS.flushWriter queue
        Hide
        Ryan King added a comment -

        Would this back-pressure apply to commit log replay? We recently ran into a situation where a node with very large commit logs managed to OOM itself by backing up the row mutation stage queue.

        Show
        Ryan King added a comment - Would this back-pressure apply to commit log replay? We recently ran into a situation where a node with very large commit logs managed to OOM itself by backing up the row mutation stage queue.
        Hide
        Stu Hood added a comment -

        > Would this back-pressure apply to commit log replay?
        It seems that one of the reasons why commit logs get large in the first place is because we don't have backpressure, so I think this should fix the problem indirectly by preventing huge commit logs.

        Show
        Stu Hood added a comment - > Would this back-pressure apply to commit log replay? It seems that one of the reasons why commit logs get large in the first place is because we don't have backpressure, so I think this should fix the problem indirectly by preventing huge commit logs.
        Hide
        Jonathan Ellis added a comment -

        yes, capping RM stage is part of this

        Show
        Jonathan Ellis added a comment - yes, capping RM stage is part of this
        Hide
        Jonathan Ellis added a comment -

        Following the line of reasoning from My comment on Jan 22, I think the best thing to do is to take what we're doing now – allowing TimedoutExceptions to serve as flow control – but make how we deal with overload situations better so we don't have the current potential for a vicious cycle of getting farther and farther behind while RMS/RRS executors waste time processing requests for which the coordinator node long since stopped waiting for:

        • uncap RMS and RRS executors. instead,
        • MessageDeserializer will check recent RMS/RRS throughput and will simply discard requests that won't make it through the task queue within RPCTimeout (preventing memory pressure from huge task queue backlog, i have seen upwards of 1.5M pendingtasks on MD)
        • MD will tag requests with a timestamp as they arrive and RMS/RRS will again discard requests that have spent longer than RPCTimeout in the task queue
        • log replay will have to self-throttle since RMS queue won't be doing it for it (it would be nice to deal with this by adjusting the queue size but concurrent queue sizes are fixed once created)
        Show
        Jonathan Ellis added a comment - Following the line of reasoning from My comment on Jan 22, I think the best thing to do is to take what we're doing now – allowing TimedoutExceptions to serve as flow control – but make how we deal with overload situations better so we don't have the current potential for a vicious cycle of getting farther and farther behind while RMS/RRS executors waste time processing requests for which the coordinator node long since stopped waiting for: uncap RMS and RRS executors. instead, MessageDeserializer will check recent RMS/RRS throughput and will simply discard requests that won't make it through the task queue within RPCTimeout (preventing memory pressure from huge task queue backlog, i have seen upwards of 1.5M pendingtasks on MD) MD will tag requests with a timestamp as they arrive and RMS/RRS will again discard requests that have spent longer than RPCTimeout in the task queue log replay will have to self-throttle since RMS queue won't be doing it for it (it would be nice to deal with this by adjusting the queue size but concurrent queue sizes are fixed once created)
        Hide
        Matthew F. Dennis added a comment -

        trunk-685.txt tags incoming requests with a timestamp. When the request is pulled from the queue to be processed, it's timestamp is checked against the RPC_TIMEOUT and discarded if needed.

        Recovery is throttled by tracking the futures for the submitted tasks and waiting until they're finished in blocks of MAX_OUTSTANDING_REPLAY_COUNT (currently 1024, could probably be much larger).

        Show
        Matthew F. Dennis added a comment - trunk-685.txt tags incoming requests with a timestamp. When the request is pulled from the queue to be processed, it's timestamp is checked against the RPC_TIMEOUT and discarded if needed. Recovery is throttled by tracking the futures for the submitted tasks and waiting until they're finished in blocks of MAX_OUTSTANDING_REPLAY_COUNT (currently 1024, could probably be much larger).
        Hide
        Jonathan Ellis added a comment -

        committed w/ minor cleanup of MDT

        Show
        Jonathan Ellis added a comment - committed w/ minor cleanup of MDT
        Hide
        Hudson added a comment -

        Integrated in Cassandra #491 (See http://hudson.zones.apache.org/hudson/job/Cassandra/491/)
        pre-emptively droprequests that cannot be processedwithin RPCTimeout. patch by mdennis; reviewed by jbellis for CASSANDRA-685

        Show
        Hudson added a comment - Integrated in Cassandra #491 (See http://hudson.zones.apache.org/hudson/job/Cassandra/491/ ) pre-emptively droprequests that cannot be processedwithin RPCTimeout. patch by mdennis; reviewed by jbellis for CASSANDRA-685
        Hide
        Jonathan Ellis added a comment -

        re-opening for backport to 0.6.4

        Show
        Jonathan Ellis added a comment - re-opening for backport to 0.6.4
        Hide
        Jonathan Ellis added a comment -

        backported to 0.6.4 in r978791

        Show
        Jonathan Ellis added a comment - backported to 0.6.4 in r978791

          People

          • Assignee:
            Jonathan Ellis
            Reporter:
            Jonathan Ellis
          • Votes:
            0 Vote for this issue
            Watchers:
            12 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development