diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 12ece3f..0876b71 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -996,13 +996,7 @@ private void createNewAttempt(ApplicationAttemptId appAttemptId) { } RMAppAttempt attempt = new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService, - submissionContext, conf, - // The newly created attempt maybe last attempt if (number of - // previously failed attempts(which should not include Preempted, - // hardware error and NM resync) + 1) equal to the max-attempt - // limit. - maxAppAttempts == (getNumFailedAppAttempts() + 1), amReq, - currentAMBlacklistManager); + submissionContext, conf, amReq, currentAMBlacklistManager); attempts.put(appAttemptId, attempt); currentAttempt = attempt; } @@ -1500,18 +1494,13 @@ public void transition(RMAppImpl app, RMAppEvent event) { }; } - private int getNumFailedAppAttempts() { + public int getNumFailedAppAttempts() { int completedAttempts = 0; - long endTime = this.systemClock.getTime(); // Do not count AM preemption, hardware failures or NM resync // as attempt failure. for (RMAppAttempt attempt : attempts.values()) { if (attempt.shouldCountTowardsMaxAttemptRetry()) { - if (this.attemptFailuresValidityInterval <= 0 - || (attempt.getFinishTime() > endTime - - this.attemptFailuresValidityInterval)) { - completedAttempts++; - } + completedAttempts++; } } return completedAttempts; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index ab84985..cbb28d2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -175,12 +175,6 @@ private int amContainerExitStatus = ContainerExitStatus.INVALID; private Configuration conf; - // Since AM preemption, hardware error and NM resync are not counted towards - // AM failure count, even if this flag is true, a new attempt can still be - // re-created if this attempt is eventually failed because of preemption, - // hardware error or NM resync. So this flag indicates that this may be - // last attempt. - private final boolean maybeLastAttempt; private static final ExpiredTransition EXPIRED_TRANSITION = new ExpiredTransition(); private static final AttemptFailedTransition FAILED_TRANSITION = @@ -486,16 +480,16 @@ public RMAppAttemptImpl(ApplicationAttemptId appAttemptId, RMContext rmContext, YarnScheduler scheduler, ApplicationMasterService masterService, ApplicationSubmissionContext submissionContext, - Configuration conf, boolean maybeLastAttempt, ResourceRequest amReq) { + Configuration conf, ResourceRequest amReq) { this(appAttemptId, rmContext, scheduler, masterService, submissionContext, - conf, maybeLastAttempt, amReq, new DisabledBlacklistManager()); + conf, amReq, new DisabledBlacklistManager()); } public RMAppAttemptImpl(ApplicationAttemptId appAttemptId, RMContext rmContext, YarnScheduler scheduler, ApplicationMasterService masterService, ApplicationSubmissionContext submissionContext, - Configuration conf, boolean maybeLastAttempt, ResourceRequest amReq, + Configuration conf, ResourceRequest amReq, BlacklistManager amBlacklistManager) { this.conf = conf; this.applicationAttemptId = appAttemptId; @@ -510,7 +504,6 @@ public RMAppAttemptImpl(ApplicationAttemptId appAttemptId, this.writeLock = lock.writeLock(); this.proxiedTrackingUrl = generateProxyUriWithScheme(); - this.maybeLastAttempt = maybeLastAttempt; this.stateMachine = stateMachineFactory.make(this); this.attemptMetrics = @@ -1414,14 +1407,10 @@ public void transition(RMAppAttemptImpl appAttempt, if (appAttempt.submissionContext .getKeepContainersAcrossApplicationAttempts() && !appAttempt.submissionContext.getUnmanagedAM()) { - // See if we should retain containers for non-unmanaged applications - if (!appAttempt.shouldCountTowardsMaxAttemptRetry()) { - // Premption, hardware failures, NM resync doesn't count towards - // app-failures and so we should retain containers. - keepContainersAcrossAppAttempts = true; - } else if (!appAttempt.maybeLastAttempt) { - // Not preemption, hardware failures or NM resync. - // Not last-attempt too - keep containers. + RMApp app = appAttempt.rmContext.getRMApps().get( + appAttempt.getAppAttemptId().getApplicationId()); + int numberOfFailure = ((RMAppImpl)app).getNumFailedAppAttempts(); + if (numberOfFailure < app.getMaxAppAttempts()) { keepContainersAcrossAppAttempts = true; } } @@ -1497,6 +1486,14 @@ public void transition(RMAppAttemptImpl appAttempt, @Override public boolean shouldCountTowardsMaxAttemptRetry() { + long attemptFailuresValidityInterval = this.submissionContext + .getAttemptFailuresValidityInterval(); + long end = System.currentTimeMillis(); + if (attemptFailuresValidityInterval > 0 + && this.getFinishTime() > 0 + && this.getFinishTime() < (end - attemptFailuresValidityInterval)) { + return false; + } try { this.readLock.lock(); int exitStatus = getAMContainerExitStatus(); @@ -2175,11 +2172,6 @@ public ApplicationAttemptReport createApplicationAttemptReport() { return attemptReport; } - // for testing - public boolean mayBeLastAttempt() { - return maybeLastAttempt; - } - @Override public RMAppAttemptMetrics getRMAppAttemptMetrics() { // didn't use read/write lock here because RMAppAttemptMetrics has its own diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 7d19dab..f9f42ad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -618,15 +618,15 @@ public RMApp submitApp(int masterMemory, String name, String user, false, null, 0, null, true, Priority.newInstance(0)); } - public RMApp submitApp(int masterMemory, long attemptFailuresValidityInterval) - throws Exception { + public RMApp submitApp(int masterMemory, long attemptFailuresValidityInterval, + boolean keepContainers) throws Exception { Resource resource = Records.newRecord(Resource.class); resource.setMemorySize(masterMemory); Priority priority = Priority.newInstance(0); return submitApp(resource, "", UserGroupInformation.getCurrentUser() .getShortUserName(), null, false, null, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, - YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, keepContainers, false, null, attemptFailuresValidityInterval, null, true, priority); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index 6376290..6e50f04 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -395,7 +395,7 @@ public void handle(Event event) { mock(ApplicationSubmissionContext.class); YarnConfiguration config = new YarnConfiguration(); RMAppAttemptImpl rmAppAttemptImpl = new RMAppAttemptImpl(attemptId, - rmContext, yarnScheduler, null, asContext, config, false, null); + rmContext, yarnScheduler, null, asContext, config, null); ApplicationResourceUsageReport report = rmAppAttemptImpl .getApplicationResourceUsageReport(); assertEquals(report, RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT); @@ -1326,7 +1326,7 @@ public ApplicationReport createAndGetApplicationReport( ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance( ApplicationId.newInstance(123456, 1), 1); RMAppAttemptImpl rmAppAttemptImpl = spy(new RMAppAttemptImpl(attemptId, - rmContext, yarnScheduler, null, asContext, config, false, null)); + rmContext, yarnScheduler, null, asContext, config, null)); Container container = Container.newInstance( ContainerId.newContainerId(attemptId, 1), null, "", null, null, null); RMContainerImpl containerimpl = spy(new RMContainerImpl(container, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index 03a9645..4fa8287 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -411,7 +411,6 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception { MockAM am2 = rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1); RMAppAttempt attempt2 = app1.getCurrentAppAttempt(); - Assert.assertTrue(((RMAppAttemptImpl) attempt2).mayBeLastAttempt()); // Preempt the second attempt. ContainerId amContainer2 = @@ -427,7 +426,6 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception { MockAM am3 = rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 3, nm1); RMAppAttempt attempt3 = app1.getCurrentAppAttempt(); - Assert.assertTrue(((RMAppAttemptImpl) attempt3).mayBeLastAttempt()); // mimic NM disk_failure ContainerStatus containerStatus = Records.newRecord(ContainerStatus.class); @@ -454,7 +452,6 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception { MockAM am4 = rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 4, nm1); RMAppAttempt attempt4 = app1.getCurrentAppAttempt(); - Assert.assertTrue(((RMAppAttemptImpl) attempt4).mayBeLastAttempt()); // create second NM, and register to rm1 MockNM nm2 = @@ -475,7 +472,6 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception { MockAM am5 = rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 5, nm2); RMAppAttempt attempt5 = app1.getCurrentAppAttempt(); - Assert.assertTrue(((RMAppAttemptImpl) attempt5).mayBeLastAttempt()); // fail the AM normally nm2 .nodeHeartbeat(am5.getApplicationAttemptId(), 1, ContainerState.COMPLETE); @@ -584,7 +580,6 @@ public void testRMRestartOrFailoverNotCountedForAMFailures() // AM should be restarted even though max-am-attempt is 1. MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); - Assert.assertTrue(((RMAppAttemptImpl) attempt1).mayBeLastAttempt()); // Restart rm. MockRM rm2 = new MockRM(conf, memStore); @@ -645,7 +640,7 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception { // set window size to a larger number : 60s // we will verify the app should be failed if // two continuous attempts failed in 60s. - RMApp app = rm1.submitApp(200, 60000); + RMApp app = rm1.submitApp(200, 60000, false); MockAM am = MockRM.launchAM(app, rm1, nm1); // Fail current attempt normally @@ -655,8 +650,7 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception { // launch the second attempt rm1.waitForState(app.getApplicationId(), RMAppState.ACCEPTED); Assert.assertEquals(2, app.getAppAttempts().size()); - Assert.assertTrue(((RMAppAttemptImpl) app.getCurrentAppAttempt()) - .mayBeLastAttempt()); + MockAM am_2 = MockRM.launchAndRegisterAM(app, rm1, nm1); rm1.waitForState(am_2.getApplicationAttemptId(), RMAppAttemptState.RUNNING); nm1.nodeHeartbeat(am_2.getApplicationAttemptId(), @@ -667,7 +661,7 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception { ControlledClock clock = new ControlledClock(); // set window size to 10s - RMAppImpl app1 = (RMAppImpl)rm1.submitApp(200, 10000); + RMAppImpl app1 = (RMAppImpl)rm1.submitApp(200, 10000, false); app1.setSystemClock(clock); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); @@ -684,7 +678,6 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception { Assert.assertEquals(2, app1.getAppAttempts().size()); RMAppAttempt attempt2 = app1.getCurrentAppAttempt(); - Assert.assertTrue(((RMAppAttemptImpl) attempt2).mayBeLastAttempt()); MockAM am2 = MockRM.launchAndRegisterAM(app1, rm1, nm1); rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.RUNNING); @@ -863,4 +856,75 @@ public void testAMRestartNotLostContainerCompleteMsg() throws Exception { rm1.stop(); } + + // Test restarting AM launched with the KeepContainers and AM reset window. + // after AM reset window, even if AM who was the last is failed, + // all containers are launched by previous AM should be kept. + @Test (timeout = 20000) + public void testAMRestartNotLostContainerAfterAttemptFailuresValidityInterval() + throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + // explicitly set max-am-retry count as 2. + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + + MockRM rm1 = new MockRM(conf); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // set window size to 10s and enable keepContainers + RMAppImpl app1 = (RMAppImpl)rm1.submitApp(200, 10000, true); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + int NUM_CONTAINERS = 2; + allocateContainers(nm1, am1, NUM_CONTAINERS); + + // launch the 2nd container, for testing running container transferred. + nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING); + ContainerId containerId2 = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); + rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING); + + // Fail attempt1 normally + nm1.nodeHeartbeat(am1.getApplicationAttemptId(), + 1, ContainerState.COMPLETE); + rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED); + + // launch the second attempt + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + Assert.assertEquals(2, app1.getAppAttempts().size()); + + // It will be the last attempt. + RMAppAttempt attempt2 = app1.getCurrentAppAttempt(); + MockAM am2 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.RUNNING); + + // wait for 10 seconds to reset AM failure count + Thread.sleep(10 * 1000); + + // Fail attempt2 normally + nm1.nodeHeartbeat(am2.getApplicationAttemptId(), + 1, ContainerState.COMPLETE); + rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED); + + // can launch the third attempt successfully + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + Assert.assertEquals(3, app1.getAppAttempts().size()); + MockAM am3 = rm1.launchAM(app1, rm1, nm1); + RegisterApplicationMasterResponse registerResponse = + am3.registerAppAttempt(); + + // keepContainers is applied, even if attempt2 was the last attempt. + Assert.assertEquals(1, registerResponse.getContainersFromPreviousAttempts() + .size()); + boolean containerId2Exists = false; + Container container = registerResponse.getContainersFromPreviousAttempts().get(0); + if (container.getId().equals(containerId2)) { + containerId2Exists = true; + } + Assert.assertTrue(containerId2Exists); + + rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING); + rm1.stop(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index 786cc50..7bbef04 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -327,7 +327,7 @@ public void setUp() throws Exception { application = mock(RMAppImpl.class); applicationAttempt = new RMAppAttemptImpl(applicationAttemptId, spyRMContext, scheduler, - masterService, submissionContext, new Configuration(), false, + masterService, submissionContext, new Configuration(), BuilderUtils.newResourceRequest( RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY, submissionContext.getResource(), 1)); @@ -1107,7 +1107,7 @@ public void testLaunchedFailWhileAHSEnabled() { RMAppAttempt myApplicationAttempt = new RMAppAttemptImpl(applicationAttempt.getAppAttemptId(), spyRMContext, scheduler,masterService, - submissionContext, myConf, false, + submissionContext, myConf, BuilderUtils.newResourceRequest( RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY, submissionContext.getResource(), 1)); @@ -1581,7 +1581,7 @@ public void testContainersCleanupForLastAttempt() { applicationAttempt = new RMAppAttemptImpl(applicationAttempt.getAppAttemptId(), spyRMContext, scheduler, masterService, submissionContext, new Configuration(), - true, BuilderUtils.newResourceRequest( + BuilderUtils.newResourceRequest( RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY, submissionContext.getResource(), 1)); when(submissionContext.getKeepContainersAcrossApplicationAttempts()) @@ -1642,7 +1642,7 @@ public Allocation answer(InvocationOnMock invocation) applicationAttempt = new RMAppAttemptImpl(applicationAttempt.getAppAttemptId(), spyRMContext, scheduler, masterService, submissionContext, - new Configuration(), true, ResourceRequest.newInstance( + new Configuration(), ResourceRequest.newInstance( Priority.UNDEFINED, "host1", Resource.newInstance(3333, 1), 3, false, "label-expression")); new RMAppAttemptImpl.ScheduleTransition().transition(