ActiveMQ
  1. ActiveMQ
  2. AMQ-1940

Negative queue size (reproducible)

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Critical Critical
    • Resolution: Fixed
    • Affects Version/s: 5.2.0
    • Fix Version/s: 5.3.0
    • Component/s: Broker
    • Labels:
      None
    • Environment:

      Found on Windows but reproduced under Linux

    • Regression:
      Regression

      Description

      When you "purge" queue from web admin console, it zeroes queue message
      counter. But if you have an active consumer at that time which
      pre-fetched messages than your consumer will keep sending ack as it
      process messages from its buffer. ActiveMQ will keep decrement counter
      upon receiving each ack. So when consumer is done queue will show
      MINUS<consumer buffer size>.

      1. Main.java
        4 kB
        Vadim Chekan
      2. QueuePurgeTest.java.diff.txt
        4 kB
        Bruce Snyder
      3. Picture 6.png
        121 kB
        Brian Moran

        Issue Links

          Activity

          Vadim Chekan created issue -
          Hide
          Vadim Chekan added a comment -

          Attached program reproduces a queue with -100 messages.

          Show
          Vadim Chekan added a comment - Attached program reproduces a queue with -100 messages.
          Vadim Chekan made changes -
          Field Original Value New Value
          Attachment Main.java [ 16997 ]
          Hide
          Bruce Snyder added a comment -

          I've created the following test case out of the attached Main.java class:

          package org.apache.activemq.broker.region;
          
          import javax.jms.Connection;
          import javax.jms.ConnectionFactory;
          import javax.jms.JMSException;
          import javax.jms.Message;
          import javax.jms.MessageConsumer;
          import javax.jms.MessageProducer;
          import javax.jms.Queue;
          import javax.jms.Session;
          import javax.jms.TextMessage;
          import javax.management.MBeanServerInvocationHandler;
          import javax.management.MalformedObjectNameException;
          import javax.management.ObjectName;
          
          import junit.framework.TestCase;
          
          import org.apache.activemq.ActiveMQConnectionFactory;
          import org.apache.activemq.broker.BrokerService;
          import org.apache.activemq.broker.jmx.QueueViewMBean;
          
          public class QueuePurgeTest extends TestCase {
              
              BrokerService broker; 
              ConnectionFactory factory; 
              Connection connection;
              Session session;
              Queue queue;
              MessageConsumer consumer;
          
              protected void setUp() throws Exception {
                  broker = new BrokerService();
                  broker.setUseJmx(true);
                  broker.setPersistent(false);
                  broker.addConnector("tcp://localhost:0");
                  broker.start();
                  
                  factory = new ActiveMQConnectionFactory("vm://localhost");
                  
                  connection = factory.createConnection();
                  connection.start();
              }
          
              protected void tearDown() throws Exception {
                  consumer.close();
                  session.close();
                  connection.stop();
                  connection.close();
                  broker.stop();
              }
              
              public void testPurgeQueueWithActiveConsumer() throws Exception {
                  createProducerAndSendMessages();        
                  QueueViewMBean proxy = getProxyToQueueViewMBean();
                  createConsumer();
                  proxy.purge();
                  assertEquals("Queue size is not zero, it's " + proxy.getQueueSize(), 0, proxy.getQueueSize());
              }
          
              private QueueViewMBean getProxyToQueueViewMBean()
                      throws MalformedObjectNameException, JMSException {
                  ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq" + ":Type=Queue,Destination=" + 
                          queue.getQueueName() + ",BrokerName=localhost");
                  QueueViewMBean proxy = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(
                          broker.getManagementContext().getMBeanServer(), 
                          queueViewMBeanName, QueueViewMBean.class, true);
                   
                   return proxy;
              }
              
              private void createProducerAndSendMessages() throws Exception {
                  session =  connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
                  queue = session.createQueue("test1");
                  MessageProducer producer = session.createProducer(queue);
                  for(int i=0; i<10000; i++) {
                      TextMessage message = session.createTextMessage("message "+i);
                      producer.send(message);
                  }
                  producer.close();
              }
              
              private void createConsumer() throws Exception {
                  consumer = session.createConsumer(queue);
                  // wait for buffer fill out
                  Thread.sleep(5*1000);
                  
                  for(int i = 0; i < 100; ++i) {
                      Message message = consumer.receive();
                      message.acknowledge();
                  }
              }
          
          }
          

          What I'm finding is that failure is intermittent, but I am able to see it once in a while:

          junit.framework.AssertionFailedError: Queue size is not zero expected:<0> but was:<-60>
          at junit.framework.Assert.fail(Assert.java:47)
          at junit.framework.Assert.failNotEquals(Assert.java:282)
          at junit.framework.Assert.assertEquals(Assert.java:64)
          at junit.framework.Assert.assertEquals(Assert.java:136)
          at org.apache.activemq.broker.region.QueuePurgeTest.testPurgeQueueWithActiveConsumer(QueuePurgeTest.java:56)
          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
          at java.lang.reflect.Method.invoke(Method.java:585)
          at junit.framework.TestCase.runTest(TestCase.java:154)
          at junit.framework.TestCase.runBare(TestCase.java:127)
          at junit.framework.TestResult$1.protect(TestResult.java:106)
          at junit.framework.TestResult.runProtected(TestResult.java:124)
          at junit.framework.TestResult.run(TestResult.java:109)
          at junit.framework.TestCase.run(TestCase.java:118)
          at junit.framework.TestSuite.runTest(TestSuite.java:208)
          at junit.framework.TestSuite.run(TestSuite.java:203)
          at org.eclipse.jdt.internal.junit.runner.junit3.JUnit3TestReference.run(JUnit3TestReference.java:130)
          at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
          at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:460)
          at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:673)
          at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:386)
          at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:196)

          Show
          Bruce Snyder added a comment - I've created the following test case out of the attached Main.java class: package org.apache.activemq.broker.region; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import javax.management.MBeanServerInvocationHandler; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; import junit.framework.TestCase; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.jmx.QueueViewMBean; public class QueuePurgeTest extends TestCase { BrokerService broker; ConnectionFactory factory; Connection connection; Session session; Queue queue; MessageConsumer consumer; protected void setUp() throws Exception { broker = new BrokerService(); broker.setUseJmx( true ); broker.setPersistent( false ); broker.addConnector( "tcp: //localhost:0" ); broker.start(); factory = new ActiveMQConnectionFactory( "vm: //localhost" ); connection = factory.createConnection(); connection.start(); } protected void tearDown() throws Exception { consumer.close(); session.close(); connection.stop(); connection.close(); broker.stop(); } public void testPurgeQueueWithActiveConsumer() throws Exception { createProducerAndSendMessages(); QueueViewMBean proxy = getProxyToQueueViewMBean(); createConsumer(); proxy.purge(); assertEquals( "Queue size is not zero, it's " + proxy.getQueueSize(), 0, proxy.getQueueSize()); } private QueueViewMBean getProxyToQueueViewMBean() throws MalformedObjectNameException, JMSException { ObjectName queueViewMBeanName = new ObjectName( "org.apache.activemq" + ":Type=Queue,Destination=" + queue.getQueueName() + ",BrokerName=localhost" ); QueueViewMBean proxy = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance( broker.getManagementContext().getMBeanServer(), queueViewMBeanName, QueueViewMBean.class, true ); return proxy; } private void createProducerAndSendMessages() throws Exception { session = connection.createSession( false , Session.CLIENT_ACKNOWLEDGE); queue = session.createQueue( "test1" ); MessageProducer producer = session.createProducer(queue); for ( int i=0; i<10000; i++) { TextMessage message = session.createTextMessage( "message " +i); producer.send(message); } producer.close(); } private void createConsumer() throws Exception { consumer = session.createConsumer(queue); // wait for buffer fill out Thread .sleep(5*1000); for ( int i = 0; i < 100; ++i) { Message message = consumer.receive(); message.acknowledge(); } } } What I'm finding is that failure is intermittent, but I am able to see it once in a while: junit.framework.AssertionFailedError: Queue size is not zero expected:<0> but was:<-60> at junit.framework.Assert.fail(Assert.java:47) at junit.framework.Assert.failNotEquals(Assert.java:282) at junit.framework.Assert.assertEquals(Assert.java:64) at junit.framework.Assert.assertEquals(Assert.java:136) at org.apache.activemq.broker.region.QueuePurgeTest.testPurgeQueueWithActiveConsumer(QueuePurgeTest.java:56) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:585) at junit.framework.TestCase.runTest(TestCase.java:154) at junit.framework.TestCase.runBare(TestCase.java:127) at junit.framework.TestResult$1.protect(TestResult.java:106) at junit.framework.TestResult.runProtected(TestResult.java:124) at junit.framework.TestResult.run(TestResult.java:109) at junit.framework.TestCase.run(TestCase.java:118) at junit.framework.TestSuite.runTest(TestSuite.java:208) at junit.framework.TestSuite.run(TestSuite.java:203) at org.eclipse.jdt.internal.junit.runner.junit3.JUnit3TestReference.run(JUnit3TestReference.java:130) at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:460) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:673) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:386) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:196)
          Rob Davies made changes -
          Assignee Rob Davies [ rajdavies ]
          Bruce Snyder made changes -
          Attachment QueuePurgeTest.java.diff.txt [ 16998 ]
          Hide
          Vadim Chekan added a comment -

          Perhaps it can be made more reproducible if purge is run in a thread parallel to the consumer. I'll try to rewrite it later tonight.

          Show
          Vadim Chekan added a comment - Perhaps it can be made more reproducible if purge is run in a thread parallel to the consumer. I'll try to rewrite it later tonight.
          Hide
          Rob Davies added a comment -

          Fixed by SVN revision 697921

          Show
          Rob Davies added a comment - Fixed by SVN revision 697921
          Rob Davies made changes -
          Status Open [ 1 ] Resolved [ 5 ]
          Resolution Fixed [ 1 ]
          Fix Version/s 5.3.0 [ 11914 ]
          Gary Tully made changes -
          Fix Version/s 5.2.0 [ 11841 ]
          Fix Version/s 5.3.0 [ 11914 ]
          Gordon Hopper made changes -
          Link This issue is related to AMQ-1693 [ AMQ-1693 ]
          Hide
          Gordon Hopper added a comment -

          1693 displays the same symptom of negative message counts.

          Show
          Gordon Hopper added a comment - 1693 displays the same symptom of negative message counts.
          Hide
          Brian Moran added a comment -

          I can reproduce this in 5.2.0.
          I am going to attach a screenshot.

          Show
          Brian Moran added a comment - I can reproduce this in 5.2.0. I am going to attach a screenshot.
          Brian Moran made changes -
          Regression [Regression]
          Resolution Fixed [ 1 ]
          Status Resolved [ 5 ] Reopened [ 4 ]
          Hide
          Brian Moran added a comment -

          Screenshot of negative messages, reproducible in 5.2.0 after purging the message queue, STOMP clients.

          Show
          Brian Moran added a comment - Screenshot of negative messages, reproducible in 5.2.0 after purging the message queue, STOMP clients.
          Brian Moran made changes -
          Attachment Picture 6.png [ 17757 ]
          Brian Moran made changes -
          Link This issue is duplicated by AMQ-1693 [ AMQ-1693 ]
          Hide
          Loc Truong added a comment -

          I can reproduct this in 5.2.0. This happens to me because there are too many receivers, each of them try to recevie 500 messages but the total messages in queue is smaller, hence negative number.

          Show
          Loc Truong added a comment - I can reproduct this in 5.2.0. This happens to me because there are too many receivers, each of them try to recevie 500 messages but the total messages in queue is smaller, hence negative number.
          Hide
          Yuri Ushakov added a comment -

          We have the same problem with 5.2.0. A queue, single consumer, single producer. We have -1 pending now, 1616 sent, 1617 received.

          Show
          Yuri Ushakov added a comment - We have the same problem with 5.2.0. A queue, single consumer, single producer. We have -1 pending now, 1616 sent, 1617 received.
          Hide
          Loc Truong added a comment -

          I can reproduce this on 5.2.0 too. I have 7 consumers getting 150 messages each. The negative number I receive is -123 at lowest.

          Show
          Loc Truong added a comment - I can reproduce this on 5.2.0 too. I have 7 consumers getting 150 messages each. The negative number I receive is -123 at lowest.
          Hide
          Ivan Dubrov added a comment -

          I had the same issue today.

          We are running two AMQ brokers in network of brokers, each one talking to the other one (the duplex is set to false). Under a high load after few hours I've noted that one of our queues started growing on both nodes, several thousand of pending messages in about 20 minutes (and the cursor memory usage was quite large). After I accidentally stopped the second node, the network bridge stopped on the first node and the queue size on the first node immediately became "-8".

          So in my case this probably somehow related to the instability of network of brokers implementation, because if I disable the network connector the system runs stable for several days under the load.

          Show
          Ivan Dubrov added a comment - I had the same issue today. We are running two AMQ brokers in network of brokers, each one talking to the other one (the duplex is set to false). Under a high load after few hours I've noted that one of our queues started growing on both nodes, several thousand of pending messages in about 20 minutes (and the cursor memory usage was quite large). After I accidentally stopped the second node, the network bridge stopped on the first node and the queue size on the first node immediately became "-8". So in my case this probably somehow related to the instability of network of brokers implementation, because if I disable the network connector the system runs stable for several days under the load.
          Hide
          Rob Davies added a comment -

          Fixed for the 5.3

          Show
          Rob Davies added a comment - Fixed for the 5.3
          Rob Davies made changes -
          Fix Version/s 5.3.0 [ 11914 ]
          Status Reopened [ 4 ] Resolved [ 5 ]
          Fix Version/s 5.2.0 [ 11841 ]
          Resolution Fixed [ 1 ]
          Jeff Turner made changes -
          Project Import Fri Nov 26 22:32:02 EST 2010 [ 1290828722158 ]
          Hide
          Pavel added a comment -

          We seen such issue on 5.6.0 ActiveMQ.

          Show
          Pavel added a comment - We seen such issue on 5.6.0 ActiveMQ.

            People

            • Assignee:
              Rob Davies
              Reporter:
              Vadim Chekan
            • Votes:
              5 Vote for this issue
              Watchers:
              9 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development