Uploaded image for project: 'Qpid'
  1. Qpid
  2. QPID-8551

[JMS AMQP 0-x] The reject per formatives are sent twice on rollback of asynchronous consumer transactions

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • qpid-java-client-0-x-6.3.0, qpid-java-client-0-x-6.3.1, qpid-java-client-0-x-6.3.2, qpid-java-client-0-x-6.3.3, qpid-java-client-0-x-6.3.4, qpid-java-client-0-x-6.4.0
    • None
    • JMS AMQP 0-x
    • None

    Description

      When consumer transaction is rolled back, the dispatch queue is cleaned by invoking syncDispatchQueue(false). That results in invocation of dispatchMessage(UnprocessedMessage message). The latter sends reject commands for all tags below rollback threshold:

      if (!(message instanceof CloseConsumerMessage) && tagLE(deliveryTag, _rollbackMark.get()))
                      {
                          if (_logger.isDebugEnabled())
                          {
                              _logger.debug("Rejecting message because delivery tag " + deliveryTag
                                      + " <= rollback mark " + _rollbackMark.get());
                          }
                          rejectMessage(message, true);
      }
      

      Though, the code above does not remove rejected message from _deliveredMessageTags.

      As result, the rejects are sent again on invocation of releaseForRollback() which is called from AMQSession#rollback().

      The issue affects only consumer transactions when MessageListener is used to receive the message.

      The work around for the defect would be to use synchronous consumer for message receiving.

      It looks like the issue can be fixed by removal of the rejected message from _deliveredMessageTags in dispatchMessage(UnprocessedMessage message).

      :
      diff --git a/client/src/main/java/org/apache/qpid/client/AMQSession.java b/client/src/main/java/org/apache/qpid/client/AMQSession.java
      index 125cba1dd..7f7a5a283 100644
      --- a/client/src/main/java/org/apache/qpid/client/AMQSession.java
      +++ b/client/src/main/java/org/apache/qpid/client/AMQSession.java
      @@ -3621,6 +3621,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
                                       + " <= rollback mark " + _rollbackMark.get());
                           }
                           rejectMessage(message, true);
      +                    getDeliveredMessageTags().remove(deliveryTag);
                       }
                       else if (_usingDispatcherForCleanup)
                       {
      

      On Qpid Broker-J side the duplicate rejects cause WARN logs like below

       (o.a.q.s.p.v.AMQChannel) - Dropping reject request as message is null for tag:1102
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            orudyy Alex Rudyy
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: