ActiveMQ
  1. ActiveMQ
  2. AMQ-1807

Activemq stops dispatching messages aborting transaction (STOMP)

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Critical Critical
    • Resolution: Fixed
    • Affects Version/s: 5.1.0
    • Fix Version/s: 5.3.0
    • Component/s: Transport
    • Labels:
      None
    • Environment:

      Linux, JDK 1.6_06b2

      Description

      As requested by Dejan Bosanac, I'm adding this ticket. I'm willing to help fix it, ie. I can get my hands dirty, but I must have some pointers on where to look because (unfortunately) I don't have much time to learn ActiveMQ's internals and architecture.

      A copy of the email I sent to the users mailing-list:
      =============================================

      I'm currently struggling to understand the reason behind that's causing the behaviour described in the subject: I'm connecting to activemq via stomp on a python app. Because I need to have the messages rolled back in case of some processing failure I'm wrapping the message processing in the following way:

      message received -> start transaction -> ack message in transaction ->
      process message -> if no exception commit tx, else rollback transaction

      AFAIK, this is the only way of making message unacknowledgement possible with stomp. Also, this is a single client connection, ie. I'm using a
      single client connection to create a message processing daemon, all messages are sent and received via this single connection to the MQ server.

      Here's a telnet session that can be used to reproduce the problem (open jconsole and send 5 text messages to the queue):

      % telnet localhost 61613
      Trying 127.0.0.1...
      Connected to localhost.
      Escape character is '^]'.
      CONNECT

      ^@
      CONNECTED
      session:ID:starfish-53281-1213736462979-2:2

      SUBSCRIBE
      destination: /queue/testq
      ack: client
      activemq.prefetchSize: 1

      ^@
      MESSAGE
      message-id:ID:starfish-53281-1213736462979-3:3:1:1:1
      destination:/queue/testq
      timestamp:1213736837743
      expires:0
      priority:0

      1
      BEGIN
      transaction: 1

      ^@
      ACK
      message-id:ID:starfish-53281-1213736462979-3:3:1:1:1
      transaction: 1

      ^@
      MESSAGE
      message-id:ID:starfish-53281-1213736462979-3:4:1:1:1
      destination:/queue/testq
      timestamp:1213736840224
      expires:0
      priority:0

      2
      MESSAGE
      message-id:ID:starfish-53281-1213736462979-3:5:1:1:1
      destination:/queue/testq
      timestamp:1213736842611
      expires:0
      priority:0

      3
      ABORT
      transaction: 1

      ^@
      BEGIN
      transaction:2

      ^@
      ACK
      message-id:ID:starfish-53281-1213736462979-3:4:1:1:1
      transaction:2

      ^@
      ABORT
      transaction:2

      ^@
      ACK
      message-id:ID:starfish-53281-1213736462979-3:5:1:1:1

      ^@

      I see a couple of issues here:

      #1) even though I specified activemq.prefetchSize to 1 in the subscription command, the connector dispatches two messages in a row

      #2) no more messages are dispatched after aborting the transaction/acknowledging the last received message. Even if the second message isn't wrapped in a transaction, message dispatch stops there.

      To add to the confusion, if I don't use transactions at all, my client keeps getting messages, one by one, ie. no two messages are sent together, I only get a new message after ACK'ing the previous one.

      I think I may be stepping into the realms of a buggy STOMP connector. Please tell me if I'm missing something obvious that fixes this issue
      (hence making it a non-issue) or if indeed the STOMP connector has problems.

        Activity

        Hide
        Dejan Bosanac added a comment -

        Hi Celso,

        I think you're confusing transaction rollback and death of consumer.

        In case when consumer dies, the broker will detect it and will append all messages that are dispatched but not acknowledged by that consumer (all the messages you have acked in the transaction that hasn't been committed) to the end of the dispatch list. So these messages will be redelivered again to another consumer eventually. This works now in the same way for JMS and Stomp clients.

        But when we rollback the transaction, we say to the broker that we haven't yet consumed messages that were delivered to us. So the broker will wait for those messages to be acked before sending more messages. This also works in the same way for Java and Stomp clients, it's just that JMS Java client has built-in logic that will try to redeliver those messages first (locally) when you start a new transaction. You can also implement the same logic in your Stomp client (or application).

        The whole point is that redelivery of messages after the rollback, is the task for the client, since there's no need for the broker to resend those messages over the network again.

        Hope this helps,
        Dejan

        Show
        Dejan Bosanac added a comment - Hi Celso, I think you're confusing transaction rollback and death of consumer. In case when consumer dies, the broker will detect it and will append all messages that are dispatched but not acknowledged by that consumer (all the messages you have acked in the transaction that hasn't been committed) to the end of the dispatch list. So these messages will be redelivered again to another consumer eventually. This works now in the same way for JMS and Stomp clients. But when we rollback the transaction, we say to the broker that we haven't yet consumed messages that were delivered to us. So the broker will wait for those messages to be acked before sending more messages. This also works in the same way for Java and Stomp clients, it's just that JMS Java client has built-in logic that will try to redeliver those messages first (locally) when you start a new transaction. You can also implement the same logic in your Stomp client (or application). The whole point is that redelivery of messages after the rollback, is the task for the client, since there's no need for the broker to resend those messages over the network again. Hope this helps, Dejan
        Hide
        Celso Pinto added a comment -

        I'm sorry but that is just wrong, if the consumer dies out of an unexpected/unhandled error (think OOM) then the message is gone for good. The STOMP protocol does define transactions, it just doesn't specify how a consumer transaction must work. I personally see that as an oversight of the specification. You should do The Right Thing and rollback the messages if the transaction is a) explicitly aborted or b) the client connection is dropped, independently of the lack of completeness in the STOMP specification, anything else is unnacceptable and unexpected behavior of the message broker.

        Show
        Celso Pinto added a comment - I'm sorry but that is just wrong, if the consumer dies out of an unexpected/unhandled error (think OOM) then the message is gone for good. The STOMP protocol does define transactions, it just doesn't specify how a consumer transaction must work. I personally see that as an oversight of the specification. You should do The Right Thing and rollback the messages if the transaction is a) explicitly aborted or b) the client connection is dropped, independently of the lack of completeness in the STOMP specification, anything else is unnacceptable and unexpected behavior of the message broker.
        Hide
        Dejan Bosanac added a comment -

        The final comment on this issue: redelivery of messages after abort is not supported by Stomp protocol (thanks Hiram for the tip), so I reverted some of the delivery logic that was implemented for this issue. The test case is StompTest.testPrefetchSize() is now modified to reflect these changes and some fixes are implemented to make it work. Please take a look at http://activemq.apache.org/how-do-i-unack-the-message-with-stomp.html for more info on this topic.

        Show
        Dejan Bosanac added a comment - The final comment on this issue: redelivery of messages after abort is not supported by Stomp protocol (thanks Hiram for the tip), so I reverted some of the delivery logic that was implemented for this issue. The test case is StompTest.testPrefetchSize() is now modified to reflect these changes and some fixes are implemented to make it work. Please take a look at http://activemq.apache.org/how-do-i-unack-the-message-with-stomp.html for more info on this topic.
        Hide
        Dejan Bosanac added a comment -

        Just looked at the test case again and it seems that this is working fine now (modified test case proves it).

        As for the redelivery policy, it is not implemented for Stomp. I created another issue for this: https://issues.apache.org/activemq/browse/AMQ-2345

        Show
        Dejan Bosanac added a comment - Just looked at the test case again and it seems that this is working fine now (modified test case proves it). As for the redelivery policy, it is not implemented for Stomp. I created another issue for this: https://issues.apache.org/activemq/browse/AMQ-2345
        Hide
        Eric Van Dewoestine added a comment -

        I just wanted to offer a bit of feedback on the current state of this as of the 20090620 snapshot (which includes Dejan's 738904 revision). For message redelivery, when aborting the transaction, the redelivery policy is not currently honored. This results in an infinite number of immediate retries to deliver the unacknowledged messages.

        Show
        Eric Van Dewoestine added a comment - I just wanted to offer a bit of feedback on the current state of this as of the 20090620 snapshot (which includes Dejan's 738904 revision). For message redelivery, when aborting the transaction, the redelivery policy is not currently honored. This results in an infinite number of immediate retries to deliver the unacknowledged messages.
        Hide
        Gary Tully added a comment -

        pushing this out to 5.4.0 as more work is needed

        Show
        Gary Tully added a comment - pushing this out to 5.4.0 as more work is needed
        Hide
        Dejan Bosanac added a comment -

        It seems that acking a messages before an abort, still makes the client hang on the next receive.

        Show
        Dejan Bosanac added a comment - It seems that acking a messages before an abort, still makes the client hang on the next receive.
        Hide
        Dejan Bosanac added a comment -

        The problem with dispatch stopping after transaction abort was due to the fact that messages were acked at the moment they were dispatched and we didn't have any logic to redeliver them after the rollback. I implemented some additional logic (Committed revision 738904), that acks messages received in the transaction on commit, and redeliver them after the abort. This helps in this usecase, as you can see in the StompTest.testPrefetchSize().

        I still can imagine certain scenarios where this would not work well, mostly because we dispatch messages as they come to the transport and don't care whether some of already dispatched messages should be redelivered before, but this requires quite a big refactoring, so I leave it for the future enhancements.

        Show
        Dejan Bosanac added a comment - The problem with dispatch stopping after transaction abort was due to the fact that messages were acked at the moment they were dispatched and we didn't have any logic to redeliver them after the rollback. I implemented some additional logic (Committed revision 738904), that acks messages received in the transaction on commit, and redeliver them after the abort. This helps in this usecase, as you can see in the StompTest.testPrefetchSize(). I still can imagine certain scenarios where this would not work well, mostly because we dispatch messages as they come to the transport and don't care whether some of already dispatched messages should be redelivered before, but this requires quite a big refactoring, so I leave it for the future enhancements.
        Hide
        Dejan Bosanac added a comment -

        I've committed a change (along with a test case) regarding the prefetch size problem. I've changed "index + 1" with "index", since it is being incremented earlier. All tests are passing. If anyone else can take a look at this as well it would be great.

        Show
        Dejan Bosanac added a comment - I've committed a change (along with a test case) regarding the prefetch size problem. I've changed "index + 1" with "index", since it is being incremented earlier. All tests are passing. If anyone else can take a look at this as well it would be great.
        Hide
        Celso Pinto added a comment -

        I've attached a patch for tests that displays the error described on #1. Digging through the code, I found it is related with the following piece of code in activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java at line 230:

        prefetchExtension = Math.max(prefetchExtension, index + 1);

        This code is broken because when the first message is dispatched to the client, prefetchExtension isn't incremented so when the client ACKs the first message, index is incremented to 1 and Math.max() returns 2 so two messages are dispatched.

        Can anyone please explain what prefetchExtension is supposed to do and how changing it impacts the rest of the code? I've asked on IRC and received no replies, same thing on activemq-devel mailing list.

        Show
        Celso Pinto added a comment - I've attached a patch for tests that displays the error described on #1. Digging through the code, I found it is related with the following piece of code in activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java at line 230: prefetchExtension = Math.max(prefetchExtension, index + 1); This code is broken because when the first message is dispatched to the client, prefetchExtension isn't incremented so when the client ACKs the first message, index is incremented to 1 and Math.max() returns 2 so two messages are dispatched. Can anyone please explain what prefetchExtension is supposed to do and how changing it impacts the rest of the code? I've asked on IRC and received no replies, same thing on activemq-devel mailing list.
        Hide
        Celso Pinto added a comment -

        This test uncovers two messages being dispatched on client ACK

        Show
        Celso Pinto added a comment - This test uncovers two messages being dispatched on client ACK

          People

          • Assignee:
            Dejan Bosanac
            Reporter:
            Celso Pinto
          • Votes:
            3 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development