ActiveMQ
  1. ActiveMQ
  2. AMQ-4533

Messages stuck in queue with redelivered=true

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 5.7.0
    • Fix Version/s: 5.10.0
    • Component/s: JMS client
    • Labels:
      None
    • Environment:

      Fuse Message Broker 5.7.0

      Description

      We're getting message stuck in queues with the
      redelivery flag set to true.

      We used the following test model: put every 1 second 50 messages sequentially, and after that, the rest of 1000 msgs quickly to INPUT_QUEUE and
      while starting 25 listeners cosuming from INPUT_QUEUE, which takes about 30 seconds to move the message to RECEIPT_QUEUE, 10 other listeners on RECEIPT_QUEUE consume and counts them.

      We tried making one of the consumer slow by setting the
      processing time to 100000 seconds (sleep) and putting a heavy load in
      500 threads every 1 ms to some other queues the same time.

      Our test case is attached, you might need to install some dependencies
      to the local maven repository manually:

      mvn install:install-file -DgroupId=org.apache.activemq
      -DartifactId=activemq-core -Dversion=5.7.0-fuse-71-047 -Dpackaging=jar
      -Dfile=activemq-core-5.7.0.fuse-71-047.jar
      mvn install:install-file -DgroupId=org.apache.kahadb
      -DartifactId=kahadb -Dversion=5.7.0-fuse-71-047 -Dpackaging=jar
      -Dfile=kahadb-5.7.0.fuse-71-047.jar
      mvn install:install-file
      -DgroupId=org.apache.geronimo.management.specs
      -DartifactId=geronimo-j2ee-management_1.1_spec -Dversion=1.0.1
      -Dpackaging=jar -Dfile=geronimo-j2ee-management_1.1_spec-1.0.1.jar
      mvn install:install-file -DgroupId=org.apache.activemq.pool
      -DartifactId=activemq-pool -Dversion=5.7.0-fuse-71-047 -Dpackaging=jar
      -Dfile=activemq-pool-5.7.0.fuse-71-047.jar

      To run the test, simply use the Maven test target:

      mvn clean test

      If the problem occurs the you'll get a message like this in the test
      results, (target/surefire-reports):

      java.lang.AssertionError: Still messages in InputQueue expected:<0>
      but was:<365>

      1. AMQ4533_logs.ZIP
        1.08 MB
        Wieslaw Dudek
      2. AMQ4533Test.java
        21 kB
        Gary Tully
      3. AMQ4533-test.patch
        43 kB
        Timothy Bish
      4. AMQ4533-Test.patch
        27 kB
        Timothy Bish
      5. AMQ4533-Test.patch
        39 kB
        Timothy Bish
      6. AMQ4533-Test.patch
        39 kB
        Timothy Bish
      7. AMQ4533-Test.patch
        38 kB
        Timothy Bish
      8. AMQ4533-Test.patch
        36 kB
        Timothy Bish
      9. AMQ4533TestPatch.txt
        27 kB
        Timothy Bish
      10. AMQ4533TestPatch.txt
        34 kB
        Timothy Bish
      11. AMQ4533TestPatch.txt
        35 kB
        Timothy Bish
      12. AMQFreeze_logs.zip
        26 kB
        Wieslaw Dudek
      13. AMQFreezeFailingTest.zip
        14 kB
        Wieslaw Dudek
      14. AMQFreezeTest.patch
        40 kB
        Dejan Bosanac
      15. AMQFreezeTest.zip
        11 kB
        Dejan Bosanac
      16. AMQFreezeTest.zip
        15 kB
        Wieslaw Dudek
      17. AMQFreezeTest-5.8.0.fuse-72-SNAPSHOT-log.zip
        111 kB
        Wieslaw Dudek
      18. AMQFreezeTest-5.9.0.redhat-610084-log.zip
        1.57 MB
        Wieslaw Dudek
      19. kahaPendingMessages.zip
        24 kB
        Jason Shepherd
      20. LOGS_FAILED_5.9.0.redhat-610-SNAPSHOT.zip
        10.00 MB
        Wieslaw Dudek

        Issue Links

          Activity

          Hide
          Christian Posta added a comment -

          You may have forgotten to attach the test case?

          Show
          Christian Posta added a comment - You may have forgotten to attach the test case?
          Hide
          Jason Shepherd added a comment -

          Thanks Christian, I've attached it now.

          Show
          Jason Shepherd added a comment - Thanks Christian, I've attached it now.
          Hide
          Timothy Bish added a comment -

          Note: Test is using the depricated and removed amqPersistenceAdapter so it'll need an update if run against newer broker releases.

          Show
          Timothy Bish added a comment - Note: Test is using the depricated and removed amqPersistenceAdapter so it'll need an update if run against newer broker releases.
          Hide
          Jason Shepherd added a comment -

          Testcase updated to use KahaDB

          Show
          Jason Shepherd added a comment - Testcase updated to use KahaDB
          Hide
          Timothy Bish added a comment -

          Initial cut of a thinned down version of this test as a JUnit test. The test is pretty complex so we need to try and reduce it down as much as we can to aid debugging the issue.

          Show
          Timothy Bish added a comment - Initial cut of a thinned down version of this test as a JUnit test. The test is pretty complex so we need to try and reduce it down as much as we can to aid debugging the issue.
          Hide
          Timothy Bish added a comment -

          some more cleanups to reduce the code size of this test.

          Show
          Timothy Bish added a comment - some more cleanups to reduce the code size of this test.
          Hide
          Timothy Bish added a comment -

          Its not clear to me that this really shows an issue. At the time the zero sized Queue assertions are issued there are still threads producing and consuming from these Queues so I don't see how the test can work. Perhaps you can clean it up a bit and reduce the amount of things going on here, tests takes 30 minutes or more on my machine.

          Show
          Timothy Bish added a comment - Its not clear to me that this really shows an issue. At the time the zero sized Queue assertions are issued there are still threads producing and consuming from these Queues so I don't see how the test can work. Perhaps you can clean it up a bit and reduce the amount of things going on here, tests takes 30 minutes or more on my machine.
          Hide
          Jason Shepherd added a comment - - edited

          This reproducer is basically a reproduction of a production scenario, and it needs to be like this, because its the only way we've found to replication the issue. If we break down what happens in the test case, hopefully you'll understand why we need so many things going on.

          Step 2 in the test case is critial for reproducing the problem. It simulates slow cusomer, with threads hung, or a slow database operation. Without this slow consumer, the test completes very fast, and we don't see the problem.

          The test simulates the production environment where the broker is under heavy load. In the test case there is heavy load on SOME_TEST_QUEUE, all of which uses separate connections pools, message listeners, and latches. Whereas the critical messages for the test case are on INPUT_QUEUE, and RECEIPT_QUEUE, which use OtherMessageListener, and onOtherMessageLatch.

          This test case is the only evidence of a problem which has been plaguing our production environment for years.

          Here is how the TC works:

          1) start AMQ brokers

          2) start spring consumer for consumming from INPUT_QUEUE and putting receipts to RECEIPT_QUEUE
          This consumer is processing first message for ever.
          if (isStuckOld!=isStuck)

          { log4jLogger.info("sleep for eternity"); Thread.sleep(100000000); }


          and next 49 messages longer /30 seconds/ than rest of 1000 msgs.

          if (nrProcessed<50)

          { Thread.sleep(30000); }

          The reason for it is to simulate some slow & stuck consumers which we can notice on production.

          3) run heavy traffic to SOME.TEST.QUEUE
          a) put 1000000 messages using 50 threads to broker 1

          sendInThreads(QUEUE_NAME, 500000, 50, DeliveryMode.NON_PERSISTENT,getUrl1(),1);
          sendInThreads(QUEUE_NAME, 500000, 50, DeliveryMode.PERSISTENT,getUrl1(),1);

          b) get 900000 messages in 200 listening threads from broker 2 using OtherMessageListener

          listenInThreads(QUEUE_NAME, 900000, 200,getUrl2(), mListener, otherConnections);

          However we do not wait on latches as it is not important. This is traffic to simulate heavy load only - not necessary to count them.

          4) send traffic to INPUT_QUEUE in 10 threads on broker 1 which triggers our spring consumer from step 2 to start consuming and to send output messages to RECEIPT_QUEUE /one receipt message sent to RECEIPT_QUEUE for one message consumed from INPUT_QUEUE/.

          CountDownLatch latch1=sendInThreads(queueName, msgCount, threadCount, persistance,getUrl1(),500);

          The latch1 is checked later in the step 6 to ensure we had all 1000 messages sent there.

          5) listen on RECEIPT_QUEUE in 10 threads on broker 2 using MainMessageListener with onMainMessageLatch set to 999 /one message should be stuck on consummer forever by TC design/

          CountDownLatch latch=listenInThreads(queueName, msgCount, threadCount,getUrl2(),msgListener,connections);

          6) wait for latches - because of heavy load we had to set the timeout long enough /here 1000 seconds/ to give the system the chance to process all messages from INPUT_QUEUE /the processing might be very slow because of heavy load/
          onMainMessageLatch.await(1000, TimeUnit.SECONDS);
          and also we check if all messages were put into INPUT_QUEUE for processing
          latch1.await(msgCount/10, TimeUnit.SECONDS); // .MINUTES

          7) check assertions for INPUT_QUEUE and RECEIPT_QUEUE - they should be empty

          queueName = INPUT_QUEUE;

          queueSize1=queueSize(brokerService1, queueName);
          queueSize2=queueSize(brokerService2, queueName);

          Assert.assertEquals("Still messages in "+queueName,0, queueSize1+queueSize2);

          queueName = RECEIPT_QUEUE;

          queueSize1=queueSize(brokerService1, queueName);
          queueSize2=queueSize(brokerService2, queueName);

          Assert.assertEquals("Still messages in "+queueName,0, queueSize1+queueSize2);

          8) close AMQ brokers

          Show
          Jason Shepherd added a comment - - edited This reproducer is basically a reproduction of a production scenario, and it needs to be like this, because its the only way we've found to replication the issue. If we break down what happens in the test case, hopefully you'll understand why we need so many things going on. Step 2 in the test case is critial for reproducing the problem. It simulates slow cusomer, with threads hung, or a slow database operation. Without this slow consumer, the test completes very fast, and we don't see the problem. The test simulates the production environment where the broker is under heavy load. In the test case there is heavy load on SOME_TEST_QUEUE, all of which uses separate connections pools, message listeners, and latches. Whereas the critical messages for the test case are on INPUT_QUEUE, and RECEIPT_QUEUE, which use OtherMessageListener, and onOtherMessageLatch. This test case is the only evidence of a problem which has been plaguing our production environment for years. Here is how the TC works: 1) start AMQ brokers 2) start spring consumer for consumming from INPUT_QUEUE and putting receipts to RECEIPT_QUEUE This consumer is processing first message for ever. if (isStuckOld!=isStuck) { log4jLogger.info("sleep for eternity"); Thread.sleep(100000000); } and next 49 messages longer /30 seconds/ than rest of 1000 msgs. if (nrProcessed<50) { Thread.sleep(30000); } The reason for it is to simulate some slow & stuck consumers which we can notice on production. 3) run heavy traffic to SOME.TEST.QUEUE a) put 1000000 messages using 50 threads to broker 1 sendInThreads(QUEUE_NAME, 500000, 50, DeliveryMode.NON_PERSISTENT,getUrl1(),1); sendInThreads(QUEUE_NAME, 500000, 50, DeliveryMode.PERSISTENT,getUrl1(),1); b) get 900000 messages in 200 listening threads from broker 2 using OtherMessageListener listenInThreads(QUEUE_NAME, 900000, 200,getUrl2(), mListener, otherConnections); However we do not wait on latches as it is not important. This is traffic to simulate heavy load only - not necessary to count them. 4) send traffic to INPUT_QUEUE in 10 threads on broker 1 which triggers our spring consumer from step 2 to start consuming and to send output messages to RECEIPT_QUEUE /one receipt message sent to RECEIPT_QUEUE for one message consumed from INPUT_QUEUE/. CountDownLatch latch1=sendInThreads(queueName, msgCount, threadCount, persistance,getUrl1(),500); The latch1 is checked later in the step 6 to ensure we had all 1000 messages sent there. 5) listen on RECEIPT_QUEUE in 10 threads on broker 2 using MainMessageListener with onMainMessageLatch set to 999 /one message should be stuck on consummer forever by TC design/ CountDownLatch latch=listenInThreads(queueName, msgCount, threadCount,getUrl2(),msgListener,connections); 6) wait for latches - because of heavy load we had to set the timeout long enough /here 1000 seconds/ to give the system the chance to process all messages from INPUT_QUEUE /the processing might be very slow because of heavy load/ onMainMessageLatch.await(1000, TimeUnit.SECONDS); and also we check if all messages were put into INPUT_QUEUE for processing latch1.await(msgCount/10, TimeUnit.SECONDS); // .MINUTES 7) check assertions for INPUT_QUEUE and RECEIPT_QUEUE - they should be empty queueName = INPUT_QUEUE; queueSize1=queueSize(brokerService1, queueName); queueSize2=queueSize(brokerService2, queueName); Assert.assertEquals("Still messages in "+queueName,0, queueSize1+queueSize2); queueName = RECEIPT_QUEUE; queueSize1=queueSize(brokerService1, queueName); queueSize2=queueSize(brokerService2, queueName); Assert.assertEquals("Still messages in "+queueName,0, queueSize1+queueSize2); 8) close AMQ brokers
          Hide
          Timothy Bish added a comment -

          more cleanups inside the test, less spring dependence.

          Show
          Timothy Bish added a comment - more cleanups inside the test, less spring dependence.
          Hide
          Gary Tully added a comment -

          from the current test case (as updated by tim) the queue depth matches the inflight count due to messages prefetched by the consumer that waits for ever.
          This is expected. Until the waitingForever consumer closes, prefetched messages will not be consumable. Using a prefetch of 0, so making the consumers pull messages on demand will resolve this.

          In the production env, does the destination with unexpected queue depth have an inflight value that matches when viewed via jmx?

          Show
          Gary Tully added a comment - from the current test case (as updated by tim) the queue depth matches the inflight count due to messages prefetched by the consumer that waits for ever. This is expected. Until the waitingForever consumer closes, prefetched messages will not be consumable. Using a prefetch of 0, so making the consumers pull messages on demand will resolve this. In the production env, does the destination with unexpected queue depth have an inflight value that matches when viewed via jmx?
          Hide
          Wieslaw Dudek added a comment -

          It might not be acceptable by customers when the app performance decreases. However if that is the case I wonder if there is any other possibility to timeout some poisoned consumer and redirect all the prefetched messages to the rest of the consumers or, at least, to deliver next messages to the fast and operational consumers to avoid the msgs stuck into invalid one. Also could you create a modified version of this test case to prove that the prefetchSize=0 will solve the issue at the cost of performance degradation in processing of messages?

          Show
          Wieslaw Dudek added a comment - It might not be acceptable by customers when the app performance decreases. However if that is the case I wonder if there is any other possibility to timeout some poisoned consumer and redirect all the prefetched messages to the rest of the consumers or, at least, to deliver next messages to the fast and operational consumers to avoid the msgs stuck into invalid one. Also could you create a modified version of this test case to prove that the prefetchSize=0 will solve the issue at the cost of performance degradation in processing of messages?
          Hide
          Jason Shepherd added a comment -

          In the sendAndListenToWithFailower method, you call listenInThreads with 'ReceiptQueue' as the queueName. If you only wanted to set the prefetch on ReceiptQueue, you would need to refactor the sendAndListenToWithFailower method to be able to specifying the prefetch value to pass to the registerConsumerListeners methods.

          In that method, you could pass the prefetch. So instead of :

          MessageConsumer messageConsumer = session.createConsumer(
          session.createQueue(queueName));

          You could use something like:

          queue = new ActiveMQQueue(queueName + "?consumer.prefetchSize=0");
          MessageConsumer messageConsumer = session.createConsumer(queue)

          See this page for a reference:

          http://activemq.apache.org/what-is-the-prefetch-limit-for.html

          Show
          Jason Shepherd added a comment - In the sendAndListenToWithFailower method, you call listenInThreads with 'ReceiptQueue' as the queueName. If you only wanted to set the prefetch on ReceiptQueue, you would need to refactor the sendAndListenToWithFailower method to be able to specifying the prefetch value to pass to the registerConsumerListeners methods. In that method, you could pass the prefetch. So instead of : MessageConsumer messageConsumer = session.createConsumer( session.createQueue(queueName)); You could use something like: queue = new ActiveMQQueue(queueName + "?consumer.prefetchSize=0"); MessageConsumer messageConsumer = session.createConsumer(queue) See this page for a reference: http://activemq.apache.org/what-is-the-prefetch-limit-for.html
          Hide
          Wieslaw Dudek added a comment -

          Yes, I did but it does not work. It is illegal to set prefetch to 0 and set message listener. Could someone create working TC to prove the theory? However we will still need some mechanism to timeout poisoned consumer to move prefetch messages to others. I would like to ensure that the test case is working without setting prefetch value to 0 and even with 1000 messages blocked at one consumer for some limited or unlimited time we have the next thousands of messages flowing to other consumers.

          Show
          Wieslaw Dudek added a comment - Yes, I did but it does not work. It is illegal to set prefetch to 0 and set message listener. Could someone create working TC to prove the theory? However we will still need some mechanism to timeout poisoned consumer to move prefetch messages to others. I would like to ensure that the test case is working without setting prefetch value to 0 and even with 1000 messages blocked at one consumer for some limited or unlimited time we have the next thousands of messages flowing to other consumers.
          Hide
          Gary Tully added a comment -

          Prefetch 1 is the best you can do with a listener.
          Have a look athttp://activemq.2283324.n4.nabble.com/Finding-a-slow-consumer-td4654153.html to see some detail on the abort slow consumer policy. That will ensure that stuck prefetched messages get re dispatched.

          Show
          Gary Tully added a comment - Prefetch 1 is the best you can do with a listener. Have a look athttp://activemq.2283324.n4.nabble.com/Finding-a-slow-consumer-td4654153.html to see some detail on the abort slow consumer policy. That will ensure that stuck prefetched messages get re dispatched.
          Show
          Gary Tully added a comment - Some additional doc at https://access.redhat.com/site/documentation/en-US/JBoss_A-MQ/6.0/html/Tuning_Guide/files/TuningSlowConsumers.html
          Hide
          Wieslaw Dudek added a comment -

          Here is what I tried to put into spring2.xml & spring3.xml but we still have e.g. 400 messages stuck. I checked a few possible configurations for <abortSlowConsumerStrategy> and the test case is still failing.

          <destinationPolicy>
          <policyMap>
          <policyEntries>
          <policyEntry queue=">" producerFlowControl="true" memoryLimit="10mb">
          <slowConsumerStrategy>
          <abortSlowConsumerStrategy abortConnection="true" maxSlowDuration="30000" />
          </slowConsumerStrategy>
          </policyEntry>
          </policyEntries>
          </policyMap>
          </destinationPolicy>

          Show
          Wieslaw Dudek added a comment - Here is what I tried to put into spring2.xml & spring3.xml but we still have e.g. 400 messages stuck. I checked a few possible configurations for <abortSlowConsumerStrategy> and the test case is still failing. <destinationPolicy> <policyMap> <policyEntries> <policyEntry queue=">" producerFlowControl="true" memoryLimit="10mb"> <slowConsumerStrategy> <abortSlowConsumerStrategy abortConnection="true" maxSlowDuration="30000" /> </slowConsumerStrategy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy>
          Hide
          Gary Tully added a comment -

          adding the abort slow consumer policy to the input queue and setting prefetch to 10.

          The prefetch is relevant b/c there are multiple consumers so messages get loadbalanced across them.
          To be deemed slow, a consumer must have a dispatch size > than the prefetch for more than 30s and with prefetch 1000 (Default) this was not happening b/c all other consumers were very fast.

          Show
          Gary Tully added a comment - adding the abort slow consumer policy to the input queue and setting prefetch to 10. The prefetch is relevant b/c there are multiple consumers so messages get loadbalanced across them. To be deemed slow, a consumer must have a dispatch size > than the prefetch for more than 30s and with prefetch 1000 (Default) this was not happening b/c all other consumers were very fast.
          Hide
          Gary Tully added a comment -

          note also that in the test, autack is used, so the message that "sleeps for eternity" is acked before the sleep but there is no receipt.

          Show
          Gary Tully added a comment - note also that in the test, autack is used, so the message that "sleeps for eternity" is acked before the sleep but there is no receipt.
          Hide
          Wieslaw Dudek added a comment -

          Thank you Gary for your input but apart from the fact this might be unacceptable to decrease the performance by reducing prefetchSize from 1000 to 10 I am not sure if it is going to work. It is still failing on my local machine and anyway it might happen that we need wait a long time to have full prefetch queue /1000 messages assigned to “stuck” consumer/ to be released. We could tune the prefetch for every queue but anyway we might really need some good working timeout for aborting such consumers /to release all prefetched messages to it/. For now the timeout is related to slow consumers only and depends on difficult mechanism of slowness detecting which does not work if there are not enough messages coming to a queue for some time. So most likely it is still necessary to have a new checking mechanism whether a consumer is still consuming messages or not.

          Show
          Wieslaw Dudek added a comment - Thank you Gary for your input but apart from the fact this might be unacceptable to decrease the performance by reducing prefetchSize from 1000 to 10 I am not sure if it is going to work. It is still failing on my local machine and anyway it might happen that we need wait a long time to have full prefetch queue /1000 messages assigned to “stuck” consumer/ to be released. We could tune the prefetch for every queue but anyway we might really need some good working timeout for aborting such consumers /to release all prefetched messages to it/. For now the timeout is related to slow consumers only and depends on difficult mechanism of slowness detecting which does not work if there are not enough messages coming to a queue for some time. So most likely it is still necessary to have a new checking mechanism whether a consumer is still consuming messages or not.
          Hide
          Gary Tully added a comment -

          However, if there are very few messages and lots of consumers, using a prefetch > 0 will not have much impact. Prefetch is really only beneficial when consumers are very fast and when there are lots of messages in the queue or many producers. When consumers take a long or variable amount of time, it is best to leave the messages in the queue (prefecth=1) for other consumers to access.

          ok, we may need to make the slowness determination some sort of strategy.
          Have a thought on how best to determine the 'slowness" of a consumer, maye time since last ack.
          It needs to a fast calculation. The decision to dispatch based on prefetch was already firing an advisory for a slow consumer so that was reused for the current abort policy.

          I would be interested to know if you can get the test to fail with a prefetch=1, also note that just the consumer is aborted, not the connection, abortConnection=false (Default)

          Show
          Gary Tully added a comment - However, if there are very few messages and lots of consumers, using a prefetch > 0 will not have much impact. Prefetch is really only beneficial when consumers are very fast and when there are lots of messages in the queue or many producers. When consumers take a long or variable amount of time, it is best to leave the messages in the queue (prefecth=1) for other consumers to access. ok, we may need to make the slowness determination some sort of strategy. Have a thought on how best to determine the 'slowness" of a consumer, maye time since last ack. It needs to a fast calculation. The decision to dispatch based on prefetch was already firing an advisory for a slow consumer so that was reused for the current abort policy. I would be interested to know if you can get the test to fail with a prefetch=1, also note that just the consumer is aborted, not the connection, abortConnection=false (Default)
          Hide
          Wieslaw Dudek added a comment -

          The test case has passed for AMQ v.5.8.0 after setting the prefetchSize to 10 and abortSlowConsumerStrategy as follows:
          <policyEntry queue=">" producerFlowControl="true" memoryLimit="10mb" queuePrefetch="10" >
          <slowConsumerStrategy>
          <abortSlowConsumerStrategy/>
          </slowConsumerStrategy>
          </policyEntry>
          I agree that the "time since last ack" would be good enough.

          Show
          Wieslaw Dudek added a comment - The test case has passed for AMQ v.5.8.0 after setting the prefetchSize to 10 and abortSlowConsumerStrategy as follows: <policyEntry queue=">" producerFlowControl="true" memoryLimit="10mb" queuePrefetch="10" > <slowConsumerStrategy> <abortSlowConsumerStrategy/> </slowConsumerStrategy> </policyEntry> I agree that the "time since last ack" would be good enough.
          Hide
          Timothy Bish added a comment -

          Would be good to close this out as resolved and create an enhancement issue documenting the needed improvements to cover this use case so it can get added into the road map.

          Show
          Timothy Bish added a comment - Would be good to close this out as resolved and create an enhancement issue documenting the needed improvements to cover this use case so it can get added into the road map.
          Hide
          Gary Tully added a comment -

          Raised https://issues.apache.org/jira/browse/AMQ-4621 to track the enhancement to the abort policy to be more deterministic.

          Show
          Gary Tully added a comment - Raised https://issues.apache.org/jira/browse/AMQ-4621 to track the enhancement to the abort policy to be more deterministic.
          Hide
          Wieslaw Dudek added a comment -

          It seems that the issue has not been resolved yet by introducing the timeout in AMQ-4621. See the logs from failed TC. We have another test case AMQFreeze which fails even the timeout is set using SlowAckConsumerStrategy. I attached the new TC. We should reopen AMQ-4533.

          Show
          Wieslaw Dudek added a comment - It seems that the issue has not been resolved yet by introducing the timeout in AMQ-4621 . See the logs from failed TC. We have another test case AMQFreeze which fails even the timeout is set using SlowAckConsumerStrategy. I attached the new TC. We should reopen AMQ-4533 .
          Hide
          Wieslaw Dudek added a comment -

          DEBUG logs for failed TC AMQ4533

          Show
          Wieslaw Dudek added a comment - DEBUG logs for failed TC AMQ4533
          Hide
          Wieslaw Dudek added a comment -

          New AMQ "freeze" test case

          Show
          Wieslaw Dudek added a comment - New AMQ "freeze" test case
          Hide
          Wieslaw Dudek added a comment -

          logs from failed AMQFreezeTest

          Show
          Wieslaw Dudek added a comment - logs from failed AMQFreezeTest
          Hide
          Timothy Bish added a comment -

          After some additional work on 4621 yesterday the problem seems to be back to not dispatching. Broker 2 now has some messages left in the Queue that aren't being dispatched.

          Show
          Timothy Bish added a comment - After some additional work on 4621 yesterday the problem seems to be back to not dispatching. Broker 2 now has some messages left in the Queue that aren't being dispatched.
          Hide
          Timothy Bish added a comment -

          Freeze test cleaned up a bit and in the form of a patch for easier application.

          Show
          Timothy Bish added a comment - Freeze test cleaned up a bit and in the form of a patch for easier application.
          Hide
          Timothy Bish added a comment -

          Updated to collect some more data in the logs.

          Show
          Timothy Bish added a comment - Updated to collect some more data in the logs.
          Hide
          Wieslaw Dudek added a comment -

          During my testing today the 5.9.0.redhat-610067 resolved the AMQ4533(kahaPendingMessages) test case using
          <abortSlowAckConsumerStrategy maxSlowCount="1"/> and when put enough time to consume all messages by adding Thread.sleep(30); before checking the queue sizes
          but after adding abortConnection="true" e.g. <abortSlowAckConsumerStrategy maxSlowCount="1" abortConnection="true"/> the TC is failing again.

          So abortConnection="true" is not working and AMQ cannot force the consumer connections to close but only can ask them gently to be closed.
          For kahaPendingMessages TC it is enough because the consumer thread is in a good state /only too busy to process other messages/
          but for another test case (AMQFreezeTest) it is not enough because being interrupted the consumers connections are in wrong state and do not react on the close request from AMQ broker /I guess/.
          For the second test case the abortConnection="true" is necessary and for my opinion it is why the second test case is still failing.
          The logs proves the theory as you can see at the end of it, during the broker shutdown procedure the connections are released and some consumption of the messages proceed but this is too late as AMQ had to abort those connections earlier during the slow consumer strategy procedure.

          Show
          Wieslaw Dudek added a comment - During my testing today the 5.9.0.redhat-610067 resolved the AMQ4533(kahaPendingMessages) test case using <abortSlowAckConsumerStrategy maxSlowCount="1"/> and when put enough time to consume all messages by adding Thread.sleep(30); before checking the queue sizes but after adding abortConnection="true" e.g. <abortSlowAckConsumerStrategy maxSlowCount="1" abortConnection="true"/> the TC is failing again. So abortConnection="true" is not working and AMQ cannot force the consumer connections to close but only can ask them gently to be closed. For kahaPendingMessages TC it is enough because the consumer thread is in a good state /only too busy to process other messages/ but for another test case (AMQFreezeTest) it is not enough because being interrupted the consumers connections are in wrong state and do not react on the close request from AMQ broker /I guess/. For the second test case the abortConnection="true" is necessary and for my opinion it is why the second test case is still failing. The logs proves the theory as you can see at the end of it, during the broker shutdown procedure the connections are released and some consumption of the messages proceed but this is too late as AMQ had to abort those connections earlier during the slow consumer strategy procedure.
          Hide
          Timothy Bish added a comment -

          Can you clarify which test case you are using and what its current configuration is. You've referenced a couple things so I'm a bit confused now on which one you are looking at. Perhaps you can attached your current test case with the configuration that's not working so I know I'm looking at the right thing.

          Show
          Timothy Bish added a comment - Can you clarify which test case you are using and what its current configuration is. You've referenced a couple things so I'm a bit confused now on which one you are looking at. Perhaps you can attached your current test case with the configuration that's not working so I know I'm looking at the right thing.
          Hide
          Wieslaw Dudek added a comment -

          The test case still failing

          Show
          Wieslaw Dudek added a comment - The test case still failing
          Hide
          Timothy Bish added a comment -

          Updated the freeze test with additional testing logic and debug data.

          Show
          Timothy Bish added a comment - Updated the freeze test with additional testing logic and debug data.
          Hide
          Timothy Bish added a comment -

          updated

          Show
          Timothy Bish added a comment - updated
          Hide
          Timothy Bish added a comment -

          updated test case.

          Show
          Timothy Bish added a comment - updated test case.
          Hide
          Dejan Bosanac added a comment -

          Hi,

          I started investigating an AMQFreeze test in more details. One thing that seems off to me is that all consumers on the InputQueue are using the same connection. My impression is that you're trying to simulate multiple applications (using each they're own connection) is misbehaving and being reconnected. As we have it here, all the misbehaving consumers are killing the same connection multiple times very frequently. Can you please confirm what's the intention of the test and maybe modify test accordingly?

          Show
          Dejan Bosanac added a comment - Hi, I started investigating an AMQFreeze test in more details. One thing that seems off to me is that all consumers on the InputQueue are using the same connection. My impression is that you're trying to simulate multiple applications (using each they're own connection) is misbehaving and being reconnected. As we have it here, all the misbehaving consumers are killing the same connection multiple times very frequently. Can you please confirm what's the intention of the test and maybe modify test accordingly?
          Hide
          Dejan Bosanac added a comment -

          Attached is a bit modified broker config for the test. We recommend using:

          • conditionalNetworkBridgeFilterFactory policy and larger ttl, as consumers can come and go to different brokers and we need to be able to pass messages back and forth
          • ignoreIdleConsumers="true" as we don't want to kill connections of idle consumers
          Show
          Dejan Bosanac added a comment - Attached is a bit modified broker config for the test. We recommend using: conditionalNetworkBridgeFilterFactory policy and larger ttl, as consumers can come and go to different brokers and we need to be able to pass messages back and forth ignoreIdleConsumers="true" as we don't want to kill connections of idle consumers
          Hide
          Wieslaw Dudek added a comment -

          The configuration for ActiveMQ broker & spring consumers reflect exectly our production configuration.
          If we should change something here just let me know.
          This models our 25 consumers which simulatanously process the messages on the InputQueue
          and send the receipts to the ReceiptQueue.

          I agree it is very weird test case as we tried to simulate unsafe "kill" command/or
          other unexpected scenario and possibly a bug in the application which might result in ActiveMQ or queue freeze.
          I am sure that the good designed software and normal processing is working perfectly until something unusual happened
          or a bug in code. It is a very rare case when we have to remove all the messages/and restart all applications
          to let AMQ keep working but if it happens it is very serious outage.

          In some our testing after upgrading to AMQ 5.8 we have experienced worse scenario yet
          when even restarting of AMQ and consumers did not help to keep processing.
          We will keep trying to recreate the scenario as the upgrading does not eliminate the "freezes"
          and it is very serious issue.

          What we do here is we put messages to INPUT_QUEUE and the 25 consumers, this time configured to use spring,
          consume them putting a receipt to RECEIPT_QUEUE.
          Next we have consumers which consume the messages from RECEIPT_QUEUE.
          So after TC ends both INPUT_QUEUE & RECEIPT_QUEUE should be empty.
          If not it indicates some freeze or TC ended too fast /we could give some sleep option to give a chance for AMQ
          to consume the messages/.

          Show
          Wieslaw Dudek added a comment - The configuration for ActiveMQ broker & spring consumers reflect exectly our production configuration. If we should change something here just let me know. This models our 25 consumers which simulatanously process the messages on the InputQueue and send the receipts to the ReceiptQueue. I agree it is very weird test case as we tried to simulate unsafe "kill" command/or other unexpected scenario and possibly a bug in the application which might result in ActiveMQ or queue freeze. I am sure that the good designed software and normal processing is working perfectly until something unusual happened or a bug in code. It is a very rare case when we have to remove all the messages/and restart all applications to let AMQ keep working but if it happens it is very serious outage. In some our testing after upgrading to AMQ 5.8 we have experienced worse scenario yet when even restarting of AMQ and consumers did not help to keep processing. We will keep trying to recreate the scenario as the upgrading does not eliminate the "freezes" and it is very serious issue. What we do here is we put messages to INPUT_QUEUE and the 25 consumers, this time configured to use spring, consume them putting a receipt to RECEIPT_QUEUE. Next we have consumers which consume the messages from RECEIPT_QUEUE. So after TC ends both INPUT_QUEUE & RECEIPT_QUEUE should be empty. If not it indicates some freeze or TC ended too fast /we could give some sleep option to give a chance for AMQ to consume the messages/.
          Hide
          Timothy Bish added a comment -

          Modded test case using embedded brokers. With changes to the spring response processor. Broker restart at end clears wrong queue stats.

          Show
          Timothy Bish added a comment - Modded test case using embedded brokers. With changes to the spring response processor. Broker restart at end clears wrong queue stats.
          Hide
          Dejan Bosanac added a comment -

          It still unusual to me to call all these interrupts, as I cannot image a scenario in real life that will do the similar thing to the application.

          Anyhow, I did some changes to the configuration of thins I have a test that now passes all the time for me (please find it attached). Things that are changed:

          • one of the most important things is to move spring consumer to client acknowlegdment (look listener.xml). The spring container will synchronosuly receive messages and pass them to listener. So in case of the long running listener, the message is acked imidiatelly, which causes problems later.
          • conditionalNetworkBridgeFilterFactory policy and larger ttl, as consumers can come and go to different brokers and we need to be able to pass messages back and forth
          • ignoreIdleConsumers="true" as we don't want to kill connections of idle consumers
          • abortConnection="false" as we don't want to kill the connection just a misbehaving consumer

          Can you guys run this version of the test and see how it works for you?

          Show
          Dejan Bosanac added a comment - It still unusual to me to call all these interrupts, as I cannot image a scenario in real life that will do the similar thing to the application. Anyhow, I did some changes to the configuration of thins I have a test that now passes all the time for me (please find it attached). Things that are changed: one of the most important things is to move spring consumer to client acknowlegdment (look listener.xml). The spring container will synchronosuly receive messages and pass them to listener. So in case of the long running listener, the message is acked imidiatelly, which causes problems later. conditionalNetworkBridgeFilterFactory policy and larger ttl, as consumers can come and go to different brokers and we need to be able to pass messages back and forth ignoreIdleConsumers="true" as we don't want to kill connections of idle consumers abortConnection="false" as we don't want to kill the connection just a misbehaving consumer Can you guys run this version of the test and see how it works for you?
          Hide
          Wieslaw Dudek added a comment -

          For me the last AMQFreezeTest.zip is still failing both using the 5.8 snapshot containing the slow abort consumer strategy and latest 5.9 binaries.

          Show
          Wieslaw Dudek added a comment - For me the last AMQFreezeTest.zip is still failing both using the 5.8 snapshot containing the slow abort consumer strategy and latest 5.9 binaries.
          Hide
          Wieslaw Dudek added a comment -

          Please advise why do you think the auto_acknowledge option should not be used?

          I managed to get the both test cases to pass using the ignoreIdleConsumers="false" and abortConnection="true"
          if we consider the traces only /the messages really received/ and not taking into account the wrong statistics. I got all the messages consumed by the spring consumers. The slow strategy after being triggered a few times was able to close the connections and they have been recreated by spring consumers only so I had to modify the test cases to use the spring configuration for both INPUT_QUEUE & RECEIPT_QUEUE.
          I used the original test cases in form I sent here with the mentioned modifications.

          What might be the risk for us to use the ignoreIdleConsumers="false" and abortConnection="true"?

          Show
          Wieslaw Dudek added a comment - Please advise why do you think the auto_acknowledge option should not be used? I managed to get the both test cases to pass using the ignoreIdleConsumers="false" and abortConnection="true" if we consider the traces only /the messages really received/ and not taking into account the wrong statistics. I got all the messages consumed by the spring consumers. The slow strategy after being triggered a few times was able to close the connections and they have been recreated by spring consumers only so I had to modify the test cases to use the spring configuration for both INPUT_QUEUE & RECEIPT_QUEUE. I used the original test cases in form I sent here with the mentioned modifications. What might be the risk for us to use the ignoreIdleConsumers="false" and abortConnection="true"?
          Hide
          Timothy Bish added a comment -

          Idle consumers are consumers that have no outstanding dispatched messages. These consumer have nothing to do and therefore would always trip the slow condition and eventually be closed. In the case of having the abortConnection=true this means that you'd have consumer connection coming and going for no real reason which could impact performance of your application. I don't see why you'd want to not just ignore the idle one's and only focus on consumers that have not ack'd outstanding messages in the allocated time.

          Show
          Timothy Bish added a comment - Idle consumers are consumers that have no outstanding dispatched messages. These consumer have nothing to do and therefore would always trip the slow condition and eventually be closed. In the case of having the abortConnection=true this means that you'd have consumer connection coming and going for no real reason which could impact performance of your application. I don't see why you'd want to not just ignore the idle one's and only focus on consumers that have not ack'd outstanding messages in the allocated time.
          Hide
          Timothy Bish added a comment -

          Running with Dejan's configuration I see the test passing consistently now.

          Show
          Timothy Bish added a comment - Running with Dejan's configuration I see the test passing consistently now.
          Hide
          Wieslaw Dudek added a comment -

          For me it has also passed 90-95% times but I sent you the case when it failed. Also please notice the wrong statistics there. We had there above 1000 messages or even -2. Why?
          I do not want it to pass 99% but 100% because we have on the production rare case when we have to remove all the messages from all queues because of such "freeze" (nothing else helps). This is the reason I am a bit pushy to get it solved completly.
          As for the parameters with ignoreIdleConsumers="false" and abortConnection="true" I managed to get 100% of success runs ignoring statistics however. With ignoreIdleConsumers="true" none of the two test cases does not work for me every time. With abortConnection="false" only kahaPendingMessages.zip is going to work, the AMQFreezeTest is failing simetimes.
          We really need to get rid of the AMQ freezes which are very bad and cost much also as for the mainenance.
          Could you advise why ignoreIdleConsumers="false" helps so much but you said it should not? Anyway I would like to avoid setting this.

          Also please use the original test case as we can modify any test case to pass but I need to know reasons for any change to our application as for AUTO_ACKNOWLEDGE before puting it in the late state of our project. We need the code to reflect out production design.

          Show
          Wieslaw Dudek added a comment - For me it has also passed 90-95% times but I sent you the case when it failed. Also please notice the wrong statistics there. We had there above 1000 messages or even -2. Why? I do not want it to pass 99% but 100% because we have on the production rare case when we have to remove all the messages from all queues because of such "freeze" (nothing else helps). This is the reason I am a bit pushy to get it solved completly. As for the parameters with ignoreIdleConsumers="false" and abortConnection="true" I managed to get 100% of success runs ignoring statistics however. With ignoreIdleConsumers="true" none of the two test cases does not work for me every time. With abortConnection="false" only kahaPendingMessages.zip is going to work, the AMQFreezeTest is failing simetimes. We really need to get rid of the AMQ freezes which are very bad and cost much also as for the mainenance. Could you advise why ignoreIdleConsumers="false" helps so much but you said it should not? Anyway I would like to avoid setting this. Also please use the original test case as we can modify any test case to pass but I need to know reasons for any change to our application as for AUTO_ACKNOWLEDGE before puting it in the late state of our project. We need the code to reflect out production design.
          Hide
          Dejan Bosanac added a comment -

          The problem is how the spring message listener works. It doesn't use message listener like you would in a JMS app. Instead it does receive() on the session, gets the message and manually executes onMessage() on the listener. In auto acknowledge mode as soon as receive() is done, the message is acked and considered processed. So the message that sleeps forever will not be resent to other consumers when this consumer is detected slow. Turning to client acknowlement Spring actually manually acks the message when onMessage() completes. So when we close the consumer, the current long-sleeping message will be replayed to other consumers.

          Show
          Dejan Bosanac added a comment - The problem is how the spring message listener works. It doesn't use message listener like you would in a JMS app. Instead it does receive() on the session, gets the message and manually executes onMessage() on the listener. In auto acknowledge mode as soon as receive() is done, the message is acked and considered processed. So the message that sleeps forever will not be resent to other consumers when this consumer is detected slow. Turning to client acknowlement Spring actually manually acks the message when onMessage() completes. So when we close the consumer, the current long-sleeping message will be replayed to other consumers.
          Hide
          Wieslaw Dudek added a comment -

          Thank you for your comment!
          We do not care about the long-sleeping message but about all the prefetched messages for the sleeping consumer which needs to be redispatched & delivered to other consumers. This is all what needs to be guaranteed here no matter what option we are going to use in the spring listener we do not want the “freeze” - meaning that all pending/prefetched messages have to be re-dispatched from such “slow” /or frozen/ consumers to other ones. However I am still concerned about the ignoreIdleConsumers="false" and abortConnection="true" which is going to work for me. Why ignoreIdleConsumers="false" helps so much when you said it should not? Could it be checked as we would like to avoid this?

          Show
          Wieslaw Dudek added a comment - Thank you for your comment! We do not care about the long-sleeping message but about all the prefetched messages for the sleeping consumer which needs to be redispatched & delivered to other consumers. This is all what needs to be guaranteed here no matter what option we are going to use in the spring listener we do not want the “freeze” - meaning that all pending/prefetched messages have to be re-dispatched from such “slow” /or frozen/ consumers to other ones. However I am still concerned about the ignoreIdleConsumers="false" and abortConnection="true" which is going to work for me. Why ignoreIdleConsumers="false" helps so much when you said it should not? Could it be checked as we would like to avoid this?
          Hide
          Wieslaw Dudek added a comment -

          I used the last test case from: https://fusesource.com/issues/browse/ENTMQ-409
          and tested it on my laptop.
          Results: Tests run: 20 Failed: 8

          I attached logs from the failed tests:
          LOGS_FAILED_5.9.0.redhat-610-SNAPSHOT.zip

          I have also checked it on very slow Solaris system where I was not able to get the test case passed even once/see attached logs/.

          After changing only the options to have abortConnection="true" ignoreIdleConsumers="false" I managed to pass the test case on the Solaris environment and got better results on the laptop with Windows OS: Tests run: 20 Failed 2

          So I was able to get the test case failed even with abortConnection="true" ignoreIdleConsumers="false" but it behaved much better with these options.

          For testing I used the oryginal TC and 5.9.0.redhat-610-SNAPSHOT from:

          http://repo.fusesource.com/nexus/content/repositories/snapshots/

          As for activemq-all we had only those:
          5.9-fuse-SNAPSHOT/ Thu Jun 27 11:57:53 UTC 2013
          5.9.0.redhat-610-SNAPSHOT/ Sat Sep 21 00:34:04 UTC 2013

          Show
          Wieslaw Dudek added a comment - I used the last test case from: https://fusesource.com/issues/browse/ENTMQ-409 and tested it on my laptop. Results: Tests run: 20 Failed: 8 I attached logs from the failed tests: LOGS_FAILED_5.9.0.redhat-610-SNAPSHOT.zip I have also checked it on very slow Solaris system where I was not able to get the test case passed even once/see attached logs/. After changing only the options to have abortConnection="true" ignoreIdleConsumers="false" I managed to pass the test case on the Solaris environment and got better results on the laptop with Windows OS: Tests run: 20 Failed 2 So I was able to get the test case failed even with abortConnection="true" ignoreIdleConsumers="false" but it behaved much better with these options. For testing I used the oryginal TC and 5.9.0.redhat-610-SNAPSHOT from: http://repo.fusesource.com/nexus/content/repositories/snapshots/ As for activemq-all we had only those: 5.9-fuse-SNAPSHOT/ Thu Jun 27 11:57:53 UTC 2013 5.9.0.redhat-610-SNAPSHOT/ Sat Sep 21 00:34:04 UTC 2013
          Hide
          Gary Tully added a comment -

          Using abortConnection=false is viable with https://issues.apache.org/jira/browse/AMQ-5114 - the broker will always see a local remove when teh sub is aborted, even if the remote sub is slow to respond or never responds.

          Show
          Gary Tully added a comment - Using abortConnection=false is viable with https://issues.apache.org/jira/browse/AMQ-5114 - the broker will always see a local remove when teh sub is aborted, even if the remote sub is slow to respond or never responds.

            People

            • Assignee:
              Timothy Bish
              Reporter:
              Jason Shepherd
            • Votes:
              1 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development