Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (revision 1592934) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (working copy) @@ -318,6 +318,11 @@ public static final String RECOVERY_ENABLED = RM_PREFIX + "recovery.enabled"; public static final boolean DEFAULT_RM_RECOVERY_ENABLED = false; + public static final String CONTINUE_ON_APPLICATION_RECOVERY_FAILURE = + RM_PREFIX + "continue-on-application-recovery-failure"; + public static final boolean DEFAULT_RM_CONTINUE_ON_APPLICATION_RECOVERY_FAILURE = + false; + /** Zookeeper interaction configs */ public static final String RM_ZK_PREFIX = RM_PREFIX + "zk-"; Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java (revision 1592934) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java (working copy) @@ -421,8 +421,18 @@ // recover applications Map appStates = state.getApplicationState(); LOG.info("Recovering " + appStates.size() + " applications"); + boolean isContinueOnRecoveryFailure = + conf.getBoolean( + YarnConfiguration.CONTINUE_ON_APPLICATION_RECOVERY_FAILURE, + YarnConfiguration.DEFAULT_RM_CONTINUE_ON_APPLICATION_RECOVERY_FAILURE); for (ApplicationState appState : appStates.values()) { - recoverApplication(appState, state); + try { + recoverApplication(appState, state); + } catch (Exception e) { + if (!isContinueOnRecoveryFailure) { + throw e; + } + } } } Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (revision 1592934) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (working copy) @@ -72,6 +72,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; @@ -1824,7 +1825,77 @@ MockAM am1 = MockRM.launchAndRegisterAM(loadedApp0, rm2, nm1); MockRM.finishAMAndVerifyAppState(loadedApp0, rm2, nm1, am1); } + + @Test (timeout = 60000) + public void testRMRestartOnRecoveryFailure() throws Exception { + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + RMState rmState = memStore.getState(); + Map rmAppState = + rmState.getApplicationState(); + // start RM + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // create app and launch the AM + RMApp app0 = rm1.submitApp(200); + MockAM am0 = launchAM(app0, rm1, nm1); + + // fail the AM by sending CONTAINER_FINISHED event without registering. + nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am0.waitForState(RMAppAttemptState.FAILED); + + ApplicationState appState = rmAppState.get(app0.getApplicationId()); + // assert the AM failed state is saved. + Assert.assertEquals(RMAppAttemptState.FAILED, + appState.getAttempt(am0.getApplicationAttemptId()).getState()); + + // assert app state has not been saved. + Assert.assertNull(rmAppState.get(app0.getApplicationId()).getState()); + + // new AM started but not registered, app still stays at ACCECPTED state. + rm1.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED); + + // start new RM + conf.setBoolean(YarnConfiguration.CONTINUE_ON_APPLICATION_RECOVERY_FAILURE, + true); + MockRM rm2 = new MockRM(conf, memStore) { + @Override + protected RMAppManager createRMAppManager() { + return new RMAppManager(this.rmContext, this.scheduler, + this.masterService, this.applicationACLsManager, conf) { + @Override + protected void recoverApplication(ApplicationState appState, + RMState rmState) throws Exception { + // Throw Exception for recovery failure + throw new YarnException("Recovery failed"); + + } + }; + } + }; + rm2.start(); + + // Verify rm2 functionality by submitting application + RMApp app1 = rm2.submitApp(200); + // assert app1 info is saved + appState = rmAppState.get(app1.getApplicationId()); + Assert.assertNotNull(appState); + Assert.assertEquals(0, appState.getAttemptCount()); + Assert.assertEquals(appState.getApplicationSubmissionContext() + .getApplicationId(), app1.getApplicationSubmissionContext() + .getApplicationId()); + + rm1.stop(); + rm2.stop(); + } + private void writeToHostsFile(String... hosts) throws IOException { if (!hostFile.exists()) { TEMP_DIR.mkdirs();