diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 94dc474..a4b5d8d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -107,12 +107,15 @@ new ConcurrentHashMap(); private final AllocateResponse resync = recordFactory.newRecordInstance(AllocateResponse.class); + private final AllocateResponse shutdown = + recordFactory.newRecordInstance(AllocateResponse.class); private final RMContext rmContext; public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) { super(ApplicationMasterService.class.getName()); this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor(); this.rScheduler = scheduler; + this.shutdown.setAMCommand(AMCommand.AM_SHUTDOWN); this.resync.setAMCommand(AMCommand.AM_RESYNC); this.rmContext = rmContext; } @@ -335,20 +338,6 @@ public FinishApplicationMasterResponse finishApplicationMaster( // Allow only one thread in AM to do finishApp at a time. synchronized (lock) { - if (!hasApplicationMasterRegistered(applicationAttemptId)) { - String message = - "Application Master is trying to unregister before registering for: " - + applicationAttemptId.getApplicationId(); - LOG.error(message); - RMAuditLogger.logFailure( - this.rmContext.getRMApps() - .get(applicationAttemptId.getApplicationId()).getUser(), - AuditConstants.UNREGISTER_AM, "", "ApplicationMasterService", - message, applicationAttemptId.getApplicationId(), - applicationAttemptId); - throw new InvalidApplicationMasterRequestException(message); - } - this.amLivelinessMonitor.receivedPing(applicationAttemptId); RMApp rmApp = @@ -409,22 +398,17 @@ public AllocateResponse allocate(AllocateRequest request) AllocateResponseLock lock = responseMap.get(appAttemptId); if (lock == null) { LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId); - return resync; + return shutdown; } synchronized (lock) { AllocateResponse lastResponse = lock.getAllocateResponse(); if (!hasApplicationMasterRegistered(appAttemptId)) { String message = - "Application Master is trying to allocate before registering for: " - + appAttemptId.getApplicationId(); - LOG.error(message); - RMAuditLogger.logFailure( - this.rmContext.getRMApps().get(appAttemptId.getApplicationId()) - .getUser(), AuditConstants.REGISTER_AM, "", - "ApplicationMasterService", message, - appAttemptId.getApplicationId(), - appAttemptId); - throw new InvalidApplicationMasterRequestException(message); + "Application Master is not registered for known application: " + + appAttemptId.getApplicationId() + + ". Looks like RM rebooted. Let AM resync."; + LOG.info(message); + return resync; } if ((request.getResponseId() + 1) == lastResponse.getResponseId()) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index bbd135b..49c1ada 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -205,6 +205,11 @@ // ACCEPTED state. .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, RMAppEventType.APP_ACCEPTED) + .addTransition(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING, + RMAppEventType.ATTEMPT_UNREGISTERED, + new FinalSavingTransition( + new AttemptUnregisteredTransition(), + RMAppState.FINISHING, RMAppState.FINISHED)) // Transitions from RUNNING state .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 5b1a17d..73c7baa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -287,6 +287,9 @@ RMAppAttemptEventType.KILL, new FinalSavingTransition(new FinalTransition( RMAppAttemptState.KILLED), RMAppAttemptState.KILLED)) + .addTransition(RMAppAttemptState.LAUNCHED, + EnumSet.of(RMAppAttemptState.FINAL_SAVING, RMAppAttemptState.FINISHED), + RMAppAttemptEventType.UNREGISTERED, new AMUnregisteredTransition()) // Transitions from RUNNING State .addTransition(RMAppAttemptState.RUNNING, @@ -911,7 +914,7 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, // Add the current attempt to the scheduler. if (appAttempt.rmContext.isWorkPreservingRecoveryEnabled()) { appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent( - appAttempt.getAppAttemptId(), false)); + appAttempt.getAppAttemptId(), false, false)); } /* @@ -930,6 +933,13 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, * heart beat back). */ (new AMLaunchedTransition()).transition(appAttempt, event); + + if (appAttempt.rmContext.isWorkPreservingRecoveryEnabled()) { + // Need to register an app attempt before AM can register + appAttempt.masterService + .registerAppAttempt(appAttempt.applicationAttemptId); + } + return RMAppAttemptState.LAUNCHED; } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 5de407d..2a6bd1f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -557,7 +557,8 @@ private synchronized void addApplication(ApplicationId applicationId, private synchronized void addApplicationAttempt( ApplicationAttemptId applicationAttemptId, - boolean transferStateFromPreviousAttempt) { + boolean transferStateFromPreviousAttempt, + boolean shouldNotifyAttemptAdded) { SchedulerApplication application = applications.get(applicationAttemptId.getApplicationId()); CSQueue queue = (CSQueue) application.getQueue(); @@ -569,15 +570,20 @@ private synchronized void addApplicationAttempt( attempt.transferStateFromPreviousAttempt(application .getCurrentAppAttempt()); } + application.setCurrentAppAttempt(attempt); queue.submitApplicationAttempt(attempt, application.getUser()); LOG.info("Added Application Attempt " + applicationAttemptId + " to scheduler from user " + application.getUser() + " in queue " + queue.getQueueName()); - rmContext.getDispatcher().getEventHandler() .handle( - new RMAppAttemptEvent(applicationAttemptId, - RMAppAttemptEventType.ATTEMPT_ADDED)); + if (shouldNotifyAttemptAdded) { + rmContext.getDispatcher().getEventHandler().handle( + new RMAppAttemptEvent(applicationAttemptId, + RMAppAttemptEventType.ATTEMPT_ADDED)); + } else { + LOG.info("Skipping notifying ATTEMPT_ADDED"); + } } private synchronized void doneApplication(ApplicationId applicationId, @@ -873,7 +879,7 @@ public void handle(SchedulerEvent event) { NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event; addNode(nodeAddedEvent.getAddedRMNode()); recoverContainersOnNode(nodeAddedEvent.getContainerReports(), - nodeAddedEvent.getAddedRMNode()); + nodeAddedEvent.getAddedRMNode()); } break; case NODE_REMOVED: @@ -911,7 +917,8 @@ public void handle(SchedulerEvent event) { AppAttemptAddedSchedulerEvent appAttemptAddedEvent = (AppAttemptAddedSchedulerEvent) event; addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(), - appAttemptAddedEvent.getTransferStateFromPreviousAttempt()); + appAttemptAddedEvent.getTransferStateFromPreviousAttempt(), + appAttemptAddedEvent.getShouldNotifyAttemptAdded()); } break; case APP_ATTEMPT_REMOVED: diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java index d31010d..64d308a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java @@ -24,13 +24,22 @@ private final ApplicationAttemptId applicationAttemptId; private final boolean transferStateFromPreviousAttempt; + private final boolean shouldNotifyAttemptAdded; public AppAttemptAddedSchedulerEvent( ApplicationAttemptId applicationAttemptId, boolean transferStateFromPreviousAttempt) { + this(applicationAttemptId, transferStateFromPreviousAttempt, true); + } + + public AppAttemptAddedSchedulerEvent( + ApplicationAttemptId applicationAttemptId, + boolean transferStateFromPreviousAttempt, + boolean shouldNotifyAttemptAdded) { super(SchedulerEventType.APP_ATTEMPT_ADDED); this.applicationAttemptId = applicationAttemptId; this.transferStateFromPreviousAttempt = transferStateFromPreviousAttempt; + this.shouldNotifyAttemptAdded = shouldNotifyAttemptAdded; } public ApplicationAttemptId getApplicationAttemptId() { @@ -40,4 +49,8 @@ public ApplicationAttemptId getApplicationAttemptId() { public boolean getTransferStateFromPreviousAttempt() { return transferStateFromPreviousAttempt; } + + public boolean getShouldNotifyAttemptAdded() { + return shouldNotifyAttemptAdded; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 5725f8c..5414967 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -597,7 +597,8 @@ protected synchronized void addApplication(ApplicationId applicationId, */ protected synchronized void addApplicationAttempt( ApplicationAttemptId applicationAttemptId, - boolean transferStateFromPreviousAttempt) { + boolean transferStateFromPreviousAttempt, + boolean shouldNotifyAttemptAdded) { SchedulerApplication application = applications.get(applicationAttemptId.getApplicationId()); String user = application.getUser(); @@ -625,9 +626,14 @@ protected synchronized void addApplicationAttempt( LOG.info("Added Application Attempt " + applicationAttemptId + " to scheduler from user: " + user); - rmContext.getDispatcher().getEventHandler().handle( - new RMAppAttemptEvent(applicationAttemptId, - RMAppAttemptEventType.ATTEMPT_ADDED)); + + if (shouldNotifyAttemptAdded) { + rmContext.getDispatcher().getEventHandler().handle( + new RMAppAttemptEvent(applicationAttemptId, + RMAppAttemptEventType.ATTEMPT_ADDED)); + } else { + LOG.info("Skipping notifying ATTEMPT_ADDED"); + } } /** @@ -1131,7 +1137,8 @@ public void handle(SchedulerEvent event) { AppAttemptAddedSchedulerEvent appAttemptAddedEvent = (AppAttemptAddedSchedulerEvent) event; addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(), - appAttemptAddedEvent.getTransferStateFromPreviousAttempt()); + appAttemptAddedEvent.getTransferStateFromPreviousAttempt(), + appAttemptAddedEvent.getShouldNotifyAttemptAdded()); break; case APP_ATTEMPT_REMOVED: if (!(event instanceof AppAttemptRemovedSchedulerEvent)) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 4681516..6f6eca9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -370,7 +370,8 @@ public synchronized void addApplication(ApplicationId applicationId, @VisibleForTesting public synchronized void addApplicationAttempt(ApplicationAttemptId appAttemptId, - boolean transferStateFromPreviousAttempt) { + boolean transferStateFromPreviousAttempt, + boolean shouldNotifyAttemptAdded) { SchedulerApplication application = applications.get(appAttemptId.getApplicationId()); String user = application.getUser(); @@ -388,9 +389,13 @@ public synchronized void addApplication(ApplicationId applicationId, metrics.submitAppAttempt(user); LOG.info("Added Application Attempt " + appAttemptId + " to scheduler from user " + application.getUser()); - rmContext.getDispatcher().getEventHandler().handle( - new RMAppAttemptEvent(appAttemptId, - RMAppAttemptEventType.ATTEMPT_ADDED)); + if (shouldNotifyAttemptAdded) { + rmContext.getDispatcher().getEventHandler().handle( + new RMAppAttemptEvent(appAttemptId, + RMAppAttemptEventType.ATTEMPT_ADDED)); + } else { + LOG.info("Skipping notifying ATTEMPT_ADDED"); + } } private synchronized void doneApplication(ApplicationId applicationId, @@ -780,7 +785,8 @@ public void handle(SchedulerEvent event) { AppAttemptAddedSchedulerEvent appAttemptAddedEvent = (AppAttemptAddedSchedulerEvent) event; addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(), - appAttemptAddedEvent.getTransferStateFromPreviousAttempt()); + appAttemptAddedEvent.getTransferStateFromPreviousAttempt(), + appAttemptAddedEvent.getShouldNotifyAttemptAdded()); } break; case APP_ATTEMPT_REMOVED: diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java index 64e5cc9..36182f5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; +import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; @@ -194,28 +195,17 @@ public void testallocateBeforeAMRegistration() throws Exception { // request for containers int request = 2; - try { - AllocateResponse ar = - am.allocate("h1", 1000, request, new ArrayList()); - } catch (Exception e) { - Assert.assertEquals("Application Master is trying to allocate before " - + "registering for: " + attempt.getAppAttemptId().getApplicationId(), - e.getMessage()); - thrown = true; - } + AllocateResponse ar = + am.allocate("h1", 1000, request, new ArrayList()); + Assert.assertTrue(ar.getAMCommand() == AMCommand.AM_RESYNC); + // kick the scheduler nm1.nodeHeartbeat(true); - try { - AllocateResponse amrs = - am.allocate(new ArrayList(), - new ArrayList()); - } catch (Exception e) { - Assert.assertEquals("Application Master is trying to allocate before " - + "registering for: " + attempt.getAppAttemptId().getApplicationId(), - e.getMessage()); - thrown = true; - } - Assert.assertTrue(thrown); + AllocateResponse amrs = + am.allocate(new ArrayList(), + new ArrayList()); + Assert.assertTrue(ar.getAMCommand() == AMCommand.AM_RESYNC); + am.registerAppAttempt(); thrown = false; try { @@ -228,5 +218,17 @@ public void testallocateBeforeAMRegistration() throws Exception { thrown = true; } Assert.assertTrue(thrown); + + // Simulate an AM that was disconnected and app attempt was removed + // (responseMap does not contain attemptid) + am.unregisterAppAttempt(); + nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1, + ContainerState.COMPLETE); + am.waitForState(RMAppAttemptState.FINISHED); + + AllocateResponse amrs2 = + am.allocate(new ArrayList(), + new ArrayList()); + Assert.assertTrue(amrs2.getAMCommand() == AMCommand.AM_SHUTDOWN); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java index afe28aa..ac7336e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java @@ -263,20 +263,9 @@ public void testFinishApplicationMasterBeforeRegistering() throws Exception { FinishApplicationMasterRequest.newInstance( FinalApplicationStatus.FAILED, "", ""); Throwable cause = null; - try { - am1.unregisterAppAttempt(req, false); - } catch (Exception e) { - cause = e.getCause(); - } - Assert.assertNotNull(cause); - Assert - .assertTrue(cause instanceof InvalidApplicationMasterRequestException); - Assert.assertNotNull(cause.getMessage()); - Assert - .assertTrue(cause - .getMessage() - .contains( - "Application Master is trying to unregister before registering for:")); + am1.unregisterAppAttempt(req, false); + + am1.waitForState(RMAppAttemptState.FINISHING); } finally { if (rm != null) { rm.stop(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java index b7b77a7..aa7f631 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java @@ -238,7 +238,7 @@ public void testNodeUpdateBeforeAppAttemptInit() throws Exception { } ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 1); - scheduler.addApplicationAttempt(attId, false); + scheduler.addApplicationAttempt(attId, false, true); rm.stop(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 9c2d87e..dcd8eb6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -293,7 +293,7 @@ public void testRMRestart() throws Exception { AllocateResponse allocResponse = am1.allocate( new ArrayList(), new ArrayList()); - Assert.assertTrue(allocResponse.getAMCommand() == AMCommand.AM_RESYNC); + Assert.assertEquals(AMCommand.AM_SHUTDOWN, allocResponse.getAMCommand()); // NM should be rebooted on heartbeat, even first heartbeat for nm2 NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true); @@ -706,6 +706,45 @@ public void testRMRestartFailedApp() throws Exception { rm2.stop(); } + @Test (timeout = 600000) + public void testRMRestartWorkPreservingAppReregister() throws Exception { + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, + true); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + // start RM + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // create app and launch the AM + RMApp app0 = rm1.submitApp(200); + MockAM am0 = launchAM(app0, rm1, nm1); + + nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.RUNNING); + am0.waitForState(RMAppAttemptState.RUNNING); + rm1.waitForState(app0.getApplicationId(), RMAppState.RUNNING); + + // start new RM + MockRM rm2 = new MockRM(conf, memStore); + rm2.start(); + rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED); + rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.LAUNCHED); + + am0.setAMRMProtocol(rm2.getApplicationMasterService()); + am0.registerAppAttempt(false); + + rm2.waitForState(app0.getApplicationId(), RMAppState.RUNNING); + rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.RUNNING); + + rm1.stop(); + rm2.stop(); + } + @Test (timeout = 60000) public void testRMRestartKilledApp() throws Exception{ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, @@ -1648,7 +1687,7 @@ public void testQueueMetricsOnRMRestart() throws Exception { rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED); MockAM am1 = rm1.sendAMLaunched(attempt1.getAppAttemptId()); am1.registerAppAttempt(); - am1.allocate("127.0.0.1" , 1000, 1, new ArrayList()); + am1.allocate("127.0.0.1" , 1000, 1, new ArrayList()); nm1.nodeHeartbeat(true); List conts = am1.allocate(new ArrayList(), new ArrayList()).getAllocatedContainers(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index fb864a2..dd1e4bb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -146,7 +146,7 @@ protected ApplicationAttemptId createSchedulingRequest( // This conditional is for testAclSubmitApplication where app is rejected // and no app is added. if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) { - scheduler.addApplicationAttempt(id, false); + scheduler.addApplicationAttempt(id, false, true); } List ask = new ArrayList(); ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 9d8b1d1..c6a2231 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -787,13 +787,13 @@ public void testQueueDemandCalculation() throws Exception { ApplicationAttemptId id11 = createAppAttemptId(1, 1); scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1"); - scheduler.addApplicationAttempt(id11, false); + scheduler.addApplicationAttempt(id11, false, true); ApplicationAttemptId id21 = createAppAttemptId(2, 1); scheduler.addApplication(id21.getApplicationId(), "root.queue2", "user1"); - scheduler.addApplicationAttempt(id21, false); + scheduler.addApplicationAttempt(id21, false, true); ApplicationAttemptId id22 = createAppAttemptId(2, 2); scheduler.addApplication(id22.getApplicationId(), "root.queue2", "user1"); - scheduler.addApplicationAttempt(id22, false); + scheduler.addApplicationAttempt(id22, false, true); int minReqSize = FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB; @@ -1555,7 +1555,7 @@ public void testMultipleNodesSingleRackRequest() throws Exception { ApplicationAttemptId appId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); scheduler.addApplication(appId.getApplicationId(), "queue1", "user1"); - scheduler.addApplicationAttempt(appId, false); + scheduler.addApplicationAttempt(appId, false, true); // 1 request with 2 nodes on the same rack. another request with 1 node on // a different rack @@ -2593,7 +2593,7 @@ public void testContinuousScheduling() throws Exception { ApplicationAttemptId appAttemptId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); fs.addApplication(appAttemptId.getApplicationId(), "queue11", "user11"); - fs.addApplicationAttempt(appAttemptId, false); + fs.addApplicationAttempt(appAttemptId, false, true); List ask = new ArrayList(); ResourceRequest request = createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true);