diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index f4f2e20..45ffeae 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppStartAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; @@ -666,7 +667,7 @@ private void createNewAttempt() { ApplicationAttemptId.newInstance(applicationId, attempts.size() + 1); RMAppAttempt attempt = new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService, - submissionContext, conf, maxAppAttempts == attempts.size()); + submissionContext, conf, maxAppAttempts <= attempts.size()); attempts.put(appAttemptId, attempt); currentAttempt = attempt; } @@ -750,7 +751,7 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { && (app.currentAttempt.getState() == RMAppAttemptState.KILLED || app.currentAttempt.getState() == RMAppAttemptState.FINISHED || (app.currentAttempt.getState() == RMAppAttemptState.FAILED - && app.attempts.size() == app.maxAppAttempts))) { + && app.getAttemptFailureCount() == app.maxAppAttempts))) { return RMAppState.ACCEPTED; } @@ -844,7 +845,7 @@ private String getAppAttemptFailedDiagnostics(RMAppEvent event) { msg = "Unmanaged application " + this.getApplicationId() + " failed due to " + failedEvent.getDiagnostics() + ". Failing the application."; - } else if (this.attempts.size() >= this.maxAppAttempts) { + } else if (getAttemptFailureCount() >= this.maxAppAttempts) { msg = "Application " + this.getApplicationId() + " failed " + this.maxAppAttempts + " times due to " + failedEvent.getDiagnostics() + ". Failing the application."; @@ -1066,6 +1067,17 @@ public void transition(RMAppImpl app, RMAppEvent event) { }; } + private int getAttemptFailureCount() { + int attemptFailures = 0; + // Do not count AM preemption as attempt failure. + for (RMAppAttempt attempt : attempts.values()) { + if (!((RMAppAttemptImpl) attempt).isPreempted()) { + attemptFailures++; + } + } + return attemptFailures; + } + private static final class AttemptFailedTransition implements MultipleArcTransition { @@ -1077,8 +1089,9 @@ public AttemptFailedTransition(RMAppState initialState) { @Override public RMAppState transition(RMAppImpl app, RMAppEvent event) { - if (!app.submissionContext.getUnmanagedAM() - && app.attempts.size() < app.maxAppAttempts) { + + if ((!app.submissionContext.getUnmanagedAM() + && app.getAttemptFailureCount() < app.maxAppAttempts)) { boolean transferStateFromPreviousAttempt = false; RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event; transferStateFromPreviousAttempt = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index e289ad5..3710c61 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -90,6 +90,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; @@ -1077,12 +1078,14 @@ public void transition(RMAppAttemptImpl appAttempt, // don't leave the tracking URL pointing to a non-existent AM appAttempt.setTrackingUrlToRMAppPage(); appAttempt.invalidateAMHostAndPort(); - if (appAttempt.submissionContext - .getKeepContainersAcrossApplicationAttempts() - && !appAttempt.isLastAttempt - && !appAttempt.submissionContext.getUnmanagedAM()) { + + if ((appAttempt.submissionContext + .getKeepContainersAcrossApplicationAttempts() && !appAttempt.submissionContext + .getUnmanagedAM()) + && (!appAttempt.isLastAttempt || appAttempt.isPreempted())) { keepContainersAcrossAppAttempts = true; } + appEvent = new RMAppFailedAttemptEvent(applicationId, RMAppEventType.ATTEMPT_FAILED, appAttempt.getDiagnostics(), @@ -1121,7 +1124,11 @@ public void transition(RMAppAttemptImpl appAttempt, appAttempt.getClientTokenMasterKey()); } } - + + public boolean isPreempted() { + return getDiagnostics().contains(SchedulerUtils.PREEMPTED_CONTAINER); + } + private static final class UnmanagedAMAttemptSavedTransition extends AMLaunchedTransition { @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index e28c18c..a6d473d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1105,14 +1105,12 @@ public void preemptContainer(ApplicationAttemptId aid, RMContainer cont) { @Override public void killContainer(RMContainer cont) { - if(LOG.isDebugEnabled()){ + if (LOG.isDebugEnabled()) { LOG.debug("KILL_CONTAINER: container" + cont.toString()); } - completedContainer(cont, - SchedulerUtils.createPreemptedContainerStatus( - cont.getContainerId(),"Container being forcibly preempted:" - + cont.getContainerId()), - RMContainerEventType.KILL); + completedContainer(cont, SchedulerUtils.createPreemptedContainerStatus( + cont.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER), + RMContainerEventType.KILL); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 1dcac06..81f3478 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -88,7 +88,7 @@ public int getHttpPort() { return httpPort; } - void setResourceTrackerService(ResourceTrackerService resourceTracker) { + public void setResourceTrackerService(ResourceTrackerService resourceTracker) { this.resourceTracker = resourceTracker; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index caee228..556af86 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -144,6 +144,19 @@ public void waitForContainerAllocated(MockNM nm, ContainerId containerId) } } + public MockAM waitForAMToRestart(ApplicationId appId, int attemptSize, + MockNM nm) throws Exception { + RMApp app = getRMContext().getRMApps().get(appId); + Assert.assertNotNull(app); + while (app.getAppAttempts().size() != attemptSize) { + System.out.println("Application " + appId + + " is waiting for AM to restart. Current has " + + app.getAppAttempts().size() + " attempts."); + Thread.sleep(200); + } + return launchAndRegisterAM(app, this, nm); + } + public void waitForState(MockNM nm, ContainerId containerId, RMContainerState containerState) throws Exception { RMContainer container = getResourceScheduler().getRMContainer(containerId); @@ -520,6 +533,7 @@ public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm) throws Exception { rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED); RMAppAttempt attempt = app.getCurrentAppAttempt(); + System.out.println("Launch AM " + attempt.getAppAttemptId()); nm.nodeHeartbeat(true); MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index bcd8c1b..fb9d204 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -22,8 +22,6 @@ import java.util.HashMap; import java.util.List; -import org.junit.Assert; - import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; @@ -38,22 +36,23 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.junit.Assert; import org.junit.Test; -/** - * Test to restart the AM on failure. - * - */ public class TestAMRestart { - @Test + @Test(timeout = 30000) public void testAMRestartWithExistingContainers() throws Exception { YarnConfiguration conf = new YarnConfiguration(); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); @@ -245,7 +244,7 @@ private void waitForContainersToFinish(int expectedNum, RMAppAttempt attempt) } } - @Test + @Test(timeout = 30000) public void testNMTokensRebindOnAMRestart() throws Exception { YarnConfiguration conf = new YarnConfiguration(); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 3); @@ -345,4 +344,95 @@ public void testNMTokensRebindOnAMRestart() throws Exception { Assert.assertTrue(transferredTokens.containsAll(expectedNMTokens)); rm1.stop(); } + + // AM container preempted should not be counted towards AM max retry count. + @Test(timeout = 20000) + public void testAMPreemptedNotCountedForAMFailures() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + // explicitly set max-am-retry count as 1. + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + MockRM rm1 = new MockRM(conf); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); + nm1.registerNode(); + RMApp app1 = rm1.submitApp(200); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + CapacityScheduler scheduler = + (CapacityScheduler) rm1.getResourceScheduler(); + ContainerId amContainer = + ContainerId.newInstance(am1.getApplicationAttemptId(), 1); + + // Forcibly preempt the am container; + scheduler.killContainer(scheduler.getRMContainer(amContainer)); + + am1.waitForState(RMAppAttemptState.FAILED); + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + + // AM should be restarted even though max-am-attempt is 1. + MockAM am2 = MockRM.launchAM(app1, rm1, nm1); + + // fail the AM normally + nm1.nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am2.waitForState(RMAppAttemptState.FAILED); + // App should not start a new AM and should be FAILED. + rm1.waitForState(app1.getApplicationId(), RMAppState.FAILED); + rm1.stop(); + } + + // Test RM restarts after AM container is preempted, new RM should not count + // AM preemption failure towards the max-retry-account and should be able to + // re-launch the AM. + @Test(timeout = 20000) + public void testPreemptedAMRestartOnRMRestart() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + // explicitly set max-am-retry count as 1. + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); + nm1.registerNode(); + RMApp app1 = rm1.submitApp(200); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + CapacityScheduler scheduler = + (CapacityScheduler) rm1.getResourceScheduler(); + ContainerId amContainer = + ContainerId.newInstance(am1.getApplicationAttemptId(), 1); + + // Forcibly preempt the am container; + scheduler.killContainer(scheduler.getRMContainer(amContainer)); + + am1.waitForState(RMAppAttemptState.FAILED); + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + + // state store has 1 attempt stored. + ApplicationState appState = + memStore.getState().getApplicationState().get(app1.getApplicationId()); + Assert.assertEquals(1, appState.getAttemptCount()); + // attempt stored has the preemption diagnostics + Assert.assertTrue(appState.getAttempt(am1.getApplicationAttemptId()) + .getDiagnostics().contains(SchedulerUtils.PREEMPTED_CONTAINER)); + + // Restart rm. + MockRM rm2 = new MockRM(conf, memStore); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + nm1.registerNode(); + rm2.start(); + + // Restarted RM should re-launch the am. + rm2.waitForAMToRestart(app1.getApplicationId(), 2, nm1); + + rm1.stop(); + rm2.stop(); + } }