Uploaded image for project: 'ActiveMQ'
  1. ActiveMQ
  2. AMQ-5107

In-flight queue message redelivered to multiple listeners upon broker shutdown

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 5.9.0
    • Fix Version/s: 5.10.0
    • Component/s: Transport
    • Labels:
      None
    • Environment:

      Windows 7 64Bit - Java "1.6.0_20"
      CentOS 6.0 - Java "1.7.0_09-icedtea"

      Description

      To reproduce:

      1) Start 3 or more listener processes (see listener code below)

      2) Run producer to push one message on queue (see producer code below)

      3) One of the listeners will pick-up the message and sleep for one minute before auto acknowledging the message

      4) Start a shutdown sequence of the broker within the 60 second window (Ctrl-C or issue Terminate jvm(int) command from Hawtio console)

      5) All other idle listeners should get the same message redelivered simultaneously, each one having deliveryCount incremented

      Listener code:
      --------------

      package com.test;
      import javax.jms.Connection;
      import javax.jms.Destination;
      import javax.jms.Message;
      import javax.jms.MessageConsumer;
      import javax.jms.MessageListener;
      import javax.jms.Session;
      import javax.jms.TextMessage;
      import org.apache.activemq.ActiveMQConnectionFactory;

      public class TestListener {
      public static void main(String[] args) {
      try {
      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
      Connection connection = connectionFactory.createConnection();
      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
      Destination destination = session.createQueue("TEST.QUEUE");
      MessageConsumer consumer = session.createConsumer(destination);

      consumer.setMessageListener(new MessageListener() {
      public void onMessage(Message message) {
      try

      { TextMessage textMessage = (TextMessage) message; System.out.print("\nReceived " + textMessage.getText()); System.out.print(", Redelivery: " + message.getJMSRedelivered()); System.out.print(", Count: " + message.getLongProperty("JMSXDeliveryCount")); Thread.sleep(60000); System.out.print("... finished after sleep"); }

      catch (Exception e)

      { e.printStackTrace(); }

      }
      });

      connection.start();
      } catch (Exception e)

      { e.printStackTrace(); }
      }

      public TestListener() { super(); }
      }


      Producer code:
      --------------

      package com.test;
      import java.util.Date;
      import javax.jms.Connection;
      import javax.jms.Destination;
      import javax.jms.MessageProducer;
      import javax.jms.Session;
      import javax.jms.TextMessage;

      import org.apache.activemq.ActiveMQConnectionFactory;

      public class TestProducer {
      public static void main(String[] args) {
      try { thread(new HelloWorldProducer(), false); } catch (Exception e) { e.printStackTrace(); }

      }

      public static class HelloWorldProducer implements Runnable {
      public void run() {
      try

      { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("TEST.QUEUE"); MessageProducer producer = session.createProducer(destination); String text = "test message created on " + new Date(); TextMessage message = session.createTextMessage(text); System.out.println("Sent " + text); producer.send(message); session.close(); connection.close(); }

      catch (Exception e)

      { e.printStackTrace(); }

      }
      public HelloWorldProducer() {}
      }

      public static void thread(Runnable runnable, boolean daemon)

      { Thread brokerThread = new Thread(runnable); brokerThread.setDaemon(daemon); brokerThread.start(); }

      public TestProducer()

      { super(); }

      }

        Attachments

          Activity

            People

            • Assignee:
              artnaseef Arthur Naseef
              Reporter:
              gregg Greg Garlak
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: