diff --git hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java index d557309..ea2d9e1 100644 --- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java +++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java @@ -52,9 +52,14 @@ public static void readFileToSet(String type, String filename, Set set) throws IOException { File file = new File(filename); FileInputStream fis = new FileInputStream(file); + readFileToSet(type, filename, fis, set); + } + + public static void readFileToSet(String type, String filename, + InputStream fInputStream, Set set) throws IOException { BufferedReader reader = null; try { - reader = new BufferedReader(new InputStreamReader(fis)); + reader = new BufferedReader(new InputStreamReader(fInputStream)); String line; while ((line = reader.readLine()) != null) { String[] nodes = line.split("[ \t\n\f\r]+"); @@ -71,13 +76,13 @@ public static void readFileToSet(String type, } } } - } + } } finally { if (reader != null) { reader.close(); } - fis.close(); - } + fInputStream.close(); + } } public synchronized void refresh() throws IOException { @@ -96,6 +101,26 @@ public synchronized void refresh() throws IOException { } } + public synchronized void refresh(String includeHostFile, + InputStream includeHostFileInputStream, String excludeHostFile, + InputStream excludeHostFileInputStream) throws IOException { + LOG.info("Refreshing hosts (include/exclude) list"); + if (includeHostFileInputStream != null) { + Set newIncludes = new HashSet(); + readFileToSet("included", includeHostFile, includeHostFileInputStream, + newIncludes); + // switch the new hosts that are to be included + includes = newIncludes; + } + if (excludeHostFileInputStream != null) { + Set newExcludes = new HashSet(); + readFileToSet("excluded", excludeHostFile, excludeHostFileInputStream, + newExcludes); + // switch the excluded hosts + excludes = newExcludes; + } + } + public synchronized Set getHosts() { return includes; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java index 97d7fa8..66cca20 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java @@ -27,8 +27,10 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.io.PrintWriter; import java.net.HttpURLConnection; import java.net.URL; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -47,6 +49,7 @@ import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest; import org.apache.hadoop.yarn.server.resourcemanager.AdminService; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; @@ -299,6 +302,53 @@ public void testAdminServiceRefreshQueuesOnHA() throws IOException, Assert.assertTrue(maxAppsAfter != maxAppsBefore); } + @Test + public void testAdminServiceRefreshNodesOnHA() + throws IOException, YarnException { + cluster.init(conf); + cluster.start(); + getAdminService(0).transitionToActive(req); + assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); + + // clean the remoteDirectory + cleanRemoteDirectory(); + + RefreshNodesRequest request = RefreshNodesRequest.newInstance(); + getAdminService(0).refreshNodes(request); + + Set excludeHostsBefore = + cluster.getResourceManager(0).getRMContext().getNodesListManager() + .getHostsReader().getExcludedHosts(); + Assert.assertEquals(excludeHostsBefore.size(), 0); + + final File basedir = + new File("target", TestRMFailover.class.getName()); + final File tmpDir = new File(basedir, "tmpDir"); + tmpDir.mkdirs(); + final File excludeHostsFile = new File(tmpDir, "exclude_hosts"); + if (excludeHostsFile.exists()) { + excludeHostsFile.delete(); + } + if (!excludeHostsFile.createNewFile()) { + Assert.fail("Can not create excludeHostsFile."); + } + + PrintWriter fileWriter = new PrintWriter(excludeHostsFile); + fileWriter.write("0.0.0.0:123"); + fileWriter.close(); + + // upload the file into Remote File System + uploadToRemoteFileSystem(new Path(excludeHostsFile.getAbsolutePath())); + + getAdminService(0).refreshNodes(request); + + Set excludeHostsAfter = + cluster.getResourceManager(0).getRMContext().getNodesListManager() + .getHostsReader().getExcludedHosts(); + Assert.assertEquals(excludeHostsAfter.size(), 1); + Assert.assertTrue(excludeHostsAfter.contains("0.0.0.0:123")); + } + private String writeConfigurationXML(Configuration conf, String confXMLName) throws IOException { DataOutputStream output = null; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/FileSystemBasedRemoteConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/FileSystemBasedRemoteConfiguration.java index ba6cbea..a9c61aa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/FileSystemBasedRemoteConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/FileSystemBasedRemoteConfiguration.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn; import java.io.IOException; +import java.io.InputStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -48,6 +49,16 @@ public synchronized Configuration getConfiguration(String name) throws IOExcepti } @Override + public synchronized InputStream getRemoteFileInputStream(String name) + throws IOException { + Path remoteFilePath = new Path(this.remoteConfigDir, name); + if (!fs.exists(remoteFilePath)) { + LOG.warn("Can not find file: " + name + " in " + remoteConfigDir); + return null; + } + return fs.open(remoteFilePath); + } + @Override public synchronized void initInternal(Configuration conf) throws Exception { remoteConfigDir = new Path(conf.get(YarnConfiguration.RM_CONF_STORE, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/RemoteConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/RemoteConfiguration.java index 4c565c6..90cedb1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/RemoteConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/RemoteConfiguration.java @@ -19,9 +19,9 @@ package org.apache.hadoop.yarn; import java.io.IOException; +import java.io.InputStream; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; public abstract class RemoteConfiguration extends Configuration{ @@ -37,6 +37,9 @@ public void close() throws Exception { public abstract Configuration getConfiguration(String name) throws IOException; + public abstract InputStream getRemoteFileInputStream(String name) + throws IOException; + public abstract void initInternal(Configuration conf) throws Exception; public abstract void closeInternal() throws Exception; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index 24a3b7d..e31a71c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.io.IOException; +import java.io.InputStream; import java.net.InetSocketAddress; import java.util.Map; import java.util.Set; @@ -92,6 +93,8 @@ private AccessControlList adminAcl; private RemoteConfiguration remoteConfiguration = null; + private static final String includeHostPath = "include_hosts"; + private static final String excludeHostPath = "exclude_hosts"; private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -340,23 +343,45 @@ public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request) @Override public RefreshNodesResponse refreshNodes(RefreshNodesRequest request) throws YarnException, StandbyException { - UserGroupInformation user = checkAcls("refreshNodes"); + String argName = "refreshNodes"; + UserGroupInformation user = checkAcls(argName); if (!isRMActive()) { - RMAuditLogger.logFailure(user.getShortUserName(), "refreshNodes", + RMAuditLogger.logFailure(user.getShortUserName(), argName, adminAcl.toString(), "AdminService", "ResourceManager is not active. Can not refresh nodes."); throwStandbyException(); } + RefreshNodesResponse response = + recordFactory.newRecordInstance(RefreshNodesResponse.class); try { - rmContext.getNodesListManager().refreshNodes(new YarnConfiguration()); - RMAuditLogger.logSuccess(user.getShortUserName(), "refreshNodes", + YarnConfiguration yarnConf = new YarnConfiguration(); + if (this.rmContext.isHAEnabled()) { + InputStream includeHostFileIS = null; + InputStream excludeHostFileIS = null; + synchronized (this) { + includeHostFileIS = + remoteConfiguration.getRemoteFileInputStream(includeHostPath); + excludeHostFileIS = + remoteConfiguration.getRemoteFileInputStream(excludeHostPath); + } + if (includeHostFileIS == null && excludeHostFileIS == null) { + LOG.warn(printFailureDescription(includeHostPath + " and " + + excludeHostPath, argName)); + return response; + } + rmContext.getNodesListManager().refreshNodes(includeHostPath, + includeHostFileIS, excludeHostPath, excludeHostFileIS); + } else { + rmContext.getNodesListManager().refreshNodes(yarnConf); + } + RMAuditLogger.logSuccess(user.getShortUserName(), argName, "AdminService"); - return recordFactory.newRecordInstance(RefreshNodesResponse.class); + return response; } catch (IOException ioe) { LOG.info("Exception refreshing nodes ", ioe); - RMAuditLogger.logFailure(user.getShortUserName(), "refreshNodes", + RMAuditLogger.logFailure(user.getShortUserName(), argName, adminAcl.toString(), "AdminService", "Exception refreshing nodes"); throw RPCUtil.getRemoteException(ioe); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java index 4249980..0f5bf6c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.io.IOException; +import java.io.InputStream; import java.util.Collection; import java.util.Collections; import java.util.Set; @@ -38,6 +39,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import com.google.common.annotations.VisibleForTesting; + @SuppressWarnings("unchecked") public class NodesListManager extends AbstractService implements EventHandler { @@ -103,6 +106,19 @@ private void printConfiguredHosts() { } } + private void printConfiguredHosts(String includeHosts, String excludeHosts) { + if (!LOG.isDebugEnabled()) { + return; + } + LOG.debug("hostsReader: in=" + includeHosts + " out=" + excludeHosts); + for (String include : hostsReader.getHosts()) { + LOG.debug("include: " + include); + } + for (String exclude : hostsReader.getExcludedHosts()) { + LOG.debug("exclude: " + exclude); + } + } + public void refreshNodes(Configuration yarnConf) throws IOException { synchronized (hostsReader) { if (null == yarnConf) { @@ -118,6 +134,16 @@ public void refreshNodes(Configuration yarnConf) throws IOException { } } + public void refreshNodes(String includeHostFile, + InputStream includeHostFileInputStream, String excludeHostsFile, + InputStream excludeHostFileInputStream) throws IOException { + synchronized (hostsReader) { + hostsReader.updateFileNames(includeHostFile, excludeHostsFile); + hostsReader.refresh(includeHostFile, includeHostFileInputStream, + excludeHostsFile, excludeHostFileInputStream); + printConfiguredHosts(includeHostFile, excludeHostsFile); + } + } public boolean isValidNode(String hostName) { synchronized (hostsReader) { Set hostsList = hostsReader.getHosts(); @@ -174,4 +200,10 @@ public void handle(NodesListManagerEvent event) { LOG.error("Ignoring invalid eventtype " + event.getType()); } } + + @VisibleForTesting + public HostsFileReader getHostsReader() { + return this.hostsReader; + } + }