Uploaded image for project: 'ActiveMQ'
  1. ActiveMQ
  2. AMQ-5150

ActiveMQ failover seems not to work in 5.9.1 on MacOSX

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Not A Problem
    • Affects Version/s: 5.9.1
    • Fix Version/s: None
    • Component/s: Connector
    • Labels:
      None
    • Environment:

      MacOSx

      Description

      I have super simple scenario: one broker and one consumer with durable subscription.
      This is the code of my consumer app:

      package test;

      import javax.jms.Connection;
      import javax.jms.ConnectionFactory;
      import javax.jms.Destination;
      import javax.jms.JMSException;
      import javax.jms.Message;
      import javax.jms.MessageConsumer;
      import javax.jms.MessageListener;
      import javax.jms.Session;
      import javax.jms.TextMessage;
      import javax.jms.Topic;

      import org.apache.activemq.ActiveMQConnectionFactory;

      import pojo.Event;
      import pojo.StockUpdate;

      public class Consumer
      {

      private static transient ConnectionFactory factory;
      private transient Connection connection;
      private transient Session session;
      public static int counter = 0;

      public Consumer(String brokerURL) throws JMSException

      { factory = new ActiveMQConnectionFactory(brokerURL); connection = factory.createConnection(); connection.setClientID("CLUSTER_CLIENT_1"); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); }

      public void close() throws JMSException
      {
      if (connection != null)

      { connection.close(); }

      }

      public static void main(String[] args) throws JMSException
      {

      try
      {
      // extract topics from the rest of arguments
      String[] topics = new String[2];
      topics[0] = "CSCO";
      topics[1] = "ORCL";

      // define connection URI
      Consumer consumer = new Consumer("failover:(tcp://localhost:61616)?maxReconnectAttempts=-1&useExponentialBackOff=true");

      for (String stock : topics)
      {
      try

      { Destination destination = consumer.getSession().createTopic("STOCKS." + stock); // consumer.getSession(). MessageConsumer messageConsumer = consumer.getSession().createDurableSubscriber((Topic) destination, "STOCKS_DURABLE_CONSUMER_" + stock); messageConsumer.setMessageListener(new Listener()); }

      catch (JMSException e)

      { e.printStackTrace(); }

      }
      }
      catch (Throwable t)

      { t.printStackTrace(); }

      }

      public Session getSession()

      { return session; }

      }

      class Listener implements MessageListener
      {

      public void onMessage(Message message)
      {
      try

      { TextMessage textMessage = (TextMessage) message; String json = textMessage.getText(); Event event = StockUpdate.fromJSON(json, StockUpdate.class); System.out.println("Consumed message #:" + ++Consumer.counter + "\n" + event); }

      catch (Exception e)

      { e.printStackTrace(); }

      }

      }

      Here is my activemq.xml

      <beans
      xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
      http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

      <!-- Allows us to use system properties as variables in this configuration file -->
      <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
      <property name="locations">
      <value>file:${activemq.conf}/credentials.properties</value>
      </property>
      </bean>

      <!--
      The <broker> element is used to configure the ActiveMQ broker.
      -->
      <broker xmlns="http://activemq.apache.org/schema/core" brokerName="R6_cluster_broker1" persistent="true">

      <networkConnectors>
      <networkConnector uri="static:(failover:(tcp://remote_master:61616,tcp://remote_slave:61617))"/>
      </networkConnectors>

      <destinationPolicy>
      <policyMap>
      <policyEntries>
      <policyEntry topic=">" >
      <!-- The constantPendingMessageLimitStrategy is used to prevent
      slow topic consumers to block producers and affect other consumers
      by limiting the number of messages that are retained
      For more information, see:

      http://activemq.apache.org/slow-consumer-handling.html

      -->
      <pendingMessageLimitStrategy>
      <constantPendingMessageLimitStrategy limit="1000"/>
      </pendingMessageLimitStrategy>
      </policyEntry>
      </policyEntries>
      </policyMap>
      </destinationPolicy>

      <!--
      The managementContext is used to configure how ActiveMQ is exposed in
      JMX. By default, ActiveMQ uses the MBean server that is started by
      the JVM. For more information, see:

      http://activemq.apache.org/jmx.html
      -->
      <managementContext>
      <managementContext createConnector="false"/>
      </managementContext>

      <!--
      Configure message persistence for the broker. The default persistence
      mechanism is the KahaDB store (identified by the kahaDB tag).
      For more information, see:

      http://activemq.apache.org/persistence.html
      -->
      <persistenceAdapter>
      <kahaDB directory="/work/temp/kahadb"/>
      </persistenceAdapter>

      <!--
      The systemUsage controls the maximum amount of space the broker will
      use before disabling caching and/or slowing down producers. For more information, see:
      http://activemq.apache.org/producer-flow-control.html
      -->
      <systemUsage>
      <systemUsage>
      <memoryUsage>
      <memoryUsage percentOfJvmHeap="70" />
      </memoryUsage>
      <storeUsage>
      <storeUsage limit="100 gb"/>
      </storeUsage>
      <tempUsage>
      <tempUsage limit="50 gb"/>
      </tempUsage>
      </systemUsage>
      </systemUsage>

      <!--
      The transport connectors expose ActiveMQ over a given protocol to
      clients and other brokers. For more information, see:

      http://activemq.apache.org/configuring-transports.html
      -->
      <transportConnectors>
      <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
      <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
      <!-- <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
      <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
      <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
      <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> -->
      </transportConnectors>

      <!-- destroy the spring context on shutdown to stop jetty -->
      <shutdownHooks>
      <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
      </shutdownHooks>

      </broker>

      <!--
      Enable web consoles, REST and Ajax APIs and demos
      The web consoles requires by default login, you can disable this in the jetty.xml file

      Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
      -->
      <import resource="jetty.xml"/>

      </beans>

      When I have both broker and consumer running and then stop the broker my consumer exits few moments after. As far I understood it must attempt to reconnect, but it is not the case. What am I doing wrong, please advise.

      !NOTE! I launch my consumer in Eclipse, i do not build a standalone jar for this task.

      I have updated my broker to the latest 5.9.1 and did the same to my consumer. Result is the same - after I stop the broker my consumer dies few seconds after. It works fine if broker is up and running.

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              i.mochurad Ihor Mochurad
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Time Tracking

                Estimated:
                Original Estimate - 48h
                48h
                Remaining:
                Remaining Estimate - 48h
                48h
                Logged:
                Time Spent - Not Specified
                Not Specified