diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index c521250..b26b8c7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -521,15 +521,18 @@ protected void addApplication(ApplicationId applicationId, applications.put(applicationId, application); queue.getMetrics().submitApp(user); - LOG.info("Accepted application " + applicationId + " from user: " + user - + ", in queue: " + queue.getName() - + ", currently num of applications: " + applications.size()); + LOG.info("Accepted application " + applicationId + " from user: " + user + + ", in queue: " + queue.getName() + + ", currently num of applications: " + applications.size()); if (isAppRecovering) { if (LOG.isDebugEnabled()) { LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED"); } - } else{ + } else { + // Last thing before we send out the event that the app is accepted is + // to set the queue in the submissionContext (needed on restore etc) + rmApp.getApplicationSubmissionContext().setQueue(queue.getName()); rmContext.getDispatcher().getEventHandler().handle( new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ParameterizedSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ParameterizedSchedulerTestBase.java index 00809f0..b154684 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ParameterizedSchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ParameterizedSchedulerTestBase.java @@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration; +import org.junit.After; import org.junit.Before; import java.io.File; @@ -90,6 +91,13 @@ private void configureFairScheduler(YarnConfiguration conf) throws IOException { conf.setLong(FairSchedulerConfiguration.UPDATE_INTERVAL_MS, 10); } + @After + public void tearDown() { + if (schedulerType == SchedulerType.FAIR) { + (new File(FS_ALLOC_FILE)).delete(); + } + } + public SchedulerType getSchedulerType() { return schedulerType; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index 2c37f44..08581b8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -1490,4 +1490,47 @@ public void testUAMRecoveryOnRMWorkPreservingRestart() throws Exception { Assert.assertEquals(RMAppState.FINISHED, recoveredApp.getState()); } + + // Apps already completed before RM restart. Make sure we restore the queue + // correctly + @Test(timeout = 20000) + public void testFairSchedulerCompletedAppsQueue() throws Exception { + if (getSchedulerType() != SchedulerType.FAIR) { + return; + } + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + nm1.registerNode(); + RMApp app = rm1.submitApp(200); + String fsQueueContext = app.getApplicationSubmissionContext().getQueue(); + String fsQueueApp = app.getQueue(); + assertEquals("Queue in app not equal to submission context", fsQueueApp, + fsQueueContext); + MockAM am1 = MockRM.launchAndRegisterAM(app, rm1, nm1); + RMAppAttempt rmAttempt = app.getCurrentAppAttempt(); + assertNotNull("No AppAttempt found", rmAttempt); + MockRM.finishAMAndVerifyAppState(app, rm1, nm1, am1); + + rm2 = new MockRM(conf, memStore); + rm2.start(); + + RMApp recoveredApp = + rm2.getRMContext().getRMApps().get(app.getApplicationId()); + RMAppAttempt rmAttemptRecovered = recoveredApp.getCurrentAppAttempt(); + assertNotNull("No AppAttempt found after recovery", rmAttemptRecovered); + String fsQueueContextRecovered = + recoveredApp.getApplicationSubmissionContext().getQueue(); + String fsQueueAppRecovered = recoveredApp.getQueue(); + assertEquals(RMAppState.FINISHED, recoveredApp.getState()); + assertEquals("Recovered app queue is not the same as context queue", + fsQueueAppRecovered, fsQueueContextRecovered); + + // Wait for RM to settle down on recovering containers; + Thread.sleep(3000); + } }