diff --git hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 8c578fd..9b78a75 100644 --- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -447,7 +447,17 @@ public RpcDetailedMetrics getRpcDetailedMetrics() { * Refresh the service authorization ACL for the service handled by this server. */ public void refreshServiceAcl(Configuration conf, PolicyProvider provider) { - serviceAuthorizationManager.refresh(conf, provider); + serviceAuthorizationManager.refresh(conf, provider, false); + } + + /** + * Refresh the service authorization ACL for the service handled by this server. + * If useRemoteConfiguration is set as true, + * refresh the service using the remoteConfiguration + */ + public void refreshServiceAcl(Configuration conf, PolicyProvider provider, + boolean useRemoteConfiguration) { + serviceAuthorizationManager.refresh(conf, provider, useRemoteConfiguration); } /** diff --git hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ServiceAuthorizationManager.java hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ServiceAuthorizationManager.java index 8523f38..bce58a2 100644 --- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ServiceAuthorizationManager.java +++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ServiceAuthorizationManager.java @@ -112,15 +112,16 @@ public void authorize(UserGroupInformation user, } public synchronized void refresh(Configuration conf, - PolicyProvider provider) { + PolicyProvider provider, boolean useRemoteConfiguration) { // Get the system property 'hadoop.policy.file' String policyFile = System.getProperty("hadoop.policy.file", HADOOP_POLICY_FILE); // Make a copy of the original config, and load the policy file Configuration policyConf = new Configuration(conf); - policyConf.addResource(policyFile); - + if (! useRemoteConfiguration) { + policyConf.addResource(policyFile); + } final Map, AccessControlList> newAcls = new IdentityHashMap, AccessControlList>(); diff --git hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java index d557309..8a8a8b0 100644 --- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java +++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java @@ -52,9 +52,14 @@ public static void readFileToSet(String type, String filename, Set set) throws IOException { File file = new File(filename); FileInputStream fis = new FileInputStream(file); + readFileToSet(type, filename, fis, set); + } + + public static void readFileToSet(String type, String filename, + InputStream fInputStream, Set set) throws IOException { BufferedReader reader = null; try { - reader = new BufferedReader(new InputStreamReader(fis)); + reader = new BufferedReader(new InputStreamReader(fInputStream)); String line; while ((line = reader.readLine()) != null) { String[] nodes = line.split("[ \t\n\f\r]+"); @@ -71,13 +76,13 @@ public static void readFileToSet(String type, } } } - } + } } finally { if (reader != null) { reader.close(); } - fis.close(); - } + fInputStream.close(); + } } public synchronized void refresh() throws IOException { @@ -96,6 +101,26 @@ public synchronized void refresh() throws IOException { } } + public synchronized void refresh(String includeHostFile, + InputStream includeHostFileInputStream, String excludeHostFile, + InputStream excludeHostFileInputStream) throws IOException { + LOG.info("Refreshing hosts (include/exclude) list"); + if (includeHostFileInputStream != null) { + Set newIncludes = new HashSet(); + readFileToSet("included", includeHostFile, includeHostFileInputStream, + newIncludes); + // switch the new hosts that are to be included + includes = newIncludes; + } + if (excludeHostFileInputStream != null) { + Set newExcludes = new HashSet(); + readFileToSet("excluded", includeHostFile, excludeHostFileInputStream, + newExcludes); + // switch the excluded hosts + excludes = newExcludes; + } + } + public synchronized Set getHosts() { return includes; } diff --git hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 65bae76..1345d90 100644 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -775,9 +775,6 @@ Release 2.4.0 - UNRELEASED HDFS-5777. Update LayoutVersion for the new editlog op OP_ADD_BLOCK. (jing9) - HDFS-5800. Fix a typo in DFSClient.renewLease(). (Kousuke Saruta - via szetszwo) - BREAKDOWN OF HDFS-2832 SUBTASKS AND RELATED JIRAS HDFS-4985. Add storage type to the protocol and expose it in block report diff --git hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index b522acb..ab98fa0 100644 --- hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -770,7 +770,7 @@ boolean renewLease() throws IOException { final long elapsed = Time.now() - getLastLeaseRenewal(); if (elapsed > HdfsConstants.LEASE_HARDLIMIT_PERIOD) { LOG.warn("Failed to renew lease for " + clientName + " for " - + (elapsed/1000) + " seconds (>= hard-limit =" + + (elapsed/1000) + " seconds (>= soft-limit =" + (HdfsConstants.LEASE_HARDLIMIT_PERIOD/1000) + " seconds.) " + "Closing all files being written ...", e); closeAllFilesBeingWritten(true); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index dc19585..ab4277b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -318,6 +318,12 @@ public static final String RM_HA_IDS = RM_HA_PREFIX + "rm-ids"; public static final String RM_HA_ID = RM_HA_PREFIX + "id"; + /** Store the related configuration files in File System */ + public static final String RM_CONF_STORE = RM_PREFIX + "conf.store"; + public static final String DEFAULT_RM_CONF_STORE = "/confStore"; + + public static final String RM_REMOTE_CONFIGURATION = RM_PREFIX + "remote-configuration.class"; + @Private public static final List RM_SERVICES_ADDRESS_CONF_KEYS = Collections.unmodifiableList(Arrays.asList( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java index 8900b16..780d5d4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java @@ -23,26 +23,59 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; +import java.io.PrintWriter; import java.net.HttpURLConnection; import java.net.URL; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.ha.ClientBaseWithFixes; import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.proto.HAServiceProtocolProtos; +import org.apache.hadoop.security.GroupMappingServiceProvider; +import org.apache.hadoop.security.Groups; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authorize.AuthorizationException; +import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.service.Service.STATE; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest; import org.apache.hadoop.yarn.server.resourcemanager.AdminService; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer; +import org.apache.hadoop.yarn.util.Records; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -98,6 +131,14 @@ public void setup() throws IOException { conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true); conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC, true); + conf.setClass("hadoop.security.group.mapping", + MockUnixGroupsMapping.class, + GroupMappingServiceProvider.class); + Groups.getUserToGroupsMappingService(conf); + + FileSystem fs = FileSystem.get(conf); + Path workingPath = new Path(fs.getHomeDirectory(), "confStore"); + conf.set(YarnConfiguration.RM_CONF_STORE, workingPath.toString()); cluster = new MiniYARNCluster(TestRMFailover.class.getName(), 2, 1, 1, 1); } @@ -252,4 +293,339 @@ private void verifyExpectedException(String exceptionMessage){ .contains("Application with id '" + fakeAppId + "' " + "doesn't exist in RM.")); } + + @Test + public void testAdminServiceRefreshNodesOnHA() + throws IOException, YarnException { + cluster.init(conf); + cluster.start(); + getAdminService(0).transitionToActive(req); + assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); + + //clean the remoteDirectory + getAdminService(0).getRemoteConfiguration().cleanRemoteDirectory(); + + RefreshNodesRequest request = RefreshNodesRequest.newInstance(); + getAdminService(0).refreshNodes(request); + + Set excludeHostsBefore = + cluster.getResourceManager(0).getRMContext().getNodesListManager() + .getHostsReader().getExcludedHosts(); + Assert.assertEquals(excludeHostsBefore.size(), 0); + + final File basedir = + new File("target", TestRMFailover.class.getName()); + final File tmpDir = new File(basedir, "tmpDir"); + tmpDir.mkdirs(); + final File excludeHostsFile = new File(tmpDir, "exclude_hosts"); + if (excludeHostsFile.exists()) { + excludeHostsFile.delete(); + } + if(!excludeHostsFile.createNewFile()) { + Assert.fail("Can not create excludeHostsFile."); + } + + PrintWriter fileWriter = new PrintWriter(excludeHostsFile); + fileWriter.write("0.0.0.0:123"); + fileWriter.close(); + + //upload the file into Remote File System + getAdminService(0).getRemoteConfiguration().uploadToRemoteFileSystem( + new Path(excludeHostsFile.getAbsolutePath())); + + getAdminService(0).refreshNodes(request); + + Set excludeHostsAfter = + cluster.getResourceManager(0).getRMContext().getNodesListManager() + .getHostsReader().getExcludedHosts(); + Assert.assertEquals(excludeHostsAfter.size(), 1); + Assert.assertTrue(excludeHostsAfter.contains("0.0.0.0:123")); + } + + @Test + public void testAdminServiceRefreshQueuesOnHA() throws IOException, + YarnException { + cluster.init(conf); + cluster.start(); + getAdminService(0).transitionToActive(req); + assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); + + //clean the remoteDirectory + getAdminService(0).getRemoteConfiguration().cleanRemoteDirectory(); + + RefreshQueuesRequest request = RefreshQueuesRequest.newInstance(); + getAdminService(0).refreshQueues(request); + CapacityScheduler cs = + (CapacityScheduler) cluster.getResourceManager(0).getRMContext() + .getScheduler(); + int maxAppsBefore = cs.getConfiguration().getMaximumSystemApplications(); + + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + csConf.set("yarn.scheduler.capacity.maximum-applications", "5000"); + String csConfFile = writeConfigurationXML(csConf, + "capacity-scheduler.xml"); + + //upload the file into Remote File System + getAdminService(0).getRemoteConfiguration().uploadToRemoteFileSystem( + new Path(csConfFile)); + + getAdminService(0).refreshQueues(request); + + int maxAppsAfter = cs.getConfiguration().getMaximumSystemApplications(); + Assert.assertEquals(maxAppsAfter, 5000); + Assert.assertTrue(maxAppsAfter != maxAppsBefore); + } + + @Test + public void testAdminServiceRefreshSuperUserGroupsConfigurationOnHA() + throws IOException, YarnException { + + final String REAL_USER_NAME = "proxier"; + final String PROXY_USER_NAME = "proxied_user"; + final String[] GROUP_NAMES = new String[] { "test_group" }; + final String PROXY_IP = "1.2.3.4"; + + Configuration conf = new Configuration(this.conf); + conf.set( + ProxyUsers.getProxySuperuserGroupConfKey(REAL_USER_NAME), + StringUtils.join(",", Arrays.asList(GROUP_NAMES))); + conf.set( + ProxyUsers.getProxySuperuserIpConfKey(REAL_USER_NAME), + PROXY_IP); + UserGroupInformation realUserUgi = UserGroupInformation + .createRemoteUser(REAL_USER_NAME); + UserGroupInformation proxyUserUgi = + UserGroupInformation.createProxyUserForTesting( + PROXY_USER_NAME, realUserUgi, GROUP_NAMES); + + cluster.init(conf); + cluster.start(); + getAdminService(0).transitionToActive(req); + assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); + + // clean the remoteDirectory + getAdminService(0).getRemoteConfiguration().cleanRemoteDirectory(); + + RefreshSuperUserGroupsConfigurationRequest request = + RefreshSuperUserGroupsConfigurationRequest.newInstance(); + getAdminService(0).refreshSuperUserGroupsConfiguration(request); + + assertNotAuthorized(proxyUserUgi, PROXY_IP); + String coreConfFile = writeConfigurationXML(conf, "core-site.xml"); + + // upload the file into Remote File System + getAdminService(0).getRemoteConfiguration().uploadToRemoteFileSystem( + new Path(coreConfFile)); + + getAdminService(0).refreshSuperUserGroupsConfiguration(request); + + assertAuthorized(proxyUserUgi, PROXY_IP); + } + + @Test + public void testAdminServiceRefreshUserToGroupsMappingsOnHA() + throws IOException, YarnException { + cluster.init(conf); + cluster.start(); + getAdminService(0).transitionToActive(req); + assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); + + //clean the remoteDirectory + getAdminService(0).getRemoteConfiguration().cleanRemoteDirectory(); + + RefreshUserToGroupsMappingsRequest request = + RefreshUserToGroupsMappingsRequest.newInstance(); + getAdminService(0).refreshUserToGroupsMappings(request); + Groups groups = Groups.getUserToGroupsMappingService(conf); + String user = UserGroupInformation.getCurrentUser().getUserName(); + List groupBefore = groups.getGroups(user); + + String coreConfFile = writeConfigurationXML(conf, "core-site.xml"); + + //upload the file into Remote File System + getAdminService(0).getRemoteConfiguration().uploadToRemoteFileSystem( + new Path(coreConfFile)); + getAdminService(0).refreshUserToGroupsMappings(request); + List groupAfter = groups.getGroups(user); + + for (int i = 0; i < groupAfter.size(); i++) { + assertFalse("Should be different group: " + groupBefore.get(i) + " and " + + groupAfter.get(i), groupBefore.get(i).equals(groupAfter.get(i))); + } + } + + @Test + public void testAdminServiceRefreshAdminAclsOnHA() throws IOException, + YarnException { + cluster.init(conf); + cluster.start(); + getAdminService(0).transitionToActive(req); + assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); + + //clean the remoteDirectory + getAdminService(0).getRemoteConfiguration().cleanRemoteDirectory(); + + RefreshAdminAclsRequest request = RefreshAdminAclsRequest.newInstance(); + getAdminService(0).refreshAdminAcls(request); + String aclStringBefore = + getAdminService(0).getAccessControlList().getAclString().trim(); + + YarnConfiguration yarnConf = new YarnConfiguration(); + yarnConf.set(YarnConfiguration.YARN_ADMIN_ACL, "world:anyone:rwcda"); + String yarnConfFile = writeConfigurationXML(yarnConf, "yarn-site.xml"); + + //upload the file into Remote File System + getAdminService(0).getRemoteConfiguration().uploadToRemoteFileSystem( + new Path(yarnConfFile)); + getAdminService(0).refreshAdminAcls(request); + String aclStringAfter = + getAdminService(0).getAccessControlList().getAclString().trim(); + + Assert.assertTrue(!aclStringAfter.equals(aclStringBefore)); + Assert.assertEquals(aclStringAfter, "world:anyone:rwcda"); + } + + @Test + public void testAdminServiceRefreshServiceAclsOnHA() throws IOException, + YarnException, InterruptedException { + Configuration.addDefaultResource("config-with-security.xml"); + Configuration confWithSecurity = new YarnConfiguration(conf); + cluster.init(confWithSecurity); + cluster.start(); + getAdminService(0).transitionToActive(req); + assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); + + // clean the remoteDirectory + getAdminService(0).getRemoteConfiguration().cleanRemoteDirectory(); + + RefreshServiceAclsRequest request = RefreshServiceAclsRequest.newInstance(); + getAdminService(0).refreshServiceAcls(request); + Configuration conf = new Configuration(); + conf.set("security.applicationclient.protocol.acl", "alice,bob users,wheel"); + String hadoopConfFile = writeConfigurationXML(conf, "hadoop-policy.xml"); + + // upload the file into Remote File System + getAdminService(0).getRemoteConfiguration().uploadToRemoteFileSystem( + new Path(hadoopConfFile)); + + getAdminService(0).refreshServiceAcls(request); + + try { + submitApplicationByUser("tester"); + fail("Submit Application by user tester should fail"); + } catch (Exception ex) { + // Expected + } + + try { + submitApplicationByUser("alice"); + } catch (Exception ex) { + fail("Submit Application by user alice should not fail"); + } + } + + private void assertNotAuthorized(UserGroupInformation proxyUgi, String host) { + try { + ProxyUsers.authorize(proxyUgi, host, null); + fail("Allowed authorization of " + proxyUgi + " from " + host); + } catch (AuthorizationException e) { + // Expected + } + } + + private void assertAuthorized(UserGroupInformation proxyUgi, String host) { + try { + ProxyUsers.authorize(proxyUgi, host, null); + } catch (AuthorizationException e) { + fail("Did not allowed authorization of " + proxyUgi + " from " + host); + } + } + + private void submitApplicationByUser(String user) throws Exception { + ApplicationClientProtocol rmClient; + final YarnRPC rpc = YarnRPC.create(conf); + UserGroupInformation owner = UserGroupInformation + .createRemoteUser(user); + rmClient = + owner.doAs(new PrivilegedExceptionAction() { + + @Override + public ApplicationClientProtocol run() throws Exception { + return (ApplicationClientProtocol) rpc.getProxy( + ApplicationClientProtocol.class, + cluster.getConfig().getSocketAddr( + YarnConfiguration.RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_PORT), cluster.getConfig()); + } + }); + ApplicationId applicationId = rmClient.getNewApplication( + GetNewApplicationRequest.newInstance()).getApplicationId(); + ApplicationSubmissionContext appContext = + Records.newRecord(ApplicationSubmissionContext.class); + appContext.setApplicationId(applicationId); + ContainerLaunchContext amContainer = + Records.newRecord(ContainerLaunchContext.class); + appContext.setAMContainerSpec(amContainer); + Resource capability = Records.newRecord(Resource.class); + capability.setMemory(10); + capability.setVirtualCores(1); + appContext.setResource(capability); + rmClient + .submitApplication(SubmitApplicationRequest.newInstance(appContext)); + } + + private String writeConfigurationXML(Configuration conf, String confXMLName) + throws IOException { + DataOutputStream output = null; + try { + final File basedir = + new File("target", TestRMFailover.class.getName()); + final File tmpDir = new File(basedir, "tmpDir"); + tmpDir.mkdirs(); + final File confFile = new File(tmpDir, confXMLName); + if (confFile.exists()) { + confFile.delete(); + } + if (!confFile.createNewFile()) { + Assert.fail("Can not create " + confXMLName); + } + output = new DataOutputStream( + new FileOutputStream(confFile)); + conf.writeXml(output); + return confFile.getAbsolutePath(); + } finally { + if (output != null) { + output.close(); + } + } + } + + public static class MockUnixGroupsMapping implements + GroupMappingServiceProvider { + + private int i = 0; + + @Override + public List getGroups(String user) throws IOException { + System.out.println("Getting groups in MockUnixGroupsMapping"); + String g1 = user + (10 * i + 1); + String g2 = user + (10 * i + 2); + List l = new ArrayList(2); + l.add(g1); + l.add(g2); + i++; + return l; + } + + @Override + public void cacheGroupsRefresh() throws IOException { + System.out.println("Refreshing groups in MockUnixGroupsMapping"); + } + + @Override + public void cacheGroupsAdd(List groups) throws IOException { + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/FileSystemBasedRemoteConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/FileSystemBasedRemoteConfiguration.java new file mode 100644 index 0000000..e17b545 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/FileSystemBasedRemoteConfiguration.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + + +public class FileSystemBasedRemoteConfiguration extends RemoteConfiguration { + + private static final Log LOG = LogFactory + .getLog(FileSystemBasedRemoteConfiguration.class); + private FileSystem fs; + private Path remoteConfigDir; + + @Override + public synchronized Configuration getConfiguration(String name) + throws IOException { + Path remoteConfigPath = new Path(this.remoteConfigDir, name); + if (!fs.exists(remoteConfigPath)) { + LOG.warn("Can not find Configuration: " + name + " in " + remoteConfigDir); + return null; + } + Configuration remoteConfig = new Configuration(false); + remoteConfig.addResource(fs.open(remoteConfigPath)); + return remoteConfig; + } + + @Override + public synchronized String getRemoteFile(String name) throws IOException { + Path remoteFilePath = new Path(this.remoteConfigDir, name); + if (!fs.exists(remoteFilePath)) { + LOG.warn("Can not find file: " + name + " in " + remoteConfigDir); + return ""; + } + return fs.makeQualified(remoteFilePath).toString(); + } + + @Override + public synchronized InputStream getRemoteFileInputStream(String name) + throws IOException { + Path remoteFilePath = new Path(this.remoteConfigDir, name); + if (!fs.exists(remoteFilePath)) { + LOG.warn("Can not find file: " + name + " in " + remoteConfigDir); + return null; + } + return fs.open(remoteFilePath); + } + + @Override + public synchronized void initInternal(Configuration conf) throws Exception { + remoteConfigDir = + new Path(conf.get(YarnConfiguration.RM_CONF_STORE, + YarnConfiguration.DEFAULT_RM_CONF_STORE)); + fs = remoteConfigDir.getFileSystem(conf); + if (!fs.exists(remoteConfigDir)) { + fs.mkdirs(remoteConfigDir); + } + } + + @Override + public synchronized void closeInternal() throws Exception { + fs.close(); + } + + @Override + public synchronized void uploadToRemoteFileSystem(Path filePath) + throws IOException { + Path outputPath = new Path(remoteConfigDir, filePath.getName()); + InputStream is = fs.open(filePath); + byte[] confFileData = IOUtils.toByteArray(is); + writeFile(outputPath, confFileData); + } + + @Override + public synchronized void cleanRemoteDirectory() throws IOException { + if (fs.exists(remoteConfigDir)) { + for (FileStatus file : fs.listStatus(remoteConfigDir)) { + fs.delete(file.getPath(), true); + } + } + } + + @Override + public synchronized void deleteFileFromRemoteDirectory(String fileName) + throws IOException { + Path remoteFilePath = new Path(this.remoteConfigDir, fileName); + if (fs.exists(remoteFilePath)) { + fs.delete(remoteFilePath, false); + } + } + + private void writeFile(Path outputPath, byte[] data) throws IOException { + Path tempPath = + new Path(outputPath.getParent(), outputPath.getName() + ".tmp"); + FSDataOutputStream fsOut = null; + fsOut = fs.create(tempPath, true); + fsOut.write(data); + fsOut.close(); + fs.rename(tempPath, outputPath); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/RemoteConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/RemoteConfiguration.java new file mode 100644 index 0000000..2e1dcf1 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/RemoteConfiguration.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + + +public abstract class RemoteConfiguration extends Configuration{ + + public void init(Configuration conf) throws Exception { + initInternal(conf); + } + + public void close() throws Exception { + closeInternal(); + } + + public abstract Configuration getConfiguration(String name) + throws IOException; + + public abstract String getRemoteFile(String name) throws IOException; + + public abstract InputStream getRemoteFileInputStream(String name) + throws IOException; + + public abstract void initInternal(Configuration conf) throws Exception; + + public abstract void closeInternal() throws Exception; + + public abstract void uploadToRemoteFileSystem(Path filePath) + throws IOException; + + public abstract void cleanRemoteDirectory() throws IOException; + + public abstract void deleteFileFromRemoteDirectory(String fileName) + throws IOException; +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/RemoteConfigurationFactory.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/RemoteConfigurationFactory.java new file mode 100644 index 0000000..18defe3 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/RemoteConfigurationFactory.java @@ -0,0 +1,37 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + + + +public class RemoteConfigurationFactory { + + public static RemoteConfiguration getRemoteConfiguration(Configuration conf) { + RemoteConfiguration remoteConfiguration = + ReflectionUtils.newInstance( + conf.getClass(YarnConfiguration.RM_REMOTE_CONFIGURATION, + FileSystemBasedRemoteConfiguration.class, + RemoteConfiguration.class), conf); + return remoteConfiguration; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index 971603a..aaaba60 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.io.IOException; +import java.io.InputStream; import java.net.InetSocketAddress; import java.util.Map; import java.util.Set; @@ -45,6 +46,8 @@ import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.RemoteConfiguration; +import org.apache.hadoop.yarn.RemoteConfigurationFactory; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.conf.HAUtil; @@ -72,6 +75,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; +import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.BlockingService; public class AdminService extends CompositeService implements @@ -88,7 +92,11 @@ private Server server; private InetSocketAddress masterServiceAddress; private AccessControlList adminAcl; - + private RemoteConfiguration remoteConfiguration = null; + private static final String includeHostPath = "include_hosts"; + private static final String excludeHostPath = "exclude_hosts"; + + private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -107,6 +115,9 @@ public synchronized void serviceInit(Configuration conf) throws Exception { addIfService(createEmbeddedElectorService()); } } + remoteConfiguration = + RemoteConfigurationFactory.getRemoteConfiguration(conf); + remoteConfiguration.init(conf); } masterServiceAddress = conf.getSocketAddr( @@ -129,6 +140,9 @@ protected synchronized void serviceStart() throws Exception { @Override protected synchronized void serviceStop() throws Exception { stopServer(); + if (remoteConfiguration != null) { + remoteConfiguration.close(); + } super.serviceStop(); } @@ -145,7 +159,7 @@ protected void startServer() throws Exception { if (conf.getBoolean( CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) { - refreshServiceAcls(conf, new RMPolicyProvider()); + refreshServiceAcls(conf, new RMPolicyProvider(), false); } if (rmContext.isHAEnabled()) { @@ -295,23 +309,32 @@ public synchronized HAServiceStatus getServiceStatus() throws IOException { @Override public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request) throws YarnException, StandbyException { - UserGroupInformation user = checkAcls("refreshQueues"); + String argName = "refreshQueues"; + UserGroupInformation user = checkAcls(argName); if (!isRMActive()) { - RMAuditLogger.logFailure(user.getShortUserName(), "refreshQueues", + RMAuditLogger.logFailure(user.getShortUserName(), argName, adminAcl.toString(), "AdminService", "ResourceManager is not active. Can not refresh queues."); throwStandbyException(); } + RefreshQueuesResponse response = + recordFactory.newRecordInstance(RefreshQueuesResponse.class); try { - rmContext.getScheduler().reinitialize(getConfig(), this.rmContext); - RMAuditLogger.logSuccess(user.getShortUserName(), "refreshQueues", + Configuration conf = getConfiguration(argName); + if (this.rmContext.isHAEnabled() && conf == null) { + LOG.warn(printFailureDescription( + getConfigurationFileName(argName), argName)); + return response; + } + rmContext.getScheduler().reinitialize(conf, this.rmContext); + RMAuditLogger.logSuccess(user.getShortUserName(), argName, "AdminService"); - return recordFactory.newRecordInstance(RefreshQueuesResponse.class); + return response; } catch (IOException ioe) { LOG.info("Exception refreshing queues ", ioe); - RMAuditLogger.logFailure(user.getShortUserName(), "refreshQueues", + RMAuditLogger.logFailure(user.getShortUserName(), argName, adminAcl.toString(), "AdminService", "Exception refreshing queues"); throw RPCUtil.getRemoteException(ioe); @@ -321,23 +344,44 @@ public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request) @Override public RefreshNodesResponse refreshNodes(RefreshNodesRequest request) throws YarnException, StandbyException { - UserGroupInformation user = checkAcls("refreshNodes"); + String argName = "refreshNodes"; + UserGroupInformation user = checkAcls(argName); if (!isRMActive()) { - RMAuditLogger.logFailure(user.getShortUserName(), "refreshNodes", + RMAuditLogger.logFailure(user.getShortUserName(), argName, adminAcl.toString(), "AdminService", "ResourceManager is not active. Can not refresh nodes."); throwStandbyException(); } - + RefreshNodesResponse response = + recordFactory.newRecordInstance(RefreshNodesResponse.class); try { - rmContext.getNodesListManager().refreshNodes(new YarnConfiguration()); - RMAuditLogger.logSuccess(user.getShortUserName(), "refreshNodes", + YarnConfiguration yarnConf = new YarnConfiguration(); + if (this.rmContext.isHAEnabled()) { + String includeHostFile = + remoteConfiguration.getRemoteFile(includeHostPath); + InputStream includeHostFileIS = + remoteConfiguration.getRemoteFileInputStream(includeHostPath); + String excludeHostFile = + remoteConfiguration.getRemoteFile(excludeHostPath); + InputStream excludeHostFileIS = + remoteConfiguration.getRemoteFileInputStream(excludeHostPath); + if (includeHostFile.isEmpty() && excludeHostFile.isEmpty()) { + LOG.warn(printFailureDescription(includeHostFile + " and " + + excludeHostFile, argName)); + return response; + } + rmContext.getNodesListManager().refreshNodes(includeHostFile, + includeHostFileIS, excludeHostFile, excludeHostFileIS); + } else { + rmContext.getNodesListManager().refreshNodes(yarnConf); + } + RMAuditLogger.logSuccess(user.getShortUserName(), argName, "AdminService"); - return recordFactory.newRecordInstance(RefreshNodesResponse.class); + return response; } catch (IOException ioe) { LOG.info("Exception refreshing nodes ", ioe); - RMAuditLogger.logFailure(user.getShortUserName(), "refreshNodes", + RMAuditLogger.logFailure(user.getShortUserName(), argName, adminAcl.toString(), "AdminService", "Exception refreshing nodes"); throw RPCUtil.getRemoteException(ioe); } @@ -346,69 +390,106 @@ public RefreshNodesResponse refreshNodes(RefreshNodesRequest request) @Override public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration( RefreshSuperUserGroupsConfigurationRequest request) - throws YarnException, StandbyException { - UserGroupInformation user = checkAcls("refreshSuperUserGroupsConfiguration"); + throws YarnException, IOException { + String argName = "refreshSuperUserGroupsConfiguration"; + UserGroupInformation user = checkAcls(argName); // TODO (YARN-1459): Revisit handling super-user-groups on Standby RM if (!isRMActive()) { RMAuditLogger.logFailure(user.getShortUserName(), - "refreshSuperUserGroupsConfiguration", + argName, adminAcl.toString(), "AdminService", "ResourceManager is not active. Can not refresh super-user-groups."); throwStandbyException(); } - ProxyUsers.refreshSuperUserGroupsConfiguration(new Configuration()); + RefreshSuperUserGroupsConfigurationResponse response = recordFactory + .newRecordInstance(RefreshSuperUserGroupsConfigurationResponse.class); + Configuration conf = getConfiguration(argName); + if (this.rmContext.isHAEnabled() && conf == null) { + LOG.warn(printFailureDescription(getConfigurationFileName(argName), + "refreshSuperUserGroupsConfiguration")); + return response; + } + + ProxyUsers.refreshSuperUserGroupsConfiguration(conf); RMAuditLogger.logSuccess(user.getShortUserName(), - "refreshSuperUserGroupsConfiguration", "AdminService"); + argName, "AdminService"); - return recordFactory.newRecordInstance( - RefreshSuperUserGroupsConfigurationResponse.class); + return response; } @Override public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings( RefreshUserToGroupsMappingsRequest request) - throws YarnException, StandbyException { - UserGroupInformation user = checkAcls("refreshUserToGroupsMappings"); + throws YarnException, IOException { + String argName = "refreshUserToGroupsMappings"; + UserGroupInformation user = checkAcls(argName); - // TODO (YARN-1459): Revisit handling user-groups on Standby RM if (!isRMActive()) { - RMAuditLogger.logFailure(user.getShortUserName(), - "refreshUserToGroupsMapping", - adminAcl.toString(), "AdminService", - "ResourceManager is not active. Can not refresh user-groups."); + RMAuditLogger.logFailure(user.getShortUserName(), argName, + adminAcl.toString(), "AdminService", "ResourceManager is not " + + "active. Can not refresh user To groups Mapping."); throwStandbyException(); } - Groups.getUserToGroupsMappingService().refresh(); + Configuration conf = getConfiguration(argName); + RefreshUserToGroupsMappingsResponse response = recordFactory + .newRecordInstance(RefreshUserToGroupsMappingsResponse.class); + if (this.rmContext.isHAEnabled()) { + if (conf == null) { + LOG.warn(printFailureDescription(getConfigurationFileName(argName), + "refreshUserToGroupsMappings")); + return response; + } + Groups.getUserToGroupsMappingService(conf).refresh(); + } else { + Groups.getUserToGroupsMappingService().refresh(); + } + RMAuditLogger.logSuccess(user.getShortUserName(), - "refreshUserToGroupsMappings", "AdminService"); + argName, "AdminService"); - return recordFactory.newRecordInstance( - RefreshUserToGroupsMappingsResponse.class); + return response; } @Override public RefreshAdminAclsResponse refreshAdminAcls( - RefreshAdminAclsRequest request) throws YarnException { + RefreshAdminAclsRequest request) throws YarnException, IOException { + String argName = "refreshAdminAcls"; UserGroupInformation user = checkAcls("refreshAdminAcls"); - - Configuration conf = new Configuration(); + + if (!isRMActive()) { + RMAuditLogger.logFailure(user.getShortUserName(), + argName, + adminAcl.toString(), "AdminService", + "ResourceManager is not active. Can not refresh user-groups."); + throwStandbyException(); + } + + Configuration conf = getConfiguration(argName); + RefreshAdminAclsResponse response = + recordFactory.newRecordInstance(RefreshAdminAclsResponse.class); + if (this.rmContext.isHAEnabled() && conf == null) { + LOG.warn(printFailureDescription(getConfigurationFileName(argName), + argName)); + return response; + } + adminAcl = new AccessControlList(conf.get( YarnConfiguration.YARN_ADMIN_ACL, YarnConfiguration.DEFAULT_YARN_ADMIN_ACL)); - RMAuditLogger.logSuccess(user.getShortUserName(), "refreshAdminAcls", + RMAuditLogger.logSuccess(user.getShortUserName(), argName, "AdminService"); - return recordFactory.newRecordInstance(RefreshAdminAclsResponse.class); + return response; } @Override public RefreshServiceAclsResponse refreshServiceAcls( - RefreshServiceAclsRequest request) throws YarnException { - Configuration conf = new Configuration(); - if (!conf.getBoolean( + RefreshServiceAclsRequest request) throws YarnException, IOException { + String argName = "refreshServiceAcls"; + if (!getConfig().getBoolean( CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) { throw RPCUtil.getRemoteException( @@ -416,27 +497,49 @@ public RefreshServiceAclsResponse refreshServiceAcls( CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION + ") not enabled.")); } - - PolicyProvider policyProvider = new RMPolicyProvider(); - - refreshServiceAcls(conf, policyProvider); - if (isRMActive()) { - rmContext.getClientRMService().refreshServiceAcls(conf, policyProvider); - rmContext.getApplicationMasterService().refreshServiceAcls( - conf, policyProvider); - rmContext.getResourceTrackerService().refreshServiceAcls( - conf, policyProvider); - } else { - LOG.warn("ResourceManager is not active. Not refreshing ACLs for " + - "Clients, ApplicationMasters and NodeManagers"); + if (!isRMActive()) { + RMAuditLogger.logFailure(UserGroupInformation.getCurrentUser() + .getShortUserName(), argName, + adminAcl.toString(), "AdminService", + "ResourceManager is not active. Can not refresh Service ACLs."); + throwStandbyException(); } - - return recordFactory.newRecordInstance(RefreshServiceAclsResponse.class); + + PolicyProvider policyProvider = new RMPolicyProvider(); + RefreshServiceAclsResponse response = recordFactory + .newRecordInstance(RefreshServiceAclsResponse.class); + + Configuration conf = getConfiguration(argName); + if (this.rmContext.isHAEnabled() && conf == null) { + LOG.warn(printFailureDescription(getConfigurationFileName(argName), + argName)); + return response; + } + + refreshServiceAcls(conf, policyProvider, this.rmContext.isHAEnabled()); + rmContext.getClientRMService().refreshServiceAcls( + getConfiguration(argName), policyProvider, + this.rmContext.isHAEnabled()); + rmContext.getApplicationMasterService().refreshServiceAcls( + getConfiguration(argName), policyProvider, + this.rmContext.isHAEnabled()); + rmContext.getResourceTrackerService().refreshServiceAcls( + getConfiguration(argName), policyProvider, + this.rmContext.isHAEnabled()); + RMAuditLogger.logSuccess(UserGroupInformation.getCurrentUser() + .getShortUserName(), argName, + "AdminService"); + return response; } void refreshServiceAcls(Configuration configuration, - PolicyProvider policyProvider) { - this.server.refreshServiceAcl(configuration, policyProvider); + PolicyProvider policyProvider, boolean useRemoteConfiguration) { + if (useRemoteConfiguration) { + this.server.refreshServiceAcl(configuration, policyProvider, + useRemoteConfiguration); + } else { + this.server.refreshServiceAcl(configuration, policyProvider); + } } @Override @@ -483,5 +586,55 @@ public UpdateNodeResourceResponse updateNodeResource( UpdateNodeResourceResponse.class); return response; } - + + private String getConfigurationFileName(String name) + throws YarnException { + if (name.equalsIgnoreCase("refreshQueues")) { + String schedulerClassName = + getConfig().get(YarnConfiguration.RM_SCHEDULER, + YarnConfiguration.DEFAULT_RM_SCHEDULER); + if (schedulerClassName.contains("CapacityScheduler")) { + return "capacity-scheduler.xml"; + } else if (schedulerClassName.contains("FairScheduler")) { + return "fair-scheduler.xml"; + } + throw new YarnException("Invalid scheduler class: " + schedulerClassName); + } else if (name.equalsIgnoreCase("refreshSuperUserGroupsConfiguration") + || name.equalsIgnoreCase("refreshUserToGroupsMappings")) { + return "core-site.xml"; + } else if (name.equalsIgnoreCase("refreshAdminAcls")) { + return "yarn-site.xml"; + } else if (name.equalsIgnoreCase("refreshServiceAcls")) { + return "hadoop-policy.xml"; + } else { + throw new YarnException("Invalid refresh function: " + name); + } + } + + private String printFailureDescription(String name, String fName) { + String remoteDir = getConfig().get(YarnConfiguration.RM_CONF_STORE, + YarnConfiguration.DEFAULT_RM_CONF_STORE); + return "Fail to do " + fName + + ". To make it work on HA, you should upload " + name + + " into remote directory: " + remoteDir; + } + + private Configuration getConfiguration(String name) + throws YarnException, IOException { + if (! this.rmContext.isHAEnabled()) { + return new Configuration(); + } + return remoteConfiguration.getConfiguration(getConfigurationFileName( + name)); + } + + @VisibleForTesting + public RemoteConfiguration getRemoteConfiguration() { + return this.remoteConfiguration; + } + + @VisibleForTesting + public AccessControlList getAccessControlList() { + return this.adminAcl; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 761bdb1..fa9f189 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -137,7 +137,7 @@ protected void serviceStart() throws Exception { if (conf.getBoolean( CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) { - refreshServiceAcls(conf, new RMPolicyProvider()); + refreshServiceAcls(conf, new RMPolicyProvider(), false); } this.server.start(); @@ -575,9 +575,10 @@ public void unregisterAttempt(ApplicationAttemptId attemptId) { rmContext.getNMTokenSecretManager().unregisterApplicationAttempt(attemptId); } - public void refreshServiceAcls(Configuration configuration, - PolicyProvider policyProvider) { - this.server.refreshServiceAcl(configuration, policyProvider); + public void refreshServiceAcls(Configuration configuration, + PolicyProvider policyProvider, boolean useRemoteConfiguration) { + this.server.refreshServiceAcl(configuration, policyProvider, + useRemoteConfiguration); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index 1df67f8..8895b0f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -163,7 +163,7 @@ protected void serviceStart() throws Exception { if (conf.getBoolean( CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) { - refreshServiceAcls(conf, new RMPolicyProvider()); + refreshServiceAcls(conf, new RMPolicyProvider(), false); } this.server.start(); @@ -703,8 +703,9 @@ private String getRenewerForToken(Token token) } void refreshServiceAcls(Configuration configuration, - PolicyProvider policyProvider) { - this.server.refreshServiceAcl(configuration, policyProvider); + PolicyProvider policyProvider, boolean useReomteConfiguration) { + this.server.refreshServiceAcl(configuration, policyProvider, + useReomteConfiguration); } private boolean isAllowedDelegationTokenOp() throws IOException { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java index 4249980..90f36ab 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.io.IOException; +import java.io.InputStream; import java.util.Collection; import java.util.Collections; import java.util.Set; @@ -38,6 +39,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import com.google.common.annotations.VisibleForTesting; + @SuppressWarnings("unchecked") public class NodesListManager extends AbstractService implements EventHandler { @@ -90,11 +93,37 @@ private void printConfiguredHosts() { if (!LOG.isDebugEnabled()) { return; } - - LOG.debug("hostsReader: in=" + conf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, - YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH) + " out=" + - conf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, + + LOG.debug("hostsReader: in=" + + conf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH) + " out=" + + conf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH)); + + for (String include : hostsReader.getHosts()) { + LOG.debug("include: " + include); + } + for (String exclude : hostsReader.getExcludedHosts()) { + LOG.debug("exclude: " + exclude); + } + } + + private void printConfiguredHosts(String includeHosts, String excludeHosts) { + if (!LOG.isDebugEnabled()) { + return; + } + + if (includeHosts == null && excludeHosts == null) { + LOG.debug("hostsReader: in=" + + conf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH) + " out=" + + conf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, + YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH)); + } else { + LOG.debug("hostsReader: in=" + + (includeHosts == null ? "null" : includeHosts) + " out=" + + (excludeHosts == null ? "null" : excludeHosts)); + } for (String include : hostsReader.getHosts()) { LOG.debug("include: " + include); } @@ -118,6 +147,17 @@ public void refreshNodes(Configuration yarnConf) throws IOException { } } + public void refreshNodes(String includeHostFile, + InputStream includeHostFileInputStream, String excludeHostsFile, + InputStream excludeHostFileInputStream) throws IOException { + synchronized (hostsReader) { + hostsReader.updateFileNames(includeHostFile, excludeHostsFile); + hostsReader.refresh(includeHostFile, includeHostFileInputStream, + excludeHostsFile, excludeHostFileInputStream); + printConfiguredHosts(includeHostFile, excludeHostsFile); + } + } + public boolean isValidNode(String hostName) { synchronized (hostsReader) { Set hostsList = hostsReader.getHosts(); @@ -174,4 +214,9 @@ public void handle(NodesListManagerEvent event) { LOG.error("Ignoring invalid eventtype " + event.getType()); } } + + @VisibleForTesting + public HostsFileReader getHostsReader() { + return this.hostsReader; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index f80ce85..172cb09 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -161,7 +161,7 @@ protected void serviceStart() throws Exception { if (conf.getBoolean( CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) { - refreshServiceAcls(conf, new RMPolicyProvider()); + refreshServiceAcls(conf, new RMPolicyProvider(), false); } this.server.start(); @@ -413,8 +413,9 @@ public static Node resolve(String hostName) { return RackResolver.resolve(hostName); } - void refreshServiceAcls(Configuration configuration, - PolicyProvider policyProvider) { - this.server.refreshServiceAcl(configuration, policyProvider); + void refreshServiceAcls(Configuration configuration, + PolicyProvider policyProvider, boolean useRemoteConfiguration) { + this.server.refreshServiceAcl(configuration, policyProvider, + useRemoteConfiguration); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 0197c5b..d3513eb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -281,7 +281,11 @@ public Resource getClusterResources() { } else { CapacitySchedulerConfiguration oldConf = this.conf; - this.conf = new CapacitySchedulerConfiguration(conf); + if (this.rmContext.isHAEnabled()) { + this.conf = new CapacitySchedulerConfiguration(conf, true); + } else { + this.conf = new CapacitySchedulerConfiguration(conf); + } validateConf(this.conf); try { LOG.info("Re-initializing queues..."); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 6fceabf..edbdead 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -144,6 +144,16 @@ public CapacitySchedulerConfiguration(Configuration configuration) { addResource(CS_CONFIGURATION_FILE); } + public CapacitySchedulerConfiguration(Configuration configuration, + boolean useRemoteConfiguration) { + super(configuration); + if (useRemoteConfiguration) { + reloadConfiguration(); + } else { + addResource(CS_CONFIGURATION_FILE); + } + } + private String getQueuePrefix(String queue) { String queueName = PREFIX + queue + DOT; return queueName;