diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 642c732..44531fe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -268,6 +268,7 @@ protected QueueACLsManager createQueueACLsManager(ResourceScheduler scheduler, @VisibleForTesting protected void setRMStateStore(RMStateStore rmStore) { rmStore.setRMDispatcher(rmDispatcher); + rmStore.setResourceManager(this); rmContext.setStateStore(rmStore); } @@ -396,11 +397,12 @@ protected static void validateConfigs(Configuration conf) { private EventHandler schedulerDispatcher; private ApplicationMasterLauncher applicationMasterLauncher; private ContainerAllocationExpirer containerAllocationExpirer; - + private ResourceManager rm; private boolean recoveryEnabled; - RMActiveServices() { + RMActiveServices(ResourceManager rm) { super("RMActiveServices"); + this.rm = rm; } @Override @@ -448,6 +450,7 @@ protected void serviceInit(Configuration configuration) throws Exception { try { rmStore.init(conf); rmStore.setRMDispatcher(rmDispatcher); + rmStore.setResourceManager(rm); } catch (Exception e) { // the Exception from stateStore.init() needs to be handled for // HA and we need to give up master status if we got fenced @@ -742,25 +745,25 @@ public void handle(RMFatalEvent event) { LOG.fatal("Received a " + RMFatalEvent.class.getName() + " of type " + event.getType().name() + ". Cause:\n" + event.getCause()); - if (event.getType() == RMFatalEventType.STATE_STORE_FENCED) { - LOG.info("RMStateStore has been fenced"); - if (rmContext.isHAEnabled()) { - try { - // Transition to standby and reinit active services - LOG.info("Transitioning RM to Standby mode"); - rm.transitionToStandby(true); - rm.adminService.resetLeaderElection(); - return; - } catch (Exception e) { - LOG.fatal("Failed to transition RM to Standby mode."); - } - } - } - ExitUtil.terminate(1, event.getCause()); } } + public void handleTransitionToStandBy() { + if (rmContext.isHAEnabled()) { + try { + // Transition to standby and reinit active services + LOG.info("Transitioning RM to Standby mode"); + transitionToStandby(true); + adminService.resetLeaderElection(); + return; + } catch (Exception e) { + LOG.fatal("Failed to transition RM to Standby mode."); + ExitUtil.terminate(1, e); + } + } + } + @Private public static final class ApplicationEventDispatcher implements EventHandler { @@ -989,7 +992,7 @@ protected void startWepApp() { * @throws Exception */ protected void createAndInitActiveServices() throws Exception { - activeServices = new RMActiveServices(); + activeServices = new RMActiveServices(this); activeServices.init(conf); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 973fe54..c33e9b0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent; import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; @@ -87,6 +88,7 @@ "AMRMTokenSecretManagerRoot"; protected static final String VERSION_NODE = "RMVersionNode"; protected static final String EPOCH_NODE = "EpochNode"; + private ResourceManager resourceManager; public static final Log LOG = LogFactory.getLog(RMStateStore.class); @@ -820,11 +822,15 @@ protected void handleStoreEvent(RMStateStoreEvent event) { protected void notifyStoreOperationFailed(Exception failureCause) { RMFatalEventType type; if (failureCause instanceof StoreFencedException) { - type = RMFatalEventType.STATE_STORE_FENCED; + Thread standByTransitionThread = + new Thread(new StandByTransitionThread()); + standByTransitionThread.setName("StandByTransitionThread Handler"); + standByTransitionThread.start(); } else { type = RMFatalEventType.STATE_STORE_OP_FAILED; + rmDispatcher.getEventHandler().handle( + new RMFatalEvent(type, failureCause)); } - rmDispatcher.getEventHandler().handle(new RMFatalEvent(type, failureCause)); } @SuppressWarnings("unchecked") @@ -866,4 +872,16 @@ public void handle(RMStateStoreEvent event) { * @throws Exception */ public abstract void deleteStore() throws Exception; + + public void setResourceManager(ResourceManager rm) { + this.resourceManager = rm; + } + + private class StandByTransitionThread implements Runnable { + @Override + public void run() { + LOG.info("RMStateStore has been fenced"); + resourceManager.handleTransitionToStandBy(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java index 8cef4c9..48830d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java @@ -40,12 +40,15 @@ import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFencedException; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; @@ -451,6 +454,75 @@ public synchronized void startInternal() throws Exception { checkActiveRMFunctionality(); } + @SuppressWarnings("unchecked") + @Test(timeout = 90000) + public void testTransitionedToStandbyShouldNotHang() throws Exception { + configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); + Configuration conf = new YarnConfiguration(configuration); + + MemoryRMStateStore memStore = new MemoryRMStateStore() { + @Override + public synchronized void updateApplicationState(ApplicationState appState) { + notifyStoreOperationFailed(new StoreFencedException()); + } + }; + memStore.init(conf); + rm = new MockRM(conf, memStore) { + @Override + void stopActiveServices() throws Exception { + Thread.sleep(10000); + super.stopActiveServices(); + } + }; + rm.init(conf); + final StateChangeRequestInfo requestInfo = + new StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER); + + assertEquals(STATE_ERR, HAServiceState.INITIALIZING, rm.adminService + .getServiceStatus().getState()); + assertFalse("RM is ready to become active before being started", + rm.adminService.getServiceStatus().isReadyToBecomeActive()); + checkMonitorHealth(); + + rm.start(); + checkMonitorHealth(); + checkStandbyRMFunctionality(); + + // 2. Transition to Active. + rm.adminService.transitionToActive(requestInfo); + + // 3. Try Transition to standby + Thread t = new Thread(new Runnable() { + @Override + public void run() { + try { + rm.transitionToStandby(true); + } catch (IOException e) { + e.printStackTrace(); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + }); + t.start(); + + // Triggering this event cause transitionToStandby hang without fix + // rm.getRMContext() + // .getDispatcher() + // .getEventHandler() + // .handle( + // new RMFatalEvent(RMFatalEventType.STATE_STORE_FENCED, + // "test failure")); + + rm.getRMContext().getStateStore().updateApplicationState(null); + t.join(); // wait for thread to finish + + rm.adminService.transitionToStandby(requestInfo); + checkStandbyRMFunctionality(); + } + public void innerTestHAWithRMHostName(boolean includeBindHost) { //this is run two times, with and without a bind host configured if (includeBindHost) { -- 1.9.2.msysgit.0