diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index 70845c7..3b43e1b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -19,7 +19,10 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.io.IOException; +import java.lang.reflect.Method; import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -90,6 +93,8 @@ private InetSocketAddress masterServiceAddress; private AccessControlList adminAcl; + private Set refreshFunctions = new HashSet(); + private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -108,6 +113,7 @@ public synchronized void serviceInit(Configuration conf) throws Exception { addIfService(createEmbeddedElectorService()); } } + this.refreshFunctions = createRefreshContexts(conf); } masterServiceAddress = conf.getSocketAddr( @@ -250,10 +256,20 @@ public synchronized void monitorHealth() @Override public synchronized void transitionToActive( HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException { + // call refreshAdminAcls before HA state transition + // for the case that adminAcls have been updated in previous active RM + try { + refreshAdminAcls(false); + } catch (YarnException ex) { + throw new ServiceFailedException("Can not execute refreshAdminAcls", ex); + } + UserGroupInformation user = checkAccess("transitionToActive"); checkHaStateChange(reqInfo); try { rm.transitionToActive(); + // call all refresh*s for active RM to get the updated configurations. + refreshAll(); RMAuditLogger.logSuccess(user.getShortUserName(), "transitionToActive", "RMHAProtocolService"); } catch (Exception e) { @@ -268,6 +284,13 @@ public synchronized void transitionToActive( @Override public synchronized void transitionToStandby( HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException { + // call refreshAdminAcls before HA state transition + // for the case that adminAcls have been updated in previous active RM + try { + refreshAdminAcls(false); + } catch (YarnException ex) { + throw new ServiceFailedException("Can not execute refreshAdminAcls", ex); + } UserGroupInformation user = checkAccess("transitionToStandby"); checkHaStateChange(reqInfo); try { @@ -406,10 +429,15 @@ public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings( @Override public RefreshAdminAclsResponse refreshAdminAcls( RefreshAdminAclsRequest request) throws YarnException, IOException { + return refreshAdminAcls(true); + } + + private RefreshAdminAclsResponse refreshAdminAcls(boolean checkRMHAState) + throws YarnException, IOException { String argName = "refreshAdminAcls"; UserGroupInformation user = checkAcls(argName); - - if (!isRMActive()) { + + if (checkRMHAState && !isRMActive()) { RMAuditLogger.logFailure(user.getShortUserName(), argName, adminAcl.toString(), "AdminService", "ResourceManager is not active. Can not refresh user-groups."); @@ -521,6 +549,68 @@ private synchronized Configuration getConfiguration(Configuration conf, return conf; } + private Set createRefreshContexts(Configuration conf) { + Set set = new HashSet(); + set.add(new RefreshContext("refreshQueues", RefreshQueuesRequest.class, + RefreshQueuesRequest.newInstance())); + set.add(new RefreshContext("refreshNodes", RefreshNodesRequest.class, + RefreshNodesRequest.newInstance())); + set.add(new RefreshContext("refreshSuperUserGroupsConfiguration", + RefreshSuperUserGroupsConfigurationRequest.class, + RefreshSuperUserGroupsConfigurationRequest.newInstance())); + set.add(new RefreshContext("refreshUserToGroupsMappings", + RefreshUserToGroupsMappingsRequest.class, + RefreshUserToGroupsMappingsRequest.newInstance())); + if (conf.getBoolean( + CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, + false)) { + set.add(new RefreshContext("refreshServiceAcls", + RefreshServiceAclsRequest.class, RefreshServiceAclsRequest + .newInstance())); + } + return Collections.unmodifiableSet(set); + } + + private void refreshAll() throws ServiceFailedException { + for (RefreshContext context : refreshFunctions) { + try { + Method api = + this.getClass().getMethod(context.getFunctionName(), + context.getParameterClass()); + api.invoke(this, context.getParamterObject()); + } catch (Exception ex) { + throw new ServiceFailedException("Can not execute " + + context.getFunctionName(), ex); + } + } + } + + private static class RefreshContext { + + private String functionName; + private Class parameterClass; + private Object paramterObject; + + public RefreshContext(String functionName, Class parameterClass, + Object paramterObject) { + this.functionName = functionName; + this.parameterClass = parameterClass; + this.paramterObject = paramterObject; + } + + public Object getParamterObject() { + return paramterObject; + } + + public Class getParameterClass() { + return parameterClass; + } + + public String getFunctionName() { + return functionName; + } + } + @VisibleForTesting public AccessControlList getAccessControlList() { return this.adminAcl; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java index e67b81f..60259cd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java @@ -34,12 +34,16 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; import org.apache.hadoop.security.GroupMappingServiceProvider; import org.apache.hadoop.security.Groups; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; +import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; @@ -518,6 +522,94 @@ public void testRefreshNodesWithFileSystemBasedConfigurationProvider() Assert.assertTrue(excludeHosts.contains("0.0.0.0:123")); } + @Test + public void testRMHAWithFileSystemBasedConfiguration() throws IOException, + YarnException { + StateChangeRequestInfo requestInfo = new StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER); + configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, + "org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider"); + configuration.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); + configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); + configuration.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2"); + int base = 100; + for (String confKey : YarnConfiguration + .getServiceAddressConfKeys(configuration)) { + configuration.set(HAUtil.addSuffix(confKey, "rm1"), "0.0.0.0:" + + (base + 20)); + configuration.set(HAUtil.addSuffix(confKey, "rm2"), "0.0.0.0:" + + (base + 40)); + base = base * 2; + } + Configuration conf1 = new Configuration(configuration); + conf1.set(YarnConfiguration.RM_HA_ID, "rm1"); + Configuration conf2 = new Configuration(configuration); + conf2.set(YarnConfiguration.RM_HA_ID, "rm2"); + + // upload default configurations + uploadDefaultConfiguration(); + + MockRM rm1 = null; + MockRM rm2 = null; + try { + rm1 = new MockRM(conf1); + rm1.init(conf1); + rm1.start(); + Assert.assertTrue(rm1.getRMContext().getHAServiceState() + == HAServiceState.STANDBY); + + rm2 = new MockRM(conf2); + rm2.init(conf1); + rm2.start(); + Assert.assertTrue(rm2.getRMContext().getHAServiceState() + == HAServiceState.STANDBY); + + rm1.adminService.transitionToActive(requestInfo); + Assert.assertTrue(rm1.getRMContext().getHAServiceState() + == HAServiceState.ACTIVE); + + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + csConf.set("yarn.scheduler.capacity.maximum-applications", "5000"); + uploadConfiguration(csConf, "capacity-scheduler.xml"); + + rm1.adminService.refreshQueues(RefreshQueuesRequest.newInstance()); + + int maxApps = + ((CapacityScheduler) rm1.getRMContext().getScheduler()) + .getConfiguration().getMaximumSystemApplications(); + Assert.assertEquals(maxApps, 5000); + + // Before failover happens, the maxApps is + // still the default value on the standby rm : rm2 + int maxAppsBeforeFailOver = + ((CapacityScheduler) rm2.getRMContext().getScheduler()) + .getConfiguration().getMaximumSystemApplications(); + Assert.assertEquals(maxAppsBeforeFailOver, 10000); + + // Do the failover + rm1.adminService.transitionToStandby(requestInfo); + rm2.adminService.transitionToActive(requestInfo); + Assert.assertTrue(rm1.getRMContext().getHAServiceState() + == HAServiceState.STANDBY); + Assert.assertTrue(rm2.getRMContext().getHAServiceState() + == HAServiceState.ACTIVE); + + int maxAppsAfter = + ((CapacityScheduler) rm2.getRMContext().getScheduler()) + .getConfiguration().getMaximumSystemApplications(); + + Assert.assertEquals(maxAppsAfter, 5000); + } finally { + if (rm1 != null) { + rm1.stop(); + } + if (rm2 != null) { + rm2.stop(); + } + } + } + private String writeConfigurationXML(Configuration conf, String confXMLName) throws IOException { DataOutputStream output = null;