Uploaded image for project: 'ActiveMQ'
  1. ActiveMQ
  2. AMQ-2048

Messages are read from physical queue of virtual topic but not removed. "Messages Received" count remains zero.

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Not A Problem
    • 5.1.0
    • 5.2.0
    • Broker
    • None
    • Microsoft Windows Server 2003 - Standard x64 Edition - Service Pack 2
      Intel Xeon 1.60 GHz, 8 GB RAM
      ActiveMQ 5.1.0 JMS Message Broker

    Description

      Virtual Topic: VirtualTopic.AckTest
      Physical Queue of Virtual Topic: Consumer.TestQueueAcknowledgement.VirtualTopic.AckTest

      Consumer is connected to Openwire TCP connector and I am posting messages directly to the virtual topic.

      Even if message.acknowledge() is called, messages are read but not removed from queue. "Messages Received" count remains at zero. Is this the expected behavior while using Virtual Topics?

      Test Consumer:

      import java.util.Properties;

      import javax.jms.Message;
      import javax.jms.MessageListener;
      import javax.jms.QueueConnection;
      import javax.jms.QueueSession;
      import javax.jms.TextMessage;
      import javax.naming.InitialContext;

      public class TestQueueAcknowledgement {

      private QueueConnection conn;
      private QueueSession session;

      public void listen() {
      try {
      Properties props = new Properties();
      props.put("java.naming.factory.initial",
      "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
      props.put("java.naming.provider.url",
      "tcp://localhost:61616");
      props.put("queue.queueName",
      "Consumer.TestQueueAcknowledgement.VirtualTopic.AckTest");

      javax.naming.Context ctx = new InitialContext(props);
      javax.jms.QueueConnectionFactory factory
      = (javax.jms.QueueConnectionFactory)
      ctx.lookup("ConnectionFactory");
      conn = factory.createQueueConnection();
      final javax.jms.Queue queue
      = (javax.jms.Queue) ctx.lookup("queueName");
      session = conn.createQueueSession(false,
      QueueSession.AUTO_ACKNOWLEDGE);
      javax.jms.QueueReceiver receiver = session.createReceiver(queue);

      receiver.setMessageListener(new MessageListener() {
      public void onMessage(Message message) {
      try {
      if (message instanceof TextMessage)

      { TextMessage txtMsg = (TextMessage) message; String msg = txtMsg.getText(); System.out.println(msg); }

      message.acknowledge();
      } catch (Exception e)

      { e.printStackTrace(); }

      }
      });
      conn.start();
      } catch (Exception e)

      { e.printStackTrace(); }

      }

      protected void finalize() throws Throwable {
      if (session != null)

      { session.close(); }

      }

      public static void main(String[] args)

      { TestQueueAcknowledgement ack = new TestQueueAcknowledgement(); ack.listen(); }

      }

      Attachments

        1. TestQueueAcknowledgement.java
          2 kB
          Jagath Vijayan Janakiraman

        Activity

          People

            Unassigned Unassigned
            jagathvijayan Jagath Vijayan Janakiraman
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: