diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 8ea07ab..7b3580b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -479,15 +479,22 @@ protected void addApplication(ApplicationId applicationId, applications.put(applicationId, application); queue.getMetrics().submitApp(user); - LOG.info("Accepted application " + applicationId + " from user: " + user - + ", in queue: " + queue.getName() - + ", currently num of applications: " + applications.size()); + LOG.info("Accepted application " + applicationId + " from user: " + user + + ", in queue: " + queue.getName() + + ", currently num of applications: " + applications.size()); if (isAppRecovering) { if (LOG.isDebugEnabled()) { LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED"); } - } else{ + } else { + // During tests we do not always have an application object, handle + // it here but we probably should fix the tests + if (rmApp != null) { + // Before we send out the event that the app is accepted is + // to set the queue in the submissionContext (needed on restore etc) + rmApp.getApplicationSubmissionContext().setQueue(queue.getName()); + } rmContext.getDispatcher().getEventHandler().handle( new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ParameterizedSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ParameterizedSchedulerTestBase.java index 9a29a89..4de16dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ParameterizedSchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ParameterizedSchedulerTestBase.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration; +import org.junit.After; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -111,6 +112,13 @@ protected void configureFairScheduler(YarnConfiguration conf) conf.setLong(FairSchedulerConfiguration.UPDATE_INTERVAL_MS, 10); } + @After + public void tearDown() { + if (schedulerType == SchedulerType.FAIR) { + (new File(FS_ALLOC_FILE)).delete(); + } + } + public SchedulerType getSchedulerType() { return schedulerType; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index 064e217..6d2cd38 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -1532,4 +1532,43 @@ public void testUAMRecoveryOnRMWorkPreservingRestart() throws Exception { Assert.assertEquals(RMAppState.FINISHED, recoveredApp.getState()); } + + // Apps already completed before RM restart. Make sure we restore the queue + // correctly + @Test(timeout = 20000) + public void testFairSchedulerCompletedAppsQueue() throws Exception { + if (getSchedulerType() != SchedulerType.FAIR) { + return; + } + + rm1 = new MockRM(conf); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + nm1.registerNode(); + RMApp app = rm1.submitApp(200); + MockAM am1 = MockRM.launchAndRegisterAM(app, rm1, nm1); + MockRM.finishAMAndVerifyAppState(app, rm1, nm1, am1); + + String fsQueueContext = app.getApplicationSubmissionContext().getQueue(); + String fsQueueApp = app.getQueue(); + assertEquals("Queue in app not equal to submission context", fsQueueApp, + fsQueueContext); + RMAppAttempt rmAttempt = app.getCurrentAppAttempt(); + assertNotNull("No AppAttempt found", rmAttempt); + + rm2 = new MockRM(conf, rm1.getRMStateStore()); + rm2.start(); + + RMApp recoveredApp = + rm2.getRMContext().getRMApps().get(app.getApplicationId()); + RMAppAttempt rmAttemptRecovered = recoveredApp.getCurrentAppAttempt(); + assertNotNull("No AppAttempt found after recovery", rmAttemptRecovered); + String fsQueueContextRecovered = + recoveredApp.getApplicationSubmissionContext().getQueue(); + String fsQueueAppRecovered = recoveredApp.getQueue(); + assertEquals(RMAppState.FINISHED, recoveredApp.getState()); + assertEquals("Recovered app queue is not the same as context queue", + fsQueueAppRecovered, fsQueueContextRecovered); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index af4e1dd..65ac504 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -163,30 +163,33 @@ protected ApplicationAttemptId createSchedulingRequest( protected ApplicationAttemptId createSchedulingRequest( int memory, int vcores, String queueId, String userId, int numContainers, int priority) { - ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); - scheduler.addApplication(id.getApplicationId(), queueId, userId, false); - // This conditional is for testAclSubmitApplication where app is rejected - // and no app is added. - if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) { - scheduler.addApplicationAttempt(id, false, false); - } - List ask = new ArrayList(); - ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY, - priority, numContainers, true); - ask.add(request); - + ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, + this.ATTEMPT_ID++); RMApp rmApp = mock(RMApp.class); RMAppAttempt rmAppAttempt = mock(RMAppAttempt.class); when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt); when(rmAppAttempt.getRMAppAttemptMetrics()).thenReturn( new RMAppAttemptMetrics(id, resourceManager.getRMContext())); - ApplicationSubmissionContext submissionContext = mock(ApplicationSubmissionContext.class); + ApplicationSubmissionContext submissionContext = + mock(ApplicationSubmissionContext.class); + when(rmApp.getApplicationSubmissionContext()).thenReturn(submissionContext); when(submissionContext.getUnmanagedAM()).thenReturn(false); when(rmAppAttempt.getSubmissionContext()).thenReturn(submissionContext); Container container = mock(Container.class); when(rmAppAttempt.getMasterContainer()).thenReturn(container); resourceManager.getRMContext().getRMApps() .put(id.getApplicationId(), rmApp); + scheduler.addApplication(id.getApplicationId(), queueId, userId, false); + // This conditional is for testAclSubmitApplication where app is rejected + // and no app is added. + if (scheduler.getSchedulerApplications(). + containsKey(id.getApplicationId())) { + scheduler.addApplicationAttempt(id, false, false); + } + List ask = new ArrayList(); + ResourceRequest request = createResourceRequest(memory, vcores, + ResourceRequest.ANY, priority, numContainers, true); + ask.add(request); scheduler.allocate(id, ask, new ArrayList(), null, null, NULL_UPDATE_REQUESTS); @@ -198,23 +201,25 @@ protected ApplicationAttemptId createSchedulingRequest(String queueId, String userId, List ask) { ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); - scheduler.addApplication(id.getApplicationId(), queueId, userId, false); - // This conditional is for testAclSubmitApplication where app is rejected - // and no app is added. - if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) { - scheduler.addApplicationAttempt(id, false, false); - } - RMApp rmApp = mock(RMApp.class); RMAppAttempt rmAppAttempt = mock(RMAppAttempt.class); when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt); when(rmAppAttempt.getRMAppAttemptMetrics()).thenReturn( new RMAppAttemptMetrics(id,resourceManager.getRMContext())); - ApplicationSubmissionContext submissionContext = mock(ApplicationSubmissionContext.class); + ApplicationSubmissionContext submissionContext = + mock(ApplicationSubmissionContext.class); + when(rmApp.getApplicationSubmissionContext()).thenReturn(submissionContext); when(submissionContext.getUnmanagedAM()).thenReturn(false); when(rmAppAttempt.getSubmissionContext()).thenReturn(submissionContext); resourceManager.getRMContext().getRMApps() .put(id.getApplicationId(), rmApp); + scheduler.addApplication(id.getApplicationId(), queueId, userId, false); + // This conditional is for testAclSubmitApplication where app is rejected + // and no app is added. + if (scheduler.getSchedulerApplications(). + containsKey(id.getApplicationId())) { + scheduler.addApplicationAttempt(id, false, false); + } scheduler.allocate(id, ask, new ArrayList(), null, null, NULL_UPDATE_REQUESTS); @@ -275,9 +280,11 @@ protected RMApp createMockRMApp(ApplicationAttemptId attemptId) { RMAppAttemptMetrics attemptMetric = mock(RMAppAttemptMetrics.class); when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric); when(app.getCurrentAppAttempt()).thenReturn(attempt); - ApplicationSubmissionContext submissionContext = mock(ApplicationSubmissionContext.class); + ApplicationSubmissionContext submissionContext = + mock(ApplicationSubmissionContext.class); when(submissionContext.getUnmanagedAM()).thenReturn(false); when(attempt.getSubmissionContext()).thenReturn(submissionContext); + when(app.getApplicationSubmissionContext()).thenReturn(submissionContext); resourceManager.getRMContext().getRMApps() .put(attemptId.getApplicationId(), app); return app;