ActiveMQ
  1. ActiveMQ
  2. AMQ-3809

Simple ActiveMQ consumer dies on failover

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Duplicate
    • Affects Version/s: 5.5.1
    • Fix Version/s: 5.6.0
    • Component/s: Broker, Transport
    • Labels:
      None
    • Environment:

      solaris, linux

      Description

      I have a simple consumer application which connects to the broker using failover url. The consumer uses Message listener to asynchronously listen to messages. Whenever i kill my primary broker, I get an EOF exception(which is expected), the consumer tries to connect to failover broker but before it could finish the reconnect, it shuts down.
      My simple producer connecting with the same failover url never dies and successfully connects to failover broker.

      What is going on here?

      Posting the snippet with which the consumer dies:-

      2012-04-17 06:45:25,771 WARN org.apache.activemq.transport.failover.FailoverTransport - Transport (host1/10.240.170.28:61616) failed to tcp://host1:61616 , attempting to automatically reconnect due to: java.io.EOFException
      2012-04-17 06:45:25,772 DEBUG org.apache.activemq.transport.failover.FailoverTransport - Transport failed with the following exception:
      java.io.EOFException
      at java.io.DataInputStream.readInt(DataInputStream.java:392)
      at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:269)
      at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:227)
      at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:219)
      at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:202)
      at java.lang.Thread.run(Thread.java:722)
      2012-04-17 06:45:25,774 DEBUG org.apache.activemq.transport.failover.FailoverTransport - urlList connectionList:[tcp://host2:61616, tcp://host1:61616], from: [tcp://host1:61616, tcp://host2:61616]
      2012-04-17 06:45:25,774 DEBUG org.apache.activemq.transport.failover.FailoverTransport - Attempting connect to: tcp://host2:61616
      20120417 06:45:25.776 EDT (Shutdown-ActiveMQConsumer) ActiveMQConsumer#stop INFO Stopping the consumer...
      2012-04-17 06:45:26,242 DEBUG org.apache.activemq.transport.failover.FailoverTransport - Stopped.

        Activity

        Hide
        Timothy Bish added a comment -

        This was resolved by the fix for AMQ-796

        Show
        Timothy Bish added a comment - This was resolved by the fix for AMQ-796
        Hide
        Timothy Bish added a comment -

        Rob resolved AMQ-796 so you may want to give the latest SNAPSHOT build a go and mark this as resolved if your code is working as expect now.

        Show
        Timothy Bish added a comment - Rob resolved AMQ-796 so you may want to give the latest SNAPSHOT build a go and mark this as resolved if your code is working as expect now.
        Hide
        Bhanu added a comment -

        Yeah, It seems quiet weird to explicitly maintain a non-daemon thread. From what I understood is in my app the only non-daemon thread was the ActiveMQ Transport thread. When the broker dies, the client Transport thread dies with an EOFException and the failover protocol tries to start a new Transport(non-daemon) thread to initiate connection with the failover broker. In this small window since no thread was non-daemon, the client application exits.

        One shot at AMQ-796 can be something like maintaining a non-daemon failover thread apart from the regular ActiveMQ Transport thread. This failover thread should perform the reconnection and subsequent failover ActivMQ Transport thread spawning.

        Thanks a lot guys !!

        Show
        Bhanu added a comment - Yeah, It seems quiet weird to explicitly maintain a non-daemon thread. From what I understood is in my app the only non-daemon thread was the ActiveMQ Transport thread. When the broker dies, the client Transport thread dies with an EOFException and the failover protocol tries to start a new Transport(non-daemon) thread to initiate connection with the failover broker. In this small window since no thread was non-daemon, the client application exits. One shot at AMQ-796 can be something like maintaining a non-daemon failover thread apart from the regular ActiveMQ Transport thread. This failover thread should perform the reconnection and subsequent failover ActivMQ Transport thread spawning. Thanks a lot guys !!
        Hide
        Rob Davies added a comment -

        I think we should fix AMQ-796

        Show
        Rob Davies added a comment - I think we should fix AMQ-796
        Hide
        Timothy Bish added a comment -

        There doesn't appear to be anything in main preventing the Process from going down, so you are seeing a shutdown related to: AMQ-796

        Show
        Timothy Bish added a comment - There doesn't appear to be anything in main preventing the Process from going down, so you are seeing a shutdown related to: AMQ-796
        Hide
        Bhanu added a comment -

        Woah ! Something messed up the indentation quiet badly !!

        Show
        Bhanu added a comment - Woah ! Something messed up the indentation quiet badly !!
        Hide
        Bhanu added a comment -

        Here is a bare-bone implementation of my app:-

        public class ActiveMQConsumer implements MessageListener
        {
        private Connection myConnection;

        private Session mySession;

        private void start()
        {
        try

        { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, ourBrokerUrl); myConnection = connectionFactory.createConnection(); myConnection.start(); mySession = myConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = mySession.createConsumer(mySession.createTopic("test_topic")); consumer.setMessageListener(this); }

        catch (JMSException e)

        { throw new RuntimeException(e); }


        }

        private void stop()

        { myConnection.close(); }

        public void onMessage(Message message)

        { // Do Something }

        public static void main(final String args[])
        {
        TerminatingThreadGroup.runInTerminatingThreadGroup(new Runnable() {
        public void run()
        {
        final ActiveMQConsumer consumer = new ActiveMQConsumer();

        ShutdownTasks.addShutdownHook(new Runnable() {
        @Override
        public void run()

        { consumer.stop(); }

        }, "Shutdown-ActiveMQConsumer");

        consumer.start();
        }
        });
        }
        }

        What I might be doing wrong here ??

        Show
        Bhanu added a comment - Here is a bare-bone implementation of my app:- public class ActiveMQConsumer implements MessageListener { private Connection myConnection; private Session mySession; private void start() { try { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, ourBrokerUrl); myConnection = connectionFactory.createConnection(); myConnection.start(); mySession = myConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = mySession.createConsumer(mySession.createTopic("test_topic")); consumer.setMessageListener(this); } catch (JMSException e) { throw new RuntimeException(e); } } private void stop() { myConnection.close(); } public void onMessage(Message message) { // Do Something } public static void main(final String args[]) { TerminatingThreadGroup.runInTerminatingThreadGroup(new Runnable() { public void run() { final ActiveMQConsumer consumer = new ActiveMQConsumer(); ShutdownTasks.addShutdownHook(new Runnable() { @Override public void run() { consumer.stop(); } }, "Shutdown-ActiveMQConsumer"); consumer.start(); } }); } } What I might be doing wrong here ??
        Hide
        Timothy Bish added a comment -

        Can you post a complete sample app that demonstrates your consumer code, can't tell from this snippet what your client is doing.

        Show
        Timothy Bish added a comment - Can you post a complete sample app that demonstrates your consumer code, can't tell from this snippet what your client is doing.

          People

          • Assignee:
            Unassigned
            Reporter:
            Bhanu
          • Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development