ActiveMQ
  1. ActiveMQ
  2. AMQ-3213

failover doesn't reconnect after broker restart

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 5.4.2
    • Fix Version/s: 5.6.0
    • Component/s: Broker
    • Labels:
      None
    • Environment:

      Windows 7 x64, jdk 1.6.0_21

      Description

      I have a network of three brokers: two embedded (publisher and subscriber) and one stand alone remote. Embedded brokers connect to the remote with duplex static failover connections. Everything works fine if remote broker is not restarted. After the remote broker is restarted embedded brokers fail to reestablish duplex bridges. Subscriber doesn't receive any messages and in logs I have:

      Subscriber worker:

      Received 14-th message.
      Received 15-th message.
      Received 16-th message.
      2011:03:09 18:19:08,276 [WARN ] org.apache.activemq.transport.failover.FailoverTransport - Transport (localhost/127.0.0.1:61616) failed to tcp://localhost:61616 , attempting to automatically reconnect due to: java.net.SocketException: Connection reset
      2011:03:09 18:19:08,276 [INFO ] org.apache.activemq.network.DemandForwardingBridgeSupport - Outbound transport to remote interrupted.
      2011:03:09 18:19:22,426 [INFO ] org.apache.activemq.network.DemandForwardingBridgeSupport - Network connection between vm://local1#0 and tcp://localhost:61616(remote) has been established.
      2011:03:09 18:19:22,429 [INFO ] org.apache.activemq.network.DemandForwardingBridgeSupport - Outbound transport to remote resumed
      2011:03:09 18:19:22,429 [INFO ] org.apache.activemq.transport.failover.FailoverTransport - Successfully reconnected to tcp://localhost:61616
      2011:03:09 18:19:22,455 [WARN ] org.apache.activemq.broker.TransportConnection - Unexpected extra broker info command received: BrokerInfo {commandId = 1929, responseRequired = false, brokerId = ID:Air-17262-1299683959982-0:1, brokerURL = tcp://validation.sls.microsoft.com:61616, slaveBroker = false, masterBroker = false, faultTolerantConfiguration = false, networkConnection = false, duplexConnection = false, peerBrokerInfos = [], brokerName = remote, connectionId = 0, brokerUploadUrl = null, networkProperties = null}
      

      Publisher worker:

      Sending 28-th message
      Sending 29-th message
      Sending 30-th message
      2011:03:09 18:19:22,430 [INFO ] org.apache.activemq.network.DemandForwardingBridgeSupport - Network connection between vm://local#0 and tcp://localhost:61616(remote) has been established.
      2011:03:09 18:19:22,435 [INFO ] org.apache.activemq.network.DemandForwardingBridgeSupport - Outbound transport to remote resumed
      2011:03:09 18:19:22,435 [INFO ] org.apache.activemq.transport.failover.FailoverTransport - Successfully reconnected to tcp://localhost:61616
      2011:03:09 18:19:22,469 [WARN ] org.apache.activemq.broker.TransportConnection - Unexpected extra broker info command received: BrokerInfo {commandId = 1911, responseRequired = false, brokerId = ID:Air-17262-1299683959982-0:1, brokerURL = tcp://validation.sls.microsoft.com:61616, slaveBroker = false, masterBroker = false, faultTolerantConfiguration = false, networkConnection = false, duplexConnection = false, peerBrokerInfos = [], brokerName = remote, connectionId = 0, brokerUploadUrl = null, networkProperties = null}
      Sending 31-th message
      Sending 32-th message
      Sending 33-th message
      

      Sample code to reproduce this issue:

      import org.apache.activemq.ActiveMQConnectionFactory;
      import org.apache.activemq.broker.BrokerService;
      import org.apache.activemq.broker.TransportConnector;
      import org.apache.activemq.network.NetworkConnector;
      
      import javax.jms.*;
      import java.net.URI;
      
      
      public class JmsTester implements MessageListener {
      
          private static final int size = 256;
          private static byte[] payload;
      
          static {
              char[] DATA = "abcdefghijklmnopqrstuvwxyz".toCharArray();
              payload = new byte[size];
              for (int i = 0; i < size; i++) {
                  payload[i] = (byte)DATA[i % DATA.length];
              }
          }
      
          public static void main(String[] args) throws Exception {
              if(System.getProperty("jms.mode").equals("publisher"))
                  publisher();
              else if(System.getProperty("jms.mode").equals("broker"))    
                  broker();
              else
                  subscriber();
          }
      
          private static void publisher() throws Exception {
              System.out.println("Starting publisher ...");
              BrokerService broker = new BrokerService();
              broker.setBrokerName("local");
              broker.setUseJmx(true);
              broker.setPersistent(true);
              NetworkConnector nc = broker.addNetworkConnector("static:(failover:(tcp://localhost:61616))");
              nc.setDuplex(true);
              nc.setNetworkTTL(4);
              broker.start();
      
              ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://local");
              Connection connection = factory.createConnection();
              Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
              Topic topic = session.createTopic("topictest.messages");
      
              MessageProducer publisher = session.createProducer(topic);
              publisher.setDeliveryMode(DeliveryMode.PERSISTENT);
      
              for (int i=0; i<1000; i++) {
                  Thread.sleep(1000);
                  System.out.println("Sending " + i + "-th message");
                  BytesMessage msg = session.createBytesMessage();
                  msg.setIntProperty("count", i);
                  msg.writeBytes(payload);
                  publisher.send(msg);
              }
      
              broker.stop();
          }
      
          private static void broker() throws Exception {
              System.out.println("Starting broker ...");
              BrokerService broker = new BrokerService();
              broker.setPersistent(true);
              broker.setBrokerName("remote");
              broker.setUseJmx(true);
              TransportConnector connector = new TransportConnector();
              connector.setUri(new URI("tcp://localhost:61616"));
              
              broker.addConnector(connector);
              broker.start();
              while(true) { }
          }
      
          private static void subscriber() throws Exception {
              System.out.println("Starting subscriber ...");
      
              BrokerService broker = new BrokerService();
              broker.setBrokerName("local1");
              broker.setUseJmx(true);
              broker.setPersistent(true);
              NetworkConnector nc = broker.addNetworkConnector("static:(failover:(tcp://localhost:61616))");
              nc.setDuplex(true);
              nc.setNetworkTTL(4);
              broker.start();
      
              ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://local1");
              Connection connection = factory.createConnection();
              connection.setClientID("subscriber2");
              Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
      
              Topic topic = session.createTopic("topictest.messages");
      
              MessageConsumer consumer = session.createDurableSubscriber(topic, "subscriber2");
              JmsTester tester = new JmsTester();
              consumer.setMessageListener(tester);
              connection.start();
              while(true) {
                  Thread.sleep(1000);
              }
          }
      
          @Override
          public void onMessage(Message message) {
              try {
                  int count = message.getIntProperty("count");
                  System.out.println("Received " + count + "-th message.");
              } catch (JMSException e) {
                  e.printStackTrace();
              }
          }
      }
      

        Activity

        Ivan Shcheklein created issue -
        Hide
        Gary Tully added a comment -

        There were some fixes in this area w.r.t to duplicate network connection detection. One of the prerequisites is that the brokerId be specified for the broker to ensure it is maintained after a restart. The broker id and networkConnector name(if there is more than one duplex network connector configured (for 5.5) are use to identify duplicate network connectors and pull down old sate that can be pending with a temp network partition or fast restart.
        Can you validate a 5.5-SNAPSHOT? But first, try setting a brokerId, use the same value as your brokerName.

        Show
        Gary Tully added a comment - There were some fixes in this area w.r.t to duplicate network connection detection. One of the prerequisites is that the brokerId be specified for the broker to ensure it is maintained after a restart. The broker id and networkConnector name(if there is more than one duplex network connector configured (for 5.5) are use to identify duplicate network connectors and pull down old sate that can be pending with a temp network partition or fast restart. Can you validate a 5.5-SNAPSHOT? But first, try setting a brokerId, use the same value as your brokerName.
        Hide
        Ivan Shcheklein added a comment -

        Gary, thanks for response. Unfortunatelly, I have the same result with brokerId and with 5.5-SNAPSHOT either. BTW, it's pretty strange but 5.5-SNAPSHOT doesn't work without slf4j-log4j12-1.5.11, slf4j-api-1.5.11. Are they both required now? 5.4.2 works fine without them.

        Show
        Ivan Shcheklein added a comment - Gary, thanks for response. Unfortunatelly, I have the same result with brokerId and with 5.5-SNAPSHOT either. BTW, it's pretty strange but 5.5-SNAPSHOT doesn't work without slf4j-log4j12-1.5.11, slf4j-api-1.5.11. Are they both required now? 5.4.2 works fine without them.
        Hide
        Andreas Calvo added a comment -

        Tried with 5.5.0 final in our test lab and we're having the same behavior.

        Is there any fix?

        Show
        Andreas Calvo added a comment - Tried with 5.5.0 final in our test lab and we're having the same behavior. Is there any fix?
        Hide
        Arthur Naseef added a comment -

        Did you try adding broker.setBrokerId(...unique_id_per_broker...)? How about nc.setName(...unique_nc_id...)?

        For example

            private static void publisher() throws Exception {
                System.out.println("Starting publisher ...");
                BrokerService broker = new BrokerService();
                broker.setBrokerName("local");
        
                broker.setBrokerId("publisherEmbeddedBrokerId");
        
                broker.setUseJmx(true);
                broker.setPersistent(true);
                NetworkConnector nc = broker.addNetworkConnector("static:(failover:(tcp://localhost:61616))");
                nc.setDuplex(true);
                nc.setNetworkTTL(4);
        
                nc.setName("publisherConnToNw");
        
        Show
        Arthur Naseef added a comment - Did you try adding broker.setBrokerId(...unique_id_per_broker...)? How about nc.setName(...unique_nc_id...)? For example private static void publisher() throws Exception { System .out.println( "Starting publisher ..." ); BrokerService broker = new BrokerService(); broker.setBrokerName( "local" ); broker.setBrokerId( "publisherEmbeddedBrokerId" ); broker.setUseJmx( true ); broker.setPersistent( true ); NetworkConnector nc = broker.addNetworkConnector( " static :(failover:(tcp: //localhost:61616))" ); nc.setDuplex( true ); nc.setNetworkTTL(4); nc.setName( "publisherConnToNw" );
        Hide
        Gary Tully added a comment -

        @Andreas - use static:(failover:(tcp://localhost:61616)?maxReconnectAttempts=1) or no failover at all in the network connector.
        A networkConnector does its own reconnection/recreation in the event of a failure. The failover transport hides the failure and does not properly reconstitute the duplex state. failover: with a network connector is really only beneficial to make a choice between two urls.

        Show
        Gary Tully added a comment - @Andreas - use static:(failover:(tcp://localhost:61616)?maxReconnectAttempts=1) or no failover at all in the network connector. A networkConnector does its own reconnection/recreation in the event of a failure. The failover transport hides the failure and does not properly reconstitute the duplex state. failover: with a network connector is really only beneficial to make a choice between two urls.
        Hide
        Ivan Shcheklein added a comment -

        @Arthur Yes, I tried both - didn't work, as far as I remember.

        @Gary @Andreas I've tried without failover. It works so far. Though I think it's not obvious that failover can not be used in the network connector. This still can be considered as a buggy behavior I suppose.

        Show
        Ivan Shcheklein added a comment - @Arthur Yes, I tried both - didn't work, as far as I remember. @Gary @Andreas I've tried without failover. It works so far. Though I think it's not obvious that failover can not be used in the network connector. This still can be considered as a buggy behavior I suppose.
        Hide
        Timothy Bish added a comment -

        This should be resolved by the changes in AMQ-3542

        Show
        Timothy Bish added a comment - This should be resolved by the changes in AMQ-3542
        Timothy Bish made changes -
        Field Original Value New Value
        Status Open [ 1 ] Resolved [ 5 ]
        Fix Version/s 5.6.0 [ 12317974 ]
        Resolution Fixed [ 1 ]

          People

          • Assignee:
            Unassigned
            Reporter:
            Ivan Shcheklein
          • Votes:
            2 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development