diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 523e6be..ab23d61 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -205,12 +205,6 @@ .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, RMAppEventType.APP_RUNNING_ON_NODE, new AppRunningOnNodeTransition()) - // ACCECPTED state can once again receive APP_ACCEPTED event, because on - // recovery the app returns ACCEPTED state and the app once again go - // through the scheduler and triggers one more APP_ACCEPTED event at - // ACCEPTED state. - .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, - RMAppEventType.APP_ACCEPTED) // Transitions from RUNNING state .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, @@ -789,8 +783,17 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { return app.recoveredFinalState; } - // Notify scheduler about the app on recovery - new AddApplicationToSchedulerTransition().transition(app, event); + // No existent attempts means the attempt associated with this app was not + // started or started but not yet saved. + if (app.attempts.isEmpty()) { + app.scheduler.handle(new AppAddedSchedulerEvent(app.applicationId, + app.submissionContext.getQueue(), app.user, true)); + return RMAppState.SUBMITTED; + } + + // Notify scheduler about the app on recovery synchronously + app.scheduler.handle(new AppAddedSchedulerEvent(app.applicationId, + app.submissionContext.getQueue(), app.user, false)); // recover attempts app.recoverAppAttempts(); @@ -805,12 +808,6 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { return RMAppState.ACCEPTED; } - // No existent attempts means the attempt associated with this app was not - // started or started but not yet saved. - if (app.attempts.isEmpty()) { - return RMAppState.SUBMITTED; - } - // YARN-1507 is saving the application state after the application is // accepted. So after YARN-1507, an app is saved meaning it is accepted. // Thus we return ACCECPTED state on recovery. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 000227a..420f947 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -926,7 +926,7 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, appAttempt.masterService .registerAppAttempt(appAttempt.applicationAttemptId); - appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent( + appAttempt.scheduler.handle(new AppAttemptAddedSchedulerEvent( appAttempt.getAppAttemptId(), false, false)); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 92727e3..fa9f00d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -521,7 +521,7 @@ synchronized CSQueue getQueue(String queueName) { } private synchronized void addApplication(ApplicationId applicationId, - String queueName, String user) { + String queueName, String user, boolean shouldNotifyAppAccepted) { // santiy checks. CSQueue queue = getQueue(queueName); if (queue == null) { @@ -553,8 +553,14 @@ private synchronized void addApplication(ApplicationId applicationId, applications.put(applicationId, application); LOG.info("Accepted application " + applicationId + " from user: " + user + ", in queue: " + queueName); - rmContext.getDispatcher().getEventHandler() + if (shouldNotifyAppAccepted) + rmContext.getDispatcher().getEventHandler() .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); + else { + if (LOG.isDebugEnabled()) { + LOG.debug("Skip notifying APP_ACCEPTED"); + } + } } private synchronized void addApplicationAttempt( @@ -905,7 +911,8 @@ public void handle(SchedulerEvent event) { { AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; addApplication(appAddedEvent.getApplicationId(), - appAddedEvent.getQueue(), appAddedEvent.getUser()); + appAddedEvent.getQueue(), appAddedEvent.getUser(), + appAddedEvent.getShouldNotifyAppAccepted()); } break; case APP_REMOVED: diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java index d6fb36d..7948451 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java @@ -25,13 +25,20 @@ private final ApplicationId applicationId; private final String queue; private final String user; + private final boolean shouldNotifyAppAccepted; public AppAddedSchedulerEvent( ApplicationId applicationId, String queue, String user) { + this(applicationId, queue, user, true); + } + + public AppAddedSchedulerEvent(ApplicationId applicationId, String queue, + String user, boolean shouldNotifyAppAccepted) { super(SchedulerEventType.APP_ADDED); this.applicationId = applicationId; this.queue = queue; this.user = user; + this.shouldNotifyAppAccepted = shouldNotifyAppAccepted; } public ApplicationId getApplicationId() { @@ -46,4 +53,7 @@ public String getUser() { return user; } + public boolean getShouldNotifyAppAccepted() { + return shouldNotifyAppAccepted; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 6f9b76f..7e36270 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -565,7 +565,7 @@ public FairSchedulerEventLog getEventLog() { * configured limits, but the app will not be marked as runnable. */ protected synchronized void addApplication(ApplicationId applicationId, - String queueName, String user) { + String queueName, String user, boolean shouldNotifyAppAccepted) { if (queueName == null || queueName.isEmpty()) { String message = "Reject application " + applicationId + " submitted by user " + user + " with an empty queue name."; @@ -602,8 +602,14 @@ protected synchronized void addApplication(ApplicationId applicationId, LOG.info("Accepted application " + applicationId + " from user: " + user + ", in queue: " + queueName + ", currently num of applications: " + applications.size()); - rmContext.getDispatcher().getEventHandler() + if (shouldNotifyAppAccepted) + rmContext.getDispatcher().getEventHandler() .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); + else { + if (LOG.isDebugEnabled()) { + LOG.debug("Skip notifying APP_ACCEPTED"); + } + } } /** @@ -1135,7 +1141,8 @@ public void handle(SchedulerEvent event) { } AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; addApplication(appAddedEvent.getApplicationId(), - appAddedEvent.getQueue(), appAddedEvent.getUser()); + appAddedEvent.getQueue(), appAddedEvent.getUser(), + appAddedEvent.getShouldNotifyAppAccepted()); break; case APP_REMOVED: if (!(event instanceof AppRemovedSchedulerEvent)) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index b017db7..42a3db9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -356,15 +356,21 @@ private FiCaSchedulerNode getNode(NodeId nodeId) { @VisibleForTesting public synchronized void addApplication(ApplicationId applicationId, - String queue, String user) { + String queue, String user, boolean shouldNotifyAppAccepted) { SchedulerApplication application = new SchedulerApplication(DEFAULT_QUEUE, user); applications.put(applicationId, application); metrics.submitApp(user); LOG.info("Accepted application " + applicationId + " from user: " + user + ", currently num of applications: " + applications.size()); - rmContext.getDispatcher().getEventHandler() + if (shouldNotifyAppAccepted) + rmContext.getDispatcher().getEventHandler() .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); + else { + if (LOG.isDebugEnabled()) { + LOG.debug("Skip notifying APP_ACCEPTED"); + } + } } @VisibleForTesting @@ -772,7 +778,8 @@ public void handle(SchedulerEvent event) { { AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; addApplication(appAddedEvent.getApplicationId(), - appAddedEvent.getQueue(), appAddedEvent.getUser()); + appAddedEvent.getQueue(), appAddedEvent.getUser(), + appAddedEvent.getShouldNotifyAppAccepted()); } break; case APP_REMOVED: diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java index aa7f631..b8b4133 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java @@ -228,7 +228,7 @@ public void testNodeUpdateBeforeAppAttemptInit() throws Exception { scheduler.handle(new NodeAddedSchedulerEvent(node)); ApplicationId appId = ApplicationId.newInstance(0, 1); - scheduler.addApplication(appId, "queue1", "user1"); + scheduler.addApplication(appId, "queue1", "user1", true); NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); try { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index fb5c3a3..989bff1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -602,6 +602,27 @@ public void testAMContainerStatusWithRMRestart() throws Exception { attempt0.getMasterContainer().getId()).isAMContainer()); } + @Test (timeout = 20000) + public void testRecoverSchedulerAppAndAttemptSynchrnously() throws Exception { + // start RM + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // create app and launch the AM + RMApp app0 = rm1.submitApp(200); + MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1); + + rm2 = new MockRM(conf, memStore); + rm2.start(); + // scheduler app/attempt is immediately available after RM is re-started. + Assert.assertNotNull(rm2.getResourceScheduler().getSchedulerAppInfo( + am0.getApplicationAttemptId())); + } private void asserteMetrics(QueueMetrics qm, int appsSubmitted, int appsPending, int appsRunning, int appsCompleted, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index ec942f9..9fd37ad 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -147,7 +147,7 @@ protected ApplicationAttemptId createSchedulingRequest( int memory, int vcores, String queueId, String userId, int numContainers, int priority) { ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); - scheduler.addApplication(id.getApplicationId(), queueId, userId); + scheduler.addApplication(id.getApplicationId(), queueId, userId, true); // This conditional is for testAclSubmitApplication where app is rejected // and no app is added. if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index dbc79d9..f7c3429 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -788,13 +788,13 @@ public void testQueueDemandCalculation() throws Exception { scheduler.reinitialize(conf, resourceManager.getRMContext()); ApplicationAttemptId id11 = createAppAttemptId(1, 1); - scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1"); + scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1", true); scheduler.addApplicationAttempt(id11, false, true); ApplicationAttemptId id21 = createAppAttemptId(2, 1); - scheduler.addApplication(id21.getApplicationId(), "root.queue2", "user1"); + scheduler.addApplication(id21.getApplicationId(), "root.queue2", "user1", true); scheduler.addApplicationAttempt(id21, false, true); ApplicationAttemptId id22 = createAppAttemptId(2, 2); - scheduler.addApplication(id22.getApplicationId(), "root.queue2", "user1"); + scheduler.addApplication(id22.getApplicationId(), "root.queue2", "user1", true); scheduler.addApplicationAttempt(id22, false, true); int minReqSize = @@ -1556,7 +1556,7 @@ public void testMultipleNodesSingleRackRequest() throws Exception { scheduler.handle(nodeEvent2); ApplicationAttemptId appId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); - scheduler.addApplication(appId.getApplicationId(), "queue1", "user1"); + scheduler.addApplication(appId.getApplicationId(), "queue1", "user1", true); scheduler.addApplicationAttempt(appId, false, true); // 1 request with 2 nodes on the same rack. another request with 1 node on @@ -1838,7 +1838,7 @@ public void testNotAllowSubmitApplication() throws Exception { ApplicationAttemptId attId = ApplicationAttemptId.newInstance(applicationId, this.ATTEMPT_ID++); - scheduler.addApplication(attId.getApplicationId(), queue, user); + scheduler.addApplication(attId.getApplicationId(), queue, user, true); numTries = 0; while (application.getFinishTime() == 0 && numTries < MAX_TRIES) { @@ -2715,7 +2715,7 @@ public void testContinuousScheduling() throws Exception { // send application request ApplicationAttemptId appAttemptId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); - fs.addApplication(appAttemptId.getApplicationId(), "queue11", "user11"); + fs.addApplication(appAttemptId.getApplicationId(), "queue11", "user11", true); fs.addApplicationAttempt(appAttemptId, false, true); List ask = new ArrayList(); ResourceRequest request =