diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index 4d4ad5d..d9c6937 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -47,7 +47,6 @@ import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.service.CompositeService; -import org.apache.hadoop.yarn.api.records.DecommissionType; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.conf.HAUtil; @@ -319,6 +318,24 @@ public synchronized void transitionToActive( UserGroupInformation user = checkAccess("transitionToActive"); checkHaStateChange(reqInfo); + + try { + // call all refresh*s for active RM to get the updated configurations. + refreshAllWithoutCheck(); + } catch (Exception e) { + if (YarnConfiguration.shouldRMFailFast(getConfig())) { + LOG.error("RefreshAll failed so firing fatal event", e); + rmContext + .getDispatcher() + .getEventHandler() + .handle( + new RMFatalEvent(RMFatalEventType.TRANSITION_TO_ACTIVE_FAILED, + e)); + } + throw new ServiceFailedException( + "Error on refreshAll during transition to Active", e); + } + try { rm.transitionToActive(); } catch (Exception e) { @@ -328,19 +345,7 @@ public synchronized void transitionToActive( throw new ServiceFailedException( "Error when transitioning to Active mode", e); } - try { - // call all refresh*s for active RM to get the updated configurations. - refreshAll(); - } catch (Exception e) { - LOG.error("RefreshAll failed so firing fatal event", e); - rmContext - .getDispatcher() - .getEventHandler() - .handle( - new RMFatalEvent(RMFatalEventType.TRANSITION_TO_ACTIVE_FAILED, e)); - throw new ServiceFailedException( - "Error on refreshAll during transistion to Active", e); - } + RMAuditLogger.logSuccess(user.getShortUserName(), "transitionToActive", "RM"); } @@ -408,12 +413,7 @@ public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request) RefreshQueuesResponse response = recordFactory.newRecordInstance(RefreshQueuesResponse.class); try { - rmContext.getScheduler().reinitialize(getConfig(), this.rmContext); - // refresh the reservation system - ReservationSystem rSystem = rmContext.getReservationSystem(); - if (rSystem != null) { - rSystem.reinitialize(getConfig(), rmContext); - } + refreshQueuesWithoutCheck(); RMAuditLogger.logSuccess(user.getShortUserName(), operation, "AdminService"); return response; @@ -422,6 +422,15 @@ public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request) } } + private void refreshQueuesWithoutCheck() throws IOException, YarnException { + rmContext.getScheduler().reinitialize(getConfig(), this.rmContext); + // refresh the reservation system + ReservationSystem rSystem = rmContext.getReservationSystem(); + if (rSystem != null) { + rSystem.reinitialize(getConfig(), rmContext); + } + } + @Override public RefreshNodesResponse refreshNodes(RefreshNodesRequest request) throws YarnException, StandbyException { @@ -454,6 +463,13 @@ public RefreshNodesResponse refreshNodes(RefreshNodesRequest request) } } + private void refreshNodesWithouCheck() throws IOException, YarnException { + Configuration conf = + getConfiguration(new Configuration(false), + YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); + rmContext.getNodesListManager().refreshNodes(conf); + } + @Override public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration( RefreshSuperUserGroupsConfigurationRequest request) @@ -464,6 +480,16 @@ public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfigu checkRMStatus(user.getShortUserName(), operation, "refresh super-user-groups."); + refreshSuperUserGroupsConfigurationWithoutCheck(); + RMAuditLogger.logSuccess(user.getShortUserName(), + operation, "AdminService"); + + return recordFactory.newRecordInstance( + RefreshSuperUserGroupsConfigurationResponse.class); + } + + private void refreshSuperUserGroupsConfigurationWithoutCheck() + throws IOException, YarnException { // Accept hadoop common configs in core-site.xml as well as RM specific // configurations in yarn-site.xml Configuration conf = @@ -472,11 +498,6 @@ public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfigu YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); RMServerUtils.processRMProxyUsersConf(conf); ProxyUsers.refreshSuperUserGroupsConfiguration(conf); - RMAuditLogger.logSuccess(user.getShortUserName(), - operation, "AdminService"); - - return recordFactory.newRecordInstance( - RefreshSuperUserGroupsConfigurationResponse.class); } @Override @@ -488,10 +509,7 @@ public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings( checkRMStatus(user.getShortUserName(), operation, "refresh user-groups."); - Groups.getUserToGroupsMappingService( - getConfiguration(new Configuration(false), - YarnConfiguration.CORE_SITE_CONFIGURATION_FILE)).refresh(); - + refreshUserToGroupsMappingsWithoutCheck(); RMAuditLogger.logSuccess(user.getShortUserName(), operation, "AdminService"); @@ -499,6 +517,13 @@ public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings( RefreshUserToGroupsMappingsResponse.class); } + private void refreshUserToGroupsMappingsWithoutCheck() + throws IOException, YarnException { + Groups.getUserToGroupsMappingService( + getConfiguration(new Configuration(false), + YarnConfiguration.CORE_SITE_CONFIGURATION_FILE)).refresh(); + } + @Override public RefreshAdminAclsResponse refreshAdminAcls( RefreshAdminAclsRequest request) throws YarnException, IOException { @@ -541,6 +566,15 @@ public RefreshServiceAclsResponse refreshServiceAcls( checkRMStatus(user.getShortUserName(), operation, "refresh Service ACLs."); + refreshServiceAclsWithoutCheck(); + RMAuditLogger.logSuccess(user.getShortUserName(), operation, + "AdminService"); + + return recordFactory.newRecordInstance(RefreshServiceAclsResponse.class); + } + + private void refreshServiceAclsWithoutCheck() + throws IOException, YarnException { PolicyProvider policyProvider = RMPolicyProvider.getInstance(); Configuration conf = getConfiguration(new Configuration(false), @@ -552,11 +586,6 @@ public RefreshServiceAclsResponse refreshServiceAcls( conf, policyProvider); rmContext.getResourceTrackerService().refreshServiceAcls( conf, policyProvider); - - RMAuditLogger.logSuccess(user.getShortUserName(), operation, - "AdminService"); - - return recordFactory.newRecordInstance(RefreshServiceAclsResponse.class); } private synchronized void refreshServiceAcls(Configuration configuration, @@ -689,20 +718,18 @@ private synchronized Configuration getConfiguration(Configuration conf, * Visibility could be private for test its made as default */ @VisibleForTesting - void refreshAll() throws ServiceFailedException { + void refreshAllWithoutCheck() throws ServiceFailedException { try { - refreshQueues(RefreshQueuesRequest.newInstance()); - refreshNodes(RefreshNodesRequest.newInstance(DecommissionType.NORMAL)); - refreshSuperUserGroupsConfiguration( - RefreshSuperUserGroupsConfigurationRequest.newInstance()); - refreshUserToGroupsMappings( - RefreshUserToGroupsMappingsRequest.newInstance()); + refreshQueuesWithoutCheck(); + refreshNodesWithouCheck(); + refreshSuperUserGroupsConfigurationWithoutCheck(); + refreshUserToGroupsMappingsWithoutCheck(); if (getConfig().getBoolean( CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) { - refreshServiceAcls(RefreshServiceAclsRequest.newInstance()); + refreshServiceAclsWithoutCheck(); } - refreshClusterMaxPriority(RefreshClusterMaxPriorityRequest.newInstance()); + refreshClusterMaxPriorityWithoutCheck(); } catch (Exception ex) { throw new ServiceFailedException(ex.getMessage()); } @@ -839,11 +866,7 @@ public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority( checkRMStatus(user.getShortUserName(), operation, msg); try { - Configuration conf = - getConfiguration(new Configuration(false), - YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); - - rmContext.getScheduler().setClusterMaxPriority(conf); + refreshClusterMaxPriorityWithoutCheck(); RMAuditLogger .logSuccess(user.getShortUserName(), operation, "AdminService"); @@ -854,6 +877,15 @@ public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority( } } + private void refreshClusterMaxPriorityWithoutCheck() + throws IOException, YarnException { + Configuration conf = + getConfiguration(new Configuration(false), + YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); + + rmContext.getScheduler().setClusterMaxPriority(conf); + } + public String getHAZookeeperConnectionState() { if (!rmContext.isHAEnabled()) { return "ResourceManager HA is not enabled."; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java index 905a42c..33d46e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java @@ -587,6 +587,7 @@ protected void serviceStart() throws Exception { @Test(timeout = 9000000) public void testTransitionedToActiveRefreshFail() throws Exception { configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); + configuration.setBoolean(YarnConfiguration.RM_FAIL_FAST, true); rm = new MockRM(configuration) { @Override protected AdminService createAdminService() { @@ -598,12 +599,13 @@ protected void setConfig(Configuration conf) { } @Override - protected void refreshAll() throws ServiceFailedException { + protected void refreshAllWithoutCheck() + throws ServiceFailedException { if (counter == 0) { counter++; throw new ServiceFailedException("Simulate RefreshFail"); } else { - super.refreshAll(); + super.refreshAllWithoutCheck(); } } }; @@ -628,18 +630,20 @@ protected Dispatcher createDispatcher() { assertEquals("HA state should be in standBy State", HAServiceState.STANDBY, rm.getRMContext().getHAServiceState()); try { - // Verify refreshAll call failure and check fail Event is dispatched + // Verify refreshAllWithoutCheck call failure and check fail + // Event is dispatched rm.adminService.transitionToActive(requestInfo); - Assert.fail("Transistion to Active should have failed for refreshAll()"); + Assert.fail("Transistion to Active should have failed for " + + "refreshAllWithoutCheck()"); } catch (Exception e) { assertTrue("Service fail Exception expected", e instanceof ServiceFailedException); } - // Since refreshAll failed we are expecting fatal event to be send - // Then fatal event is send RM will shutdown + // Since refreshAllWithoutCheck failed we are expecting fatal + // event to be send. Then fatal event is send RM will shutdown dispatcher.await(); assertEquals("Fatal Event to be received", 1, dispatcher.getEventCount()); - // Check of refreshAll success HA can be active + // Check of refreshAllWithoutCheck success HA can be active rm.adminService.transitionToActive(requestInfo); assertEquals(HAServiceState.ACTIVE, rm.getRMContext().getHAServiceState()); rm.adminService.transitionToStandby(requestInfo); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index dab7312..87670c4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -49,6 +49,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.NetworkTopology; @@ -74,9 +75,13 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -84,6 +89,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -4648,4 +4654,67 @@ public void testContainerAllocationWithContainerIdLeap() throws Exception { long reservedId = reservedContainer1.getContainerId().getContainerId(); assertEquals(reservedId + 1, maxId); } + + @Test(timeout = 120000) + public void testRefreshQueuesWhenRMHA() throws Exception { + conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, false); + conf.setBoolean(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, false); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + HAServiceProtocol.StateChangeRequestInfo requestInfo = + new HAServiceProtocol.StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER); + + // 1. start a standby RM, file 'ALLOC_FILE' is empty, so there is no queues + MockRM rm1 = new MockRM(conf, null); + rm1.init(conf); + rm1.start(); + rm1.getAdminService().transitionToStandby(requestInfo); + + // 2. add a new queue "test_queue" + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println(" 3"); + out.println(""); + out.println(""); + out.close(); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + // 3. start a active RM + MockRM rm2 = new MockRM(conf, memStore); + rm2.init(conf); + rm2.start(); + + MockNM nm = + new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService()); + nm.registerNode(); + + rm2.getAdminService().transitionToActive(requestInfo); + + // 4. submit a app to the new added queue "test_queue" + RMApp app = rm2.submitApp(200, "test_app", "user", null, "test_queue"); + RMAppAttempt attempt0 = app.getCurrentAppAttempt(); + nm.nodeHeartbeat(true); + MockAM am0 = rm2.sendAMLaunched(attempt0.getAppAttemptId()); + am0.registerAppAttempt(); + assertEquals("root.test_queue", app.getQueue()); + + // 5. Transit rm1 to active, recover app + ((RMContextImpl)rm1.getRMContext()).setStateStore(memStore); + rm1.getAdminService().transitionToActive(requestInfo); + rm1.drainEvents(); + assertEquals(1, rm1.getRMContext().getRMApps().size()); + RMApp recoveredApp = + rm1.getRMContext().getRMApps().values().iterator().next(); + assertEquals("root.test_queue", recoveredApp.getQueue()); + + rm1.stop(); + rm2.stop(); + } }