Qpid
  1. Qpid
  2. QPID-2935

Support "best effort" producer flow control within the AMQP 0.10 implementation.

    Details

    • Type: New Feature New Feature
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.9
    • Fix Version/s: 0.9
    • Component/s: C++ Broker, C++ Client
    • Labels:
      None
    • Environment:

      any

      Description

      To what extent, if any, could producer flow control be supported on the existing (pre-1.0) protocol?

      In the current C++ broker/client implementation, when a queue on the broker fills to the point where it cannot accept any more messages (--default-queue-limit hit), the broker will forcibly disconnect any client that attempts to route a message to that queue. This is an abrupt failure - the producing client is not privy to the queue's remaining capacity. The broker provides no feedback to the producing client, which could be used to throttle the client's message production rate.

      The purpose of this JIRA is to explore the possible methods for implementing producer throttling on the current 0.10 C++ codebase.

      1. QPID-2935.tgz
        6 kB
        Ken Giusti
      2. user_doc.txt
        8 kB
        Ken Giusti

        Issue Links

          Activity

          Hide
          Ken Giusti added a comment -

          The 0.10 AMQP spec defines the use of "credit" as a mechanism to control the amount of data that is transferred between a client and a server (see section 2.6.1. Flow Control in the AMQP spec 0.10). In theory, the broker should be able to throttle the production rate of a client by intelligently managing the rate for which it replenishes credit to that client. For example, the broker could delay the completion of a message transfer should it detect that one or more queues that are destinations of the message transfer are nearing their limits.

          The current broker implementation supports the notion of "asynchronous enqueuing" - that there is a potential delay between the receipt of a message and the point in at which the message completes enqueueing to all destination queues. The broker delays the completion of the message transfer (from the producer's point of view) until the enqueueing has completed. This concept may be able to be extended to consider the capacity of the destination queues: do not complete a message transfer until all destination queues have a "reasonable" amount of capacity available.

          Show
          Ken Giusti added a comment - The 0.10 AMQP spec defines the use of "credit" as a mechanism to control the amount of data that is transferred between a client and a server (see section 2.6.1. Flow Control in the AMQP spec 0.10). In theory, the broker should be able to throttle the production rate of a client by intelligently managing the rate for which it replenishes credit to that client. For example, the broker could delay the completion of a message transfer should it detect that one or more queues that are destinations of the message transfer are nearing their limits. The current broker implementation supports the notion of "asynchronous enqueuing" - that there is a potential delay between the receipt of a message and the point in at which the message completes enqueueing to all destination queues. The broker delays the completion of the message transfer (from the producer's point of view) until the enqueueing has completed. This concept may be able to be extended to consider the capacity of the destination queues: do not complete a message transfer until all destination queues have a "reasonable" amount of capacity available.
          Hide
          Ken Giusti added a comment -

          The solution to QPID-2921 could be leveraged to implement a solution to this problem.

          Show
          Ken Giusti added a comment - The solution to QPID-2921 could be leveraged to implement a solution to this problem.
          Hide
          Ken Giusti added a comment -

          A very hacky proof-of-concept of the "delayed completion" approach. Includes same sender and receiver clients (including a spout derivation). Loosely based on the IncompleteMessageList object in the broker's SessionState object.

          Associates a list of overflow messages and a timer thread with the broker's SessionState for a (producer) client. Should a message fail to enqueue to a queue due to limit overflow, this hack stores the message on the overflow list. All further messages that arrive from that client are also queued to the overflow list (preserving order). Periodically, the timer thread attempts to route the message at the head of the overflow list. Should the routing succeed (there was room on the destination queues), the head message is removed from the list, and the next message is routed.

          I've modified the client side to wait for capacity to become available when it is exhausted.

          When run against the example slow consumer (receiver.cpp), the sender client will occasionally block in the send() api call, until space becomes available on the queue.

          Show
          Ken Giusti added a comment - A very hacky proof-of-concept of the "delayed completion" approach. Includes same sender and receiver clients (including a spout derivation). Loosely based on the IncompleteMessageList object in the broker's SessionState object. Associates a list of overflow messages and a timer thread with the broker's SessionState for a (producer) client. Should a message fail to enqueue to a queue due to limit overflow, this hack stores the message on the overflow list. All further messages that arrive from that client are also queued to the overflow list (preserving order). Periodically, the timer thread attempts to route the message at the head of the overflow list. Should the routing succeed (there was room on the destination queues), the head message is removed from the list, and the next message is routed. I've modified the client side to wait for capacity to become available when it is exhausted. When run against the example slow consumer (receiver.cpp), the sender client will occasionally block in the send() api call, until space becomes available on the queue.
          Hide
          Ken Giusti added a comment -

          A high-level description of a proposed solution:

          *) Use credit to prevent queue overflow event.

          *) Associate watermarks with each Queue instance. Each queue would maintain a high and low watermark corresponding to a capacity level within the queue - the number of queued messages, for example, or the total number of message bytes enqueued. The watermarks would be constrained such that high_watermark >= low_watermark: the high_watermark would indicate the level of capacity at and above which the queue is considered likely to overflow. The low_watermark would indicate the level at or below which the queue is no longer considered likely to overflow.

          *) Associate a state with each Queue instance that reflects the current level of data in the queue with respect to the watermarks. When the current level of data in the queue crosses the high_watermark, the state is set to "blocking". When the current level of data in the queue falls below the low_watermark, the state will transition to "normal" from "blocking"

          *) Each message transferred to the broker will maintain a boolean "blocked" flag. After a message has been enqueued to all of the destination queues, the block flag will be set if one or more of the destination queues are in the blocking state.

          *) The transfer of any message which has a set "blocked" flag will not be completed from the point of view of the client until the flag is reset.

          *) A message's "blocked" flag will be reset when: 1) the state of all destination queues become "normal" or 2) the message is consumed from all queues.

          *) The message is completed once the "blocked" flag is reset.

          Issues with this approach:

          1) the capacity level configured for a given producer must take into account the high_watermark setting of the potential destination queues. If the producer's capacity level is too high for a given queue (or the sum of all potential producer's capacity), the queue will overflow regardless of this solution.

          2) A producer will be blocked based on the destination of the current set of outbound messages. A pending transfer of a message to a different - possibly unblocked - destination would be blocked by the current outstanding messages. This appears to be unavoidable given the 0.10 model.

          Show
          Ken Giusti added a comment - A high-level description of a proposed solution: *) Use credit to prevent queue overflow event. *) Associate watermarks with each Queue instance. Each queue would maintain a high and low watermark corresponding to a capacity level within the queue - the number of queued messages, for example, or the total number of message bytes enqueued. The watermarks would be constrained such that high_watermark >= low_watermark: the high_watermark would indicate the level of capacity at and above which the queue is considered likely to overflow. The low_watermark would indicate the level at or below which the queue is no longer considered likely to overflow. *) Associate a state with each Queue instance that reflects the current level of data in the queue with respect to the watermarks. When the current level of data in the queue crosses the high_watermark, the state is set to "blocking". When the current level of data in the queue falls below the low_watermark, the state will transition to "normal" from "blocking" *) Each message transferred to the broker will maintain a boolean "blocked" flag. After a message has been enqueued to all of the destination queues, the block flag will be set if one or more of the destination queues are in the blocking state. *) The transfer of any message which has a set "blocked" flag will not be completed from the point of view of the client until the flag is reset. *) A message's "blocked" flag will be reset when: 1) the state of all destination queues become "normal" or 2) the message is consumed from all queues. *) The message is completed once the "blocked" flag is reset. Issues with this approach: 1) the capacity level configured for a given producer must take into account the high_watermark setting of the potential destination queues. If the producer's capacity level is too high for a given queue (or the sum of all potential producer's capacity), the queue will overflow regardless of this solution. 2) A producer will be blocked based on the destination of the current set of outbound messages. A pending transfer of a message to a different - possibly unblocked - destination would be blocked by the current outstanding messages. This appears to be unavoidable given the 0.10 model.
          Hide
          Marnie McCormack added a comment -

          Hi Ken,

          FYI - In case you haven't already seen the docs on the flow control implementation on the Java broker:

          https://cwiki.apache.org/qpid/use-producer-flow-control.html
          https://cwiki.apache.org/qpid/producer-flow-control.html

          Implemented by Rob Godfrey.

          Regards,
          Marnie

          Show
          Marnie McCormack added a comment - Hi Ken, FYI - In case you haven't already seen the docs on the flow control implementation on the Java broker: https://cwiki.apache.org/qpid/use-producer-flow-control.html https://cwiki.apache.org/qpid/producer-flow-control.html Implemented by Rob Godfrey. Regards, Marnie
          Hide
          Alan Conway added a comment - - edited

          This will affect the cluster.

          • All new queue state (watermarks and blocking flags) will need to be passed in update to new brokers joining
          • Introduces a new timer task, this will have to use the cluster timer in a cluster, or some other solution to ensure actions are synchronized across the cluster

          The appropriate changes to the cluster need to be comitted along with the flow control solution, othewise cluster brokers will become inconsistent and crash sporadically with invalid-arg errors.

          Show
          Alan Conway added a comment - - edited This will affect the cluster. All new queue state (watermarks and blocking flags) will need to be passed in update to new brokers joining Introduces a new timer task, this will have to use the cluster timer in a cluster, or some other solution to ensure actions are synchronized across the cluster The appropriate changes to the cluster need to be comitted along with the flow control solution, othewise cluster brokers will become inconsistent and crash sporadically with invalid-arg errors.
          Hide
          Ken Giusti added a comment -

          Marnie - thanks for those links. We'll want the C++ broker's implementation to have the same look and feel as the existing Java functionality. I'll follow up with Rob with some questions.

          Alan - extra work necessary for clustering - duly noted. I'll be sure to develop some flow control tests against clustered brokers, and have you review them.

          Show
          Ken Giusti added a comment - Marnie - thanks for those links. We'll want the C++ broker's implementation to have the same look and feel as the existing Java functionality. I'll follow up with Rob with some questions. Alan - extra work necessary for clustering - duly noted. I'll be sure to develop some flow control tests against clustered brokers, and have you review them.
          Hide
          Ken Giusti added a comment -

          created svn branch for development of this feature:

          http://svn.apache.org/viewvc/qpid/branches/qpid-2935/

          Show
          Ken Giusti added a comment - created svn branch for development of this feature: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/
          Hide
          Ken Giusti added a comment -

          As of 2/7/2011: Code functionally complete, and passing simple unit tests. The following TODO items still pending:

          o) Performance regression testing & tuning.
          o) Cleanup synchronous completion case in SessionState ~line 267
          o) Sends completion on every execution.sync, else hangs. Can this be fixed?
          o) Hang in clustered + async store tests. Need debug & fix.
          o) Creates a request for IOProcessing on each completion - need to aggregate these.
          o) Reviews pending for management changes.
          o) Reviews pending for queue flow limit changes.

          Show
          Ken Giusti added a comment - As of 2/7/2011: Code functionally complete, and passing simple unit tests. The following TODO items still pending: o) Performance regression testing & tuning. o) Cleanup synchronous completion case in SessionState ~line 267 o) Sends completion on every execution.sync, else hangs. Can this be fixed? o) Hang in clustered + async store tests. Need debug & fix. o) Creates a request for IOProcessing on each completion - need to aggregate these. o) Reviews pending for management changes. o) Reviews pending for queue flow limit changes.
          Hide
          Ken Giusti added a comment -

          Status update 2/7/2011:

          Resync'ed to trunk.
          Fixed:
          -) Cleanup synchronous completion case in SessionState ~line 267
          -) Creates a request for IOProcessing on each completion - need to aggregate these.

          Additional work item: forgot to add:
          o) Replication of new queue thresholds for clustering.
          o) Documentation updates.

          And still TBD:
          o) Performance regression testing & tuning.
          o) Sends completion on every execution.sync, else hangs. Can this be fixed?
          o) Hang in clustered + async store tests. Need debug & fix.
          o) Reviews pending for management changes.
          o) Reviews pending for queue flow limit changes.

          Show
          Ken Giusti added a comment - Status update 2/7/2011: Resync'ed to trunk. Fixed: -) Cleanup synchronous completion case in SessionState ~line 267 -) Creates a request for IOProcessing on each completion - need to aggregate these. Additional work item: forgot to add: o) Replication of new queue thresholds for clustering. o) Documentation updates. And still TBD: o) Performance regression testing & tuning. o) Sends completion on every execution.sync, else hangs. Can this be fixed? o) Hang in clustered + async store tests. Need debug & fix. o) Reviews pending for management changes. o) Reviews pending for queue flow limit changes.
          Hide
          Ken Giusti added a comment -

          Functionally complete - work merged to trunk:
          http://svn.apache.org/viewvc?view=revision&revision=1072356

          Show
          Ken Giusti added a comment - Functionally complete - work merged to trunk: http://svn.apache.org/viewvc?view=revision&revision=1072356
          Hide
          Ken Giusti added a comment -

          Documents the operation and user interface.

          Show
          Ken Giusti added a comment - Documents the operation and user interface.
          Hide
          Alan Conway added a comment -

          Flow control is disabled in a cluster, enabling it is QPID-3076

          Show
          Alan Conway added a comment - Flow control is disabled in a cluster, enabling it is QPID-3076

            People

            • Assignee:
              Ken Giusti
              Reporter:
              Ken Giusti
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Due:
                Created:
                Updated:
                Resolved:

                Development