diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 651d0e9..67b676b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -1244,4 +1244,19 @@ public Priority getPriority() { public Map getUserWeights() { return userWeights; } + + public void recoverDrainingState() { + try { + this.writeLock.lock(); + if (getState() == QueueState.STOPPED) { + updateQueueState(QueueState.DRAINING); + } + LOG.info("Recover draining state for queue " + this.getQueuePath()); + if (getParent() != null && getParent().getState() == QueueState.STOPPED) { + ((AbstractCSQueue) getParent()).recoverDrainingState(); + } + } finally { + this.writeLock.unlock(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 776e512..2b226eb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; @@ -807,6 +808,10 @@ private void addApplicationOnRecovery(ApplicationId applicationId, } catch (AccessControlException ace) { // Ignore the exception for recovered app as the app was previously // accepted. + // If queue state is STOPPED, it should be updated to DRAINING + if (queue.getState() == QueueState.STOPPED) { + ((LeafQueue) queue).recoverDrainingState(); + } } queue.getMetrics().submitApp(user); SchedulerApplication application = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java index 9f2933e..1306930 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java @@ -32,7 +32,13 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; @@ -197,4 +203,55 @@ private FiCaSchedulerApp getMockApplication(ApplicationId appId, String user, .thenCallRealMethod(); return application; } + + + @Test (timeout = 30000) + public void testRecoverDrainingStateAfterRMRestart() throws Exception { + // init conf + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, + false); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 1); + conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {Q1}); + conf.setQueues(Q1_PATH, new String[] {Q2}); + conf.setCapacity(Q1_PATH, 100); + conf.setCapacity(Q2_PATH, 100); + + // init state store + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + RMStateStore.RMState rmState = memStore.getState(); + // init RM & NMs & Nodes + MockRM rm = new MockRM(conf, memStore); + rm.start(); + MockNM nm = rm.registerNode("h1:1234", 204800); + + // submit an app, AM is running on nm1 + RMApp app = rm.submitApp(1024, "appname", "appuser", null, Q2); + MockRM.launchAM(app, rm, nm); + rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED); + // update queue state to STOPPED + conf.setState(Q1_PATH, QueueState.STOPPED); + CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler(); + cs.reinitialize(conf, rm.getRMContext()); + // current queue state should be DRAINING + Assert.assertEquals(QueueState.DRAINING, cs.getQueue(Q2).getState()); + Assert.assertEquals(QueueState.DRAINING, cs.getQueue(Q1).getState()); + + // RM restart + rm = new MockRM(conf, memStore); + rm.start(); + rm.registerNode("h1:1234", 204800); + + // queue state should be DRAINING after app recovered + rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED); + cs = (CapacityScheduler) rm.getRMContext().getScheduler(); + Assert.assertEquals(QueueState.DRAINING, cs.getQueue(Q2).getState()); + Assert.assertEquals(QueueState.DRAINING, cs.getQueue(Q1).getState()); + + // close rm + rm.close(); + } }