diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 12ece3f..a5a261f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -1502,16 +1502,11 @@ public void transition(RMAppImpl app, RMAppEvent event) { private int getNumFailedAppAttempts() { int completedAttempts = 0; - long endTime = this.systemClock.getTime(); // Do not count AM preemption, hardware failures or NM resync // as attempt failure. for (RMAppAttempt attempt : attempts.values()) { if (attempt.shouldCountTowardsMaxAttemptRetry()) { - if (this.attemptFailuresValidityInterval <= 0 - || (attempt.getFinishTime() > endTime - - this.attemptFailuresValidityInterval)) { - completedAttempts++; - } + completedAttempts++; } } return completedAttempts; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index ab84985..20fe91c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -1423,6 +1423,14 @@ public void transition(RMAppAttemptImpl appAttempt, // Not preemption, hardware failures or NM resync. // Not last-attempt too - keep containers. keepContainersAcrossAppAttempts = true; + } else { + // After AM reset window time, it is no longer the last attempt. + long attemptFailuresValidityInterval = appAttempt.submissionContext.getAttemptFailuresValidityInterval(); + long end = System.currentTimeMillis(); + if (attemptFailuresValidityInterval > 0 + && appAttempt.getStartTime() < (end - attemptFailuresValidityInterval)) { + keepContainersAcrossAppAttempts = true; + } } } appEvent = @@ -1497,6 +1505,13 @@ public void transition(RMAppAttemptImpl appAttempt, @Override public boolean shouldCountTowardsMaxAttemptRetry() { + long attemptFailuresValidityInterval = this.submissionContext.getAttemptFailuresValidityInterval(); + long end = System.currentTimeMillis(); + if (attemptFailuresValidityInterval > 0 + && this.getFinishTime() > 0 + && this.getFinishTime() < (end - attemptFailuresValidityInterval)) { + return false; + } try { this.readLock.lock(); int exitStatus = getAMContainerExitStatus(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 7d19dab..f9f42ad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -618,15 +618,15 @@ public RMApp submitApp(int masterMemory, String name, String user, false, null, 0, null, true, Priority.newInstance(0)); } - public RMApp submitApp(int masterMemory, long attemptFailuresValidityInterval) - throws Exception { + public RMApp submitApp(int masterMemory, long attemptFailuresValidityInterval, + boolean keepContainers) throws Exception { Resource resource = Records.newRecord(Resource.class); resource.setMemorySize(masterMemory); Priority priority = Priority.newInstance(0); return submitApp(resource, "", UserGroupInformation.getCurrentUser() .getShortUserName(), null, false, null, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, - YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, keepContainers, false, null, attemptFailuresValidityInterval, null, true, priority); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index 03a9645..3bff47a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -645,7 +645,7 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception { // set window size to a larger number : 60s // we will verify the app should be failed if // two continuous attempts failed in 60s. - RMApp app = rm1.submitApp(200, 60000); + RMApp app = rm1.submitApp(200, 60000, false); MockAM am = MockRM.launchAM(app, rm1, nm1); // Fail current attempt normally @@ -667,7 +667,7 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception { ControlledClock clock = new ControlledClock(); // set window size to 10s - RMAppImpl app1 = (RMAppImpl)rm1.submitApp(200, 10000); + RMAppImpl app1 = (RMAppImpl)rm1.submitApp(200, 10000, false); app1.setSystemClock(clock); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); @@ -863,4 +863,72 @@ public void testAMRestartNotLostContainerCompleteMsg() throws Exception { rm1.stop(); } + + @Test (timeout = 20000) + public void testAMRestartNotLostContainerAfterAttemptFailuresValidityInterval() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + // explicitly set max-am-retry count as 2. + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + + MockRM rm1 = new MockRM(conf); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // set window size to 10s and enable keepContainers + RMAppImpl app1 = (RMAppImpl)rm1.submitApp(200, 10000, true); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + int NUM_CONTAINERS = 2; + allocateContainers(nm1, am1, NUM_CONTAINERS); + + // launch the 2nd container, for testing running container transferred. + nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING); + ContainerId containerId2 = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); + rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING); + + // Fail attempt1 normally + nm1.nodeHeartbeat(am1.getApplicationAttemptId(), + 1, ContainerState.COMPLETE); + rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED); + + // launch the second attempt + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + Assert.assertEquals(2, app1.getAppAttempts().size()); + + // It will be the last attempt. + RMAppAttempt attempt2 = app1.getCurrentAppAttempt(); + Assert.assertTrue(((RMAppAttemptImpl) attempt2).mayBeLastAttempt()); + MockAM am2 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.RUNNING); + + // wait for 10 seconds to reset AM failure count + Thread.sleep(10 * 1000); + + // Fail attempt2 normally + nm1.nodeHeartbeat(am2.getApplicationAttemptId(), + 1, ContainerState.COMPLETE); + rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED); + + // can launch the third attempt successfully + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + Assert.assertEquals(3, app1.getAppAttempts().size()); + MockAM am3 = rm1.launchAM(app1, rm1, nm1); + RegisterApplicationMasterResponse registerResponse = + am3.registerAppAttempt(); + + // keepContainers is applied, even if attempt2 was the last attempt. + Assert.assertEquals(1, registerResponse.getContainersFromPreviousAttempts() + .size()); + boolean containerId2Exists = false; + Container container = registerResponse.getContainersFromPreviousAttempts().get(0); + if (container.getId().equals(containerId2)) { + containerId2Exists = true; + } + Assert.assertTrue(containerId2Exists); + + rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING); + rm1.stop(); + } }