diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 0a11948..6279c4a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -318,6 +318,10 @@ public static final String RECOVERY_ENABLED = RM_PREFIX + "recovery.enabled"; public static final boolean DEFAULT_RM_RECOVERY_ENABLED = false; + public static final String RECOVERY_IGNORE_FAILURES = + RM_PREFIX + "recovery.ignore-failures"; + public static final boolean DEFAULT_RECOVERY_IGNORE_FAILURES = true; + /** Zookeeper interaction configs */ public static final String RM_ZK_PREFIX = RM_PREFIX + "zk-"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 51024cf..55b4788 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -305,35 +305,33 @@ protected void submitApplication( appState.getApplicationSubmissionContext(); ApplicationId appId = appState.getAppId(); - // create and recover app. - RMAppImpl application = - createAndPopulateNewRMApp(appContext, appState.getSubmitTime(), - appState.getUser()); - application.recover(rmState); - if (isApplicationInFinalState(appState.getState())) { - // We are synchronously moving the application into final state so that - // momentarily client will not see this application in NEW state. Also - // for finished applications we will avoid renewing tokens. - application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER)); - return; - } - - if (UserGroupInformation.isSecurityEnabled()) { - Credentials credentials = null; - try { - credentials = parseCredentials(appContext); + try { + // create and recover app. + RMAppImpl application = + createAndPopulateNewRMApp(appContext, appState.getSubmitTime(), + appState.getUser()); + application.recover(rmState); + + // If security is enabled and the application is NOT in a final state, + // parse the credentials and renew delegation token + if (UserGroupInformation.isSecurityEnabled() && + !isApplicationInFinalState(appState.getState())) { + Credentials credentials = parseCredentials(appContext); // synchronously renew delegation token on recovery. rmContext.getDelegationTokenRenewer().addApplicationSync(appId, - credentials, appContext.getCancelTokensWhenComplete()); - application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER)); - } catch (Exception e) { - LOG.warn("Unable to parse and renew delegation tokens.", e); - this.rmContext.getDispatcher().getEventHandler() - .handle(new RMAppRejectedEvent(appId, e.getMessage())); - throw e; + credentials, appContext.getCancelTokensWhenComplete()); } - } else { + + // Actual recovery of the application application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER)); + } catch (Exception e) { + LOG.error("Failed to recover application + " + appId, e); + // Fail the application if it is a running application. + if (!isApplicationInFinalState(appState.getState())) { + rmContext.getDispatcher().getEventHandler().handle( + new RMAppRejectedEvent(appId, e.getMessage())); + } + throw e; } } @@ -421,8 +419,19 @@ public void recover(RMState state) throws Exception { // recover applications Map appStates = state.getApplicationState(); LOG.info("Recovering " + appStates.size() + " applications"); + boolean ignoreFailures = conf.getBoolean( + YarnConfiguration.RECOVERY_IGNORE_FAILURES, + YarnConfiguration.DEFAULT_RECOVERY_IGNORE_FAILURES); for (ApplicationState appState : appStates.values()) { - recoverApplication(appState, state); + try { + recoverApplication(appState, state); + } catch (Exception e) { + LOG.error("Error recovering application " + appState, e); + // Rethrow exception if we are not to ignore failures + if (!ignoreFailures) { + throw e; + } + } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 49eff8b..9d92bcd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -72,6 +72,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; @@ -1824,6 +1825,76 @@ protected void serviceStart() throws Exception { MockAM am1 = MockRM.launchAndRegisterAM(loadedApp0, rm2, nm1); MockRM.finishAMAndVerifyAppState(loadedApp0, rm2, nm1, am1); } + + @Test (timeout = 60000) + public void testRMRestartOnRecoveryFailure() throws Exception { + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + RMState rmState = memStore.getState(); + Map rmAppState = + rmState.getApplicationState(); + + // start RM + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // create app and launch the AM + RMApp app0 = rm1.submitApp(200); + MockAM am0 = launchAM(app0, rm1, nm1); + + // fail the AM by sending CONTAINER_FINISHED event without registering. + nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am0.waitForState(RMAppAttemptState.FAILED); + + ApplicationState appState = rmAppState.get(app0.getApplicationId()); + // assert the AM failed state is saved. + Assert.assertEquals(RMAppAttemptState.FAILED, + appState.getAttempt(am0.getApplicationAttemptId()).getState()); + + // assert app state has not been saved. + Assert.assertNull(rmAppState.get(app0.getApplicationId()).getState()); + + // new AM started but not registered, app still stays at ACCECPTED state. + rm1.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED); + + // start new RM + conf.setBoolean(YarnConfiguration.RECOVERY_IGNORE_FAILURES, + true); + MockRM rm2 = new MockRM(conf, memStore) { + @Override + protected RMAppManager createRMAppManager() { + return new RMAppManager(this.rmContext, this.scheduler, + this.masterService, this.applicationACLsManager, conf) { + @Override + protected void recoverApplication(ApplicationState appState, + RMState rmState) throws Exception { + // Throw Exception for recovery failure + throw new YarnException("Recovery failed"); + + } + }; + } + }; + rm2.start(); + + // Verify rm2 functionality by submitting application + RMApp app1 = rm2.submitApp(200); + // assert app1 info is saved + appState = rmAppState.get(app1.getApplicationId()); + Assert.assertNotNull(appState); + Assert.assertEquals(0, appState.getAttemptCount()); + Assert.assertEquals(appState.getApplicationSubmissionContext() + .getApplicationId(), app1.getApplicationSubmissionContext() + .getApplicationId()); + + rm1.stop(); + rm2.stop(); + } private void writeToHostsFile(String... hosts) throws IOException { if (!hostFile.exists()) {