ActiveMQ
  1. ActiveMQ
  2. AMQ-1600

Number of items in Topic queue never empties out (Queue Size in JMX for every topic seems to be > 0 all the time)

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Critical Critical
    • Resolution: Fixed
    • Affects Version/s: 5.1.0
    • Fix Version/s: 5.3.0
    • Component/s: Broker
    • Labels:
      None
    • Environment:

      Linux

      Description

      If I create a topic and then have a producer write to the topic (non-persistent message) the items in the queueSize in JMX appears to be the total messages that I sent. Even if I start the consumer the number of items on the queue never goes down. If I run the same code in 4.1.0 the queue size is always 0 which is the expected behavior – something changed in 5.1... Will try on 5.0 in a few minutes.

      1. untitled.JPG
        40 kB
        Denis Abramov

        Issue Links

          Activity

          Hide
          Denis Abramov added a comment -

          Not really doing anything fancy to have this issue happen... just calling send many times: persistent = false and looking at JConsole...

          /*********************************************************************

          • Create a connection to the messaging system
          • @param jms_url
          • @param intf
          • @param is_durable_subscriber
          • @throws Exception
            *********************************************************************/
            public void createConnection(String jms_url, MessageManagerIntf intf, boolean is_durable_subscriber) throws Exception {
            _jms_url = jms_url;
            _is_durable_subscriber = is_durable_subscriber;
            _msg_wrapper_intf = intf;

          log.info("");
          log.info("");
          log.info("Creating Connection to: " + jms_url);
          log.info("");
          String user = ActiveMQConnection.DEFAULT_USER;
          String password = ActiveMQConnection.DEFAULT_PASSWORD;

          //jms_url = addJMSOptions(jms_url);
          ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, jms_url);
          connectionFactory.setDispatchAsync(true);
          connectionFactory.setUseAsyncSend(true);

          _connection = connectionFactory.createConnection();
          _connection.setExceptionListener(this);

          if (is_durable_subscriber)
          _connection.setClientID(_msg_wrapper_intf.getClass().getSimpleName());
          else

          { SimpleDateFormat f = new SimpleDateFormat("M/d/yyyy h_mm_ss a"); Calendar cal = Calendar.getInstance(); _connection.setClientID(_msg_wrapper_intf.getClass().getSimpleName().toLowerCase() + "@" + InetAddress.getLocalHost().getHostName().toUpperCase() + "-" + f.format(cal.getTime()) + "--" + cal.getTimeInMillis()); }

          log.info("JMS VERSION: " + _connection.getMetaData().getJMSVersion());
          log.info("JMS PROVIDER: " + _connection.getMetaData().getJMSProviderName());
          log.info("JMS PROVIDER VERSION: " + _connection.getMetaData().getProviderVersion());
          log.info("CLIENT ID: " + _connection.getClientID());

          _connection.start();
          _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
          }

          public void send(String name, DESTINATION_TYPE destination_type, Destination reply_to, MessageIntf wrapper, boolean persistent) throws Exception

          { Destination dest = createDestination(name, destination_type); MessageProducer publisher = getProducer(dest, persistent); //wrapper.generateXMLMessage() just generates some XML message. TextMessage msg = _session.createTextMessage(wrapper.generateXMLMessage()); msg.setJMSReplyTo(reply_to); log.debug(this.getClass().getName() + ">>>>>> SENDING MESSAGE TO '" + dest + "': " + wrapper.generateXMLMessage()); publisher.send(msg); }

          public MessageProducer getProducer(Destination dest, boolean persistent) throws Exception

          { //just a little optimization so I don't have too many producing running amock. Have 1 producer per destination if (_producer_cache.containsKey(dest.toString())) return _producer_cache.get(dest.toString()); MessageProducer publisher = _session.createProducer(dest); if(persistent) publisher.setDeliveryMode(DeliveryMode.PERSISTENT); else publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT); _producer_cache.put(dest.toString(), publisher); return publisher; }

          /********************************************************************

          • Create a topic
          • @param name
          • @return
          • @throws JMSException
          • *******************************************************************/
            public Destination createDestination(String name, DESTINATION_TYPE destination) throws JMSException

            Unknown macro: { switch (destination) { case TOPIC: return _session.createTopic(name); case QUEUE: return _session.createQueue(name); default: return null; } }

          Show
          Denis Abramov added a comment - Not really doing anything fancy to have this issue happen... just calling send many times: persistent = false and looking at JConsole... /********************************************************************* Create a connection to the messaging system @param jms_url @param intf @param is_durable_subscriber @throws Exception *********************************************************************/ public void createConnection(String jms_url, MessageManagerIntf intf, boolean is_durable_subscriber) throws Exception { _jms_url = jms_url; _is_durable_subscriber = is_durable_subscriber; _msg_wrapper_intf = intf; log.info(""); log.info(""); log.info("Creating Connection to: " + jms_url); log.info(""); String user = ActiveMQConnection.DEFAULT_USER; String password = ActiveMQConnection.DEFAULT_PASSWORD; //jms_url = addJMSOptions(jms_url); ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, jms_url); connectionFactory.setDispatchAsync(true); connectionFactory.setUseAsyncSend(true); _connection = connectionFactory.createConnection(); _connection.setExceptionListener(this); if (is_durable_subscriber) _connection.setClientID(_msg_wrapper_intf.getClass().getSimpleName()); else { SimpleDateFormat f = new SimpleDateFormat("M/d/yyyy h_mm_ss a"); Calendar cal = Calendar.getInstance(); _connection.setClientID(_msg_wrapper_intf.getClass().getSimpleName().toLowerCase() + "@" + InetAddress.getLocalHost().getHostName().toUpperCase() + "-" + f.format(cal.getTime()) + "--" + cal.getTimeInMillis()); } log.info("JMS VERSION: " + _connection.getMetaData().getJMSVersion()); log.info("JMS PROVIDER: " + _connection.getMetaData().getJMSProviderName()); log.info("JMS PROVIDER VERSION: " + _connection.getMetaData().getProviderVersion()); log.info("CLIENT ID: " + _connection.getClientID()); _connection.start(); _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } public void send(String name, DESTINATION_TYPE destination_type, Destination reply_to, MessageIntf wrapper, boolean persistent) throws Exception { Destination dest = createDestination(name, destination_type); MessageProducer publisher = getProducer(dest, persistent); //wrapper.generateXMLMessage() just generates some XML message. TextMessage msg = _session.createTextMessage(wrapper.generateXMLMessage()); msg.setJMSReplyTo(reply_to); log.debug(this.getClass().getName() + ">>>>>> SENDING MESSAGE TO '" + dest + "': " + wrapper.generateXMLMessage()); publisher.send(msg); } public MessageProducer getProducer(Destination dest, boolean persistent) throws Exception { //just a little optimization so I don't have too many producing running amock. Have 1 producer per destination if (_producer_cache.containsKey(dest.toString())) return _producer_cache.get(dest.toString()); MessageProducer publisher = _session.createProducer(dest); if(persistent) publisher.setDeliveryMode(DeliveryMode.PERSISTENT); else publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT); _producer_cache.put(dest.toString(), publisher); return publisher; } /******************************************************************** Create a topic @param name @return @throws JMSException *******************************************************************/ public Destination createDestination(String name, DESTINATION_TYPE destination) throws JMSException Unknown macro: { switch (destination) { case TOPIC: return _session.createTopic(name); case QUEUE: return _session.createQueue(name); default: return null; } }
          Hide
          Denis Abramov added a comment -

          running ActiveMQ 5.1.0:

          How can Dequeuecount == Enqueuecount == Dispatchcount and QueueSize != 0?

          Show
          Denis Abramov added a comment - running ActiveMQ 5.1.0: How can Dequeuecount == Enqueuecount == Dispatchcount and QueueSize != 0?
          Hide
          Denis Abramov added a comment -

          Doesn't seem this problem exists in 5.0.0

          Show
          Denis Abramov added a comment - Doesn't seem this problem exists in 5.0.0

            People

            • Assignee:
              Rob Davies
              Reporter:
              Denis Abramov
            • Votes:
              4 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development