Qpid
  1. Qpid
  2. QPID-2921

c++ broker: Improvements to asynchronos completion

    Details

    • Type: Improvement Improvement
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.8
    • Fix Version/s: 0.9
    • Component/s: C++ Broker
    • Labels:
      None

      Description

        • Overview

      Asynchronous completion means that command execution is initiated in one thread
      (a client connection thread) and completed in a different thread.

      When the async store is loaded, message.transfer commands are
      completed by a store thread that does the async write.

        • Issues with asynchronous completion code as of revision r1029686
          • Not really asynchronous

      IncompleteMessageList::process blocks the connection thread till all
      outstanding async commands (messages) for the session are complete.

      With the new cluster, this could deadlock since it is blocking a Poller thread.

          • Race condition for message.transfer

      Sketch of the current code:

      // Called in connection thread
      PersistableMessage::enqueueAsync

      { ++counter; }


      // Called in store thread once message is written.
      PersistableMessage::enqueueComplete

      { if (--counter == 0) notifyCompleted(); }

      The intent is that notify be called once per message, after the
      message has been written to each queue it was routed to.

      However of a message is routed to N queues, it's possible for
      notifyCompleted to be called up to N times. The store thread could
      call notifyCompleted for the first queue before the connection thread
      has called enqueueAsync for the second queue, and so on.

          • No asynchronous completion for message.accept

      We do not currently delay completion of message.accept until the
      message is deleted from the async store. This could cause duplicate
      delivery if the broker crashes after completing the message but
      before it is removed from store.

      There is code in PersistableMessage to maintain a counter for dequeues
      analogous to to the async enqueue code but this is incorrect.

      Completion of transfer is triggered when all enqueues for a message are complete.
      Completion of accept is triggered for each dequeue from a queue independently.

      Furthermore a single accept can reference many messages, so it can't be associated with a message.

        • New requirements

      The new cluster design will need to participate in async completion, e.g.
      an accept cannot be comlpeted until the message is

      • removed from store (if present) AND
      • replicated to the cluster (if present) as dequeued

      The new cluster also needs to asynchronously complete binding commands
      (declare, bind, delete) when they are replicated to the cluster.

      1. proposal1.diff
        14 kB
        Ken Giusti
      2. msg.patch
        25 kB
        Ken Giusti

        Issue Links

          Activity

          Hide
          Ken Giusti added a comment -

          Outstanding issues have been moved to JIRA https://issues.apache.org/jira/browse/QPID-3079

          Show
          Ken Giusti added a comment - Outstanding issues have been moved to JIRA https://issues.apache.org/jira/browse/QPID-3079
          Hide
          Ken Giusti added a comment -

          I've submitted fixes for a sub-set of the issues described in this bug in time for the 0.10 release. However, some issues remain.

          Fixes have been submitted for:

          i1) Remove the blocking of the polling thread by SessionState when pending for completion of outstanding async commands.
          i2) Fix the enqueueAsync/enqueueComplete race condition that could result in multiple completions per single message
          r1) allow clustering to hold off the completions of (various) commands
          r2) provide a completion model that will allow performing completions from a thread that may not be the same as the initiating thread.

          The following issues remain:

          i3) do not allow an outbound message to satisfy a message.accept until the message is deleted from the queue's async store
          i4) a message becomes eligible to satisfy a message.accept on a per-dequeue basis, not the last of all dequeues as it is currently done
          i5) a message.accept references multiple messages - all must enforce rules i3 & i4 in order for the message.accept to complete.

          Rather than leaving this issue unresolved for 0.10, I've cloned a new bug specifically for the remaining issues:
          https://issues.apache.org/jira/browse/QPID-3079

          Show
          Ken Giusti added a comment - I've submitted fixes for a sub-set of the issues described in this bug in time for the 0.10 release. However, some issues remain. Fixes have been submitted for: i1) Remove the blocking of the polling thread by SessionState when pending for completion of outstanding async commands. i2) Fix the enqueueAsync/enqueueComplete race condition that could result in multiple completions per single message r1) allow clustering to hold off the completions of (various) commands r2) provide a completion model that will allow performing completions from a thread that may not be the same as the initiating thread. The following issues remain: i3) do not allow an outbound message to satisfy a message.accept until the message is deleted from the queue's async store i4) a message becomes eligible to satisfy a message.accept on a per-dequeue basis, not the last of all dequeues as it is currently done i5) a message.accept references multiple messages - all must enforce rules i3 & i4 in order for the message.accept to complete. Rather than leaving this issue unresolved for 0.10, I've cloned a new bug specifically for the remaining issues: https://issues.apache.org/jira/browse/QPID-3079
          Hide
          Ken Giusti added a comment -

          This patch attempts to solve just the asynchronous message completion problem. It modifies the reference counter in a persistable message to provide a callback that indicates whether the completion is occurring in the context of the receive path.

          Additionally, it updates execution.sync to pend completion on outstanding asynchronous messages.

          Issues I have with this patch:

          1) there's no clean way to pass the completion status from the SessionAdapter layer back to the SessionState for a given command. Ideally, this could be done via the invoker result, but the SessionAdapter's handlers don't have access to the invoker result in order to set it.

          2) Message transfer is currently not represented by an "incomplete command" context. The reference counter in the persistable message is used instead, as all the async interfaces deal directly with the message itself.

          3) There are two callbacks that are invoked when a message completes: one to bind the message, which then calls the Session's callback. These should be collapsed to one for performance considerations.

          I'd like to get some review & feedback from Alan and others before I apply this.

          Show
          Ken Giusti added a comment - This patch attempts to solve just the asynchronous message completion problem. It modifies the reference counter in a persistable message to provide a callback that indicates whether the completion is occurring in the context of the receive path. Additionally, it updates execution.sync to pend completion on outstanding asynchronous messages. Issues I have with this patch: 1) there's no clean way to pass the completion status from the SessionAdapter layer back to the SessionState for a given command. Ideally, this could be done via the invoker result, but the SessionAdapter's handlers don't have access to the invoker result in order to set it. 2) Message transfer is currently not represented by an "incomplete command" context. The reference counter in the persistable message is used instead, as all the async interfaces deal directly with the message itself. 3) There are two callbacks that are invoked when a message completes: one to bind the message, which then calls the Session's callback. These should be collapsed to one for performance considerations. I'd like to get some review & feedback from Alan and others before I apply this.
          Hide
          Ken Giusti added a comment -

          A rough hack showing the intended approach to async completion of message.transfer

          Show
          Ken Giusti added a comment - A rough hack showing the intended approach to async completion of message.transfer
          Hide
          Ken Giusti added a comment -

          Here's a summary of the improvements that Alan has recommended as I understand them:

          i1) Remove the blocking of the polling thread by SessionState when pending for completion of outstanding async commands.
          i2) Fix the enqueueAsync/enqueueComplete race condition that could result in multiple completions per single message
          i3) do not allow an outbound message to satisfy a message.accept until the message is deleted from the queue's async store
          i4) a message becomes eligible to satisfy a message.accept on a per-dequeue basis, not the last of all dequeues as it is currently done
          i5) a message.accept references multiple messages - all must enforce rules i3 & i4 in order for the message.accept to complete.

          And some new requirements:

          r1) allow clustering to hold off the completions of (various) commands
          r2) provide a completion model that will allow performing completions from a thread that may not be the same as the initiating thread.

          Alan - is this correct?

          As a starting point, I'm trying to implement asynchronous completion of message.transfer commands. In doing this, I'm attempting to
          address points i1, i2, and r2 above. Here's what I've discovered so far:

          First, I need to associate the transfer Completion with the inbound message, not with the transfer command itself. This is because the
          transfer command stays within the domain of the SessionState, whilst the message is handed off to various potential asynchronous services,
          such as store, queuing, etc. In other words, SessionState isn't aware of all the potential async operations that may apply to a given
          message as it is transfered, but the message itself will.

          Instead, I consider SessionState a "completer" - using Alan's definition. It adds iself to the message, passes the message off to
          semantic state, then completes the message on return. This prevents the i2 race condition.

          In addition, I'm planning to:

          0) Augment Alan's proposed Completion class to enable it to identify the desired completion thread (via storing Thread::current()). When
          the Completion "completes", the action callback can check its current thread against this value to determine how to process the completion.
          1) replace the asyncEnqueue functionality in the broker::Message with a more-generic transferComplete Completer object (based on Alan's
          proposed class). This allows the message to be considered complete against a variety of operations beyond queueing (e.g. credit supply).
          2) remove the incomplete message list from SessionState - it is no longer needed.
          3) Augment the SessionState enqueue complete callback to be thread aware.

          I'll attach a set of code changes that roughly implements the above description for comment(won't compile yet - just as a reference).

          Show
          Ken Giusti added a comment - Here's a summary of the improvements that Alan has recommended as I understand them: i1) Remove the blocking of the polling thread by SessionState when pending for completion of outstanding async commands. i2) Fix the enqueueAsync/enqueueComplete race condition that could result in multiple completions per single message i3) do not allow an outbound message to satisfy a message.accept until the message is deleted from the queue's async store i4) a message becomes eligible to satisfy a message.accept on a per-dequeue basis, not the last of all dequeues as it is currently done i5) a message.accept references multiple messages - all must enforce rules i3 & i4 in order for the message.accept to complete. And some new requirements: r1) allow clustering to hold off the completions of (various) commands r2) provide a completion model that will allow performing completions from a thread that may not be the same as the initiating thread. Alan - is this correct? As a starting point, I'm trying to implement asynchronous completion of message.transfer commands. In doing this, I'm attempting to address points i1, i2, and r2 above. Here's what I've discovered so far: First, I need to associate the transfer Completion with the inbound message, not with the transfer command itself. This is because the transfer command stays within the domain of the SessionState, whilst the message is handed off to various potential asynchronous services, such as store, queuing, etc. In other words, SessionState isn't aware of all the potential async operations that may apply to a given message as it is transfered, but the message itself will. Instead, I consider SessionState a "completer" - using Alan's definition. It adds iself to the message, passes the message off to semantic state, then completes the message on return. This prevents the i2 race condition. In addition, I'm planning to: 0) Augment Alan's proposed Completion class to enable it to identify the desired completion thread (via storing Thread::current()). When the Completion "completes", the action callback can check its current thread against this value to determine how to process the completion. 1) replace the asyncEnqueue functionality in the broker::Message with a more-generic transferComplete Completer object (based on Alan's proposed class). This allows the message to be considered complete against a variety of operations beyond queueing (e.g. credit supply). 2) remove the incomplete message list from SessionState - it is no longer needed. 3) Augment the SessionState enqueue complete callback to be thread aware. I'll attach a set of code changes that roughly implements the above description for comment(won't compile yet - just as a reference).
          Hide
          Alan Conway added a comment -

          Sketch of a solution:

          Async completion abstracted as separte class (currently embedded in Message)

          /**

          • Class to implement asynchronous completion of AMQP commands.
            *
          • The initiating thread is the thread that initiates the action,
          • i.e. the connection read thread.
            *
          • A completing thread is any thread that contributes to completion,
          • e.g. a store thread that does an async write.
          • There may be more than one completing thread.
            */
            class Completion {
            private:
            AtomicValue<uint32_t> completionsNeeded;

          protected:
          /** Called when the action is completed.
          *@param initiating: true if the calling thread is the initiating thread.
          */
          virtual void completed(bool initiating) = 0;

          public:
          /** Called in the initiating thread to increase the count of completions

          • expected. E.g. called when initiating an async store operation.
            */
            void addCompleter() { ++completionsNeeded; }

          /** Called in initiating thread to indicate all completers have been added.

          • E.g. would be called after routing a message to all queues.
            */
            void completersDone() { if (completionsNeeded == 0) doAction(true); }

          // Decrease completion count, e.g. called when async write complete.
          // called in completing thread.
          void complete()

          { if (--completionsNeeded == 0) doAction(false); }

          };

          Replace SessionState::IncompleteMessageList with
          IncompleteCommandList: holds completions for all commands that can be
          async (transfer, accept, create/destroy/bind) Completions are
          associated with a command, not a message.

          class IncompleteCommand : Completion {
          void doAction(bool initiating)

          { if (initiating) // We're in the connection thread update_session_state_directly(); else connection.requestIOProcessing(update_session_state); }

          };

          Remove blocking wait: sync bit commands

          • In IncompleteMessageList store note of sync bit commands, complete when preceeding commands do.

          Point of concern: the IncompleteCommandList is on the critical path,
          need it to be cheap to process.

          Show
          Alan Conway added a comment - Sketch of a solution: Async completion abstracted as separte class (currently embedded in Message) /** Class to implement asynchronous completion of AMQP commands. * The initiating thread is the thread that initiates the action, i.e. the connection read thread. * A completing thread is any thread that contributes to completion, e.g. a store thread that does an async write. There may be more than one completing thread. */ class Completion { private: AtomicValue<uint32_t> completionsNeeded; protected: /** Called when the action is completed. *@param initiating: true if the calling thread is the initiating thread. */ virtual void completed(bool initiating) = 0; public: /** Called in the initiating thread to increase the count of completions expected. E.g. called when initiating an async store operation. */ void addCompleter() { ++completionsNeeded; } /** Called in initiating thread to indicate all completers have been added. E.g. would be called after routing a message to all queues. */ void completersDone() { if (completionsNeeded == 0) doAction(true); } // Decrease completion count, e.g. called when async write complete. // called in completing thread. void complete() { if (--completionsNeeded == 0) doAction(false); } }; Replace SessionState::IncompleteMessageList with IncompleteCommandList: holds completions for all commands that can be async (transfer, accept, create/destroy/bind) Completions are associated with a command, not a message. class IncompleteCommand : Completion { void doAction(bool initiating) { if (initiating) // We're in the connection thread update_session_state_directly(); else connection.requestIOProcessing(update_session_state); } }; Remove blocking wait: sync bit commands In IncompleteMessageList store note of sync bit commands, complete when preceeding commands do. Point of concern: the IncompleteCommandList is on the critical path, need it to be cheap to process.

            People

            • Assignee:
              Ken Giusti
              Reporter:
              Alan Conway
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development