diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 51024cf..fc66dcb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -305,35 +305,33 @@ protected void submitApplication( appState.getApplicationSubmissionContext(); ApplicationId appId = appState.getAppId(); - // create and recover app. - RMAppImpl application = - createAndPopulateNewRMApp(appContext, appState.getSubmitTime(), - appState.getUser()); - application.recover(rmState); - if (isApplicationInFinalState(appState.getState())) { - // We are synchronously moving the application into final state so that - // momentarily client will not see this application in NEW state. Also - // for finished applications we will avoid renewing tokens. - application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER)); - return; - } - - if (UserGroupInformation.isSecurityEnabled()) { - Credentials credentials = null; - try { - credentials = parseCredentials(appContext); + try { + // create and recover app. + RMAppImpl application = + createAndPopulateNewRMApp(appContext, appState.getSubmitTime(), + appState.getUser()); + application.recover(rmState); + + // If security is enabled and the application is NOT in a final state, + // parse the credentials and renew delegation token + if (UserGroupInformation.isSecurityEnabled() && + !isApplicationInFinalState(appState.getState())) { + Credentials credentials = parseCredentials(appContext); // synchronously renew delegation token on recovery. rmContext.getDelegationTokenRenewer().addApplicationSync(appId, - credentials, appContext.getCancelTokensWhenComplete()); - application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER)); - } catch (Exception e) { - LOG.warn("Unable to parse and renew delegation tokens.", e); - this.rmContext.getDispatcher().getEventHandler() - .handle(new RMAppRejectedEvent(appId, e.getMessage())); - throw e; + credentials, appContext.getCancelTokensWhenComplete()); } - } else { + + // Actual recovery of the application application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER)); + } catch (Exception e) { + LOG.error("Failed to recover application + " + appId, e); + // Fail the application if it is a running application. + if (!isApplicationInFinalState(appState.getState())) { + rmContext.getDispatcher().getEventHandler().handle( + new RMAppRejectedEvent(appId, e.getMessage())); + } + throw e; } } @@ -422,7 +420,11 @@ public void recover(RMState state) throws Exception { Map appStates = state.getApplicationState(); LOG.info("Recovering " + appStates.size() + " applications"); for (ApplicationState appState : appStates.values()) { - recoverApplication(appState, state); + try { + recoverApplication(appState, state); + } catch (Exception e) { + LOG.error("Error recovering application " + appState, e); + } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index e289ad5..7efc760 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -715,8 +715,10 @@ private void recoverAppAttemptCredentials(Credentials appAttemptTokens) if (UserGroupInformation.isSecurityEnabled()) { byte[] clientTokenMasterKeyBytes = appAttemptTokens.getSecretKey( RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME); - clientTokenMasterKey = rmContext.getClientToAMTokenSecretManager() - .registerMasterKey(applicationAttemptId, clientTokenMasterKeyBytes); + if (clientTokenMasterKeyBytes != null) { + clientTokenMasterKey = rmContext.getClientToAMTokenSecretManager() + .registerMasterKey(applicationAttemptId, clientTokenMasterKeyBytes); + } } // Only one AMRMToken is stored per-attempt, so this should be fine. Can't diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 6a05123..423f1b8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -72,6 +72,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; @@ -1831,6 +1832,74 @@ protected void serviceStart() throws Exception { MockAM am1 = MockRM.launchAndRegisterAM(loadedApp0, rm2, nm1); MockRM.finishAMAndVerifyAppState(loadedApp0, rm2, nm1, am1); } + + @Test (timeout = 60000) + public void testRMRestartOnRecoveryFailure() throws Exception { + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + RMState rmState = memStore.getState(); + Map rmAppState = + rmState.getApplicationState(); + + // start RM + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // create app and launch the AM + RMApp app0 = rm1.submitApp(200); + MockAM am0 = launchAM(app0, rm1, nm1); + + // fail the AM by sending CONTAINER_FINISHED event without registering. + nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am0.waitForState(RMAppAttemptState.FAILED); + + ApplicationState appState = rmAppState.get(app0.getApplicationId()); + // assert the AM failed state is saved. + Assert.assertEquals(RMAppAttemptState.FAILED, + appState.getAttempt(am0.getApplicationAttemptId()).getState()); + + // assert app state has not been saved. + Assert.assertNull(rmAppState.get(app0.getApplicationId()).getState()); + + // new AM started but not registered, app still stays at ACCECPTED state. + rm1.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED); + + // start new RM + MockRM rm2 = new MockRM(conf, memStore) { + @Override + protected RMAppManager createRMAppManager() { + return new RMAppManager(this.rmContext, this.scheduler, + this.masterService, this.applicationACLsManager, conf) { + @Override + protected void recoverApplication(ApplicationState appState, + RMState rmState) throws Exception { + // Throw Exception for recovery failure + throw new YarnException("Recovery failed"); + + } + }; + } + }; + rm2.start(); + + // Verify rm2 functionality by submitting application + RMApp app1 = rm2.submitApp(200); + // assert app1 info is saved + appState = rmAppState.get(app1.getApplicationId()); + Assert.assertNotNull(appState); + Assert.assertEquals(0, appState.getAttemptCount()); + Assert.assertEquals(appState.getApplicationSubmissionContext() + .getApplicationId(), app1.getApplicationSubmissionContext() + .getApplicationId()); + + rm1.stop(); + rm2.stop(); + } private void writeToHostsFile(String... hosts) throws IOException { if (!hostFile.exists()) {