diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 70fd257..a10969d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -57,6 +57,7 @@ private static final Log LOG = LogFactory.getLog(RMAppManager.class); private int completedAppsMax = YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS; + private int globalMaxAppAttempts; private LinkedList completedApps = new LinkedList(); private final RMContext rmContext; @@ -76,6 +77,8 @@ public RMAppManager(RMContext context, setCompletedAppsMax(conf.getInt( YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS)); + globalMaxAppAttempts = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); } /** @@ -308,6 +311,7 @@ public void recover(RMState state) throws Exception { Map appStates = state.getApplicationState(); LOG.info("Recovering " + appStates.size() + " applications"); for(ApplicationState appState : appStates.values()) { + boolean shouldRecovery = true; // re-submit the application // this is going to send an app start event but since the async dispatcher // has not started that event will be queued until we have completed re @@ -318,16 +322,39 @@ public void recover(RMState state) throws Exception { // This will need to be changed in work preserving recovery in which // RM will re-connect with the running AM's instead of restarting them LOG.info("Not recovering unmanaged application " + appState.getAppId()); - store.removeApplication(appState); + shouldRecovery = false; + } + int individualMaxAppAttempts = appState.getApplicationSubmissionContext() + .getMaxAppAttempts(); + int maxAppAttempts; + if (individualMaxAppAttempts <= 0 || + individualMaxAppAttempts > globalMaxAppAttempts) { + maxAppAttempts = globalMaxAppAttempts; + LOG.warn("The specific max attempts: " + individualMaxAppAttempts + + " for application: " + appState.getAppId() + + " is invalid, because it is out of the range [1, " + + globalMaxAppAttempts + "]. Use the global max attempts instead."); } else { + maxAppAttempts = individualMaxAppAttempts; + } + if(appState.getAttemptCount() >= maxAppAttempts) { + LOG.info("Not recovering application " + appState.getAppId() + + " due to recovering attempt is beyond maxAppAttempt limit"); + shouldRecovery = false; + } + + if(shouldRecovery) { LOG.info("Recovering application " + appState.getAppId()); submitApplication(appState.getApplicationSubmissionContext(), - appState.getSubmitTime()); + appState.getSubmitTime()); // re-populate attempt information in application RMAppImpl appImpl = (RMAppImpl) rmContext.getRMApps().get( - appState.getAppId()); + appState.getAppId()); appImpl.recover(state); } + else { + store.removeApplication(appState); + } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 2057d8a..97e8b37 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -62,6 +62,7 @@ public void testRMRestart() throws Exception { "org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore"); conf.set(YarnConfiguration.RM_SCHEDULER, "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler"); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 5); MemoryRMStateStore memStore = new MemoryRMStateStore(); memStore.init(conf); @@ -306,4 +307,54 @@ public void testRMRestart() throws Exception { Assert.assertEquals(0, rmAppState.size()); } + @Test + public void testRMRestartOnMaxAppAttempts() throws Exception { + Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.DEBUG); + ExitUtil.disableSystemExit(); + + YarnConfiguration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); + conf.set(YarnConfiguration.RM_STORE, + "org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore"); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + RMState rmState = memStore.getState(); + Map rmAppState = + rmState.getApplicationState(); + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + RMApp app = rm1.submitApp(200); + app.getApplicationSubmissionContext().setMaxAppAttempts(1); + // assert app info is saved + ApplicationState appState = rmAppState.get(app.getApplicationId()); + Assert.assertNotNull(appState); + Assert.assertEquals(0, appState.getAttemptCount()); + Assert.assertEquals(appState.getApplicationSubmissionContext() + .getApplicationId(), app.getApplicationSubmissionContext() + .getApplicationId()); + nm1.nodeHeartbeat(true); + // assert app attempt is saved + RMAppAttempt attempt = app.getCurrentAppAttempt(); + ApplicationAttemptId attemptId1 = attempt.getAppAttemptId(); + rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED); + Assert.assertEquals(1, appState.getAttemptCount()); + ApplicationAttemptState attemptState = + appState.getAttempt(attemptId1); + Assert.assertNotNull(attemptState); + Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1), + attemptState.getMasterContainer().getId()); + // start new RM + MockRM rm2 = new MockRM(conf, memStore); + rm2.start(); + // verify that app info is removed + Assert.assertEquals(0, rm2.getRMContext().getRMApps().size()); + rm1.stop(); + rm2.stop(); + } }