Uploaded image for project: 'Qpid'
  1. Qpid
  2. QPID-8551

[JMS AMQP 0-x] The reject per formatives are sent twice on rollback of asynchronous consumer transactions



    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • qpid-java-client-0-x-6.3.0, qpid-java-client-0-x-6.3.1, qpid-java-client-0-x-6.3.2, qpid-java-client-0-x-6.3.3, qpid-java-client-0-x-6.3.4, qpid-java-client-0-x-6.4.0
    • None
    • JMS AMQP 0-x
    • None


      When consumer transaction is rolled back, the dispatch queue is cleaned by invoking syncDispatchQueue(false). That results in invocation of dispatchMessage(UnprocessedMessage message). The latter sends reject commands for all tags below rollback threshold:

      if (!(message instanceof CloseConsumerMessage) && tagLE(deliveryTag, _rollbackMark.get()))
                          if (_logger.isDebugEnabled())
                              _logger.debug("Rejecting message because delivery tag " + deliveryTag
                                      + " <= rollback mark " + _rollbackMark.get());
                          rejectMessage(message, true);

      Though, the code above does not remove rejected message from _deliveredMessageTags.

      As result, the rejects are sent again on invocation of releaseForRollback() which is called from AMQSession#rollback().

      The issue affects only consumer transactions when MessageListener is used to receive the message.

      The work around for the defect would be to use synchronous consumer for message receiving.

      It looks like the issue can be fixed by removal of the rejected message from _deliveredMessageTags in dispatchMessage(UnprocessedMessage message).

      diff --git a/client/src/main/java/org/apache/qpid/client/AMQSession.java b/client/src/main/java/org/apache/qpid/client/AMQSession.java
      index 125cba1dd..7f7a5a283 100644
      --- a/client/src/main/java/org/apache/qpid/client/AMQSession.java
      +++ b/client/src/main/java/org/apache/qpid/client/AMQSession.java
      @@ -3621,6 +3621,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
                                       + " <= rollback mark " + _rollbackMark.get());
                           rejectMessage(message, true);
      +                    getDeliveredMessageTags().remove(deliveryTag);
                       else if (_usingDispatcherForCleanup)

      On Qpid Broker-J side the duplicate rejects cause WARN logs like below

       (o.a.q.s.p.v.AMQChannel) - Dropping reject request as message is null for tag:1102




            Unassigned Unassigned
            orudyy Alex Rudyy
            0 Vote for this issue
            1 Start watching this issue