diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java index cac43c9..cebdc8c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java @@ -19,6 +19,7 @@ package org.apache.hadoop.util; import java.io.*; +import java.util.Collection; import java.util.Set; import java.util.HashSet; @@ -36,39 +37,68 @@ public class HostsFileReader { private Set includes; private Set excludes; - private String includesFile; - private String excludesFile; - + // comma separated list of file names + private String includesFiles; + private String excludesFiles; + private static final Log LOG = LogFactory.getLog(HostsFileReader.class); - public HostsFileReader(String inFile, - String exFile) throws IOException { - includes = new HashSet(); - excludes = new HashSet(); - includesFile = inFile; - excludesFile = exFile; + public HostsFileReader(String inFiles, String exFiles) throws IOException { + includes = new HashSet<>(); + excludes = new HashSet<>(); + includesFiles = inFiles; + excludesFiles = exFiles; refresh(); } @Private public HostsFileReader(String includesFile, InputStream inFileInputStream, - String excludesFile, InputStream exFileInputStream) throws IOException { - includes = new HashSet(); - excludes = new HashSet(); - this.includesFile = includesFile; - this.excludesFile = excludesFile; + String excludesFiles, InputStream exFileInputStream) throws IOException { + includes = new HashSet<>(); + excludes = new HashSet<>(); + this.includesFiles = includesFile; + this.excludesFiles = excludesFiles; refresh(inFileInputStream, exFileInputStream); } + @Private + public HostsFileReader(String includesFiles, + Set inFileInputStreams, + String excludesFiles, Set exFileInputStreams) + throws IOException { + includes = new HashSet<>(); + excludes = new HashSet<>(); + this.includesFiles = includesFiles; + this.excludesFiles = excludesFiles; + refresh(inFileInputStreams, exFileInputStreams); + } + public static void readFileToSet(String type, String filename, Set set) throws IOException { - File file = new File(filename); - FileInputStream fis = new FileInputStream(file); - readFileToSetWithFileInputStream(type, filename, fis, set); + Set filenames = new HashSet<>(); + filenames.add(filename); + readFilesToSet(type, filename, set); + } + + /** + * Read a list of files + * @param type type of the files + * @param filenames - The comma separated file names + * @throws IOException + */ + public static void readFilesToSet(String type, + String filenames, Set set) throws IOException { + Collection fileNames = + StringUtils.getTrimmedStringCollection(filenames); + for (String filename : fileNames) { + File file = new File(filename); + FileInputStream fis = new FileInputStream(file); + readFileToSetWithFileInputStream(type, filename, fis, set); + } } @Private - public static void readFileToSetWithFileInputStream(String type, + private static void readFileToSetWithFileInputStream(String type, String filename, InputStream fileInputStream, Set set) throws IOException { BufferedReader reader = null; @@ -103,16 +133,16 @@ public static void readFileToSetWithFileInputStream(String type, public synchronized void refresh() throws IOException { LOG.info("Refreshing hosts (include/exclude) list"); - Set newIncludes = new HashSet(); - Set newExcludes = new HashSet(); + Set newIncludes = new HashSet<>(); + Set newExcludes = new HashSet<>(); boolean switchIncludes = false; boolean switchExcludes = false; - if (!includesFile.isEmpty()) { - readFileToSet("included", includesFile, newIncludes); + if (!includesFiles.isEmpty()) { + readFilesToSet("included", includesFiles, newIncludes); switchIncludes = true; } - if (!excludesFile.isEmpty()) { - readFileToSet("excluded", excludesFile, newExcludes); + if (!excludesFiles.isEmpty()) { + readFilesToSet("excluded", excludesFiles, newExcludes); switchExcludes = true; } @@ -129,19 +159,33 @@ public synchronized void refresh() throws IOException { @Private public synchronized void refresh(InputStream inFileInputStream, InputStream exFileInputStream) throws IOException { + Set inFileInputStreams = new HashSet(); + inFileInputStreams.add(inFileInputStream); + Set exFileInputStreams = new HashSet(); + exFileInputStreams.add(exFileInputStream); + refresh(inFileInputStreams, exFileInputStreams); + } + + @Private + public synchronized void refresh(Set inFileInputStreams, + Set exFileInputStreams) throws IOException { LOG.info("Refreshing hosts (include/exclude) list"); Set newIncludes = new HashSet(); Set newExcludes = new HashSet(); boolean switchIncludes = false; boolean switchExcludes = false; - if (inFileInputStream != null) { - readFileToSetWithFileInputStream("included", includesFile, - inFileInputStream, newIncludes); + if (inFileInputStreams != null) { + for (InputStream inFileInputStream : inFileInputStreams) { + readFileToSetWithFileInputStream("included", includesFiles, + inFileInputStream, newIncludes); + } switchIncludes = true; } - if (exFileInputStream != null) { - readFileToSetWithFileInputStream("excluded", excludesFile, - exFileInputStream, newExcludes); + if (exFileInputStreams != null) { + for (InputStream exFileInputStream : exFileInputStreams) { + readFileToSetWithFileInputStream("excluded", excludesFiles, + exFileInputStream, newExcludes); + } switchExcludes = true; } if (switchIncludes) { @@ -162,19 +206,19 @@ public synchronized void refresh(InputStream inFileInputStream, return excludes; } - public synchronized void setIncludesFile(String includesFile) { - LOG.info("Setting the includes file to " + includesFile); - this.includesFile = includesFile; + private synchronized void setIncludesFiles(String includesFiles) { + LOG.info("Setting the includes file(s) to " + includesFiles); + this.includesFiles = includesFiles; } - public synchronized void setExcludesFile(String excludesFile) { - LOG.info("Setting the excludes file to " + excludesFile); - this.excludesFile = excludesFile; + private synchronized void setExcludesFiles(String excludesFiles) { + LOG.info("Setting the excludes file(s) to " + excludesFiles); + this.excludesFiles = excludesFiles; } - public synchronized void updateFileNames(String includesFile, - String excludesFile) { - setIncludesFile(includesFile); - setExcludesFile(excludesFile); + public synchronized void updateFileNames(String includesFiles, + String excludesFiles) { + setIncludesFiles(includesFiles); + setExcludesFiles(excludesFiles); } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java index 3000069..c33b6cd 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java @@ -34,9 +34,13 @@ // Using /test/build/data/tmp directory to store temprory files final String HOSTS_TEST_DIR = GenericTestUtils.getTestDir().getAbsolutePath(); File EXCLUDES_FILE = new File(HOSTS_TEST_DIR, "dfs.exclude"); + File EXCLUDES_FILE2 = new File(HOSTS_TEST_DIR, "dfs.exclude2"); File INCLUDES_FILE = new File(HOSTS_TEST_DIR, "dfs.include"); + File INCLUDES_FILE2 = new File(HOSTS_TEST_DIR, "dfs.include2"); String excludesFile = HOSTS_TEST_DIR + "/dfs.exclude"; + String excludesFile2 = HOSTS_TEST_DIR + "/dfs.exclude2"; String includesFile = HOSTS_TEST_DIR + "/dfs.include"; + String includesFile2 = HOSTS_TEST_DIR + "/dfs.include2"; @Before public void setUp() throws Exception { @@ -46,8 +50,9 @@ public void setUp() throws Exception { public void tearDown() throws Exception { // Delete test files after running tests EXCLUDES_FILE.delete(); + EXCLUDES_FILE2.delete(); INCLUDES_FILE.delete(); - + INCLUDES_FILE2.delete(); } /* @@ -99,6 +104,62 @@ public void testHostsFileReader() throws Exception { } /* + * 1.Create multiple files for each of dfs.exclude and dfs.include + * 2.Write host names per line + * 3.Write comments starting with # + * 4.Close files + * 5.Compare if number of hosts reported by HostsFileReader + * are equal to the number of hosts written + */ + @Test + public void testHostsFileReaderForMultipleFiles() throws Exception { + + FileWriter efw = new FileWriter(excludesFile); + FileWriter efw2 = new FileWriter(excludesFile2); + FileWriter ifw = new FileWriter(includesFile); + FileWriter ifw2 = new FileWriter(includesFile2); + + efw.write("#DFS-Hosts-excluded\n"); + efw.write("somehost1\n"); + efw.write("#This-is-comment\n"); + efw.write("somehost2\n"); + efw.close(); + + efw2.write("somehost3 # host3\n"); + efw2.write("somehost4\n"); + efw2.write("somehost4 somehost5\n"); + efw2.close(); + + ifw.write("#Hosts-in-DFS\n"); + ifw.write("somehost1\n"); + ifw.write("somehost2\n"); + ifw.write("somehost3\n"); + ifw.close();; + + ifw2.write("#This-is-comment\n"); + ifw2.write("somehost4 # host4\n"); + ifw2.write("somehost4 somehost5\n"); + ifw2.close(); + + HostsFileReader hfp = new HostsFileReader( + includesFile + "," + includesFile2, + excludesFile + "," + excludesFile2); + + int includesLen = hfp.getHosts().size(); + int excludesLen = hfp.getExcludedHosts().size(); + + assertEquals(5, includesLen); + assertEquals(5, excludesLen); + + assertTrue(hfp.getHosts().contains("somehost5")); + assertFalse(hfp.getHosts().contains("host3")); + + assertTrue(hfp.getExcludedHosts().contains("somehost5")); + assertFalse(hfp.getExcludedHosts().contains("host4")); + + } + + /* * Test creating a new HostsFileReader with nonexistent files */ @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 965b6c5..18e7b3c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -420,12 +420,12 @@ public static boolean isAclEnabled(Configuration conf) { RM_PREFIX + "rm.container-allocation.expiry-interval-ms"; public static final int DEFAULT_RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS = 600000; - /** Path to file with nodes to include.*/ + /** Path to comma separated list of files with nodes to include.*/ public static final String RM_NODES_INCLUDE_FILE_PATH = RM_PREFIX + "nodes.include-path"; public static final String DEFAULT_RM_NODES_INCLUDE_FILE_PATH = ""; - /** Path to file with nodes to exclude.*/ + /** Path to comma separated list of files with nodes to exclude.*/ public static final String RM_NODES_EXCLUDE_FILE_PATH = RM_PREFIX + "nodes.exclude-path"; public static final String DEFAULT_RM_NODES_EXCLUDE_FILE_PATH = ""; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index a38d0d8..6a030c3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -275,13 +275,13 @@ - Path to file with nodes to include. + Path to comma separated list of files with nodes to include. yarn.resourcemanager.nodes.include-path - Path to file with nodes to exclude. + Path to comma separated list of files with nodes to exclude. yarn.resourcemanager.nodes.exclude-path diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java index bb00e60..b4ff46c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java @@ -18,7 +18,9 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import java.io.InputStream; import java.io.IOException; +import java.util.Collection; import java.util.HashSet; import java.util.Map.Entry; import java.util.Set; @@ -36,6 +38,7 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.HostsFileReader; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; @@ -65,8 +68,8 @@ private Configuration conf; private final RMContext rmContext; - private String includesFile; - private String excludesFile; + private String includesFiles; + private String excludesFiles; private Resolver resolver; private Timer removalTimer; @@ -95,12 +98,12 @@ protected void serviceInit(Configuration conf) throws Exception { // Read the hosts/exclude files to restrict access to the RM try { - this.includesFile = conf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + this.includesFiles = conf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH); - this.excludesFile = conf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, + this.excludesFiles = conf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH); this.hostsReader = - createHostsFileReader(this.includesFile, this.excludesFile); + createHostsFileReader(this.includesFiles, this.excludesFiles); setDecomissionedNMs(); printConfiguredHosts(); } catch (YarnException ex) { @@ -212,19 +215,17 @@ private void refreshHostsReader(Configuration yarnConf) throws IOException, if (null == yarnConf) { yarnConf = new YarnConfiguration(); } - includesFile = + includesFiles = yarnConf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH); - excludesFile = + excludesFiles = yarnConf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH); - hostsReader.updateFileNames(includesFile, excludesFile); + hostsReader.updateFileNames(includesFiles, excludesFiles); hostsReader.refresh( - includesFile.isEmpty() ? null : this.rmContext - .getConfigurationProvider().getConfigurationInputStream( - this.conf, includesFile), excludesFile.isEmpty() ? null - : this.rmContext.getConfigurationProvider() - .getConfigurationInputStream(this.conf, excludesFile)); + includesFiles.isEmpty() ? null : getConfigurationInputStreams( + this.conf, includesFiles), excludesFiles.isEmpty() ? null + : getConfigurationInputStreams(this.conf, excludesFiles)); printConfiguredHosts(); } } @@ -416,12 +417,12 @@ public void handle(NodesListManagerEvent event) { private void disableHostsFileReader(Exception ex) { LOG.warn("Failed to init hostsReader, disabling", ex); try { - this.includesFile = + this.includesFiles = conf.get(YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH); - this.excludesFile = + this.excludesFiles = conf.get(YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH); this.hostsReader = - createHostsFileReader(this.includesFile, this.excludesFile); + createHostsFileReader(this.includesFiles, this.excludesFiles); setDecomissionedNMs(); } catch (IOException ioe2) { // Should *never* happen @@ -439,20 +440,40 @@ public HostsFileReader getHostsReader() { return this.hostsReader; } - private HostsFileReader createHostsFileReader(String includesFile, - String excludesFile) throws IOException, YarnException { + private HostsFileReader createHostsFileReader(String includesFiles, + String excludesFiles) throws IOException, YarnException { HostsFileReader hostsReader = - new HostsFileReader(includesFile, - (includesFile == null || includesFile.isEmpty()) ? null - : this.rmContext.getConfigurationProvider() - .getConfigurationInputStream(this.conf, includesFile), - excludesFile, - (excludesFile == null || excludesFile.isEmpty()) ? null - : this.rmContext.getConfigurationProvider() - .getConfigurationInputStream(this.conf, excludesFile)); + new HostsFileReader(includesFiles, + (includesFiles == null || includesFiles.isEmpty()) ? null + : getConfigurationInputStreams(this.conf, includesFiles), + excludesFiles, + (excludesFiles == null || excludesFiles.isEmpty()) ? null + : getConfigurationInputStreams(this.conf, excludesFiles)); return hostsReader; } + /** + * Opens a collection of InputStreams at the indicated files + * @param bootstrapConf Configuration + * @param names The comma separated configuration file names + * @return A collection of configurations + * @throws YarnException + * @throws IOException + */ + private Set getConfigurationInputStreams( + Configuration bootstrapConf, String names) throws YarnException, + IOException { + Collection fileNames = + StringUtils.getTrimmedStringCollection(names); + Set inputStreams = new HashSet(); + for (String filename : fileNames) { + inputStreams.add(this.rmContext + .getConfigurationProvider().getConfigurationInputStream( + bootstrapConf, filename)); + } + return inputStreams; + } + private void updateInactiveNodes() { long now = Time.monotonicNow(); for(Entry entry : diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index cac4511..994607c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -81,7 +81,10 @@ private final static File TEMP_DIR = new File(System.getProperty( "test.build.data", "/tmp"), "decommision"); - private final File hostFile = new File(TEMP_DIR + File.separator + "hostFile.txt"); + private final File hostFile = + new File(TEMP_DIR + File.separator + "hostFile.txt"); + private final File hostFile2 = + new File(TEMP_DIR + File.separator + "hostFile2.txt"); private MockRM rm; /** @@ -1322,6 +1325,33 @@ public void testNodeRemovalGracefully() throws Exception { testNodeRemovalUtilUnhealthy(true); } + /** + * Use multiple include hosts files + */ + @Test + public void testMultipleIncludeHostsFiles() throws Exception { + + writeToHostsFile(hostFile, "localhost"); + writeToHostsFile(hostFile2, "host1", "host2"); + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile + .getAbsolutePath() + "," + hostFile2.getAbsolutePath()); + + rm = new MockRM(conf); + rm.start(); + + MockNM nm1 = rm.registerNode("host1:1234", 5120); + MockNM nm2 = rm.registerNode("host2:5678", 10240); + MockNM nm3 = rm.registerNode("localhost:4433", 1024); + + NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); + nodeHeartbeat = nm2.nodeHeartbeat(true); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); + nodeHeartbeat = nm3.nodeHeartbeat(true); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); + } + public void refreshNodesOption(boolean doGraceful, Configuration conf) throws Exception { if (doGraceful) {