Uploaded image for project: 'ActiveMQ'
  1. ActiveMQ
  2. AMQ-2048

Messages are read from physical queue of virtual topic but not removed. "Messages Received" count remains zero.

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Not A Problem
    • Affects Version/s: 5.1.0
    • Fix Version/s: 5.2.0
    • Component/s: Broker
    • Labels:
      None
    • Environment:

      Microsoft Windows Server 2003 - Standard x64 Edition - Service Pack 2
      Intel Xeon 1.60 GHz, 8 GB RAM
      ActiveMQ 5.1.0 JMS Message Broker

      Description

      Virtual Topic: VirtualTopic.AckTest
      Physical Queue of Virtual Topic: Consumer.TestQueueAcknowledgement.VirtualTopic.AckTest

      Consumer is connected to Openwire TCP connector and I am posting messages directly to the virtual topic.

      Even if message.acknowledge() is called, messages are read but not removed from queue. "Messages Received" count remains at zero. Is this the expected behavior while using Virtual Topics?

      Test Consumer:

      import java.util.Properties;

      import javax.jms.Message;
      import javax.jms.MessageListener;
      import javax.jms.QueueConnection;
      import javax.jms.QueueSession;
      import javax.jms.TextMessage;
      import javax.naming.InitialContext;

      public class TestQueueAcknowledgement {

      private QueueConnection conn;
      private QueueSession session;

      public void listen() {
      try {
      Properties props = new Properties();
      props.put("java.naming.factory.initial",
      "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
      props.put("java.naming.provider.url",
      "tcp://localhost:61616");
      props.put("queue.queueName",
      "Consumer.TestQueueAcknowledgement.VirtualTopic.AckTest");

      javax.naming.Context ctx = new InitialContext(props);
      javax.jms.QueueConnectionFactory factory
      = (javax.jms.QueueConnectionFactory)
      ctx.lookup("ConnectionFactory");
      conn = factory.createQueueConnection();
      final javax.jms.Queue queue
      = (javax.jms.Queue) ctx.lookup("queueName");
      session = conn.createQueueSession(false,
      QueueSession.AUTO_ACKNOWLEDGE);
      javax.jms.QueueReceiver receiver = session.createReceiver(queue);

      receiver.setMessageListener(new MessageListener() {
      public void onMessage(Message message) {
      try {
      if (message instanceof TextMessage)

      { TextMessage txtMsg = (TextMessage) message; String msg = txtMsg.getText(); System.out.println(msg); }

      message.acknowledge();
      } catch (Exception e)

      { e.printStackTrace(); }

      }
      });
      conn.start();
      } catch (Exception e)

      { e.printStackTrace(); }

      }

      protected void finalize() throws Throwable {
      if (session != null)

      { session.close(); }

      }

      public static void main(String[] args)

      { TestQueueAcknowledgement ack = new TestQueueAcknowledgement(); ack.listen(); }

      }

        Attachments

        1. TestQueueAcknowledgement.java
          2 kB
          Jagath Vijayan Janakiraman

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              jagathvijayan Jagath Vijayan Janakiraman
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: