Uploaded image for project: 'Cassandra'
  1. Cassandra
  2. CASSANDRA-3005

OutboundTcpConnection's sending queue grows unboundedly without any backpressure logic

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Fix Version/s: 1.1.0
    • Component/s: None
    • Labels:
      None

      Description

      OutboundTcpConnection's sending queue unconditionally queues up the request and process them in sequence. Thinking about tagging the message coming in with timestamp and drop them before actually sending it if the message stays in the queue for too long, which is defined by the message's own time out value.

      1. 3005-v3.txt
        8 kB
        Jonathan Ellis
      2. 3005-v4.txt
        8 kB
        Jonathan Ellis
      3. c3005.patch
        8 kB
        Melvin Wang
      4. c3005-v2
        8 kB
        Melvin Wang
      5. c3005-v4
        9 kB
        Melvin Wang

        Activity

        Hide
        mw Melvin Wang added a comment -

        The goal here is to remove the expired messages as many as we can to avoid the queue going unbounded.

        Two places to do this.
        1) when we try to send the message, we drop it if it is already expired. This doesn't work if the connection is somehow slow because the queue may be piled up already. Thus we have
        2) when we enqueue a message we try to remove some expired messages as well. This is tricky because we now have two threads trying to remove from one queue. Although the operation on the queue is protected, w/o synchronization, we have enqueue() trying to remove a message which is not he peeked a moment ago (since it may already be removed from the queue by run()).

        To solve this w/o using lock, I use two queues. Whenever run() finishes one queue, it (and only it) will swap the pointers of queues to process the other one. ConcurrentLinkedQueue is implemented using a non-block algorithm, so we don't suffer much here.

        Show
        mw Melvin Wang added a comment - The goal here is to remove the expired messages as many as we can to avoid the queue going unbounded. Two places to do this. 1) when we try to send the message, we drop it if it is already expired. This doesn't work if the connection is somehow slow because the queue may be piled up already. Thus we have 2) when we enqueue a message we try to remove some expired messages as well. This is tricky because we now have two threads trying to remove from one queue. Although the operation on the queue is protected, w/o synchronization, we have enqueue() trying to remove a message which is not he peeked a moment ago (since it may already be removed from the queue by run()). To solve this w/o using lock, I use two queues. Whenever run() finishes one queue, it (and only it) will swap the pointers of queues to process the other one. ConcurrentLinkedQueue is implemented using a non-block algorithm, so we don't suffer much here.
        Hide
        mw Melvin Wang added a comment -

        The previous patch was not up-to-date. sorry for the confusion.

        Show
        mw Melvin Wang added a comment - The previous patch was not up-to-date. sorry for the confusion.
        Hide
        stuhood Stu Hood added a comment -

        Rather than nesting the Pair objects, you should probably create

        class Entry { Message message; String id; long timestamp }

        which will allow you to avoid boxing the long and the extra Pair.

        Show
        stuhood Stu Hood added a comment - Rather than nesting the Pair objects, you should probably create class Entry { Message message; String id; long timestamp } which will allow you to avoid boxing the long and the extra Pair.
        Hide
        jbellis Jonathan Ellis added a comment -

        Melvin, are you working on a patch incorporating Stu's feedback?

        Show
        jbellis Jonathan Ellis added a comment - Melvin, are you working on a patch incorporating Stu's feedback?
        Hide
        mw Melvin Wang added a comment -

        Sorry for the slackness. I got my hands really full these days. Incorporated Stu's feedback.

        Show
        mw Melvin Wang added a comment - Sorry for the slackness. I got my hands really full these days. Incorporated Stu's feedback.
        Hide
        stuhood Stu Hood added a comment -

        We experienced a bit of cascading on a high-write-throughput cluster that likely would have been alleviated by this, so one of us will be getting back on it soon.

        Show
        stuhood Stu Hood added a comment - We experienced a bit of cascading on a high-write-throughput cluster that likely would have been alleviated by this, so one of us will be getting back on it soon.
        Hide
        mw Melvin Wang added a comment -

        The v2 version of the patch is ready for review now.

        Show
        mw Melvin Wang added a comment - The v2 version of the patch is ready for review now.
        Hide
        jbellis Jonathan Ellis added a comment -

        v2 still has a race if the sending thread swaps the queues between the peek and the poll in expire.

        v3 attached that attempts to fix this by using deques instead of queues, and simply putting the polled entry "back" at the front of the queue if a swap happened.

        v3 also cleans out the condition (unnecessary if using a BlockingQueue/Deque) and the pending count (just uses sum of the two queues' size()).

        Show
        jbellis Jonathan Ellis added a comment - v2 still has a race if the sending thread swaps the queues between the peek and the poll in expire. v3 attached that attempts to fix this by using deques instead of queues, and simply putting the polled entry "back" at the front of the queue if a swap happened. v3 also cleans out the condition (unnecessary if using a BlockingQueue/Deque) and the pending count (just uses sum of the two queues' size()).
        Hide
        mw Melvin Wang added a comment -

        v2 still has a race if the sending thread swaps the queues between the peek and the poll in expire.

        I don't think it is a race per se. If the sending thread swaps the two queues, we then stop expiring messages and break. If it doesn't, then 'producer' is still pointing to the instance from which we peek. If the message we peek is meant be expired, and the sending thread swaps the queues, it will be dropped in run() anyway. The queues will only be swapped once 'consumer' is empty, i.e. nothing to send to the network.

        The point of this design is that we don't need to lock the queues whereas BlockingQueue/Deque locks it). The goal is to be lock-free.

        Show
        mw Melvin Wang added a comment - v2 still has a race if the sending thread swaps the queues between the peek and the poll in expire. I don't think it is a race per se. If the sending thread swaps the two queues, we then stop expiring messages and break. If it doesn't, then 'producer' is still pointing to the instance from which we peek. If the message we peek is meant be expired, and the sending thread swaps the queues, it will be dropped in run() anyway. The queues will only be swapped once 'consumer' is empty, i.e. nothing to send to the network. The point of this design is that we don't need to lock the queues whereas BlockingQueue/Deque locks it). The goal is to be lock-free.
        Hide
        jbellis Jonathan Ellis added a comment -

        I don't think it is a race per se. If the sending thread swaps the two queues, we then stop expiring messages and break.

        Here's the problem:

        +            Entry entry = producer.peek();
        +            if (entry == null)
        +                break;
        +            if (entry.timestamp + DatabaseDescriptor.getRpcTimeout() < System.currentTimeMillis()) {
        +                if (producer.poll() == null)
        +                    break;   // consumer swaps the pointers so we end up having an empty queue here.
        +            }
        

        poll() may be an unexpired entry, not the one peeked, if the sending thread switched queues and also took the front entry off in between the peek and the poll. In other words: you still have the same problem you had to start with, just more subtle.

        The point of this design is that we don't need to lock the queues whereas BlockingQueue/Deque locks it

        Producer/consumer is what the concurrent.Blocking classes are designed for. The "Blocking" means you can call take() and it will block until an entry is ready, not that it generates a lot of contention.

        Show
        jbellis Jonathan Ellis added a comment - I don't think it is a race per se. If the sending thread swaps the two queues, we then stop expiring messages and break. Here's the problem: + Entry entry = producer.peek(); + if (entry == null) + break; + if (entry.timestamp + DatabaseDescriptor.getRpcTimeout() < System.currentTimeMillis()) { + if (producer.poll() == null) + break; // consumer swaps the pointers so we end up having an empty queue here. + } poll() may be an unexpired entry, not the one peeked, if the sending thread switched queues and also took the front entry off in between the peek and the poll. In other words: you still have the same problem you had to start with, just more subtle. The point of this design is that we don't need to lock the queues whereas BlockingQueue/Deque locks it Producer/consumer is what the concurrent.Blocking classes are designed for. The "Blocking" means you can call take() and it will block until an entry is ready, not that it generates a lot of contention.
        Hide
        mw Melvin Wang added a comment -

        poll() may be an unexpired entry, not the one peeked, if the sending thread switched queues and also took the front entry off in between the peek and the poll. In other words: you still have the same problem you had to start with, just more subtle.

        The logic in run() guarantees that if the sending thread swaps, the queue pointed by 'producer' will be an empty one from which producer.poll() will yield 'null' and we will break from there w/o any operation.

        Show
        mw Melvin Wang added a comment - poll() may be an unexpired entry, not the one peeked, if the sending thread switched queues and also took the front entry off in between the peek and the poll. In other words: you still have the same problem you had to start with, just more subtle. The logic in run() guarantees that if the sending thread swaps, the queue pointed by 'producer' will be an empty one from which producer.poll() will yield 'null' and we will break from there w/o any operation.
        Hide
        jbellis Jonathan Ellis added a comment -

        That may be safe if there is only ever a single producer, although it feels sketchy to me. It's clearly broken though with multiple producers, which is what we need to support. If a second producer adds a new Entry to the "empty" swapped producer queue, the first producer in the expire loop will happily discard it in poll().

        Show
        jbellis Jonathan Ellis added a comment - That may be safe if there is only ever a single producer, although it feels sketchy to me. It's clearly broken though with multiple producers, which is what we need to support. If a second producer adds a new Entry to the "empty" swapped producer queue, the first producer in the expire loop will happily discard it in poll().
        Hide
        mw Melvin Wang added a comment -

        Agreed on the arguments in the case of multiple producers ( I believe it is right when there is a single producer). I'd like to give it another try. I will put the polled element back to the consumer queue. This may result an earlier message being put to the end of the queue if the sending thread swaps, however, all the messages in the consumer queue will be processed before we swap again. Since it is OK for us to drop messages, processing an earlier message later may not be that bad.

        Show
        mw Melvin Wang added a comment - Agreed on the arguments in the case of multiple producers ( I believe it is right when there is a single producer). I'd like to give it another try. I will put the polled element back to the consumer queue. This may result an earlier message being put to the end of the queue if the sending thread swaps, however, all the messages in the consumer queue will be processed before we swap again. Since it is OK for us to drop messages, processing an earlier message later may not be that bad.
        Hide
        jbellis Jonathan Ellis added a comment -

        True, but why risk dropping something you don't have to when you can just use a deque?

        Show
        jbellis Jonathan Ellis added a comment - True, but why risk dropping something you don't have to when you can just use a deque?
        Hide
        mw Melvin Wang added a comment -

        by dropping you mean 'putting an earlier message to the end of the queue', right? deque uses a lock when multiple threads trying to add messages to the end of the queue, while concurrentLinkedQueue is a lock-free implementation. Another point is that we want to drop messages (because they are timed out) at two places:
        1) where we send the message out to network
        2) where we add messages.
        The reason for this has been discussed some time ago, which is simply to prevent a backed-up queue hurting the system by having the system process already timed out messages.
        For deque, there will be contention when we sending messages out to network while we are trying to examine and remove timed out messages( when adding messages)

        Based on this consideration, I tried to use a lock-free implementation. I'll be boarding now. Will talk more after couple of hours.

        Show
        mw Melvin Wang added a comment - by dropping you mean 'putting an earlier message to the end of the queue', right? deque uses a lock when multiple threads trying to add messages to the end of the queue, while concurrentLinkedQueue is a lock-free implementation. Another point is that we want to drop messages (because they are timed out) at two places: 1) where we send the message out to network 2) where we add messages. The reason for this has been discussed some time ago, which is simply to prevent a backed-up queue hurting the system by having the system process already timed out messages. For deque, there will be contention when we sending messages out to network while we are trying to examine and remove timed out messages( when adding messages) Based on this consideration, I tried to use a lock-free implementation. I'll be boarding now. Will talk more after couple of hours.
        Hide
        jbellis Jonathan Ellis added a comment -

        I suspect you're committing premature optimization. I've never seen LB[Q|D] be a significant source of contention.

        Show
        jbellis Jonathan Ellis added a comment - I suspect you're committing premature optimization. I've never seen LB [Q|D] be a significant source of contention.
        Hide
        stuhood Stu Hood added a comment -

        (sidenote: we should consider counting dropped messages on the sending side the same way we count them on the receiving side: in the dropped message counts in MessagingService. Alternatively, it might be good to count in both locations)

        Show
        stuhood Stu Hood added a comment - (sidenote: we should consider counting dropped messages on the sending side the same way we count them on the receiving side: in the dropped message counts in MessagingService. Alternatively, it might be good to count in both locations)
        Hide
        mw Melvin Wang added a comment -

        ok, i cannot say for sure that ConcurrentLinkedQueue performs better than LinkedBlockingQueue. v4 and v3 differs mainly in the underlying queue implementation which is not hard to change if it is a bottleneck. And I agree that in run() ConcurrentLinkedQueue + condition variable does look like a blocking queue, so we can settle this. There is only one thing. Can we just append the element to 'active' rather than push back to 'backlog' in expireMessage? The reason (as I understand) is to save one round of racing to gain the lock to the beginning of 'backlog' because you need to race to get the lock to push at the beginning of queue then multiple threads will race to remove elements from the beginning of the queue as well (to expire messages), whereas you just race once to get the element to the end of 'active' which will remove elements in a single thread.

        Show
        mw Melvin Wang added a comment - ok, i cannot say for sure that ConcurrentLinkedQueue performs better than LinkedBlockingQueue. v4 and v3 differs mainly in the underlying queue implementation which is not hard to change if it is a bottleneck. And I agree that in run() ConcurrentLinkedQueue + condition variable does look like a blocking queue, so we can settle this. There is only one thing. Can we just append the element to 'active' rather than push back to 'backlog' in expireMessage? The reason (as I understand) is to save one round of racing to gain the lock to the beginning of 'backlog' because you need to race to get the lock to push at the beginning of queue then multiple threads will race to remove elements from the beginning of the queue as well (to expire messages), whereas you just race once to get the element to the end of 'active' which will remove elements in a single thread.
        Hide
        jbellis Jonathan Ellis added a comment -

        The reason I want to put it at the head is that minimizes the chance that this message would be timed out when it wouldn't have otherwise, by having to "wait in line" twice.

        Show
        jbellis Jonathan Ellis added a comment - The reason I want to put it at the head is that minimizes the chance that this message would be timed out when it wouldn't have otherwise, by having to "wait in line" twice.
        Hide
        jbellis Jonathan Ellis added a comment -

        It's enough of a corner case that I don't think the lock contention matters much, compared to losing messages entirely (from being expired).

        Show
        jbellis Jonathan Ellis added a comment - It's enough of a corner case that I don't think the lock contention matters much, compared to losing messages entirely (from being expired).
        Hide
        mw Melvin Wang added a comment -

        The reason I want to put it at the head is that minimizes the chance that this message would be timed out when it wouldn't have otherwise, by having to "wait in line" twice.

        Well, the way the code is structured is that all the messages in 'active' comes before the ones in 'backlog' so putting it at the end of 'active' queue doesn't mean to have it wait since it has to be after those messages anyway. I agree it is not a major issue here though.

        Show
        mw Melvin Wang added a comment - The reason I want to put it at the head is that minimizes the chance that this message would be timed out when it wouldn't have otherwise, by having to "wait in line" twice. Well, the way the code is structured is that all the messages in 'active' comes before the ones in 'backlog' so putting it at the end of 'active' queue doesn't mean to have it wait since it has to be after those messages anyway. I agree it is not a major issue here though.
        Hide
        jbellis Jonathan Ellis added a comment -

        all the messages in 'active' comes before the ones in 'backlog'

        True enough. v4 attached.

        Show
        jbellis Jonathan Ellis added a comment - all the messages in 'active' comes before the ones in 'backlog' True enough. v4 attached.
        Hide
        mw Melvin Wang added a comment -

        +1

        // Thanks for the intelligent review comments.

        Show
        mw Melvin Wang added a comment - +1 // Thanks for the intelligent review comments.
        Hide
        jbellis Jonathan Ellis added a comment -

        Committed, thanks Melvin!

        Show
        jbellis Jonathan Ellis added a comment - Committed, thanks Melvin!
        Hide
        hudson Hudson added a comment -

        Integrated in Cassandra #1194 (See https://builds.apache.org/job/Cassandra/1194/)
        add message expiration logic to OutboundTcpConnection
        patch by Melvin Wang and jbellis for CASSANDRA-3005

        jbellis : http://svn.apache.org/viewcvs.cgi/?root=Apache-SVN&view=rev&rev=1199010
        Files :

        • /cassandra/trunk
        • /cassandra/trunk/CHANGES.txt
        • /cassandra/trunk/contrib
        • /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
        • /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
        • /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
        • /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
        • /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
        • /cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
        • /cassandra/trunk/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
        • /cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
        Show
        hudson Hudson added a comment - Integrated in Cassandra #1194 (See https://builds.apache.org/job/Cassandra/1194/ ) add message expiration logic to OutboundTcpConnection patch by Melvin Wang and jbellis for CASSANDRA-3005 jbellis : http://svn.apache.org/viewcvs.cgi/?root=Apache-SVN&view=rev&rev=1199010 Files : /cassandra/trunk /cassandra/trunk/CHANGES.txt /cassandra/trunk/contrib /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java /cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java /cassandra/trunk/src/java/org/apache/cassandra/net/MessagingServiceMBean.java /cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java

          People

          • Assignee:
            mw Melvin Wang
            Reporter:
            mw Melvin Wang
            Reviewer:
            Jonathan Ellis
          • Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development