ActiveMQ
  1. ActiveMQ
  2. AMQ-4159

Race condition in SimpleDiscoveryAgent creates multiple concurrent threads attempting to connect to the same bridge --- can result in deadlock

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Critical Critical
    • Resolution: Fixed
    • Affects Version/s: 5.8.0
    • Fix Version/s: 5.8.0
    • Component/s: None
    • Labels:
      None
    • Patch Info:
      Patch Available

      Description

      Symptom
      =======
      I was diagnosing a deadlock issue in DiscoveryNetworkConnector and noticed that during one of the tests, concurrent calls were being made to DiscoveryNetworkConnector.onServiceAdd(...) for the same DiscoveryEvent. This was unexpected because only a single service (URL) had been given to SimpleDiscoveryAgent. In fact, during one of the tests I observed dozens of concurrent calls.

      Concurrent attempts to establish a bridge to the same remote broker are problematic because they expose a number of race conditions in DiscoveryNetworkConnector and RegionBroker that can lead to permanent bridge failure (see AMQ-4160), as well as unnecessary thread pool execution/resource usage and logging.

      The issues with DiscoveryNetworkConnector and RegionBroker will be filed as separate issues. This issue specifically addresses the bug that causes SimpleDiscoveryAgent to uncontrollably multiply bridge connection attempts.

      Cause
      =====
      When DemandForwardingBridgeSupport handles exceptions from either the local or remote sides of the the bridge, it fires a "bridge failed" event:

      DemandForwardingBridgeSupport.java
      public void serviceLocalException(Throwable error) {
          if (!disposed.get()) {
              LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a local error: " + error);
              LOG.debug("The local Exception was:" + error, error);
              brokerService.getTaskRunnerFactory().execute(new Runnable() {
                  public void run() {
                      ServiceSupport.dispose(getControllingService());
                  }
              });
              fireBridgeFailed();
          }
      }
      
      
      public void serviceRemoteException(Throwable error) {
          if (!disposed.get()) {
              if (error instanceof SecurityException || error instanceof GeneralSecurityException) {
                  LOG.error("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error);
              } else {
                  LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error);
              }
              LOG.debug("The remote Exception was: " + error, error);
              brokerService.getTaskRunnerFactory().execute(new Runnable() {
                  public void run() {
                      ServiceSupport.dispose(getControllingService());
                  }
              });
              fireBridgeFailed();
          }
      }
      
      private void fireBridgeFailed() {
          NetworkBridgeListener l = this.networkBridgeListener;
          if (l != null) {
              l.bridgeFailed();
          }
      }
      

      DiscoveryNetworkConnector is the NetworkBridgeListener, and its bridgeFailed() method calls back to SimpleDiscoveryAgent.serviceFailed(...):

      DiscoveryNetworkConnectol.java
      protected NetworkBridge createBridge(Transport localTransport, Transport remoteTransport, final DiscoveryEvent event) {
          class DiscoverNetworkBridgeListener extends MBeanNetworkListener {
      
              public DiscoverNetworkBridgeListener(BrokerService brokerService, ObjectName connectorName) {
                  super(brokerService, connectorName);
              }
      
              public void bridgeFailed() {
                  if (!serviceSupport.isStopped()) {
                      try {
                          discoveryAgent.serviceFailed(event);
                      } catch (IOException e) {
                      }
                  }
      
              }
          }
      ...
      

      In response, SimpleDiscoveryAgent.serviceFailed(...) pauses for the reconnectDelay before attempting to re-establish the bridge via DiscoveryNetworkConnector.onServiceAdd(...):

      SimpleDiscoveryAgent.java
      public void serviceFailed(DiscoveryEvent devent) throws IOException {
      
          final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent)devent;
          if (sevent.failed.compareAndSet(false, true)) {
      
              listener.onServiceRemove(sevent);
              taskRunner.execute(new Runnable() {
                  public void run() {
                      // We detect a failed connection attempt because the service
                      // fails right
                      // away.
                      if (event.connectTime + minConnectTime > System.currentTimeMillis()) {
                          LOG.debug("Failure occurred soon after the discovery event was generated.  It will be classified as a connection failure: "+event);
      ...
                          synchronized (sleepMutex) {
                              try {
                                  if (!running.get()) {
                                      LOG.debug("Reconnecting disabled: stopped");
                                      return;
                                  }
      
                                  LOG.debug("Waiting "+event.reconnectDelay+" ms before attempting to reconnect.");
                                  sleepMutex.wait(event.reconnectDelay);
                              } catch (InterruptedException ie) {
                                  LOG.debug("Reconnecting disabled: " + ie);
                                  Thread.currentThread().interrupt();
                                  return;
                              }
                          }
      ...
                      event.connectTime = System.currentTimeMillis();
                      event.failed.set(false);
                      listener.onServiceAdd(event);
                  }
              }, "Simple Discovery Agent");
          }
      }
      

      NOTE: the call to listener.onServiceAdd(...) is made by a new thread!

      There are two race conditions that allow SimpleDiscoveryAgent.serviceFailed(...) to launch more than one thread, each attempting to re-restablish the same bridge.

      First, note that DemandForwardingBridgeSupport.serviceLocal/RemoteException(...) launches a separate thread that stops the bridge:

      DemandForwardingBridgeSupport.java
      public void serviceRemoteException(Throwable error) {
          if (!disposed.get()) {
              if (error instanceof SecurityException || error instanceof GeneralSecurityException) {
                  LOG.error("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error);
              } else {
                  LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error);
              } 
              LOG.debug("The remote Exception was: " + error, error);
              brokerService.getTaskRunnerFactory().execute(new Runnable() {
                  public void run() {
                      ServiceSupport.dispose(getControllingService());
                  }
              });
              fireBridgeFailed();
          }
      }
      
      public void stop() throws Exception {
          if (started.compareAndSet(true, false)) {
              if (disposed.compareAndSet(false, true)) {
                  LOG.debug(" stopping " + configuration.getBrokerName() + " bridge to " + remoteBrokerName);
                  NetworkBridgeListener l = this.networkBridgeListener;
                  if (l != null) {
                      l.onStop(this);
                  }
      

      When the bridge stops, the disposed flag is set, which prevents subsequent calls to serviceLocal/RemoteException(...) from calling fireBridgeFailed(). However, since the call to DemandForwardingBridgeSupport.stop() is made by a separate thread, multiple serviceLocal/RemoteException(...) calls that are made in quick succession can result in multiple calls to fireBridgeFailed().

      This is the first race condition: multiple calls can be made to DiscoveryNetworkConnector.bridgeFailed() for the same bridge. By transitivity, this results in multiple calls to SimpleDiscoveryAgent.serviceFailed(...).

      SimpleDiscoveryAgent.serviceFailed(...) has a guard class, event.failed.compareAndSet(false, true), which should only allow the first call to launch a bridge reconnect thread. However, once the reconnectDelay expires, event.failed is reset to false, which allows re-entry to the failure handling logic, and the possibile launching of additional bridge reconnect threads if the reconnectDelay is short or the threads calling serviceFailed(...) are delayed.

      This is the second race condition: the guard clause in SimpleDiscoveryAgent.serviceFailed(...) can be reset before the subsequent redundant calls have been filtered out.

      These two race conditions allow a single call to DiscoveryNetworkConnector.onServiceAdd(...) to result in multiple subsequent concurrent (re)calls, and these concurrent calls can spawn their own multiple concurrent calls. The result can be an unlimited number of concurrent calls to onServiceAdd(...).

      Unit Test
      =========
      The attached unit test demonstrates this bug by simulating a bridge failure that has yet to be detected by the remote broker (i.e., before the InactivityMonitor closes the connection). The local broker attempts to re-establish the bridge, but its call to DemandForwardingBridge.startRemoteBroker() fails because the remote broker rejects the new connection since the old one still exists. Since startRemoteBroker sends multiple messages to the remote broker, multiple exceptions are generated:

      DemandForwardingBridgeSupport.java
      protected void startRemoteBridge() throws Exception {
      ...
                      remoteBroker.oneway(brokerInfo);
      ...
                  remoteBroker.oneway(remoteConnectionInfo);
      ...
                  remoteBroker.oneway(producerInfo);
      ...
                      remoteBroker.oneway(demandConsumerInfo);
      }
      

      The multiple exceptions result in multiple calls to DemandForwardingBridgeSupport.serviceRemoteException(...), which allows the first race condition to be exhibited.

      The first unit test has a 1s reconnectDelay, which is sufficient to make the second race condition improbable; therefore, this test generally passes.

      The second unit test has a 0s reconnectDelay; on my system, this makes the timing of multiple calls to DemandForwardingBridgeSupport.serviceRemoteException(...) such that the second race condition is reliably exhibited, resulting in the unit test failing because it detects concurrent calls to DiscoveryNetworkConnector.onServiceAdd(...).

      Solution
      ========
      While it would be possible to add a failed.compareAndSet(false,true) guard clause to DemandForwardingBridgeSupport.fireBridgeFailed(), and prevent the first race condition from allowing multiple calls to SimpleDiscoveryAgent.serviceFailed(), the root problem is the race condition in serviceFailed. This can be trivially addressed by making a copy of the DiscoveryEvent, which prevents the original event.failed guard clause from being reset:

      Patched SimpleDiscoveryAgent.java
      public void serviceFailed(DiscoveryEvent devent) throws IOException {
      
          final SimpleDiscoveryEvent sevent = (SimpleDiscoveryEvent)devent;
          if (sevent.failed.compareAndSet(false, true)) {
      
              listener.onServiceRemove(sevent);
              taskRunner.execute(new Runnable() {
                  public void run() {
                      SimpleDiscoveryEvent event = new SimpleDiscoveryEvent(sevent);
      
      ...
                      event.connectTime = System.currentTimeMillis();
                      event.failed.set(false);
                      listener.onServiceAdd(event);
                  }
              }, "Simple Discovery Agent");
          }
      }
      
      1. AMQ4159.patch
        2 kB
        Stirling Chow
      2. AMQ4159Test.java
        6 kB
        Stirling Chow

        Issue Links

          Activity

            People

            • Assignee:
              Timothy Bish
              Reporter:
              Stirling Chow
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development