diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 0c1df33..414d718 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -189,6 +189,7 @@ @VisibleForTesting protected String webAppAddress; private ConfigurationProvider configurationProvider = null; + private boolean needRefreshQueues = true; /** End of Active services */ private Configuration conf; @@ -1070,6 +1071,18 @@ void reinitialize(boolean initialize) { } @VisibleForTesting + public void setNeedRefreshQueues(boolean needRefreshQueues) { + this.needRefreshQueues = needRefreshQueues; + } + + private void reinitializeActiveServices() throws IOException { + // refresh queues + if (needRefreshQueues) { + rmContext.getScheduler().reinitialize(conf, rmContext); + } + } + + @VisibleForTesting protected boolean areActiveServicesRunning() { return activeServices != null && activeServices.isInState(STATE.STARTED); } @@ -1085,6 +1098,7 @@ synchronized void transitionToActive() throws Exception { @Override public Void run() throws Exception { try { + reinitializeActiveServices(); startActiveServices(); return null; } catch (Exception e) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index f92af35..da1005b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -49,6 +49,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.NetworkTopology; @@ -74,9 +75,13 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -84,6 +89,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -4641,4 +4647,67 @@ public void testContainerAllocationWithContainerIdLeap() throws Exception { long reservedId = reservedContainer1.getContainerId().getContainerId(); assertEquals(reservedId + 1, maxId); } + + @Test(timeout = 120000) + public void testRefreshQueuesWhenRMHA() throws Exception { + conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, false); + conf.setBoolean(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, false); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + HAServiceProtocol.StateChangeRequestInfo requestInfo = + new HAServiceProtocol.StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER); + + // 1. start a standby RM, file 'ALLOC_FILE' is empty, so there is no queues + MockRM rm1 = new MockRM(conf, null); + rm1.init(conf); + rm1.start(); + rm1.getAdminService().transitionToStandby(requestInfo); + + // 2. add a new queue "test_queue" + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println(" 3"); + out.println(""); + out.println(""); + out.close(); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + // 3. start a active RM + MockRM rm2 = new MockRM(conf, memStore); + rm2.init(conf); + rm2.start(); + + MockNM nm = + new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService()); + nm.registerNode(); + + rm2.getAdminService().transitionToActive(requestInfo); + + // 4. submit a app to the new added queue "test_queue" + RMApp app = rm2.submitApp(200, "test_app", "user", null, "test_queue"); + RMAppAttempt attempt0 = app.getCurrentAppAttempt(); + nm.nodeHeartbeat(true); + MockAM am0 = rm2.sendAMLaunched(attempt0.getAppAttemptId()); + am0.registerAppAttempt(); + assertEquals("root.test_queue", app.getQueue()); + + // 5. Transit rm1 to active, recover app + ((RMContextImpl)rm1.getRMContext()).setStateStore(memStore); + rm1.getAdminService().transitionToActive(requestInfo); + rm1.drainEvents(); + assertEquals(1, rm1.getRMContext().getRMApps().size()); + RMApp recoveredApp = + rm1.getRMContext().getRMApps().values().iterator().next(); + assertEquals("root.test_queue", recoveredApp.getQueue()); + + rm1.stop(); + rm2.stop(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesAppsModification.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesAppsModification.java index 53ef031..04d0651 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesAppsModification.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesAppsModification.java @@ -546,6 +546,7 @@ public void testSingleAppKillUnauthorized() throws Exception { csconf.setAcl("root", QueueACL.ADMINISTER_QUEUE, "someuser"); csconf.setAcl("root.default", QueueACL.ADMINISTER_QUEUE, "someuser"); rm.getResourceScheduler().reinitialize(csconf, rm.getRMContext()); + rm.setNeedRefreshQueues(false); } rm.start(); @@ -1071,6 +1072,7 @@ public void testUpdateAppPriority() throws Exception { csconf.setAcl("root.default", QueueACL.ADMINISTER_QUEUE, "someuser"); csconf.setAcl("root.test", QueueACL.ADMINISTER_QUEUE, "someuser"); rm.getResourceScheduler().reinitialize(csconf, rm.getRMContext()); + rm.setNeedRefreshQueues(false); rm.start(); MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048); @@ -1152,6 +1154,7 @@ public void testAppMove() throws Exception { csconf.setAcl("root.default", QueueACL.ADMINISTER_QUEUE, "someuser"); csconf.setAcl("root.test", QueueACL.ADMINISTER_QUEUE, "someuser"); rm.getResourceScheduler().reinitialize(csconf, rm.getRMContext()); + rm.setNeedRefreshQueues(false); rm.start(); MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);