Uploaded image for project: 'ActiveMQ Classic'
  1. ActiveMQ Classic
  2. AMQ-1855

bridge reconnection stops because of race in SimpleDiscoveryAgent

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 4.1.2
    • 5.3.0
    • Connector
    • None

    Description

      I believe there is a race condition in SimpleDiscoveryAgent which can cause subsequent bridge restart to fail, without starting new thread that should restart a bridge. As a consequence, network bridge is never restarted.

      Following scenario leads to this:
      1. bridge is disconnected (e.g. local error: org.apache.activemq.transport.InactivityIOException: Channel was inactive for too long)
      2. bridge is disposed in separate thread in DemandForwardingBridge.serviceLocalException
      3. SimpleDiscoveryAgent.serviceFailed is called which starts up another thread which calls DiscoveryNetworkConnector.onServiceAdd which tries to restart bridge
      4. bridge startup can cause javax.jms.InvalidClientIDException: Broker: some_broker2 - Client: NC_some_broker1_inboundlocalhost already connected (this one is caused by race condition with thread disposing the bridge, since given client subscription should be removed by thread disposing the bridge (step 2)
      5. this causes invocation of DemandForwardingBridge.serviceLocalException (this call can be made asynchronously, while previous bridge startup is still in progress)

      As a consequence, multiple threads can end up calling SimpleDiscoveryAgent.serviceFailed simultaneously.

      serviceFailed will call DiscoveryNetworkConnector.onServiceAdd which will try to reconnect bridge. Reconnect logic is guarded by
      if( event.failed.compareAndSet(false, true) )

      which tries to ensure that only a single thread is reconnecting bridge at some point.

          public void serviceFailed(DiscoveryEvent devent) throws IOException {
          	
              final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent) devent;
              if( event.failed.compareAndSet(false, true) ) {
              	
      			listener.onServiceRemove(event);
      	    	Thread thread = new Thread() {
      	    		public void run() {
      	
      	
      	    			// We detect a failed connection attempt because the service fails right
      	    			// away.
      	    			if( event.connectTime + minConnectTime > System.currentTimeMillis()  ) {
      	    				
      	    				event.connectFailures++;
      	    				
      	    				if( maxReconnectAttempts>0 &&  event.connectFailures >= maxReconnectAttempts ) {
      	    					// Don' try to re-connect
      	    					return;
      	    				}
      	    				
      		                synchronized(sleepMutex){
      		                    try{
      		                    	if( !running.get() )
      		                    		return;
      		                    	
      		                        sleepMutex.wait(event.reconnectDelay);
      		                    }catch(InterruptedException ie){
                                      Thread.currentThread().interrupt();
      		                       return;
      		                    }
      		                }
      	
      		                if (!useExponentialBackOff) {
      		                    event.reconnectDelay = initialReconnectDelay;
      		                } else {
      		                    // Exponential increment of reconnect delay.
      		                    event.reconnectDelay*=backOffMultiplier;
      		                    if(event.reconnectDelay>maxReconnectDelay)
      		                        event.reconnectDelay=maxReconnectDelay;
      		                }
      		                
      	    			} else {
      	    				event.connectFailures = 0;
      	                    event.reconnectDelay = initialReconnectDelay;
      	    			}
      	    			                    			
      	            	if( !running.get() )
      	            		return;
      	            	
      	    			event.connectTime = System.currentTimeMillis();
      	    			event.failed.set(false);
      	    			
      	    			listener.onServiceAdd(event);
      	    		}
      	    	};
      	    	thread.setDaemon(true);
      	    	thread.start();
              }
          }
      

      Prior to calling DiscoveryNetworkConnector.onServiceAdd, event.failed is set to false (T1), and it's possible for some other thread (T2) to enter block guarded by if( event.failed.compareAndSet(false, true) ) , while reconnect process has already begun by first thread. T2 can satisfy condition: if( event.connectTime + minConnectTime > System.currentTimeMillis() ) and will enter
      sleepMutex.wait(event.reconnectDelay), but still holding event.failed == true (causing all other calls to serviceFailed not to start thread that will reconnect bridge).

      If first thread (T1) fails to reconnect bridge (e.g because of InvalidClientIDException described in step 4), it will not schedule new thread to restart broker (and call DiscoveryNetworkConnector.onServiceRemove, and cleanup DiscoveryNetworkConnector.bridges) because of event.failed == true, and T2 still waiting (default 5 sec). When T2 wakes up from wait, it will try to restart broker and fail because of following condition in DiscoveryNetworkConnector:

                  if (    bridges.containsKey(uri) 
                          || localURI.equals(uri) 
                          || (connectionFilter!=null && !connectionFilter.connectTo(uri))
                          )
                      return;
      

      bridges.containsKey(uri) will be true (thread T1 added it while unsuccessfully trying to reconnect bridge), and T2 will return from DiscoveryNetworkConnector.onServiceAdd and will not start bridge.
      No additional attempt to reconnect bridge will be made, since T2 held event.failed == true, effectively ignoring SimpleDiscoveryAgent.serviceFailed calls from other threads processing local or remote bridge exceptions.

      End result:

      • DiscoveryNetworkConnector.bridges contains bridge that is disposed and prevents all other attempts to restart bridge (onServiceAdd always returns because bridges.containsKey(uri) == true)
      • SimpleDiscoveryAgent doesn't try to reconnect the bridge (T2 was a last attempt which returned without restarting the bridge - SimpleDiscoveryAgent.serviceFailed is not called again, since bridge is not started

      I think that synchronization of threads processing bridge exceptions and entering SimpleDiscoveryAgent.serviceFailed should be verified and/or improved.
      Also, InvalidClientIDException is relatively common (at least on multicore machines, e.g. Solaris T2000), maybe ConduitBridge.serviceLocalException (which starts another thread doing ServiceSupport.dispose(DemandForwardingBridgeSupport.this)), should be changed to wait a bit for bridge disposal to finish (e.g. sleep for some time) and then try to restart a bridge - waiting for a second more to restart a bridge is better then not to start it at all

      I've seen this problem in 4.1.0 and 4.1.2, but I think it can occur in 5.1 and 5.2 trunk (SimpleDiscoveryAgent.serviceFailed and DiscoveryNetworkConnector.onServiceAdd are more or less the same, just using ASYNC_TASKS to execute asynchronous calls, instead of starting new threads directly.

      Attachments

        Issue Links

          Activity

            People

              gtully Gary Tully
              mlukica Mario Lukica
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: