diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 57b41af..8b62a57 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueNotFoundException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; @@ -593,7 +594,7 @@ protected synchronized void addApplication(ApplicationId applicationId, } RMApp rmApp = rmContext.getRMApps().get(applicationId); - FSLeafQueue queue = assignToQueue(rmApp, queueName, user); + FSLeafQueue queue = assignToQueue(rmApp, queueName, user, isAppRecovering); if (queue == null) { return; } @@ -681,7 +682,8 @@ protected synchronized void addApplicationAttempt( * responsible to call the appropriate event-handler if the app is rejected. */ @VisibleForTesting - FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user) { + FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user, + boolean isAppRecovering) { FSLeafQueue queue = null; String appRejectMsg = null; @@ -689,6 +691,19 @@ FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user) { QueuePlacementPolicy placementPolicy = allocConf.getPlacementPolicy(); queueName = placementPolicy.assignAppToQueue(queueName, user); if (queueName == null) { + // During a restart, indicates QueuePlacementPolicy is changed + // not presently supported + if (isAppRecovering) { + String queueErrorMsg = + "Queue named " + + queueName + + " missing during application recovery." + + " Queue removal during recovery is not presently supported by the" + + " fair scheduler, please restart with all queues configured" + + " which were present before shutdown/restart."; + LOG.fatal(queueErrorMsg); + throw new QueueNotFoundException(queueErrorMsg); + } appRejectMsg = "Application rejected by queue placement policy"; } else { queue = queueMgr.getLeafQueue(queueName, true); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index e21fcf9..c0941c8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -35,6 +35,7 @@ import java.util.Map; import java.util.Set; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.Credentials; @@ -430,7 +431,83 @@ private void initFairScheduler(ResourceManager rm) throws IOException { private static final String USER_1 = "user1"; private static final String USER_2 = "user2"; - private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) { + private Configuration setupQueueConfiguration(Class schedulerClass, + YarnConfiguration conf) throws IOException { + if (schedulerClass.equals(CapacityScheduler.class)) { + conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true); + conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, + DominantResourceCalculator.class.getName()); + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(conf); + setupCSQueueConfiguration(csConf); + return csConf; + } else if (schedulerClass.equals(FairScheduler.class)) { + FairSchedulerConfiguration fsconf = new FairSchedulerConfiguration(conf); + return fsconf; + } + return conf; + } + + private Configuration setupQueueConfigurationOnlyA(Class schedulerClass, + YarnConfiguration conf) throws IOException { + if (schedulerClass.equals(CapacityScheduler.class)) { + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(conf); + setupCSQueueConfigurationOnlyA(csConf); + return csConf; + } else if (schedulerClass.equals(FairScheduler.class)) { + FairSchedulerConfiguration fsconf = new FairSchedulerConfiguration(conf); + setupFSQueueConfigurationWithPolicyReject(fsconf); + return fsconf; + } + return conf; + } + + private Configuration setupCSQueueConfigurationOnlyA(Class schedulerClass, + YarnConfiguration conf) throws IOException { + if (schedulerClass.equals(CapacityScheduler.class)) { + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(conf); + setupCSQueueConfigurationOnlyA(csConf); + return csConf; + } else if (schedulerClass.equals(FairScheduler.class)) { + FairSchedulerConfiguration fsconf = new FairSchedulerConfiguration(conf); + setupFSQueueConfigurationWithPolicyReject(fsconf); + return fsconf; + } + return conf; + } + + private void setupFSQueueConfigurationWithPolicyReject( + FairSchedulerConfiguration conf) throws IOException { + String TEST_DIR = + new File(System.getProperty("test.build.data", "/tmp")) + .getAbsolutePath(); + String ALLOC_FILE = new File(TEST_DIR, "test-queues").getAbsolutePath(); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("2048mb,0vcores"); + out.println(""); + out.println("2048mb,0vcores"); + out.println(""); + out.println(""); + out.println("2048mb,0vcores"); + out.println(""); + out.println(""); + + out.println(""); + out.println(""); + out.println(""); + + out.println(""); + out.close(); + } + + private void setupCSQueueConfiguration(CapacitySchedulerConfiguration conf) { conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { R }); final String Q_R = CapacitySchedulerConfiguration.ROOT + "." + R; conf.setCapacity(Q_R, 100); @@ -443,7 +520,7 @@ private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) { .MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 0.5f); } - private void setupQueueConfigurationOnlyA( + private void setupCSQueueConfigurationOnlyA( CapacitySchedulerConfiguration conf) { conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { R }); final String Q_R = CapacitySchedulerConfiguration.ROOT + "." + R; @@ -476,7 +553,7 @@ public void testCapacitySchedulerRecovery() throws Exception { DominantResourceCalculator.class.getName()); CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(conf); - setupQueueConfiguration(csConf); + setupCSQueueConfiguration(csConf); MemoryRMStateStore memStore = new MemoryRMStateStore(); memStore.init(csConf); rm1 = new MockRM(csConf, memStore); @@ -586,19 +663,15 @@ public void testCapacitySchedulerRecovery() throws Exception { //2. Remove one of the queues, restart the RM //3. Verify that the expected exception was thrown @Test (timeout = 30000, expected = QueueNotFoundException.class) - public void testCapacitySchedulerQueueRemovedRecovery() throws Exception { - if (!schedulerClass.equals(CapacityScheduler.class)) { + public void testSchedulerQueueRemovedRecovery() throws Exception { + if (!schedulerClass.equals(CapacityScheduler.class) + && !schedulerClass.equals(FairScheduler.class)) { throw new QueueNotFoundException("Dummy"); } - conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true); - conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, - DominantResourceCalculator.class.getName()); - CapacitySchedulerConfiguration csConf = - new CapacitySchedulerConfiguration(conf); - setupQueueConfiguration(csConf); + Configuration schedulerConf = setupQueueConfiguration(schedulerClass, conf); MemoryRMStateStore memStore = new MemoryRMStateStore(); - memStore.init(csConf); - rm1 = new MockRM(csConf, memStore); + memStore.init(schedulerConf); + rm1 = new MockRM(schedulerConf, memStore); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); @@ -624,11 +697,8 @@ public void testCapacitySchedulerQueueRemovedRecovery() throws Exception { rm1.clearQueueMetrics(app1_2); rm1.clearQueueMetrics(app2); - // Re-start RM - csConf = - new CapacitySchedulerConfiguration(conf); - setupQueueConfigurationOnlyA(csConf); - rm2 = new MockRM(csConf, memStore); + schedulerConf = setupQueueConfigurationOnlyA(schedulerClass, conf); + rm2 = new MockRM(schedulerConf, memStore); rm2.start(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 5d079a3..f48c09b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -966,8 +966,10 @@ public void testAssignToQueue() throws Exception { RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW); RMApp rmApp2 = new MockRMApp(1, 1, RMAppState.NEW); - FSLeafQueue queue1 = scheduler.assignToQueue(rmApp1, "default", "asterix"); - FSLeafQueue queue2 = scheduler.assignToQueue(rmApp2, "notdefault", "obelix"); + FSLeafQueue queue1 = + scheduler.assignToQueue(rmApp1, "default", "asterix", false); + FSLeafQueue queue2 = + scheduler.assignToQueue(rmApp2, "notdefault", "obelix", false); // assert FSLeafQueue's name is the correct name is the one set in the RMApp assertEquals(rmApp1.getQueue(), queue1.getName()); @@ -990,8 +992,9 @@ public void testAssignToNonLeafQueueReturnsNull() throws Exception { RMApp rmApp2 = new MockRMApp(1, 1, RMAppState.NEW); // Trying to assign to non leaf queue would return null - assertNull(scheduler.assignToQueue(rmApp1, "root.child1", "tintin")); - assertNotNull(scheduler.assignToQueue(rmApp2, "root.child2", "snowy")); + assertNull(scheduler.assignToQueue(rmApp1, "root.child1", "tintin", false)); + assertNotNull(scheduler + .assignToQueue(rmApp2, "root.child2", "snowy", false)); } @Test @@ -1116,8 +1119,8 @@ public void testNestedUserQueue() throws IOException { scheduler.reinitialize(conf, resourceManager.getRMContext()); RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW); - FSLeafQueue user1Leaf = scheduler.assignToQueue(rmApp1, "root.default", - "user1"); + FSLeafQueue user1Leaf = + scheduler.assignToQueue(rmApp1, "root.default", "user1", false); assertEquals("root.user1group.user1", user1Leaf.getName()); } -- 1.9.2.msysgit.0