diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 3845562..73c96c3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -841,7 +841,7 @@ public void recover(RMState state) { this.startTime = appState.getStartTime(); this.callerContext = appState.getCallerContext(); // If interval > 0, some attempts might have been deleted. - if (submissionContext.getAttemptFailuresValidityInterval() > 0) { + if (this.attemptFailuresValidityInterval > 0) { this.firstAttemptIdInStateStore = appState.getFirstAttemptId(); this.nextAttemptId = firstAttemptIdInStateStore; } @@ -1341,7 +1341,9 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { + "is " + numberOfFailure + ". The max attempts is " + app.maxAppAttempts); - removeExcessAttempts(app); + if (app.attemptFailuresValidityInterval > 0) { + removeExcessAttempts(app); + } if (!app.submissionContext.getUnmanagedAM() && numberOfFailure < app.maxAppAttempts) { @@ -1381,15 +1383,22 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { } private void removeExcessAttempts(RMAppImpl app) { - while (app.nextAttemptId - app.firstAttemptIdInStateStore - > app.maxAppAttempts) { + while (app.nextAttemptId + - app.firstAttemptIdInStateStore > app.maxAppAttempts) { // attempts' first element is oldest attempt because it is a // LinkedHashMap ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance( app.getApplicationId(), app.firstAttemptIdInStateStore); - app.firstAttemptIdInStateStore++; - LOG.info("Remove attempt from state store : " + attemptId); - app.rmContext.getStateStore().removeApplicationAttempt(attemptId); + RMAppAttempt rmAppAttempt = app.getRMAppAttempt(attemptId); + long endTime = app.systemClock.getTime(); + if (rmAppAttempt.getFinishTime() < (endTime + - app.attemptFailuresValidityInterval)) { + app.firstAttemptIdInStateStore++; + LOG.info("Remove attempt from state store : " + attemptId); + app.rmContext.getStateStore().removeApplicationAttempt(attemptId); + } else { + break; + } } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java index 9ceeffb..b560157 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java @@ -31,14 +31,19 @@ import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.security.AuthenticationFilterInitializer; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; @@ -316,4 +321,52 @@ protected void doSecureLogin() throws IOException { } } + @Test(timeout = 6000000) + public void testStartAfterPreemption() throws Exception { + Configuration conf = new Configuration(); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + // start RM + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + int CONTAINER_MEMORY = 1024; + // create app and launch the AM + RMApp app0 = rm1.submitApp(CONTAINER_MEMORY); + MockAM am0 = MockRM.launchAM(app0, rm1, nm1); + nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, + ContainerState.COMPLETE); + am0.waitForState(RMAppAttemptState.FAILED); + for (int i = 0; i < 4; i++) { + am0 = MockRM.launchAM(app0, rm1, nm1); + am0.registerAppAttempt(); + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + // get scheduler app + FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications() + .get(app0.getApplicationId()).getCurrentAppAttempt(); + // kill app0-attempt + cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer( + app0.getCurrentAppAttempt().getMasterContainer().getId())); + } + rm1.killApp(app0.getApplicationId()); + rm1.waitForState(app0.getCurrentAppAttempt().getAppAttemptId(), + RMAppAttemptState.KILLED); + MockRM rm2 = null; + // start RM2 + try { + rm2 = new MockRM(conf, memStore); + rm2.start(); + Assert.assertTrue("RM start successfully", true); + } catch (Exception e) { + LOG.debug("Exception on start", e); + Assert.fail("RM should start with out any issue"); + } finally { + rm1.stop(); + } + } + }