ActiveMQ
  1. ActiveMQ
  2. AMQ-3607

Setting OptimiseAcknowledge on a queue with a prefetch limit causes normal/fast consumers to miss messages when a slow consumer is blocking

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Not A Problem
    • Affects Version/s: 5.5.0
    • Fix Version/s: 5.6.0
    • Component/s: Broker
    • Labels:
      None
    • Environment:

      Java: 1.6.0_26-b03-383.jdk

      Description

      The attached test case tests slow consumer handling with a variety of topic policies and SessionFactory/ConnectionFactory settings. The expectation is that a normal (i.e. fast) consumer will continue to receive messages whilst a slow consumer is blocking.

      Without a prefetch limit, the expected behaviour is seen with setOptimizeAcknowledge both true and false.

      If a prefetch limit is set, setOptimizeAcknowledge(true) causes the normal/fast consumer to miss messages whilst the slow consumer is blocking.

      Would be nice to be able to turn on OptimiseAcknowledge for performance reasons, however it is also necessary to set the prefetch limit in order to trigger SlowConsumerStrategy/MessageEvictionStrategySupport logic.

      testDefaultSettings
      Publisher: Send 0
      SlowConsumer: Receive 0
      FastConsumer: Receive 0
      testDefaultSettings: Publisher Sent: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
      testDefaultSettings: Whilst slow consumer blocked:
      		- SlowConsumer Received: 1 [0]
      		- FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
      testDefaultSettings: After slow consumer unblocked:
      		- SlowConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
      		- FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
      
      testDefaultSettingsWithOptimiseAcknowledge
      testDefaultSettingsWithOptimiseAcknowledge: Publisher Sent: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
      testDefaultSettingsWithOptimiseAcknowledge: Whilst slow consumer blocked:
      		- SlowConsumer Received: 1 [0]
      		- FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
      testDefaultSettingsWithOptimiseAcknowledge: After slow consumer unblocked:
      		- SlowConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
      		- FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
      
      testBounded
      testBounded: Publisher Sent: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
      testBounded: Whilst slow consumer blocked:
      		- SlowConsumer Received: 1 [0]
      		- FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
      testBounded: After slow consumer unblocked:
      		- SlowConsumer Received: 10 [0, 1, 2, 3, 4, 25, 26, 27, 28, 29]
      		- FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
      
      testBoundedWithOptimiseAcknowledge
      testBoundedWithOptimiseAcknowledge: Publisher Sent: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
      testBoundedWithOptimiseAcknowledge: Whilst slow consumer blocked:
      		- SlowConsumer Received: 1 [0]
      		- FastConsumer Received: 5 [0, 1, 2, 3, 4]
      testBoundedWithOptimiseAcknowledge: After slow consumer unblocked:
      		- SlowConsumer Received: 5 [0, 1, 2, 3, 4]
      		- FastConsumer Received: 5 [0, 1, 2, 3, 4]
      
      java.lang.AssertionError: Fast consumer missed messages whilst slow consumer was blocking expected:<[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]> but was:<[0, 1, 2, 3, 4]>
      

        Issue Links

          Activity

          James Furness created issue -
          Hide
          Timothy Bish added a comment -

          Can you attach your test case as a file here and tick the Grant License to Apache box so that the test can be included in the source.

          Show
          Timothy Bish added a comment - Can you attach your test case as a file here and tick the Grant License to Apache box so that the test can be included in the source.
          James Furness made changes -
          Field Original Value New Value
          Description The below test case tests slow consumer handling with a variety of topic policies and SessionFactory/ConnectionFactory settings. The expectation is that a normal (i.e. fast) consumer will continue to receive messages whilst a slow consumer is blocking.

          Without a prefetch limit, the expected behaviour is seen with setOptimizeAcknowledge both true and false.

          If a prefetch limit is set, setOptimizeAcknowledge(true) causes the normal/fast consumer to miss messages whilst the slow consumer is blocking.

          Would be nice to be able to turn on OptimiseAcknowledge for performance reasons, however it is also necessary to set the prefetch limit in order to trigger SlowConsumerStrategy/MessageEvictionStrategySupport logic.

          {code}
          import org.apache.activemq.ActiveMQConnectionFactory;
          import org.apache.activemq.broker.BrokerService;
          import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
          import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
          import org.apache.activemq.broker.region.policy.PolicyEntry;
          import org.apache.activemq.broker.region.policy.PolicyMap;
          import org.apache.activemq.command.ActiveMQTopic;
          import org.junit.Assert;
          import org.junit.Ignore;
          import org.junit.Test;

          import javax.jms.Connection;
          import javax.jms.DeliveryMode;
          import javax.jms.JMSException;
          import javax.jms.Message;
          import javax.jms.MessageConsumer;
          import javax.jms.MessageListener;
          import javax.jms.MessageProducer;
          import javax.jms.Session;
          import javax.jms.TextMessage;
          import java.util.ArrayList;
          import java.util.List;
          import java.util.concurrent.CountDownLatch;
          import java.util.concurrent.atomic.AtomicInteger;

          /**
           * @author James Furness
           */
          public class ActiveMQSlowConsumerManualTest {
              private static final int PORT = 12345;
              private static final ActiveMQTopic TOPIC = new ActiveMQTopic("TOPIC");
              private static final String URL = "nio://localhost:" + PORT + "?socket.tcpNoDelay=true";

              @Test(timeout = 60000)
              public void testDefaultSettings() throws Exception {
                  runTest("testDefaultSettings", 30, -1, -1, false, false, false, false);
              }

              @Test(timeout = 60000)
              public void testDefaultSettingsWithOptimiseAcknowledge() throws Exception {
                  runTest("testDefaultSettingsWithOptimiseAcknowledge", 30, -1, -1, false, false, true, false);
              }

              @Test(timeout = 60000)
              public void testBounded() throws Exception {
                  runTest("testBounded", 30, 5, 5, false, false, false, false);
              }

              @Test(timeout = 60000)
              public void testBoundedWithOptimiseAcknowledge() throws Exception {
                  runTest("testBoundedWithOptimiseAcknowledge", 30, 5, 5, false, false, true, false);
              }

              public void runTest(String name, int sendMessageCount, int prefetchLimit, int messageLimit, boolean evictOldestMessage, boolean disableFlowControl, boolean optimizeAcknowledge, boolean persistent) throws Exception {
                  BrokerService broker = createBroker(persistent);
                  broker.setDestinationPolicy(buildPolicy(TOPIC, prefetchLimit, messageLimit, evictOldestMessage, disableFlowControl));
                  broker.start();

                  // Slow consumer
                  Session slowConsumerSession = buildSession("SlowConsumer", URL, optimizeAcknowledge);
                  final CountDownLatch blockSlowConsumer = new CountDownLatch(1);
                  final AtomicInteger slowConsumerReceiveCount = new AtomicInteger();
                  final List<Integer> slowConsumerReceived = sendMessageCount <= 1000 ? new ArrayList<Integer>() : null;
                  MessageConsumer slowConsumer = createSubscriber(slowConsumerSession,
                          new MessageListener() {
                              @Override
                              public void onMessage(Message message) {
                                  try {
                                      slowConsumerReceiveCount.incrementAndGet();
                                      int count = Integer.parseInt(((TextMessage) message).getText());
                                      if (slowConsumerReceived != null) slowConsumerReceived.add(count);
                                      if (count % 10000 == 0) System.out.println("SlowConsumer: Receive " + count);
                                      blockSlowConsumer.await();
                                  } catch (Exception ignored) {}
                              }
                          }
                  );

                  // Fast consumer
                  Session fastConsumerSession = buildSession("FastConsumer", URL, optimizeAcknowledge);
                  final AtomicInteger fastConsumerReceiveCount = new AtomicInteger();
                  final List<Integer> fastConsumerReceived = sendMessageCount <= 1000 ? new ArrayList<Integer>() : null;
                  MessageConsumer fastConsumer = createSubscriber(fastConsumerSession,
                          new MessageListener() {
                              @Override
                              public void onMessage(Message message) {
                                  try {
                                      fastConsumerReceiveCount.incrementAndGet();
                                      int count = Integer.parseInt(((TextMessage) message).getText());
                                      if (fastConsumerReceived != null) fastConsumerReceived.add(count);
                                      if (count % 10000 == 0) System.out.println("FastConsumer: Receive " + count);
                                  } catch (Exception ignored) {}
                              }
                          }
                  );

                  // Wait for consumers to connect
                  Thread.sleep(500);

                  // Publisher
                  AtomicInteger sentCount = new AtomicInteger();
                  List<Integer> sent = sendMessageCount <= 1000 ? new ArrayList<Integer>() : null;
                  Session publisherSession = buildSession("Publisher", URL, optimizeAcknowledge);
                  MessageProducer publisher = createPublisher(publisherSession);
                  for (int i = 0; i < sendMessageCount; i++) {
                      sentCount.incrementAndGet();
                      if (sent != null) sent.add(i);
                      if (i % 10000 == 0) System.out.println("Publisher: Send " + i);
                      publisher.send(publisherSession.createTextMessage(Integer.toString(i)));
                  }

                  // Wait for messages to arrive
                  Thread.sleep(500);

                  System.out.println(name + ": Publisher Sent: " + sentCount + " " + sent);
                  System.out.println(name + ": Whilst slow consumer blocked:");
                  System.out.println("\t\t- SlowConsumer Received: " + slowConsumerReceiveCount + " " + slowConsumerReceived);
                  System.out.println("\t\t- FastConsumer Received: " + fastConsumerReceiveCount + " " + fastConsumerReceived);

                  // Unblock slow consumer
                  blockSlowConsumer.countDown();

                  // Wait for messages to arrive
                  Thread.sleep(500);

                  System.out.println(name + ": After slow consumer unblocked:");
                  System.out.println("\t\t- SlowConsumer Received: " + slowConsumerReceiveCount + " " + slowConsumerReceived);
                  System.out.println("\t\t- FastConsumer Received: " + fastConsumerReceiveCount + " " + fastConsumerReceived);
                  System.out.println();

                  publisher.close();
                  publisherSession.close();
                  slowConsumer.close();
                  slowConsumerSession.close();
                  fastConsumer.close();
                  fastConsumerSession.close();
                  broker.stop();

                  Assert.assertEquals("Fast consumer missed messages whilst slow consumer was blocking", sent, fastConsumerReceived);
                  Assert.assertEquals("Slow consumer received incorrect message count", Math.min(sendMessageCount, prefetchLimit + (messageLimit > 0 ? messageLimit : Integer.MAX_VALUE)), slowConsumerReceived.size());
              }

              private static BrokerService createBroker(boolean persistent) throws Exception {
                  BrokerService broker = new BrokerService();
                  broker.setBrokerName("TestBroker");
                  broker.setPersistent(persistent);
                  broker.addConnector(URL);
                  return broker;
              }

              private static MessageConsumer createSubscriber(Session session, MessageListener messageListener) throws JMSException {
                  MessageConsumer consumer = session.createConsumer(TOPIC);
                  consumer.setMessageListener(messageListener);
                  return consumer;
              }

              private static MessageProducer createPublisher(Session session) throws JMSException {
                  MessageProducer producer = session.createProducer(TOPIC);
                  producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                  return producer;
              }

              private static Session buildSession(String clientId, String url, boolean optimizeAcknowledge) throws JMSException {
                  ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);

                  connectionFactory.setCopyMessageOnSend(false);
                  connectionFactory.setDisableTimeStampsByDefault(true);
                  connectionFactory.setOptimizeAcknowledge(optimizeAcknowledge);

                  Connection connection = connectionFactory.createConnection();
                  connection.setClientID(clientId);

                  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

                  connection.start();

                  return session;
              }

              private static PolicyMap buildPolicy(ActiveMQTopic topic, int prefetchLimit, int messageLimit, boolean evictOldestMessage, boolean disableFlowControl) {
                  PolicyMap policyMap = new PolicyMap();

                  PolicyEntry policyEntry = new PolicyEntry();

                  if (evictOldestMessage) {
                      policyEntry.setMessageEvictionStrategy(new OldestMessageEvictionStrategy());
                  }

                  if (disableFlowControl) {
                      policyEntry.setProducerFlowControl(false);
                  }

                  if (prefetchLimit > 0) {
                      policyEntry.setTopicPrefetch(prefetchLimit);
                  }

                  if (messageLimit > 0) {
                      ConstantPendingMessageLimitStrategy messageLimitStrategy = new ConstantPendingMessageLimitStrategy();
                      messageLimitStrategy.setLimit(messageLimit);
                      policyEntry.setPendingMessageLimitStrategy(messageLimitStrategy);
                  }

                  policyMap.put(topic, policyEntry);

                  return policyMap;
              }
          }
          {code}
          The below test case tests slow consumer handling with a variety of topic policies and SessionFactory/ConnectionFactory settings. The expectation is that a normal (i.e. fast) consumer will continue to receive messages whilst a slow consumer is blocking.

          Without a prefetch limit, the expected behaviour is seen with setOptimizeAcknowledge both true and false.

          If a prefetch limit is set, setOptimizeAcknowledge(true) causes the normal/fast consumer to miss messages whilst the slow consumer is blocking.

          Would be nice to be able to turn on OptimiseAcknowledge for performance reasons, however it is also necessary to set the prefetch limit in order to trigger SlowConsumerStrategy/MessageEvictionStrategySupport logic.

          {code:title=testDefaultSettings}
          Publisher: Send 0
          SlowConsumer: Receive 0
          FastConsumer: Receive 0
          testDefaultSettings: Publisher Sent: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
          testDefaultSettings: Whilst slow consumer blocked:
          - SlowConsumer Received: 1 [0]
          - FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
          testDefaultSettings: After slow consumer unblocked:
          - SlowConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
          - FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
          {code}

          {code:title=testDefaultSettingsWithOptimiseAcknowledge}
          testDefaultSettingsWithOptimiseAcknowledge: Publisher Sent: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
          testDefaultSettingsWithOptimiseAcknowledge: Whilst slow consumer blocked:
          - SlowConsumer Received: 1 [0]
          - FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
          testDefaultSettingsWithOptimiseAcknowledge: After slow consumer unblocked:
          - SlowConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
          - FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
          {code}

          {code:title=testBounded}
          testBounded: Publisher Sent: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
          testBounded: Whilst slow consumer blocked:
          - SlowConsumer Received: 1 [0]
          - FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
          testBounded: After slow consumer unblocked:
          - SlowConsumer Received: 10 [0, 1, 2, 3, 4, 25, 26, 27, 28, 29]
          - FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
          {code}

          {code:title=testBoundedWithOptimiseAcknowledge}
          testBoundedWithOptimiseAcknowledge: Publisher Sent: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
          testBoundedWithOptimiseAcknowledge: Whilst slow consumer blocked:
          - SlowConsumer Received: 1 [0]
          - FastConsumer Received: 5 [0, 1, 2, 3, 4]
          testBoundedWithOptimiseAcknowledge: After slow consumer unblocked:
          - SlowConsumer Received: 5 [0, 1, 2, 3, 4]
          - FastConsumer Received: 5 [0, 1, 2, 3, 4]

          java.lang.AssertionError: Fast consumer missed messages whilst slow consumer was blocking expected:<[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]> but was:<[0, 1, 2, 3, 4]>
          {code}

          {code:title=ActiveMQSlowConsumerManualTest.java}
          import org.apache.activemq.ActiveMQConnectionFactory;
          import org.apache.activemq.broker.BrokerService;
          import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
          import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
          import org.apache.activemq.broker.region.policy.PolicyEntry;
          import org.apache.activemq.broker.region.policy.PolicyMap;
          import org.apache.activemq.command.ActiveMQTopic;
          import org.junit.Assert;
          import org.junit.Ignore;
          import org.junit.Test;

          import javax.jms.Connection;
          import javax.jms.DeliveryMode;
          import javax.jms.JMSException;
          import javax.jms.Message;
          import javax.jms.MessageConsumer;
          import javax.jms.MessageListener;
          import javax.jms.MessageProducer;
          import javax.jms.Session;
          import javax.jms.TextMessage;
          import java.util.ArrayList;
          import java.util.List;
          import java.util.concurrent.CountDownLatch;
          import java.util.concurrent.atomic.AtomicInteger;

          /**
           * @author James Furness
           */
          public class ActiveMQSlowConsumerManualTest {
              private static final int PORT = 12345;
              private static final ActiveMQTopic TOPIC = new ActiveMQTopic("TOPIC");
              private static final String URL = "nio://localhost:" + PORT + "?socket.tcpNoDelay=true";

              @Test(timeout = 60000)
              public void testDefaultSettings() throws Exception {
                  runTest("testDefaultSettings", 30, -1, -1, false, false, false, false);
              }

              @Test(timeout = 60000)
              public void testDefaultSettingsWithOptimiseAcknowledge() throws Exception {
                  runTest("testDefaultSettingsWithOptimiseAcknowledge", 30, -1, -1, false, false, true, false);
              }

              @Test(timeout = 60000)
              public void testBounded() throws Exception {
                  runTest("testBounded", 30, 5, 5, false, false, false, false);
              }

              @Test(timeout = 60000)
              public void testBoundedWithOptimiseAcknowledge() throws Exception {
                  runTest("testBoundedWithOptimiseAcknowledge", 30, 5, 5, false, false, true, false);
              }

              public void runTest(String name, int sendMessageCount, int prefetchLimit, int messageLimit, boolean evictOldestMessage, boolean disableFlowControl, boolean optimizeAcknowledge, boolean persistent) throws Exception {
                  BrokerService broker = createBroker(persistent);
                  broker.setDestinationPolicy(buildPolicy(TOPIC, prefetchLimit, messageLimit, evictOldestMessage, disableFlowControl));
                  broker.start();

                  // Slow consumer
                  Session slowConsumerSession = buildSession("SlowConsumer", URL, optimizeAcknowledge);
                  final CountDownLatch blockSlowConsumer = new CountDownLatch(1);
                  final AtomicInteger slowConsumerReceiveCount = new AtomicInteger();
                  final List<Integer> slowConsumerReceived = sendMessageCount <= 1000 ? new ArrayList<Integer>() : null;
                  MessageConsumer slowConsumer = createSubscriber(slowConsumerSession,
                          new MessageListener() {
                              @Override
                              public void onMessage(Message message) {
                                  try {
                                      slowConsumerReceiveCount.incrementAndGet();
                                      int count = Integer.parseInt(((TextMessage) message).getText());
                                      if (slowConsumerReceived != null) slowConsumerReceived.add(count);
                                      if (count % 10000 == 0) System.out.println("SlowConsumer: Receive " + count);
                                      blockSlowConsumer.await();
                                  } catch (Exception ignored) {}
                              }
                          }
                  );

                  // Fast consumer
                  Session fastConsumerSession = buildSession("FastConsumer", URL, optimizeAcknowledge);
                  final AtomicInteger fastConsumerReceiveCount = new AtomicInteger();
                  final List<Integer> fastConsumerReceived = sendMessageCount <= 1000 ? new ArrayList<Integer>() : null;
                  MessageConsumer fastConsumer = createSubscriber(fastConsumerSession,
                          new MessageListener() {
                              @Override
                              public void onMessage(Message message) {
                                  try {
                                      fastConsumerReceiveCount.incrementAndGet();
                                      int count = Integer.parseInt(((TextMessage) message).getText());
                                      if (fastConsumerReceived != null) fastConsumerReceived.add(count);
                                      if (count % 10000 == 0) System.out.println("FastConsumer: Receive " + count);
                                  } catch (Exception ignored) {}
                              }
                          }
                  );

                  // Wait for consumers to connect
                  Thread.sleep(500);

                  // Publisher
                  AtomicInteger sentCount = new AtomicInteger();
                  List<Integer> sent = sendMessageCount <= 1000 ? new ArrayList<Integer>() : null;
                  Session publisherSession = buildSession("Publisher", URL, optimizeAcknowledge);
                  MessageProducer publisher = createPublisher(publisherSession);
                  for (int i = 0; i < sendMessageCount; i++) {
                      sentCount.incrementAndGet();
                      if (sent != null) sent.add(i);
                      if (i % 10000 == 0) System.out.println("Publisher: Send " + i);
                      publisher.send(publisherSession.createTextMessage(Integer.toString(i)));
                  }

                  // Wait for messages to arrive
                  Thread.sleep(500);

                  System.out.println(name + ": Publisher Sent: " + sentCount + " " + sent);
                  System.out.println(name + ": Whilst slow consumer blocked:");
                  System.out.println("\t\t- SlowConsumer Received: " + slowConsumerReceiveCount + " " + slowConsumerReceived);
                  System.out.println("\t\t- FastConsumer Received: " + fastConsumerReceiveCount + " " + fastConsumerReceived);

                  // Unblock slow consumer
                  blockSlowConsumer.countDown();

                  // Wait for messages to arrive
                  Thread.sleep(500);

                  System.out.println(name + ": After slow consumer unblocked:");
                  System.out.println("\t\t- SlowConsumer Received: " + slowConsumerReceiveCount + " " + slowConsumerReceived);
                  System.out.println("\t\t- FastConsumer Received: " + fastConsumerReceiveCount + " " + fastConsumerReceived);
                  System.out.println();

                  publisher.close();
                  publisherSession.close();
                  slowConsumer.close();
                  slowConsumerSession.close();
                  fastConsumer.close();
                  fastConsumerSession.close();
                  broker.stop();

                  Assert.assertEquals("Fast consumer missed messages whilst slow consumer was blocking", sent, fastConsumerReceived);
                  Assert.assertEquals("Slow consumer received incorrect message count", Math.min(sendMessageCount, prefetchLimit + (messageLimit > 0 ? messageLimit : Integer.MAX_VALUE)), slowConsumerReceived.size());
              }

              private static BrokerService createBroker(boolean persistent) throws Exception {
                  BrokerService broker = new BrokerService();
                  broker.setBrokerName("TestBroker");
                  broker.setPersistent(persistent);
                  broker.addConnector(URL);
                  return broker;
              }

              private static MessageConsumer createSubscriber(Session session, MessageListener messageListener) throws JMSException {
                  MessageConsumer consumer = session.createConsumer(TOPIC);
                  consumer.setMessageListener(messageListener);
                  return consumer;
              }

              private static MessageProducer createPublisher(Session session) throws JMSException {
                  MessageProducer producer = session.createProducer(TOPIC);
                  producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                  return producer;
              }

              private static Session buildSession(String clientId, String url, boolean optimizeAcknowledge) throws JMSException {
                  ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);

                  connectionFactory.setCopyMessageOnSend(false);
                  connectionFactory.setDisableTimeStampsByDefault(true);
                  connectionFactory.setOptimizeAcknowledge(optimizeAcknowledge);

                  Connection connection = connectionFactory.createConnection();
                  connection.setClientID(clientId);

                  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

                  connection.start();

                  return session;
              }

              private static PolicyMap buildPolicy(ActiveMQTopic topic, int prefetchLimit, int messageLimit, boolean evictOldestMessage, boolean disableFlowControl) {
                  PolicyMap policyMap = new PolicyMap();

                  PolicyEntry policyEntry = new PolicyEntry();

                  if (evictOldestMessage) {
                      policyEntry.setMessageEvictionStrategy(new OldestMessageEvictionStrategy());
                  }

                  if (disableFlowControl) {
                      policyEntry.setProducerFlowControl(false);
                  }

                  if (prefetchLimit > 0) {
                      policyEntry.setTopicPrefetch(prefetchLimit);
                  }

                  if (messageLimit > 0) {
                      ConstantPendingMessageLimitStrategy messageLimitStrategy = new ConstantPendingMessageLimitStrategy();
                      messageLimitStrategy.setLimit(messageLimit);
                      policyEntry.setPendingMessageLimitStrategy(messageLimitStrategy);
                  }

                  policyMap.put(topic, policyEntry);

                  return policyMap;
              }
          }
          {code}
          Hide
          James Furness added a comment -

          Test case attached as requested

          Show
          James Furness added a comment - Test case attached as requested
          James Furness made changes -
          Attachment ActiveMQSlowConsumerManualTest.java [ 12504903 ]
          James Furness made changes -
          Description The below test case tests slow consumer handling with a variety of topic policies and SessionFactory/ConnectionFactory settings. The expectation is that a normal (i.e. fast) consumer will continue to receive messages whilst a slow consumer is blocking.

          Without a prefetch limit, the expected behaviour is seen with setOptimizeAcknowledge both true and false.

          If a prefetch limit is set, setOptimizeAcknowledge(true) causes the normal/fast consumer to miss messages whilst the slow consumer is blocking.

          Would be nice to be able to turn on OptimiseAcknowledge for performance reasons, however it is also necessary to set the prefetch limit in order to trigger SlowConsumerStrategy/MessageEvictionStrategySupport logic.

          {code:title=testDefaultSettings}
          Publisher: Send 0
          SlowConsumer: Receive 0
          FastConsumer: Receive 0
          testDefaultSettings: Publisher Sent: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
          testDefaultSettings: Whilst slow consumer blocked:
          - SlowConsumer Received: 1 [0]
          - FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
          testDefaultSettings: After slow consumer unblocked:
          - SlowConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
          - FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
          {code}

          {code:title=testDefaultSettingsWithOptimiseAcknowledge}
          testDefaultSettingsWithOptimiseAcknowledge: Publisher Sent: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
          testDefaultSettingsWithOptimiseAcknowledge: Whilst slow consumer blocked:
          - SlowConsumer Received: 1 [0]
          - FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
          testDefaultSettingsWithOptimiseAcknowledge: After slow consumer unblocked:
          - SlowConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
          - FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
          {code}

          {code:title=testBounded}
          testBounded: Publisher Sent: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
          testBounded: Whilst slow consumer blocked:
          - SlowConsumer Received: 1 [0]
          - FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
          testBounded: After slow consumer unblocked:
          - SlowConsumer Received: 10 [0, 1, 2, 3, 4, 25, 26, 27, 28, 29]
          - FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
          {code}

          {code:title=testBoundedWithOptimiseAcknowledge}
          testBoundedWithOptimiseAcknowledge: Publisher Sent: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
          testBoundedWithOptimiseAcknowledge: Whilst slow consumer blocked:
          - SlowConsumer Received: 1 [0]
          - FastConsumer Received: 5 [0, 1, 2, 3, 4]
          testBoundedWithOptimiseAcknowledge: After slow consumer unblocked:
          - SlowConsumer Received: 5 [0, 1, 2, 3, 4]
          - FastConsumer Received: 5 [0, 1, 2, 3, 4]

          java.lang.AssertionError: Fast consumer missed messages whilst slow consumer was blocking expected:<[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]> but was:<[0, 1, 2, 3, 4]>
          {code}

          {code:title=ActiveMQSlowConsumerManualTest.java}
          import org.apache.activemq.ActiveMQConnectionFactory;
          import org.apache.activemq.broker.BrokerService;
          import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
          import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
          import org.apache.activemq.broker.region.policy.PolicyEntry;
          import org.apache.activemq.broker.region.policy.PolicyMap;
          import org.apache.activemq.command.ActiveMQTopic;
          import org.junit.Assert;
          import org.junit.Ignore;
          import org.junit.Test;

          import javax.jms.Connection;
          import javax.jms.DeliveryMode;
          import javax.jms.JMSException;
          import javax.jms.Message;
          import javax.jms.MessageConsumer;
          import javax.jms.MessageListener;
          import javax.jms.MessageProducer;
          import javax.jms.Session;
          import javax.jms.TextMessage;
          import java.util.ArrayList;
          import java.util.List;
          import java.util.concurrent.CountDownLatch;
          import java.util.concurrent.atomic.AtomicInteger;

          /**
           * @author James Furness
           */
          public class ActiveMQSlowConsumerManualTest {
              private static final int PORT = 12345;
              private static final ActiveMQTopic TOPIC = new ActiveMQTopic("TOPIC");
              private static final String URL = "nio://localhost:" + PORT + "?socket.tcpNoDelay=true";

              @Test(timeout = 60000)
              public void testDefaultSettings() throws Exception {
                  runTest("testDefaultSettings", 30, -1, -1, false, false, false, false);
              }

              @Test(timeout = 60000)
              public void testDefaultSettingsWithOptimiseAcknowledge() throws Exception {
                  runTest("testDefaultSettingsWithOptimiseAcknowledge", 30, -1, -1, false, false, true, false);
              }

              @Test(timeout = 60000)
              public void testBounded() throws Exception {
                  runTest("testBounded", 30, 5, 5, false, false, false, false);
              }

              @Test(timeout = 60000)
              public void testBoundedWithOptimiseAcknowledge() throws Exception {
                  runTest("testBoundedWithOptimiseAcknowledge", 30, 5, 5, false, false, true, false);
              }

              public void runTest(String name, int sendMessageCount, int prefetchLimit, int messageLimit, boolean evictOldestMessage, boolean disableFlowControl, boolean optimizeAcknowledge, boolean persistent) throws Exception {
                  BrokerService broker = createBroker(persistent);
                  broker.setDestinationPolicy(buildPolicy(TOPIC, prefetchLimit, messageLimit, evictOldestMessage, disableFlowControl));
                  broker.start();

                  // Slow consumer
                  Session slowConsumerSession = buildSession("SlowConsumer", URL, optimizeAcknowledge);
                  final CountDownLatch blockSlowConsumer = new CountDownLatch(1);
                  final AtomicInteger slowConsumerReceiveCount = new AtomicInteger();
                  final List<Integer> slowConsumerReceived = sendMessageCount <= 1000 ? new ArrayList<Integer>() : null;
                  MessageConsumer slowConsumer = createSubscriber(slowConsumerSession,
                          new MessageListener() {
                              @Override
                              public void onMessage(Message message) {
                                  try {
                                      slowConsumerReceiveCount.incrementAndGet();
                                      int count = Integer.parseInt(((TextMessage) message).getText());
                                      if (slowConsumerReceived != null) slowConsumerReceived.add(count);
                                      if (count % 10000 == 0) System.out.println("SlowConsumer: Receive " + count);
                                      blockSlowConsumer.await();
                                  } catch (Exception ignored) {}
                              }
                          }
                  );

                  // Fast consumer
                  Session fastConsumerSession = buildSession("FastConsumer", URL, optimizeAcknowledge);
                  final AtomicInteger fastConsumerReceiveCount = new AtomicInteger();
                  final List<Integer> fastConsumerReceived = sendMessageCount <= 1000 ? new ArrayList<Integer>() : null;
                  MessageConsumer fastConsumer = createSubscriber(fastConsumerSession,
                          new MessageListener() {
                              @Override
                              public void onMessage(Message message) {
                                  try {
                                      fastConsumerReceiveCount.incrementAndGet();
                                      int count = Integer.parseInt(((TextMessage) message).getText());
                                      if (fastConsumerReceived != null) fastConsumerReceived.add(count);
                                      if (count % 10000 == 0) System.out.println("FastConsumer: Receive " + count);
                                  } catch (Exception ignored) {}
                              }
                          }
                  );

                  // Wait for consumers to connect
                  Thread.sleep(500);

                  // Publisher
                  AtomicInteger sentCount = new AtomicInteger();
                  List<Integer> sent = sendMessageCount <= 1000 ? new ArrayList<Integer>() : null;
                  Session publisherSession = buildSession("Publisher", URL, optimizeAcknowledge);
                  MessageProducer publisher = createPublisher(publisherSession);
                  for (int i = 0; i < sendMessageCount; i++) {
                      sentCount.incrementAndGet();
                      if (sent != null) sent.add(i);
                      if (i % 10000 == 0) System.out.println("Publisher: Send " + i);
                      publisher.send(publisherSession.createTextMessage(Integer.toString(i)));
                  }

                  // Wait for messages to arrive
                  Thread.sleep(500);

                  System.out.println(name + ": Publisher Sent: " + sentCount + " " + sent);
                  System.out.println(name + ": Whilst slow consumer blocked:");
                  System.out.println("\t\t- SlowConsumer Received: " + slowConsumerReceiveCount + " " + slowConsumerReceived);
                  System.out.println("\t\t- FastConsumer Received: " + fastConsumerReceiveCount + " " + fastConsumerReceived);

                  // Unblock slow consumer
                  blockSlowConsumer.countDown();

                  // Wait for messages to arrive
                  Thread.sleep(500);

                  System.out.println(name + ": After slow consumer unblocked:");
                  System.out.println("\t\t- SlowConsumer Received: " + slowConsumerReceiveCount + " " + slowConsumerReceived);
                  System.out.println("\t\t- FastConsumer Received: " + fastConsumerReceiveCount + " " + fastConsumerReceived);
                  System.out.println();

                  publisher.close();
                  publisherSession.close();
                  slowConsumer.close();
                  slowConsumerSession.close();
                  fastConsumer.close();
                  fastConsumerSession.close();
                  broker.stop();

                  Assert.assertEquals("Fast consumer missed messages whilst slow consumer was blocking", sent, fastConsumerReceived);
                  Assert.assertEquals("Slow consumer received incorrect message count", Math.min(sendMessageCount, prefetchLimit + (messageLimit > 0 ? messageLimit : Integer.MAX_VALUE)), slowConsumerReceived.size());
              }

              private static BrokerService createBroker(boolean persistent) throws Exception {
                  BrokerService broker = new BrokerService();
                  broker.setBrokerName("TestBroker");
                  broker.setPersistent(persistent);
                  broker.addConnector(URL);
                  return broker;
              }

              private static MessageConsumer createSubscriber(Session session, MessageListener messageListener) throws JMSException {
                  MessageConsumer consumer = session.createConsumer(TOPIC);
                  consumer.setMessageListener(messageListener);
                  return consumer;
              }

              private static MessageProducer createPublisher(Session session) throws JMSException {
                  MessageProducer producer = session.createProducer(TOPIC);
                  producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                  return producer;
              }

              private static Session buildSession(String clientId, String url, boolean optimizeAcknowledge) throws JMSException {
                  ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);

                  connectionFactory.setCopyMessageOnSend(false);
                  connectionFactory.setDisableTimeStampsByDefault(true);
                  connectionFactory.setOptimizeAcknowledge(optimizeAcknowledge);

                  Connection connection = connectionFactory.createConnection();
                  connection.setClientID(clientId);

                  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

                  connection.start();

                  return session;
              }

              private static PolicyMap buildPolicy(ActiveMQTopic topic, int prefetchLimit, int messageLimit, boolean evictOldestMessage, boolean disableFlowControl) {
                  PolicyMap policyMap = new PolicyMap();

                  PolicyEntry policyEntry = new PolicyEntry();

                  if (evictOldestMessage) {
                      policyEntry.setMessageEvictionStrategy(new OldestMessageEvictionStrategy());
                  }

                  if (disableFlowControl) {
                      policyEntry.setProducerFlowControl(false);
                  }

                  if (prefetchLimit > 0) {
                      policyEntry.setTopicPrefetch(prefetchLimit);
                  }

                  if (messageLimit > 0) {
                      ConstantPendingMessageLimitStrategy messageLimitStrategy = new ConstantPendingMessageLimitStrategy();
                      messageLimitStrategy.setLimit(messageLimit);
                      policyEntry.setPendingMessageLimitStrategy(messageLimitStrategy);
                  }

                  policyMap.put(topic, policyEntry);

                  return policyMap;
              }
          }
          {code}
          The attached test case tests slow consumer handling with a variety of topic policies and SessionFactory/ConnectionFactory settings. The expectation is that a normal (i.e. fast) consumer will continue to receive messages whilst a slow consumer is blocking.

          Without a prefetch limit, the expected behaviour is seen with setOptimizeAcknowledge both true and false.

          If a prefetch limit is set, setOptimizeAcknowledge(true) causes the normal/fast consumer to miss messages whilst the slow consumer is blocking.

          Would be nice to be able to turn on OptimiseAcknowledge for performance reasons, however it is also necessary to set the prefetch limit in order to trigger SlowConsumerStrategy/MessageEvictionStrategySupport logic.

          {code:title=testDefaultSettings}
          Publisher: Send 0
          SlowConsumer: Receive 0
          FastConsumer: Receive 0
          testDefaultSettings: Publisher Sent: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
          testDefaultSettings: Whilst slow consumer blocked:
          - SlowConsumer Received: 1 [0]
          - FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
          testDefaultSettings: After slow consumer unblocked:
          - SlowConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
          - FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
          {code}

          {code:title=testDefaultSettingsWithOptimiseAcknowledge}
          testDefaultSettingsWithOptimiseAcknowledge: Publisher Sent: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
          testDefaultSettingsWithOptimiseAcknowledge: Whilst slow consumer blocked:
          - SlowConsumer Received: 1 [0]
          - FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
          testDefaultSettingsWithOptimiseAcknowledge: After slow consumer unblocked:
          - SlowConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
          - FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
          {code}

          {code:title=testBounded}
          testBounded: Publisher Sent: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
          testBounded: Whilst slow consumer blocked:
          - SlowConsumer Received: 1 [0]
          - FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
          testBounded: After slow consumer unblocked:
          - SlowConsumer Received: 10 [0, 1, 2, 3, 4, 25, 26, 27, 28, 29]
          - FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
          {code}

          {code:title=testBoundedWithOptimiseAcknowledge}
          testBoundedWithOptimiseAcknowledge: Publisher Sent: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
          testBoundedWithOptimiseAcknowledge: Whilst slow consumer blocked:
          - SlowConsumer Received: 1 [0]
          - FastConsumer Received: 5 [0, 1, 2, 3, 4]
          testBoundedWithOptimiseAcknowledge: After slow consumer unblocked:
          - SlowConsumer Received: 5 [0, 1, 2, 3, 4]
          - FastConsumer Received: 5 [0, 1, 2, 3, 4]

          java.lang.AssertionError: Fast consumer missed messages whilst slow consumer was blocking expected:<[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]> but was:<[0, 1, 2, 3, 4]>
          {code}
          Hide
          Gary Tully added a comment -

          Great to see a nice junit test, I had to have a peek.
          Apart from the missing setter for connectionFactory.setOptimizeAcknowledgeTimeOut this test is working as expected.
          The failure results from the low prefetch and discard limit so more often than not messages are discarded and end up in the dlq.
          I made use of the setOptimizeAcknowledgeTimeOut feature, to allow an ack to be sent on a timer rather than on a percentage of prefetch such that there is more control in optimizeAck mode. Without that, once 5 messages are sent, the rest end up in the dlq as there is no ack till more are sent.
          If you enable debug logging for:
          org.apache.activemq.broker.region.TopicSubscription you will see the discards the result from the pending message limit strategy.

          Some mods in http://svn.apache.org/viewvc?rev=1208048&view=rev

          Show
          Gary Tully added a comment - Great to see a nice junit test, I had to have a peek. Apart from the missing setter for connectionFactory.setOptimizeAcknowledgeTimeOut this test is working as expected. The failure results from the low prefetch and discard limit so more often than not messages are discarded and end up in the dlq. I made use of the setOptimizeAcknowledgeTimeOut feature, to allow an ack to be sent on a timer rather than on a percentage of prefetch such that there is more control in optimizeAck mode. Without that, once 5 messages are sent, the rest end up in the dlq as there is no ack till more are sent. If you enable debug logging for: org.apache.activemq.broker.region.TopicSubscription you will see the discards the result from the pending message limit strategy. Some mods in http://svn.apache.org/viewvc?rev=1208048&view=rev
          Gary Tully made changes -
          Status Open [ 1 ] Resolved [ 5 ]
          Assignee Gary Tully [ gtully ]
          Fix Version/s 5.6.0 [ 12317974 ]
          Resolution Not A Problem [ 8 ]
          Hide
          James Furness added a comment -

          Hi Gary,

          Thanks for your response. I see your point regards the low prefetch size:

          ActiveMQMessageConsumer.java
          if (ackCounter >= (info.getPrefetchSize() * .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) {
          

          As expected, I can't get your modified test case passing (with 5.5.1-fuse-01-06), but will try again once your tweak to add the missing setter in ActiveMQConnectionFactory.configureConnection() is available.

          Cheers,
          James

          Show
          James Furness added a comment - Hi Gary, Thanks for your response. I see your point regards the low prefetch size: ActiveMQMessageConsumer.java if (ackCounter >= (info.getPrefetchSize() * .65) || (optimizeAcknowledgeTimeOut > 0 && System .currentTimeMillis() >= (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) { As expected, I can't get your modified test case passing (with 5.5.1-fuse-01-06), but will try again once your tweak to add the missing setter in ActiveMQConnectionFactory.configureConnection() is available. Cheers, James
          James Furness made changes -
          Link This issue relates to AMQ-3676 [ AMQ-3676 ]
          Transition Time In Source Status Execution Times Last Executer Last Execution Date
          Open Open Resolved Resolved
          6d 1h 41m 1 Gary Tully 29/Nov/11 19:58

            People

            • Assignee:
              Gary Tully
              Reporter:
              James Furness
            • Votes:
              0 Vote for this issue
              Watchers:
              0 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development