ActiveMQ
  1. ActiveMQ
  2. AMQ-1940

Negative queue size (reproducible)

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Critical Critical
    • Resolution: Fixed
    • Affects Version/s: 5.2.0
    • Fix Version/s: 5.3.0
    • Component/s: Broker
    • Labels:
      None
    • Environment:

      Found on Windows but reproduced under Linux

    • Regression:
      Regression

      Description

      When you "purge" queue from web admin console, it zeroes queue message
      counter. But if you have an active consumer at that time which
      pre-fetched messages than your consumer will keep sending ack as it
      process messages from its buffer. ActiveMQ will keep decrement counter
      upon receiving each ack. So when consumer is done queue will show
      MINUS<consumer buffer size>.

      1. Picture 6.png
        121 kB
        Brian Moran
      2. QueuePurgeTest.java.diff.txt
        4 kB
        Bruce Snyder
      3. Main.java
        4 kB
        Vadim Chekan

        Issue Links

          Activity

          Hide
          Vadim Chekan added a comment -

          Attached program reproduces a queue with -100 messages.

          Show
          Vadim Chekan added a comment - Attached program reproduces a queue with -100 messages.
          Hide
          Bruce Snyder added a comment -

          I've created the following test case out of the attached Main.java class:

          package org.apache.activemq.broker.region;
          
          import javax.jms.Connection;
          import javax.jms.ConnectionFactory;
          import javax.jms.JMSException;
          import javax.jms.Message;
          import javax.jms.MessageConsumer;
          import javax.jms.MessageProducer;
          import javax.jms.Queue;
          import javax.jms.Session;
          import javax.jms.TextMessage;
          import javax.management.MBeanServerInvocationHandler;
          import javax.management.MalformedObjectNameException;
          import javax.management.ObjectName;
          
          import junit.framework.TestCase;
          
          import org.apache.activemq.ActiveMQConnectionFactory;
          import org.apache.activemq.broker.BrokerService;
          import org.apache.activemq.broker.jmx.QueueViewMBean;
          
          public class QueuePurgeTest extends TestCase {
              
              BrokerService broker; 
              ConnectionFactory factory; 
              Connection connection;
              Session session;
              Queue queue;
              MessageConsumer consumer;
          
              protected void setUp() throws Exception {
                  broker = new BrokerService();
                  broker.setUseJmx(true);
                  broker.setPersistent(false);
                  broker.addConnector("tcp://localhost:0");
                  broker.start();
                  
                  factory = new ActiveMQConnectionFactory("vm://localhost");
                  
                  connection = factory.createConnection();
                  connection.start();
              }
          
              protected void tearDown() throws Exception {
                  consumer.close();
                  session.close();
                  connection.stop();
                  connection.close();
                  broker.stop();
              }
              
              public void testPurgeQueueWithActiveConsumer() throws Exception {
                  createProducerAndSendMessages();        
                  QueueViewMBean proxy = getProxyToQueueViewMBean();
                  createConsumer();
                  proxy.purge();
                  assertEquals("Queue size is not zero, it's " + proxy.getQueueSize(), 0, proxy.getQueueSize());
              }
          
              private QueueViewMBean getProxyToQueueViewMBean()
                      throws MalformedObjectNameException, JMSException {
                  ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq" + ":Type=Queue,Destination=" + 
                          queue.getQueueName() + ",BrokerName=localhost");
                  QueueViewMBean proxy = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(
                          broker.getManagementContext().getMBeanServer(), 
                          queueViewMBeanName, QueueViewMBean.class, true);
                   
                   return proxy;
              }
              
              private void createProducerAndSendMessages() throws Exception {
                  session =  connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
                  queue = session.createQueue("test1");
                  MessageProducer producer = session.createProducer(queue);
                  for(int i=0; i<10000; i++) {
                      TextMessage message = session.createTextMessage("message "+i);
                      producer.send(message);
                  }
                  producer.close();
              }
              
              private void createConsumer() throws Exception {
                  consumer = session.createConsumer(queue);
                  // wait for buffer fill out
                  Thread.sleep(5*1000);
                  
                  for(int i = 0; i < 100; ++i) {
                      Message message = consumer.receive();
                      message.acknowledge();
                  }
              }
          
          }
          

          What I'm finding is that failure is intermittent, but I am able to see it once in a while:

          junit.framework.AssertionFailedError: Queue size is not zero expected:<0> but was:<-60>
          at junit.framework.Assert.fail(Assert.java:47)
          at junit.framework.Assert.failNotEquals(Assert.java:282)
          at junit.framework.Assert.assertEquals(Assert.java:64)
          at junit.framework.Assert.assertEquals(Assert.java:136)
          at org.apache.activemq.broker.region.QueuePurgeTest.testPurgeQueueWithActiveConsumer(QueuePurgeTest.java:56)
          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
          at java.lang.reflect.Method.invoke(Method.java:585)
          at junit.framework.TestCase.runTest(TestCase.java:154)
          at junit.framework.TestCase.runBare(TestCase.java:127)
          at junit.framework.TestResult$1.protect(TestResult.java:106)
          at junit.framework.TestResult.runProtected(TestResult.java:124)
          at junit.framework.TestResult.run(TestResult.java:109)
          at junit.framework.TestCase.run(TestCase.java:118)
          at junit.framework.TestSuite.runTest(TestSuite.java:208)
          at junit.framework.TestSuite.run(TestSuite.java:203)
          at org.eclipse.jdt.internal.junit.runner.junit3.JUnit3TestReference.run(JUnit3TestReference.java:130)
          at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
          at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:460)
          at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:673)
          at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:386)
          at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:196)

          Show
          Bruce Snyder added a comment - I've created the following test case out of the attached Main.java class: package org.apache.activemq.broker.region; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import javax.management.MBeanServerInvocationHandler; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; import junit.framework.TestCase; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.jmx.QueueViewMBean; public class QueuePurgeTest extends TestCase { BrokerService broker; ConnectionFactory factory; Connection connection; Session session; Queue queue; MessageConsumer consumer; protected void setUp() throws Exception { broker = new BrokerService(); broker.setUseJmx( true ); broker.setPersistent( false ); broker.addConnector( "tcp: //localhost:0" ); broker.start(); factory = new ActiveMQConnectionFactory( "vm: //localhost" ); connection = factory.createConnection(); connection.start(); } protected void tearDown() throws Exception { consumer.close(); session.close(); connection.stop(); connection.close(); broker.stop(); } public void testPurgeQueueWithActiveConsumer() throws Exception { createProducerAndSendMessages(); QueueViewMBean proxy = getProxyToQueueViewMBean(); createConsumer(); proxy.purge(); assertEquals( "Queue size is not zero, it's " + proxy.getQueueSize(), 0, proxy.getQueueSize()); } private QueueViewMBean getProxyToQueueViewMBean() throws MalformedObjectNameException, JMSException { ObjectName queueViewMBeanName = new ObjectName( "org.apache.activemq" + ":Type=Queue,Destination=" + queue.getQueueName() + ",BrokerName=localhost" ); QueueViewMBean proxy = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance( broker.getManagementContext().getMBeanServer(), queueViewMBeanName, QueueViewMBean.class, true ); return proxy; } private void createProducerAndSendMessages() throws Exception { session = connection.createSession( false , Session.CLIENT_ACKNOWLEDGE); queue = session.createQueue( "test1" ); MessageProducer producer = session.createProducer(queue); for ( int i=0; i<10000; i++) { TextMessage message = session.createTextMessage( "message " +i); producer.send(message); } producer.close(); } private void createConsumer() throws Exception { consumer = session.createConsumer(queue); // wait for buffer fill out Thread .sleep(5*1000); for ( int i = 0; i < 100; ++i) { Message message = consumer.receive(); message.acknowledge(); } } } What I'm finding is that failure is intermittent, but I am able to see it once in a while: junit.framework.AssertionFailedError: Queue size is not zero expected:<0> but was:<-60> at junit.framework.Assert.fail(Assert.java:47) at junit.framework.Assert.failNotEquals(Assert.java:282) at junit.framework.Assert.assertEquals(Assert.java:64) at junit.framework.Assert.assertEquals(Assert.java:136) at org.apache.activemq.broker.region.QueuePurgeTest.testPurgeQueueWithActiveConsumer(QueuePurgeTest.java:56) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:585) at junit.framework.TestCase.runTest(TestCase.java:154) at junit.framework.TestCase.runBare(TestCase.java:127) at junit.framework.TestResult$1.protect(TestResult.java:106) at junit.framework.TestResult.runProtected(TestResult.java:124) at junit.framework.TestResult.run(TestResult.java:109) at junit.framework.TestCase.run(TestCase.java:118) at junit.framework.TestSuite.runTest(TestSuite.java:208) at junit.framework.TestSuite.run(TestSuite.java:203) at org.eclipse.jdt.internal.junit.runner.junit3.JUnit3TestReference.run(JUnit3TestReference.java:130) at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:460) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:673) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:386) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:196)
          Hide
          Vadim Chekan added a comment -

          Perhaps it can be made more reproducible if purge is run in a thread parallel to the consumer. I'll try to rewrite it later tonight.

          Show
          Vadim Chekan added a comment - Perhaps it can be made more reproducible if purge is run in a thread parallel to the consumer. I'll try to rewrite it later tonight.
          Hide
          Rob Davies added a comment -

          Fixed by SVN revision 697921

          Show
          Rob Davies added a comment - Fixed by SVN revision 697921
          Hide
          Gordon Hopper added a comment -

          1693 displays the same symptom of negative message counts.

          Show
          Gordon Hopper added a comment - 1693 displays the same symptom of negative message counts.
          Hide
          Brian Moran added a comment -

          I can reproduce this in 5.2.0.
          I am going to attach a screenshot.

          Show
          Brian Moran added a comment - I can reproduce this in 5.2.0. I am going to attach a screenshot.
          Hide
          Brian Moran added a comment -

          Screenshot of negative messages, reproducible in 5.2.0 after purging the message queue, STOMP clients.

          Show
          Brian Moran added a comment - Screenshot of negative messages, reproducible in 5.2.0 after purging the message queue, STOMP clients.
          Hide
          Loc Truong added a comment -

          I can reproduct this in 5.2.0. This happens to me because there are too many receivers, each of them try to recevie 500 messages but the total messages in queue is smaller, hence negative number.

          Show
          Loc Truong added a comment - I can reproduct this in 5.2.0. This happens to me because there are too many receivers, each of them try to recevie 500 messages but the total messages in queue is smaller, hence negative number.
          Hide
          Yuri Ushakov added a comment -

          We have the same problem with 5.2.0. A queue, single consumer, single producer. We have -1 pending now, 1616 sent, 1617 received.

          Show
          Yuri Ushakov added a comment - We have the same problem with 5.2.0. A queue, single consumer, single producer. We have -1 pending now, 1616 sent, 1617 received.
          Hide
          Loc Truong added a comment -

          I can reproduce this on 5.2.0 too. I have 7 consumers getting 150 messages each. The negative number I receive is -123 at lowest.

          Show
          Loc Truong added a comment - I can reproduce this on 5.2.0 too. I have 7 consumers getting 150 messages each. The negative number I receive is -123 at lowest.
          Hide
          Ivan Dubrov added a comment -

          I had the same issue today.

          We are running two AMQ brokers in network of brokers, each one talking to the other one (the duplex is set to false). Under a high load after few hours I've noted that one of our queues started growing on both nodes, several thousand of pending messages in about 20 minutes (and the cursor memory usage was quite large). After I accidentally stopped the second node, the network bridge stopped on the first node and the queue size on the first node immediately became "-8".

          So in my case this probably somehow related to the instability of network of brokers implementation, because if I disable the network connector the system runs stable for several days under the load.

          Show
          Ivan Dubrov added a comment - I had the same issue today. We are running two AMQ brokers in network of brokers, each one talking to the other one (the duplex is set to false). Under a high load after few hours I've noted that one of our queues started growing on both nodes, several thousand of pending messages in about 20 minutes (and the cursor memory usage was quite large). After I accidentally stopped the second node, the network bridge stopped on the first node and the queue size on the first node immediately became "-8". So in my case this probably somehow related to the instability of network of brokers implementation, because if I disable the network connector the system runs stable for several days under the load.
          Hide
          Rob Davies added a comment -

          Fixed for the 5.3

          Show
          Rob Davies added a comment - Fixed for the 5.3
          Hide
          Pavel added a comment -

          We seen such issue on 5.6.0 ActiveMQ.

          Show
          Pavel added a comment - We seen such issue on 5.6.0 ActiveMQ.
          Hide
          Wolfram Schlich added a comment -

          We also experience this currently with ActiveMQ 5.8.0.

          Show
          Wolfram Schlich added a comment - We also experience this currently with ActiveMQ 5.8.0.
          Hide
          Sergey added a comment -

          Still have this issue in 5.10.0. Moreover, sometime I can see number of pending messages groving, but if I try to browse queue, I see nothing there. I.e. this numbers are all wrong.

          Show
          Sergey added a comment - Still have this issue in 5.10.0. Moreover, sometime I can see number of pending messages groving, but if I try to browse queue, I see nothing there. I.e. this numbers are all wrong.
          Hide
          Sergey added a comment -

          What I can see in JMX using Hawtio:
          Total dequeue count 2474137
          Total enqueue count 1795563
          Total message count -678584

          BTW, I have camel route within broker, where incoming messages are duplicated in two queues. And, at the moment, web console displays something like:

          queue pending enqueued dequeued
          algo250a 856725 906471 1552909
          algo250b -23451 906472 929923
          highwayQueue 0 0 0

          Last one is inbound queue, algoxxx are outbound. I expect that number of enqueued messages for inbound Q should be equal to number of dequeued. But its always zero.

          Show
          Sergey added a comment - What I can see in JMX using Hawtio: Total dequeue count 2474137 Total enqueue count 1795563 Total message count -678584 BTW, I have camel route within broker, where incoming messages are duplicated in two queues. And, at the moment, web console displays something like: queue pending enqueued dequeued algo250a 856725 906471 1552909 algo250b -23451 906472 929923 highwayQueue 0 0 0 Last one is inbound queue, algoxxx are outbound. I expect that number of enqueued messages for inbound Q should be equal to number of dequeued. But its always zero.
          Hide
          Gary Tully added a comment -

          @Sergey - can you make a junit test case that can reproduce - peek at the activemq-camel module if you need to use camel routes

          Show
          Gary Tully added a comment - @Sergey - can you make a junit test case that can reproduce - peek at the activemq-camel module if you need to use camel routes
          Hide
          Sergey added a comment -

          I've just created AMQ-5576, and where is a route test case.

          Just deploy this route, and send 1 message into inbound queue using web console, it should be enough to reproduce some of issues with counters/sizes.

          Show
          Sergey added a comment - I've just created AMQ-5576 , and where is a route test case. Just deploy this route, and send 1 message into inbound queue using web console, it should be enough to reproduce some of issues with counters/sizes.

            People

            • Assignee:
              Rob Davies
              Reporter:
              Vadim Chekan
            • Votes:
              5 Vote for this issue
              Watchers:
              11 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development