diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index 4d4ad5d..67c655b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -121,6 +121,8 @@ private UserGroupInformation daemonUser; + private boolean isTransitingToActive = false; + @VisibleForTesting boolean isCentralizedNodeLabelConfiguration = true; @@ -317,18 +319,8 @@ public synchronized void transitionToActive( throw new ServiceFailedException("Can not execute refreshAdminAcls", ex); } - UserGroupInformation user = checkAccess("transitionToActive"); - checkHaStateChange(reqInfo); - try { - rm.transitionToActive(); - } catch (Exception e) { - RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive", - "", "RM", - "Exception transitioning to active"); - throw new ServiceFailedException( - "Error when transitioning to Active mode", e); - } try { + isTransitingToActive = true; // call all refresh*s for active RM to get the updated configurations. refreshAll(); } catch (Exception e) { @@ -337,9 +329,23 @@ public synchronized void transitionToActive( .getDispatcher() .getEventHandler() .handle( - new RMFatalEvent(RMFatalEventType.TRANSITION_TO_ACTIVE_FAILED, e)); + new RMFatalEvent(RMFatalEventType.TRANSITION_TO_ACTIVE_FAILED, e)); throw new ServiceFailedException( "Error on refreshAll during transistion to Active", e); + } finally { + isTransitingToActive = false; + } + + UserGroupInformation user = checkAccess("transitionToActive"); + checkHaStateChange(reqInfo); + try { + rm.transitionToActive(); + } catch (Exception e) { + RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive", + "", "RM", + "Exception transitioning to active"); + throw new ServiceFailedException( + "Error when transitioning to Active mode", e); } RMAuditLogger.logSuccess(user.getShortUserName(), "transitionToActive", "RM"); @@ -403,7 +409,9 @@ public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request) final String msg = "refresh queues."; UserGroupInformation user = checkAcls(operation); - checkRMStatus(user.getShortUserName(), operation, msg); + if (needCheckRMStatues()) { + checkRMStatus(user.getShortUserName(), operation, msg); + } RefreshQueuesResponse response = recordFactory.newRecordInstance(RefreshQueuesResponse.class); @@ -429,7 +437,9 @@ public RefreshNodesResponse refreshNodes(RefreshNodesRequest request) final String msg = "refresh nodes."; UserGroupInformation user = checkAcls("refreshNodes"); - checkRMStatus(user.getShortUserName(), operation, msg); + if (needCheckRMStatues()) { + checkRMStatus(user.getShortUserName(), operation, msg); + } try { Configuration conf = @@ -461,8 +471,10 @@ public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfigu final String operation = "refreshSuperUserGroupsConfiguration"; UserGroupInformation user = checkAcls(operation); - checkRMStatus(user.getShortUserName(), operation, - "refresh super-user-groups."); + if (needCheckRMStatues()) { + checkRMStatus(user.getShortUserName(), operation, + "refresh super-user-groups."); + } // Accept hadoop common configs in core-site.xml as well as RM specific // configurations in yarn-site.xml @@ -486,7 +498,9 @@ public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings( final String operation = "refreshUserToGroupsMappings"; UserGroupInformation user = checkAcls(operation); - checkRMStatus(user.getShortUserName(), operation, "refresh user-groups."); + if (needCheckRMStatues()) { + checkRMStatus(user.getShortUserName(), operation, "refresh user-groups."); + } Groups.getUserToGroupsMappingService( getConfiguration(new Configuration(false), @@ -539,7 +553,10 @@ public RefreshServiceAclsResponse refreshServiceAcls( final String operation = "refreshServiceAcls"; UserGroupInformation user = checkAcls(operation); - checkRMStatus(user.getShortUserName(), operation, "refresh Service ACLs."); + if (needCheckRMStatues()) { + checkRMStatus(user.getShortUserName(), operation, + "refresh Service ACLs."); + } PolicyProvider policyProvider = RMPolicyProvider.getInstance(); Configuration conf = @@ -792,6 +809,10 @@ public ReplaceLabelsOnNodeResponse replaceLabelsOnNode( } } + private boolean needCheckRMStatues() { + return !isTransitingToActive; + } + private void checkRMStatus(String user, String operation, String msg) throws StandbyException { if (!isRMActive()) { @@ -837,7 +858,10 @@ public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority( final String msg = "refresh cluster max priority"; UserGroupInformation user = checkAcls(operation); - checkRMStatus(user.getShortUserName(), operation, msg); + if (needCheckRMStatues()) { + checkRMStatus(user.getShortUserName(), operation, msg); + } + try { Configuration conf = getConfiguration(new Configuration(false), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index dab7312..87670c4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -49,6 +49,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.NetworkTopology; @@ -74,9 +75,13 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -84,6 +89,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -4648,4 +4654,67 @@ public void testContainerAllocationWithContainerIdLeap() throws Exception { long reservedId = reservedContainer1.getContainerId().getContainerId(); assertEquals(reservedId + 1, maxId); } + + @Test(timeout = 120000) + public void testRefreshQueuesWhenRMHA() throws Exception { + conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, false); + conf.setBoolean(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, false); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + HAServiceProtocol.StateChangeRequestInfo requestInfo = + new HAServiceProtocol.StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER); + + // 1. start a standby RM, file 'ALLOC_FILE' is empty, so there is no queues + MockRM rm1 = new MockRM(conf, null); + rm1.init(conf); + rm1.start(); + rm1.getAdminService().transitionToStandby(requestInfo); + + // 2. add a new queue "test_queue" + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println(" 3"); + out.println(""); + out.println(""); + out.close(); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + // 3. start a active RM + MockRM rm2 = new MockRM(conf, memStore); + rm2.init(conf); + rm2.start(); + + MockNM nm = + new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService()); + nm.registerNode(); + + rm2.getAdminService().transitionToActive(requestInfo); + + // 4. submit a app to the new added queue "test_queue" + RMApp app = rm2.submitApp(200, "test_app", "user", null, "test_queue"); + RMAppAttempt attempt0 = app.getCurrentAppAttempt(); + nm.nodeHeartbeat(true); + MockAM am0 = rm2.sendAMLaunched(attempt0.getAppAttemptId()); + am0.registerAppAttempt(); + assertEquals("root.test_queue", app.getQueue()); + + // 5. Transit rm1 to active, recover app + ((RMContextImpl)rm1.getRMContext()).setStateStore(memStore); + rm1.getAdminService().transitionToActive(requestInfo); + rm1.drainEvents(); + assertEquals(1, rm1.getRMContext().getRMApps().size()); + RMApp recoveredApp = + rm1.getRMContext().getRMApps().values().iterator().next(); + assertEquals("root.test_queue", recoveredApp.getQueue()); + + rm1.stop(); + rm2.stop(); + } }