diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java index c5d6b86..d204701 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java @@ -18,19 +18,35 @@ package org.apache.hadoop.util; -import java.io.*; -import java.util.Set; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.HashMap; import java.util.HashSet; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; +import java.util.Map; +import java.util.Set; + +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; import org.apache.commons.io.Charsets; -import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability; +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; +import org.xml.sax.SAXException; // Keeps track of which datanodes/tasktrackers are allowed to connect to the // namenode/jobtracker. @@ -38,7 +54,9 @@ @InterfaceStability.Unstable public class HostsFileReader { private Set includes; - private Set excludes; + // exclude host list with optional timeout. + // If the value is null, it indicates default timeout. + private Map excludes; private String includesFile; private String excludesFile; private WriteLock writeLock; @@ -49,7 +67,7 @@ public HostsFileReader(String inFile, String exFile) throws IOException { includes = new HashSet(); - excludes = new HashSet(); + excludes = new HashMap(); includesFile = inFile; excludesFile = exFile; ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); @@ -62,7 +80,7 @@ public HostsFileReader(String inFile, public HostsFileReader(String includesFile, InputStream inFileInputStream, String excludesFile, InputStream exFileInputStream) throws IOException { includes = new HashSet(); - excludes = new HashSet(); + excludes = new HashMap(); this.includesFile = includesFile; this.excludesFile = excludesFile; ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); @@ -121,6 +139,68 @@ public void refresh() throws IOException { } } + public static void readFileToMap(String type, + String filename, Map map) throws IOException { + File file = new File(filename); + FileInputStream fis = new FileInputStream(file); + readFileToMapWithFileInputStream(type, filename, fis, map); + } + + public static void readFileToMapWithFileInputStream(String type, + String filename, InputStream inputStream, Map map) + throws IOException { + // The input file could be either simple text or XML. + boolean xmlInput = filename.toLowerCase().endsWith(".xml"); + if (xmlInput) { + readXmlFileToMapWithFileInputStream(type, filename, inputStream, map); + } else { + HashSet nodes = new HashSet(); + readFileToSetWithFileInputStream(type, filename, inputStream, nodes); + for (String node : nodes) { + map.put(node, null); + } + } + } + + public static void readXmlFileToMapWithFileInputStream(String type, + String filename, InputStream fileInputStream, Map map) + throws IOException { + Document dom; + DocumentBuilderFactory builder = DocumentBuilderFactory.newInstance(); + try { + DocumentBuilder db = builder.newDocumentBuilder(); + dom = db.parse(fileInputStream); + // Examples: + // host1 + // host2123 + // host3-1 + Element doc = dom.getDocumentElement(); + NodeList nodes = doc.getElementsByTagName("host"); + for (int i = 0; i < nodes.getLength(); i++) { + Node node = nodes.item(i); + if (node.getNodeType() == Node.ELEMENT_NODE) { + Element e= (Element) node; + String host = readFirstTagValue(e, "name"); + String str = readFirstTagValue(e, "timeout"); + Integer timeout = (str == null)? null : Integer.parseInt(str); + map.put(host, timeout); + LOG.info("Adding a node \"" + host + "\" to the list of " + + type + " hosts from " + filename); + } + } + } catch (IOException|SAXException|ParserConfigurationException e) { + LOG.fatal("error parsing " + filename, e); + throw new RuntimeException(e); + } finally { + fileInputStream.close(); + } + } + + static String readFirstTagValue(Element e, String tag) { + NodeList nodes = e.getElementsByTagName(tag); + return (nodes.getLength() == 0)? null : nodes.item(0).getTextContent(); + } + public void refresh(String includeFiles, String excludeFiles) throws IOException { LOG.info("Refreshing hosts (include/exclude) list"); @@ -129,7 +209,7 @@ public void refresh(String includeFiles, String excludeFiles) // update instance variables updateFileNames(includeFiles, excludeFiles); Set newIncludes = new HashSet(); - Set newExcludes = new HashSet(); + Map newExcludes = new HashMap(); boolean switchIncludes = false; boolean switchExcludes = false; if (includeFiles != null && !includeFiles.isEmpty()) { @@ -137,7 +217,7 @@ public void refresh(String includeFiles, String excludeFiles) switchIncludes = true; } if (excludeFiles != null && !excludeFiles.isEmpty()) { - readFileToSet("excluded", excludeFiles, newExcludes); + readFileToMap("excluded", excludeFiles, newExcludes); switchExcludes = true; } @@ -161,7 +241,7 @@ public void refresh(InputStream inFileInputStream, this.writeLock.lock(); try { Set newIncludes = new HashSet(); - Set newExcludes = new HashSet(); + Map newExcludes = new HashMap(); boolean switchIncludes = false; boolean switchExcludes = false; if (inFileInputStream != null) { @@ -170,7 +250,7 @@ public void refresh(InputStream inFileInputStream, switchIncludes = true; } if (exFileInputStream != null) { - readFileToSetWithFileInputStream("excluded", excludesFile, + readFileToMapWithFileInputStream("excluded", excludesFile, exFileInputStream, newExcludes); switchExcludes = true; } @@ -199,7 +279,7 @@ public void refresh(InputStream inFileInputStream, public Set getExcludedHosts() { this.readLock.lock(); try { - return excludes; + return excludes.keySet(); } finally { this.readLock.unlock(); } @@ -209,7 +289,22 @@ public void getHostDetails(Set includes, Set excludes) { this.readLock.lock(); try { includes.addAll(this.includes); - excludes.addAll(this.excludes); + excludes.addAll(this.excludes.keySet()); + } finally { + this.readLock.unlock(); + } + } + + public synchronized Map getExcludedHostsWithTimeout() { + return excludes; + } + + public void getHostDetails(Set includeHosts, + Map excludeHosts) { + this.readLock.lock(); + try { + includeHosts.addAll(this.includes); + excludeHosts.putAll(this.excludes); } finally { this.readLock.unlock(); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java index 8015f7a..125f9d7 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java @@ -22,14 +22,16 @@ import java.io.FileWriter; import java.util.HashSet; import java.util.Set; +import java.util.Map; import org.apache.hadoop.test.GenericTestUtils; import org.junit.*; + import static org.junit.Assert.*; /* * Test for HostsFileReader.java - * + * */ public class TestHostsFileReader { @@ -39,6 +41,7 @@ File INCLUDES_FILE = new File(HOSTS_TEST_DIR, "dfs.include"); String excludesFile = HOSTS_TEST_DIR + "/dfs.exclude"; String includesFile = HOSTS_TEST_DIR + "/dfs.include"; + private String excludesXmlFile = HOSTS_TEST_DIR + "/dfs.exclude.xml"; @Before public void setUp() throws Exception { @@ -288,4 +291,52 @@ public void testHostFileReaderWithTabs() throws Exception { assertFalse(hfp.getExcludedHosts().contains("somehost5")); } + + /* + * Test if timeout values are provided in HostFile + */ + @Test + public void testHostFileReaderWithTimeout() throws Exception { + FileWriter efw = new FileWriter(excludesXmlFile); + FileWriter ifw = new FileWriter(includesFile); + + efw.write("\n"); + efw.write("\n"); + efw.write("\n"); + efw.write("host1\n"); + efw.write("host2123\n"); + efw.write("host3-1\n"); + efw.write("10000\n"); + efw.write("10001123\n"); + efw.write("10002-1\n"); + efw.write("\n"); + efw.close(); + + ifw.write("#Hosts-in-DFS\n"); + ifw.write(" \n"); + ifw.write(" somehost \t somehost2 \n somehost4"); + ifw.write(" somehost3 \t # somehost5"); + ifw.close(); + + HostsFileReader hfp = new HostsFileReader(includesFile, excludesXmlFile); + + int includesLen = hfp.getHosts().size(); + int excludesLen = hfp.getExcludedHosts().size(); + assertEquals(4, includesLen); + assertEquals(6, excludesLen); + + Map excludes = hfp.getExcludedHostsWithTimeout(); + assertTrue(excludes.containsKey("host1")); + assertTrue(excludes.containsKey("host2")); + assertTrue(excludes.containsKey("host3")); + assertTrue(excludes.containsKey("10000")); + assertTrue(excludes.containsKey("10001")); + assertTrue(excludes.containsKey("10002")); + assertTrue(excludes.get("host1") == null); + assertTrue(excludes.get("host2") == 123); + assertTrue(excludes.get("host3") == -1); + assertTrue(excludes.get("10000") == null); + assertTrue(excludes.get("10001") == 123); + assertTrue(excludes.get("10002") == -1); + } } diff --git a/hadoop-project/src/site/site.xml b/hadoop-project/src/site/site.xml index de09016..f36bb41 100644 --- a/hadoop-project/src/site/site.xml +++ b/hadoop-project/src/site/site.xml @@ -136,6 +136,7 @@ + diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java index bd737bd..c598aa0 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java @@ -213,6 +213,11 @@ public long getUntrackedTimeStamp() { @Override public void setUntrackedTimeStamp(long timeStamp) { } + + @Override + public Integer getDecommissioningTimeout() { + return null; + } } public static RMNode newNodeInfo(String rackName, String hostName, diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index 5048978..6d0ffbd 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -202,4 +202,9 @@ public long getUntrackedTimeStamp() { @Override public void setUntrackedTimeStamp(long timeStamp) { } + + @Override + public Integer getDecommissioningTimeout() { + return null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 3bb73f5..116ab06 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -783,6 +783,20 @@ public static boolean isAclEnabled(Configuration conf) { */ public static final String RM_PROXY_USER_PREFIX = RM_PREFIX + "proxyuser."; + /** + * Timeout in seconds for YARN node graceful decommission. + * This is the maximal time to wait for running containers and applications + * to complete before transition a DECOMMISSIONING node into DECOMMISSIONED. + */ + public static final String RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT = + RM_PREFIX + "nodemanager-graceful-decommission-timeout-secs"; + public static final int DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT = 3600; + + public static final String RM_DECOMMISSIONING_NODES_WATCHER_POLL_INTERVAL = + RM_PREFIX + "decommissioning-nodes-watcher.poll-interval-secs"; + public static final int + DEFAULT_RM_DECOMMISSIONING_NODES_WATCHER_POLL_INTERVAL = 20; + //////////////////////////////// // Node Manager Configs //////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshNodesRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshNodesRequest.java index 0333c3b..732d98e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshNodesRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshNodesRequest.java @@ -43,6 +43,16 @@ public static RefreshNodesRequest newInstance( return request; } + @Private + @Unstable + public static RefreshNodesRequest newInstance( + DecommissionType decommissionType, Integer timeout) { + RefreshNodesRequest request = Records.newRecord(RefreshNodesRequest.class); + request.setDecommissionType(decommissionType); + request.setDecommissionTimeout(timeout); + return request; + } + /** * Set the DecommissionType * @@ -56,4 +66,18 @@ public static RefreshNodesRequest newInstance( * @return decommissionType */ public abstract DecommissionType getDecommissionType(); -} + + /** + * Set the DecommissionTimeout. + * + * @param timeout graceful decommission timeout in seconds + */ + public abstract void setDecommissionTimeout(Integer timeout); + + /** + * Get the DecommissionTimeout. + * + * @return decommissionTimeout + */ + public abstract Integer getDecommissionTimeout(); +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto index eaf658f..b9f30db 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto @@ -37,6 +37,7 @@ message RefreshQueuesResponseProto { message RefreshNodesRequestProto { optional DecommissionTypeProto decommissionType = 1 [default = NORMAL]; + optional int32 decommissionTimeout = 2; } message RefreshNodesResponseProto { } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java index 4aa3a14..e09a665 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java @@ -309,33 +309,40 @@ private int refreshQueues() throws IOException, YarnException { return 0; } - private int refreshNodes() throws IOException, YarnException { + private int refreshNodes(boolean graceful) throws IOException, YarnException { // Refresh the nodes ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol(); - RefreshNodesRequest request = RefreshNodesRequest - .newInstance(DecommissionType.NORMAL); + RefreshNodesRequest request = RefreshNodesRequest.newInstance( + graceful? DecommissionType.GRACEFUL : DecommissionType.NORMAL); adminProtocol.refreshNodes(request); return 0; } - private int refreshNodes(long timeout, String trackingMode) + private int refreshNodes(int timeout, String trackingMode) throws IOException, YarnException { - if (!"client".equals(trackingMode)) { - throw new UnsupportedOperationException( - "Only client tracking mode is currently supported."); - } + boolean serverTracking = !"client".equals(trackingMode); // Graceful decommissioning with timeout ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol(); RefreshNodesRequest gracefulRequest = RefreshNodesRequest - .newInstance(DecommissionType.GRACEFUL); + .newInstance(DecommissionType.GRACEFUL, timeout); adminProtocol.refreshNodes(gracefulRequest); + if (serverTracking) { + return 0; + } CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest = recordFactory .newRecordInstance(CheckForDecommissioningNodesRequest.class); long waitingTime; boolean nodesDecommissioning = true; + // As RM enforces timeout automatically, client usually don't need + // to forcefully decommission nodes upon timeout. + // Here we let the client waits a small additional seconds so to avoid + // unnecessary double decommission. + final int gracePeriod = 5; // timeout=-1 means wait for all the nodes to be gracefully // decommissioned - for (waitingTime = 0; waitingTime < timeout || timeout == -1; waitingTime++) { + for (waitingTime = 0; + timeout == -1 || (timeout >= 0 && waitingTime < timeout + gracePeriod); + waitingTime++) { // wait for one second to check nodes decommissioning status try { Thread.sleep(1000); @@ -380,6 +387,10 @@ private int refreshNodesResources() throws IOException, YarnException { return 0; } + private int refreshNodes() throws IOException, YarnException { + return refreshNodes(false); + } + private int refreshUserToGroupsMappings() throws IOException, YarnException { // Refresh the user-to-groups mappings @@ -725,33 +736,12 @@ public int run(String[] args) throws Exception { return exitCode; } } - + try { if ("-refreshQueues".equals(cmd)) { exitCode = refreshQueues(); } else if ("-refreshNodes".equals(cmd)) { - if (args.length == 1) { - exitCode = refreshNodes(); - } else if (args.length == 3 || args.length == 4) { - // if the graceful timeout specified - if ("-g".equals(args[1])) { - long timeout = -1; - String trackingMode; - if (args.length == 4) { - timeout = validateTimeout(args[2]); - trackingMode = validateTrackingMode(args[3]); - } else { - trackingMode = validateTrackingMode(args[2]); - } - exitCode = refreshNodes(timeout, trackingMode); - } else { - printUsage(cmd, isHAEnabled); - return -1; - } - } else { - printUsage(cmd, isHAEnabled); - return -1; - } + exitCode = handleRefreshNodes(args, cmd, isHAEnabled); } else if ("-refreshNodesResources".equals(cmd)) { exitCode = refreshNodesResources(); } else if ("-refreshUserToGroupsMappings".equals(cmd)) { @@ -768,22 +758,7 @@ public int run(String[] args) throws Exception { String[] usernames = Arrays.copyOfRange(args, i, args.length); exitCode = getGroups(usernames); } else if ("-updateNodeResource".equals(cmd)) { - if (args.length < 4 || args.length > 5) { - System.err.println("Number of parameters specified for " + - "updateNodeResource is wrong."); - printUsage(cmd, isHAEnabled); - exitCode = -1; - } else { - String nodeID = args[i++]; - String memSize = args[i++]; - String cores = args[i++]; - int overCommitTimeout = ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT; - if (i == args.length - 1) { - overCommitTimeout = Integer.parseInt(args[i]); - } - exitCode = updateNodeResource(nodeID, Integer.parseInt(memSize), - Integer.parseInt(cores), overCommitTimeout); - } + exitCode = handleUpdateNodeResource(args, cmd, isHAEnabled); } else if ("-addToClusterNodeLabels".equals(cmd)) { if (i >= args.length) { System.err.println(NO_LABEL_ERR_MSG); @@ -843,10 +818,59 @@ public int run(String[] args) throws Exception { return exitCode; } - private long validateTimeout(String strTimeout) { - long timeout; + // A helper method to reduce the number of lines of run() + private int handleRefreshNodes(String[] args, String cmd, boolean isHAEnabled) + throws IOException, YarnException { + if (args.length == 1) { + return refreshNodes(); + } else if (args.length == 3 || args.length == 4) { + // if the graceful timeout specified + if ("-g".equals(args[1]) || "-graceful".equals(args[1])) { + int timeout = -1; + String trackingMode; + if (args.length == 4) { + timeout = validateTimeout(args[2]); + trackingMode = validateTrackingMode(args[3]); + } else { + trackingMode = validateTrackingMode(args[2]); + } + return refreshNodes(timeout, trackingMode); + } else { + printUsage(cmd, isHAEnabled); + return -1; + } + } else { + printUsage(cmd, isHAEnabled); + return -1; + } + } + + private int handleUpdateNodeResource( + String[] args, String cmd, boolean isHAEnabled) + throws NumberFormatException, IOException, YarnException { + int i = 1; + if (args.length < 4 || args.length > 5) { + System.err.println("Number of parameters specified for " + + "updateNodeResource is wrong."); + printUsage(cmd, isHAEnabled); + return -1; + } else { + String nodeID = args[i++]; + String memSize = args[i++]; + String cores = args[i++]; + int overCommitTimeout = ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT; + if (i == args.length - 1) { + overCommitTimeout = Integer.parseInt(args[i]); + } + return updateNodeResource(nodeID, Integer.parseInt(memSize), + Integer.parseInt(cores), overCommitTimeout); + } + } + + private int validateTimeout(String strTimeout) { + int timeout; try { - timeout = Long.parseLong(strTimeout); + timeout = Integer.parseInt(strTimeout); } catch (NumberFormatException ex) { throw new IllegalArgumentException(INVALID_TIMEOUT_ERR_MSG + strTimeout); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java index d3161ba..e33f23b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java @@ -267,7 +267,7 @@ public void testRefreshNodesGracefulBeforeTimeout() throws Exception { CheckForDecommissioningNodesRequest.class))).thenReturn(response); assertEquals(0, rmAdminCLI.run(args)); verify(admin).refreshNodes( - RefreshNodesRequest.newInstance(DecommissionType.GRACEFUL)); + RefreshNodesRequest.newInstance(DecommissionType.GRACEFUL, 1)); verify(admin, never()).refreshNodes( RefreshNodesRequest.newInstance(DecommissionType.FORCEFUL)); } @@ -327,7 +327,7 @@ public CheckForDecommissioningNodesResponse answer( }); assertEquals(0, rmAdminCLI.run(args)); verify(admin, atLeastOnce()).refreshNodes( - RefreshNodesRequest.newInstance(DecommissionType.GRACEFUL)); + RefreshNodesRequest.newInstance(DecommissionType.GRACEFUL, -1)); verify(admin, never()).refreshNodes( RefreshNodesRequest.newInstance(DecommissionType.FORCEFUL)); } @@ -346,10 +346,6 @@ public void testRefreshNodesGracefulInvalidArgs() throws Exception { String[] negativeTimeoutArgs = {"-refreshNodes", "-g", "-1000", "-client"}; assertEquals(-1, rmAdminCLI.run(negativeTimeoutArgs)); - // server tracking mode - String[] serveTrackingrArgs = {"-refreshNodes", "-g", "1", "-server"}; - assertEquals(-1, rmAdminCLI.run(serveTrackingrArgs)); - // invalid tracking mode String[] invalidTrackingArgs = {"-refreshNodes", "-g", "1", "-foo"}; assertEquals(-1, rmAdminCLI.run(invalidTrackingArgs)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshNodesRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshNodesRequestPBImpl.java index 05f3230..c03a569 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshNodesRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshNodesRequestPBImpl.java @@ -31,7 +31,6 @@ @Private @Unstable public class RefreshNodesRequestPBImpl extends RefreshNodesRequest { - RefreshNodesRequestProto proto = RefreshNodesRequestProto.getDefaultInstance(); RefreshNodesRequestProto.Builder builder = null; boolean viaProto = false; @@ -108,6 +107,22 @@ public synchronized DecommissionType getDecommissionType() { return convertFromProtoFormat(p.getDecommissionType()); } + @Override + public synchronized void setDecommissionTimeout(Integer timeout) { + maybeInitBuilder(); + if (timeout != null) { + builder.setDecommissionTimeout(timeout); + } else { + builder.clearDecommissionTimeout(); + } + } + + @Override + public synchronized Integer getDecommissionTimeout() { + RefreshNodesRequestProtoOrBuilder p = viaProto ? proto : builder; + return p.hasDecommissionTimeout()? p.getDecommissionTimeout() : null; + } + private DecommissionType convertFromProtoFormat(DecommissionTypeProto p) { return DecommissionType.valueOf(p.name()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index c8bc741..3b89a7d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2586,6 +2586,24 @@ + + Timeout in seconds for YARN node graceful decommission. + This is the maximal time to wait for running containers and applications to complete + before transition a DECOMMISSIONING node into DECOMMISSIONED. + + yarn.resourcemanager.nodemanager-graceful-decommission-timeout-secs + 3600 + + + + + Timeout in seconds of DecommissioningNodesWatcher internal polling. + + yarn.resourcemanager.decommissioning-nodes-watcher.poll-interval-secs + 20 + + + The Node Label script to run. Script output Line starting with "NODE_PARTITION:" will be considered as Node Label Partition. In case of multiple lines have this pattern, then last one will be considered diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index 4d4ad5d..a5f2b9e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -440,7 +440,8 @@ public RefreshNodesResponse refreshNodes(RefreshNodesRequest request) rmContext.getNodesListManager().refreshNodes(conf); break; case GRACEFUL: - rmContext.getNodesListManager().refreshNodesGracefully(conf); + rmContext.getNodesListManager().refreshNodesGracefully( + conf, request.getDecommissionTimeout()); break; case FORCEFUL: rmContext.getNodesListManager().refreshNodesForcefully(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java new file mode 100644 index 0000000..559c23d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java @@ -0,0 +1,436 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; + +/** + * DecommissioningNodesWatcher is used by ResourceTrackerService to track + * DECOMMISSIONING nodes to decide when, after all running containers on + * the node have completed, will be transitioned into DECOMMISSIONED state + * (NodeManager will be told to shutdown). + * Under MR application, a node, after completes all its containers, + * may still serve it map output data during the duration of the application + * for reducers. A fully graceful mechanism would keep such DECOMMISSIONING + * nodes until all involved applications complete. It could be however + * undesirable under long-running applications scenario where a bunch + * of "idle" nodes would stay around for long period of time. + * + * DecommissioningNodesWatcher balance such concern with a timeout policy --- + * a DECOMMISSIONING node will be DECOMMISSIONED no later than + * DECOMMISSIONING_TIMEOUT regardless of running containers or applications. + * + * To be efficient, DecommissioningNodesWatcher skip tracking application + * containers on a particular node before the node is in DECOMMISSIONING state. + * It only tracks containers once the node is in DECOMMISSIONING state. + * DecommissioningNodesWatcher basically is no cost when no node is + * DECOMMISSIONING. This sacrifices the possibility that that node once + * host containers of an still running application. + */ +public class DecommissioningNodesWatcher { + private static final Log LOG = + LogFactory.getLog(DecommissioningNodesWatcher.class); + + private final RMContext rmContext; + + // Default timeout value in mills. + // Negative value indicates no timeout. 0 means immediate. + private long defaultTimeoutMs = + 1000L * YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT; + + // Once a RMNode is observed in DECOMMISSIONING state, + // All its ContainerStatus update are tracked inside DecomNodeContext. + class DecommissioningNodeContext { + private final NodeId nodeId; + + // Last known NodeState. + private NodeState nodeState; + + // The moment node is observed in DECOMMISSIONING state. + private final long decommissioningStartTime; + + private long lastContainerFinishTime; + + // number of running containers at the moment. + private int numActiveContainers; + + // All applications run on the node at or after decommissioningStartTime. + private Set appIds; + + // First moment the node is observed in DECOMMISSIONED state. + private long decommissionedTime; + + // Timeout in millis for this decommissioning node. + // This value could be dynamically updated with new value from RMNode. + private long timeoutMs; + + private long lastUpdateTime; + + public DecommissioningNodeContext(NodeId nodeId) { + this.nodeId = nodeId; + this.appIds = new HashSet(); + this.decommissioningStartTime = System.currentTimeMillis(); + this.timeoutMs = defaultTimeoutMs; + } + + void updateTimeout(Integer timeoutSec) { + this.timeoutMs = (timeoutSec == null)? + defaultTimeoutMs : (1000L * timeoutSec); + } + } + + // All DECOMMISSIONING nodes to track. + private HashMap decomNodes = + new HashMap(); + + private Timer pollTimer; + + public DecommissioningNodesWatcher(RMContext rmContext) { + this.rmContext = rmContext; + pollTimer = new Timer(true); + } + + public void init(Configuration conf) { + readDecommissioningTimeout(conf); + int v = conf.getInt( + YarnConfiguration.RM_DECOMMISSIONING_NODES_WATCHER_POLL_INTERVAL, + YarnConfiguration + .DEFAULT_RM_DECOMMISSIONING_NODES_WATCHER_POLL_INTERVAL); + pollTimer.schedule(new PollTimerTask(rmContext), 0, (1000L * v)); + } + + /** + * Update rmNode decommissioning status based on NodeStatus. + * @param rmNode The node + * @param remoteNodeStatus latest NodeStatus + */ + public synchronized void update(RMNode rmNode, NodeStatus remoteNodeStatus) { + DecommissioningNodeContext context = decomNodes.get(rmNode.getNodeID()); + long now = System.currentTimeMillis(); + if (rmNode.getState() == NodeState.DECOMMISSIONED) { + if (context == null) { + return; + } + context.nodeState = rmNode.getState(); + // keep DECOMMISSIONED node for a while for status log. + if (context.decommissionedTime == 0) { + context.decommissionedTime = now; + } else if (now - context.decommissionedTime > 60000L) { + decomNodes.remove(rmNode.getNodeID()); + LOG.info("remove " + rmNode.getState() + " " + rmNode.getNodeID()); + } + } else if (rmNode.getState() == NodeState.DECOMMISSIONING) { + if (context == null) { + context = new DecommissioningNodeContext(rmNode.getNodeID()); + decomNodes.put(rmNode.getNodeID(), context); + context.nodeState = rmNode.getState(); + context.decommissionedTime = 0; + } + context.updateTimeout(rmNode.getDecommissioningTimeout()); + context.lastUpdateTime = now; + + if (remoteNodeStatus.getKeepAliveApplications() != null) { + context.appIds.addAll(remoteNodeStatus.getKeepAliveApplications()); + } + + // Count number of active containers. + int numActiveContainers = 0; + for (ContainerStatus cs : remoteNodeStatus.getContainersStatuses()) { + ContainerState newState = cs.getState(); + if (newState == ContainerState.RUNNING || + newState == ContainerState.NEW) { + numActiveContainers++; + } + context.numActiveContainers = numActiveContainers; + ApplicationId aid = cs.getContainerId() + .getApplicationAttemptId().getApplicationId(); + if (!context.appIds.contains(aid)) { + context.appIds.add(aid); + } + } + + context.numActiveContainers = numActiveContainers; + + // maintain lastContainerFinishTime. + if (context.numActiveContainers == 0 && + context.lastContainerFinishTime == 0) { + context.lastContainerFinishTime = now; + } + } else { + // remove node in other states + if (context != null) { + decomNodes.remove(rmNode.getNodeID()); + } + } + } + + public synchronized void remove(NodeId nodeId) { + DecommissioningNodeContext context = decomNodes.get(nodeId); + if (context != null) { + LOG.info("remove " + nodeId + " in " + context.nodeState); + decomNodes.remove(nodeId); + } + } + + /** + * Status about a specific decommissioning node. + * + */ + public enum DecommissioningNodeStatus { + // Node is not in DECOMMISSIONING state. + NONE, + + // wait for running containers to complete + WAIT_CONTAINER, + + // wait for running application to complete (after all containers complete); + WAIT_APP, + + // Timeout waiting for either containers or applications to complete. + TIMEOUT, + + // nothing to wait, ready to be decommissioned + READY, + + // The node has already been decommissioned + DECOMMISSIONED, + } + + public boolean checkReadyToBeDecommissioned(NodeId nodeId) { + DecommissioningNodeStatus s = checkDecommissioningStatus(nodeId); + return (s == DecommissioningNodeStatus.READY || + s == DecommissioningNodeStatus.TIMEOUT); + } + + public DecommissioningNodeStatus checkDecommissioningStatus(NodeId nodeId) { + DecommissioningNodeContext context = decomNodes.get(nodeId); + if (context == null) { + return DecommissioningNodeStatus.NONE; + } + + if (context.nodeState == NodeState.DECOMMISSIONED) { + return DecommissioningNodeStatus.DECOMMISSIONED; + } + + long waitTime = + System.currentTimeMillis() - context.decommissioningStartTime; + if (context.numActiveContainers > 0) { + return (context.timeoutMs < 0 || waitTime < context.timeoutMs)? + DecommissioningNodeStatus.WAIT_CONTAINER : + DecommissioningNodeStatus.TIMEOUT; + } + + removeCompletedApps(context); + if (context.appIds.size() == 0) { + return DecommissioningNodeStatus.READY; + } else { + return (context.timeoutMs < 0 || waitTime < context.timeoutMs)? + DecommissioningNodeStatus.WAIT_APP : + DecommissioningNodeStatus.TIMEOUT; + } + } + + /** + * PollTimerTask periodically: + * 1. log status of all DECOMMISSIONING nodes; + * 2. identify and taken care of stale DECOMMISSIONING nodes + * (for example, node already terminated). + */ + class PollTimerTask extends TimerTask { + private final RMContext rmContext; + + public PollTimerTask(RMContext rmContext) { + this.rmContext = rmContext; + } + + public void run() { + logDecommissioningNodesStatus(); + long now = System.currentTimeMillis(); + Set staleNodes = new HashSet(); + + for (Iterator> it = + decomNodes.entrySet().iterator(); it.hasNext();) { + Map.Entry e = it.next(); + DecommissioningNodeContext d = e.getValue(); + // Skip node recently updated (NM usually updates every second). + if (now - d.lastUpdateTime < 30000L) { + continue; + } + // Remove stale non-DECOMMISSIONING node + if (d.nodeState != NodeState.DECOMMISSIONING) { + LOG.info("remove " + d.nodeState + " " + d.nodeId); + it.remove(); + continue; + } else if (now - d.lastUpdateTime > 60000L) { + // Node DECOMMISSIONED could become stale, remove as necessary. + RMNode rmNode = getRmNode(d.nodeId); + if (rmNode != null && + rmNode.getState() == NodeState.DECOMMISSIONED) { + LOG.info("remove " + rmNode.getState() + " " + d.nodeId); + it.remove(); + continue; + } + } + if (d.timeoutMs >= 0 && + d.decommissioningStartTime + d.timeoutMs < now) { + staleNodes.add(d.nodeId); + LOG.info("Identified stale and timeout node " + d.nodeId); + } + } + + for (NodeId nodeId : staleNodes) { + RMNode rmNode = this.rmContext.getRMNodes().get(nodeId); + if (rmNode == null || rmNode.getState() != NodeState.DECOMMISSIONING) { + remove(nodeId); + continue; + } + if (rmNode.getState() == NodeState.DECOMMISSIONING && + checkReadyToBeDecommissioned(rmNode.getNodeID())) { + LOG.info("DECOMMISSIONING " + nodeId + " timeout"); + this.rmContext.getDispatcher().getEventHandler().handle( + new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION)); + } + } + } + } + + private RMNode getRmNode(NodeId nodeId) { + RMNode rmNode = this.rmContext.getRMNodes().get(nodeId); + if (rmNode == null) { + rmNode = this.rmContext.getInactiveRMNodes().get(nodeId); + } + return rmNode; + } + + private void removeCompletedApps(DecommissioningNodeContext context) { + Iterator it = context.appIds.iterator(); + while (it.hasNext()) { + ApplicationId appId = it.next(); + RMApp rmApp = rmContext.getRMApps().get(appId); + if (rmApp == null) { + LOG.debug("Consider non-existing app " + appId + " as completed"); + it.remove(); + continue; + } + if (rmApp.getState() == RMAppState.FINISHED || + rmApp.getState() == RMAppState.FAILED || + rmApp.getState() == RMAppState.KILLED) { + LOG.debug("Remove " + rmApp.getState() + " app " + appId); + it.remove(); + } + } + } + + // Time in second to be decommissioned. + private int getTimeoutInSec(DecommissioningNodeContext context) { + if (context.nodeState == NodeState.DECOMMISSIONED) { + return 0; + } else if (context.nodeState != NodeState.DECOMMISSIONING) { + return -1; + } + if (context.appIds.size() == 0 && context.numActiveContainers == 0) { + return 0; + } + // negative timeout value means no timeout (infinite timeout). + if (context.timeoutMs < 0) { + return -1; + } + + long now = System.currentTimeMillis(); + long timeout = context.decommissioningStartTime + context.timeoutMs - now; + return Math.max(0, (int)(timeout / 1000)); + } + + private void logDecommissioningNodesStatus() { + if (!LOG.isDebugEnabled() || decomNodes.size() == 0) { + return; + } + StringBuilder sb = new StringBuilder(); + long now = System.currentTimeMillis(); + for (DecommissioningNodeContext d : decomNodes.values()) { + DecommissioningNodeStatus s = checkDecommissioningStatus(d.nodeId); + sb.append(String.format( + "%n %-34s %4ds fresh:%3ds containers:%2d %14s", + d.nodeId.getHost(), + (now - d.decommissioningStartTime) / 1000, + (now - d.lastUpdateTime) / 1000, + d.numActiveContainers, + s)); + if (s == DecommissioningNodeStatus.WAIT_APP || + s == DecommissioningNodeStatus.WAIT_CONTAINER) { + sb.append(String.format(" timeout:%4ds", getTimeoutInSec(d))); + } + for (ApplicationId aid : d.appIds) { + sb.append("\n " + aid); + RMApp rmApp = rmContext.getRMApps().get(aid); + if (rmApp != null) { + sb.append(String.format( + " %s %9s %5.2f%% %5ds", + rmApp.getState(), + (rmApp.getApplicationType() == null)? + "" : rmApp.getApplicationType(), + 100.0 * rmApp.getProgress(), + (System.currentTimeMillis() - rmApp.getStartTime()) / 1000)); + } + } + } + LOG.info("Decommissioning Nodes: " + sb.toString()); + } + + // Read possible new DECOMMISSIONING_TIMEOUT_KEY from yarn-site.xml. + // This enables DecommissioningNodesWatcher to pick up new value + // without ResourceManager restart. + private void readDecommissioningTimeout(Configuration conf) { + try { + if (conf == null) { + conf = new YarnConfiguration(); + } + int v = conf.getInt( + YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT, + YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT); + if (defaultTimeoutMs != 1000L * v) { + defaultTimeoutMs = 1000L * v; + LOG.info("Use new decommissioningTimeoutMs: " + defaultTimeoutMs); + } + } catch (Exception e) { + LOG.info("Error readDecommissioningTimeout ", e); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java index 7937383..e6b5373 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java @@ -19,14 +19,17 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.io.IOException; +import java.util.ArrayList; import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; -import java.util.Map; -import java.util.Iterator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -37,6 +40,7 @@ import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.HostsFileReader; import org.apache.hadoop.util.Time; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -47,14 +51,15 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecommissioningEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; - -import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; +import com.google.common.annotations.VisibleForTesting; + @SuppressWarnings("unchecked") public class NodesListManager extends CompositeService implements EventHandler { @@ -178,10 +183,11 @@ private void printConfiguredHosts() { if (!LOG.isDebugEnabled()) { return; } - - LOG.debug("hostsReader: in=" + conf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + + LOG.debug("hostsReader: in=" + + conf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH) + " out=" + - conf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, + conf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH)); Set hostsList = new HashSet(); @@ -196,23 +202,19 @@ private void printConfiguredHosts() { } } - public void refreshNodes(Configuration yarnConf) throws IOException, - YarnException { - refreshHostsReader(yarnConf); + public void refreshNodes(Configuration yarnConf) + throws IOException, YarnException { + refreshNodes(yarnConf, false); + } - for (NodeId nodeId: rmContext.getRMNodes().keySet()) { - if (!isValidNode(nodeId.getHost())) { - RMNodeEventType nodeEventType = isUntrackedNode(nodeId.getHost()) ? - RMNodeEventType.SHUTDOWN : RMNodeEventType.DECOMMISSION; - this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeEvent(nodeId, nodeEventType)); - } - } - updateInactiveNodes(); + public void refreshNodes(Configuration yarnConf, boolean graceful) + throws IOException, YarnException { + refreshHostsReader(yarnConf, graceful, null); } - private void refreshHostsReader(Configuration yarnConf) throws IOException, - YarnException { + private void refreshHostsReader( + Configuration yarnConf, boolean graceful, Integer timeout) + throws IOException, YarnException { if (null == yarnConf) { yarnConf = new YarnConfiguration(); } @@ -222,8 +224,16 @@ private void refreshHostsReader(Configuration yarnConf) throws IOException, excludesFile = yarnConf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH); + LOG.info("refreshNodes excludesFile " + excludesFile); hostsReader.refresh(includesFile, excludesFile); printConfiguredHosts(); + + LOG.info("hostsReader include:{" + + StringUtils.join(",", hostsReader.getHosts()) + + "} exclude:{" + + StringUtils.join(",", hostsReader.getExcludedHosts()) + "}"); + + handleExcludeNodeList(graceful, timeout); } private void setDecomissionedNMs() { @@ -237,6 +247,82 @@ private void setDecomissionedNMs() { } } + // Handle excluded nodes based on following rules: + // Recommission DECOMMISSIONED or DECOMMISSIONING nodes no longer excluded; + // Gracefully decommission excluded nodes that are not already + // DECOMMISSIONED nor DECOMMISSIONING; Take no action for excluded nodes + // that are already DECOMMISSIONED or DECOMMISSIONING. + private void handleExcludeNodeList(boolean graceful, Integer timeout) { + // DECOMMISSIONED/DECOMMISSIONING nodes need to be re-commissioned. + List nodesToRecom = new ArrayList(); + + // Nodes need to be decommissioned (graceful or forceful); + List nodesToDecom = new ArrayList(); + + Map timeouts = hostsReader.getExcludedHostsWithTimeout(); + + for (RMNode n : this.rmContext.getRMNodes().values()) { + NodeState s = n.getState(); + // An invalid node (either due to explicit exclude or not include) + // should be excluded. + boolean exclude = !isValidNode(n.getHostName()); + String nodeStr = "node " + n.getNodeID() + " with state " + s; + if (!exclude) { + if (s == NodeState.DECOMMISSIONED || s == NodeState.DECOMMISSIONING) { + LOG.info("Recommission " + nodeStr); + nodesToRecom.add(n); + } + // Otherwise no-action needed. + } else { + // exclude is true. + if (graceful) { + // Use per node timeout if exist otherwise the request timeout. + Integer timeoutToUse = (timeouts.get(n.getHostName()) != null)? + timeouts.get(n.getHostName()) : timeout; + if (s != NodeState.DECOMMISSIONED && + s != NodeState.DECOMMISSIONING) { + LOG.info("Gracefully decommission " + nodeStr); + nodesToDecom.add(n); + } else if (s == NodeState.DECOMMISSIONING && + !Objects.equals(n.getDecommissioningTimeout(), + timeoutToUse)) { + LOG.info("Update " + nodeStr + " timeout to be " + timeoutToUse); + nodesToDecom.add(n); + } else { + LOG.info("No action for " + nodeStr); + } + } else { + if (s != NodeState.DECOMMISSIONED) { + LOG.info("Forcefully decommission " + nodeStr); + nodesToDecom.add(n); + } + } + } + } + + for (RMNode n : nodesToRecom) { + RMNodeEvent e = new RMNodeEvent( + n.getNodeID(), RMNodeEventType.RECOMMISSION); + this.rmContext.getDispatcher().getEventHandler().handle(e); + } + + for (RMNode n : nodesToDecom) { + RMNodeEvent e; + if (graceful) { + Integer timeoutToUse = (timeouts.get(n.getHostName()) != null)? + timeouts.get(n.getHostName()) : timeout; + e = new RMNodeDecommissioningEvent(n.getNodeID(), timeoutToUse); + } else { + RMNodeEventType eventType = isUntrackedNode(n.getHostName())? + RMNodeEventType.SHUTDOWN : RMNodeEventType.DECOMMISSION; + e = new RMNodeEvent(n.getNodeID(), eventType); + } + this.rmContext.getDispatcher().getEventHandler().handle(e); + } + + updateInactiveNodes(); + } + @VisibleForTesting public int getNodeRemovalCheckInterval() { return nodeRemovalCheckInterval; @@ -478,29 +564,14 @@ public boolean isUntrackedNode(String hostName) { /** * Refresh the nodes gracefully * - * @param conf + * @param yarnConf + * @param timeout decommission timeout, null means default timeout. * @throws IOException * @throws YarnException */ - public void refreshNodesGracefully(Configuration conf) throws IOException, - YarnException { - refreshHostsReader(conf); - for (Entry entry : rmContext.getRMNodes().entrySet()) { - NodeId nodeId = entry.getKey(); - if (!isValidNode(nodeId.getHost())) { - RMNodeEventType nodeEventType = isUntrackedNode(nodeId.getHost()) ? - RMNodeEventType.SHUTDOWN : RMNodeEventType.GRACEFUL_DECOMMISSION; - this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeEvent(nodeId, nodeEventType)); - } else { - // Recommissioning the nodes - if (entry.getValue().getState() == NodeState.DECOMMISSIONING) { - this.rmContext.getDispatcher().getEventHandler() - .handle(new RMNodeEvent(nodeId, RMNodeEventType.RECOMMISSION)); - } - } - } - updateInactiveNodes(); + public void refreshNodesGracefully(Configuration yarnConf, Integer timeout) + throws IOException, YarnException { + refreshHostsReader(yarnConf, true, timeout); } /** @@ -596,4 +667,4 @@ public void setHost(String hst) { this.host = hst; } } -} \ No newline at end of file +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java index 9b9b02e..5e9827a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java @@ -70,7 +70,7 @@ public static List queryRMNodes(RMContext context, EnumSet acceptedStates) { - // nodes contains nodes that are NEW, RUNNING OR UNHEALTHY + // nodes contains nodes that are NEW, RUNNING, UNHEALTHY or DECOMMISSIONING. ArrayList results = new ArrayList(); if (acceptedStates.contains(NodeState.NEW) || acceptedStates.contains(NodeState.RUNNING) || diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 9d480f3..21a5b2e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -113,6 +113,8 @@ private int minAllocMb; private int minAllocVcores; + private DecommissioningNodesWatcher decommissioningWatcher; + private boolean isDistributedNodeLabelsConf; private boolean isDelegatedCentralizedNodeLabelsConf; private DynamicResourceConfiguration drConf; @@ -131,6 +133,7 @@ public ResourceTrackerService(RMContext rmContext, ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.readLock = lock.readLock(); this.writeLock = lock.writeLock(); + this.decommissioningWatcher = new DecommissioningNodesWatcher(rmContext); } @Override @@ -170,6 +173,7 @@ protected void serviceInit(Configuration conf) throws Exception { } loadDynamicResourceConfiguration(conf); + decommissioningWatcher.init(conf); super.serviceInit(conf); } @@ -495,6 +499,8 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) // Send ping this.nmLivelinessMonitor.receivedPing(nodeId); + this.decommissioningWatcher.update(rmNode, remoteNodeStatus); + // 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat NodeHeartbeatResponse lastNodeHeartbeatResponse = rmNode.getLastNodeHeartBeatResponse(); if (remoteNodeStatus.getResponseId() + 1 == lastNodeHeartbeatResponse @@ -526,6 +532,19 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) updateAppCollectorsMap(request); } + // Evaluate whether a DECOMMISSIONING node is ready to be DECOMMISSIONED. + if (rmNode.getState() == NodeState.DECOMMISSIONING && + decommissioningWatcher.checkReadyToBeDecommissioned( + rmNode.getNodeID())) { + String message = "DECOMMISSIONING " + nodeId + + " is ready to be decommissioned"; + LOG.info(message); + this.rmContext.getDispatcher().getEventHandler().handle( + new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION)); + return YarnServerBuilderUtils.newNodeHeartbeatResponse( + NodeAction.SHUTDOWN, message); + } + // Heartbeat response NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils .newNodeHeartbeatResponse(lastNodeHeartbeatResponse. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index 3a9cf54..10e2afa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -175,4 +175,10 @@ public void updateNodeHeartbeatResponseForContainersDecreasing( long getUntrackedTimeStamp(); void setUntrackedTimeStamp(long timeStamp); + /* + * Optional decommissioning timeout in second + * (null indicates default timeout). + * @return the decommissioning timeout in second. + */ + Integer getDecommissioningTimeout(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecommissioningEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecommissioningEvent.java new file mode 100644 index 0000000..9955e9e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecommissioningEvent.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmnode; + +import org.apache.hadoop.yarn.api.records.NodeId; + +/** + * RMNode Decommissioning Event. + * + */ +public class RMNodeDecommissioningEvent extends RMNodeEvent { + // Optional decommissioning timeout in second. + private final Integer decommissioningTimeout; + + // Create instance with optional timeout + // (timeout could be null which means use default). + public RMNodeDecommissioningEvent(NodeId nodeId, Integer timeout) { + super(nodeId, RMNodeEventType.GRACEFUL_DECOMMISSION); + this.decommissioningTimeout = timeout; + } + + public Integer getDecommissioningTimeout() { + return this.decommissioningTimeout; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index a3a6b30..4a70eff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -27,6 +27,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentLinkedQueue; @@ -124,6 +125,7 @@ private String healthReport; private long lastHealthReportTime; private String nodeManagerVersion; + private Integer decommissioningTimeout; private long timeStamp; /* Aggregated resource utilization for the containers. */ @@ -179,7 +181,6 @@ NodeState, RMNodeEventType, RMNodeEvent>(NodeState.NEW) - //Transitions from NEW state .addTransition(NodeState.NEW, NodeState.RUNNING, RMNodeEventType.STARTED, new AddNodeTransition()) @@ -241,6 +242,16 @@ RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM, new AddContainersToBeRemovedFromNMTransition()) + .addTransition(NodeState.DECOMMISSIONED, NodeState.RUNNING, + RMNodeEventType.RECOMMISSION, + new RecommissionNodeTransition(NodeState.RUNNING)) + .addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED, + RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition()) + .addTransition(NodeState.DECOMMISSIONED, NodeState.LOST, + RMNodeEventType.EXPIRE, new DeactivateNodeTransition(NodeState.LOST)) + .addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED, + RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition()) + //Transitions from DECOMMISSIONING state .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONED, RMNodeEventType.DECOMMISSION, @@ -265,6 +276,9 @@ .addTransition(NodeState.DECOMMISSIONING, NodeState.REBOOTED, RMNodeEventType.REBOOTING, new DeactivateNodeTransition(NodeState.REBOOTED)) + .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING, + RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM, + new AddContainersToBeRemovedFromNMTransition()) .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING, RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition()) @@ -633,7 +647,7 @@ public void handle(RMNodeEvent event) { } catch (InvalidStateTransitionException e) { LOG.error("Can't handle this event at current state", e); LOG.error("Invalid event " + event.getType() + - " on Node " + this.nodeId); + " on Node " + this.nodeId + " oldState " + oldState); } if (oldState != getState()) { LOG.info(nodeId + " Node Transitioned from " + oldState + " to " @@ -666,6 +680,9 @@ private void updateMetricsForRejoinedNode(NodeState previousNodeState) { case SHUTDOWN: metrics.decrNumShutdownNMs(); break; + case DECOMMISSIONING: + metrics.decrDecommissioningNMs(); + break; default: LOG.debug("Unexpected previous node state"); } @@ -712,6 +729,9 @@ private void updateMetricsForDeactivatedNode(NodeState initialState, case DECOMMISSIONING: metrics.decrDecommissioningNMs(); break; + case DECOMMISSIONED: + metrics.decrDecommisionedNMs(); + break; case UNHEALTHY: metrics.decrNumUnhealthyNMs(); break; @@ -1087,9 +1107,26 @@ public DecommissioningNodeTransition(NodeState initState, @Override public void transition(RMNodeImpl rmNode, RMNodeEvent event) { + Integer timeout = null; + if (RMNodeDecommissioningEvent.class.isInstance(event)) { + RMNodeDecommissioningEvent e = ((RMNodeDecommissioningEvent) event); + timeout = e.getDecommissioningTimeout(); + } + // Pick up possible updates on decommissioningTimeout. + if (rmNode.getState() == NodeState.DECOMMISSIONING) { + if (!Objects.equals(rmNode.getDecommissioningTimeout(), timeout)) { + LOG.info("Update " + rmNode.getNodeID() + + " DecommissioningTimeout to be " + timeout); + rmNode.decommissioningTimeout = timeout; + } else { + LOG.info(rmNode.getNodeID() + " is already DECOMMISSIONING"); + } + return; + } LOG.info("Put Node " + rmNode.nodeId + " in DECOMMISSIONING."); // Update NM metrics during graceful decommissioning. rmNode.updateMetricsForGracefulDecommission(initState, finalState); + rmNode.decommissioningTimeout = timeout; if (rmNode.originalTotalCapability == null){ rmNode.originalTotalCapability = Resources.clone(rmNode.totalCapability); @@ -1156,24 +1193,6 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { return NodeState.UNHEALTHY; } } - if (isNodeDecommissioning) { - List runningApps = rmNode.getRunningApps(); - - List keepAliveApps = statusEvent.getKeepAliveAppIds(); - - // no running (and keeping alive) app on this node, get it - // decommissioned. - // TODO may need to check no container is being scheduled on this node - // as well. - if ((runningApps == null || runningApps.size() == 0) - && (keepAliveApps == null || keepAliveApps.size() == 0)) { - RMNodeImpl.deactivateNode(rmNode, NodeState.DECOMMISSIONED); - return NodeState.DECOMMISSIONED; - } - - // TODO (in YARN-3223) if node in decommissioning, get node resource - // updated if container get finished (keep available resource to be 0) - } rmNode.handleContainerStatus(statusEvent.getContainers()); rmNode.handleReportedIncreasedContainers( @@ -1472,4 +1491,9 @@ public long getUntrackedTimeStamp() { public void setUntrackedTimeStamp(long ts) { this.timeStamp = ts; } + + @Override + public Integer getDecommissioningTimeout() { + return decommissioningTimeout; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java index 3012d0d..1789e09 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java @@ -97,7 +97,7 @@ public ClusterMetricsInfo(final ResourceManager rm) { this.rebootedNodes = clusterMetrics.getNumRebootedNMs(); this.shutdownNodes = clusterMetrics.getNumShutdownNMs(); this.totalNodes = activeNodes + lostNodes + decommissionedNodes - + rebootedNodes + unhealthyNodes + shutdownNodes; + + rebootedNodes + unhealthyNodes + decommissioningNodes + shutdownNodes; } public int getAppsSubmitted() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 22aa0ee..5a89e54 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -272,6 +272,11 @@ public long getUntrackedTimeStamp() { @Override public void setUntrackedTimeStamp(long timeStamp) { } + + @Override + public Integer getDecommissioningTimeout() { + return null; + } }; private static RMNode buildRMNode(int rack, final Resource perNode, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index dcdc934..b4f6514 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -709,6 +709,9 @@ public void sendNodeLost(MockNM nm) throws Exception { public void waitForState(NodeId nodeId, NodeState finalState) throws InterruptedException { RMNode node = getRMContext().getRMNodes().get(nodeId); + if (node == null) { + node = getRMContext().getInactiveRMNodes().get(nodeId); + } Assert.assertNotNull("node shouldn't be null", node); int timeWaiting = 0; while (!finalState.equals(node.getState())) { @@ -722,11 +725,17 @@ public void waitForState(NodeId nodeId, NodeState finalState) timeWaiting += WAIT_MS_PER_LOOP; } - System.out.println("Node State is : " + node.getState()); + System.out.println("Node " + nodeId + " State is : " + node.getState()); Assert.assertEquals("Node state is not correct (timedout)", finalState, node.getState()); } + public void sendNodeEvent(MockNM nm, RMNodeEventType event) throws Exception { + RMNodeImpl node = (RMNodeImpl) + getRMContext().getRMNodes().get(nm.getNodeId()); + node.handle(new RMNodeEvent(nm.getNodeId(), event)); + } + public KillApplicationResponse killApp(ApplicationId appId) throws Exception { ApplicationClientProtocol client = getClientRMService(); KillApplicationRequest req = KillApplicationRequest.newInstance(appId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java new file mode 100644 index 0000000..194727b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.resourcemanager.DecommissioningNodesWatcher.DecommissioningNodeStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +public class TestDecommissioningNodesWatcher { + private MockRM rm; + + @Test + public void testDecommissioningNodesWatcher() throws Exception { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT, "40"); + + rm = new MockRM(conf); + rm.start(); + + DecommissioningNodesWatcher watcher = + new DecommissioningNodesWatcher(rm.getRMContext()); + + MockNM nm1 = rm.registerNode("host1:1234", 10240); + RMNode node1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); + NodeId id1 = nm1.getNodeId(); + + rm.waitForState(id1, NodeState.RUNNING); + Assert.assertFalse(watcher.checkReadyToBeDecommissioned(id1)); + + RMApp app = rm.submitApp(2000); + MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1); + + // Setup nm1 as DECOMMISSIONING for DecommissioningNodesWatcher. + rm.sendNodeEvent(nm1, RMNodeEventType.GRACEFUL_DECOMMISSION); + rm.waitForState(id1, NodeState.DECOMMISSIONING); + + // Update status with decreasing number of running containers until 0. + watcher.update(node1, createNodeStatus(id1, app, 12)); + watcher.update(node1, createNodeStatus(id1, app, 11)); + Assert.assertFalse(watcher.checkReadyToBeDecommissioned(id1)); + + watcher.update(node1, createNodeStatus(id1, app, 1)); + Assert.assertEquals(DecommissioningNodeStatus.WAIT_CONTAINER, + watcher.checkDecommissioningStatus(id1)); + + watcher.update(node1, createNodeStatus(id1, app, 0)); + Assert.assertEquals(DecommissioningNodeStatus.WAIT_APP, + watcher.checkDecommissioningStatus(id1)); + + // Set app to be FINISHED and verified DecommissioningNodeStatus is READY. + MockRM.finishAMAndVerifyAppState(app, rm, nm1, am); + rm.waitForState(app.getApplicationId(), RMAppState.FINISHED); + Assert.assertEquals(DecommissioningNodeStatus.READY, + watcher.checkDecommissioningStatus(id1)); + } + + @After + public void tearDown() { + if (rm != null) { + rm.stop(); + } + } + + private NodeStatus createNodeStatus( + NodeId nodeId, RMApp app, int numRunningContainers) { + return NodeStatus.newInstance( + nodeId, 0, getContainerStatuses(app, numRunningContainers), + new ArrayList(), + NodeHealthStatus.newInstance( + true, "", System.currentTimeMillis() - 1000), + null, null, null); + } + + // Get mocked ContainerStatus for bunch of containers, + // where numRunningContainers are RUNNING. + private List getContainerStatuses( + RMApp app, int numRunningContainers) { + // Total 12 containers + final int total = 12; + numRunningContainers = Math.min(total, numRunningContainers); + List output = new ArrayList(); + for (int i = 0; i < total; i++) { + ContainerState cstate = (i >= numRunningContainers)? + ContainerState.COMPLETE : ContainerState.RUNNING; + output.add(ContainerStatus.newInstance( + ContainerId.newContainerId( + ApplicationAttemptId.newInstance(app.getApplicationId(), i), 1), + cstate, "Dummy", 0)); + } + return output; + } +} + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 83a7c73..e82b93c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -254,17 +254,6 @@ public void testStatusUpdateOnDecommissioningNode() { cm.getNumDecommissioningNMs()); Assert.assertEquals("Decommissioned Nodes", initialDecommissioned, cm.getNumDecommisionedNMs()); - - // Verify node in DECOMMISSIONING will be changed by status update - // without running apps - statusEvent = getMockRMNodeStatusEventWithoutRunningApps(); - node.handle(statusEvent); - Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState()); - Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs()); - Assert.assertEquals("Decommissioning Nodes", initialDecommissioning - 1, - cm.getNumDecommissioningNMs()); - Assert.assertEquals("Decommissioned Nodes", initialDecommissioned + 1, - cm.getNumDecommisionedNMs()); } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 098ba54..f96b32f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; @@ -222,6 +223,109 @@ public void testDecommissionWithExcludeHosts() throws Exception { } /** + * Graceful decommission node with no running application. + */ + @Test + public void testGracefulDecommissionNoApp() throws Exception { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile + .getAbsolutePath()); + + writeToHostsFile(""); + rm = new MockRM(conf); + rm.start(); + + MockNM nm1 = rm.registerNode("host1:1234", 5120); + MockNM nm2 = rm.registerNode("host2:5678", 10240); + MockNM nm3 = rm.registerNode("host3:4433", 5120); + + int metricCount = ClusterMetrics.getMetrics().getNumDecommisionedNMs(); + NodeHeartbeatResponse nodeHeartbeat1 = nm1.nodeHeartbeat(true); + NodeHeartbeatResponse nodeHeartbeat2 = nm2.nodeHeartbeat(true); + NodeHeartbeatResponse nodeHeartbeat3 = nm3.nodeHeartbeat(true); + + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat1.getNodeAction())); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat2.getNodeAction())); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat3.getNodeAction())); + + rm.waitForState(nm2.getNodeId(), NodeState.RUNNING); + rm.waitForState(nm3.getNodeId(), NodeState.RUNNING); + + // Graceful decommission both host2 and host3. + writeToHostsFile("host2", "host3"); + rm.getNodesListManager().refreshNodes(conf, true); + + rm.waitForState(nm2.getNodeId(), NodeState.DECOMMISSIONING); + rm.waitForState(nm3.getNodeId(), NodeState.DECOMMISSIONING); + + nodeHeartbeat1 = nm1.nodeHeartbeat(true); + nodeHeartbeat2 = nm2.nodeHeartbeat(true); + nodeHeartbeat3 = nm3.nodeHeartbeat(true); + + checkDecommissionedNMCount(rm, metricCount + 2); + rm.waitForState(nm2.getNodeId(), NodeState.DECOMMISSIONED); + rm.waitForState(nm3.getNodeId(), NodeState.DECOMMISSIONED); + + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat1.getNodeAction())); + Assert.assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat2.getNodeAction()); + Assert.assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat3.getNodeAction()); + } + + /** + * Graceful decommission node with running application. + */ + @Test + public void testGracefulDecommissionWithApp() throws Exception { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile + .getAbsolutePath()); + + writeToHostsFile(""); + rm = new MockRM(conf); + rm.start(); + + MockNM nm1 = rm.registerNode("host1:1234", 10240); + MockNM nm2 = rm.registerNode("host2:5678", 20480); + MockNM nm3 = rm.registerNode("host3:4433", 10240); + NodeId id1 = nm1.getNodeId(); + NodeId id3 = nm3.getNodeId(); + rm.waitForState(id1, NodeState.RUNNING); + rm.waitForState(id3, NodeState.RUNNING); + + // Create an app and launch two containers on host1. + RMApp app = rm.submitApp(2000); + MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1); + ApplicationAttemptId aaid = app.getCurrentAppAttempt().getAppAttemptId(); + + // Graceful decommission host1 and host3 + writeToHostsFile("host1", "host3"); + rm.getNodesListManager().refreshNodes(conf, true); + rm.waitForState(id1, NodeState.DECOMMISSIONING); + rm.waitForState(id3, NodeState.DECOMMISSIONING); + + // host1 should be DECOMMISSIONING due to running containers. + // host3 should become DECOMMISSIONED. + nm1.nodeHeartbeat(aaid, 2, ContainerState.RUNNING); + nm3.nodeHeartbeat(true); + rm.waitForState(id1, NodeState.DECOMMISSIONING); + rm.waitForState(id3, NodeState.DECOMMISSIONED); + nm1.nodeHeartbeat(aaid, 2, ContainerState.RUNNING); + + // Complete containers on host1. + // Since the app is still RUNNING, expect NodeAction.NORMAL. + NodeHeartbeatResponse nodeHeartbeat1 = + nm1.nodeHeartbeat(aaid, 2, ContainerState.COMPLETE); + Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat1.getNodeAction()); + + // Finish the app and verified DECOMMISSIONED. + MockRM.finishAMAndVerifyAppState(app, rm, nm1, am); + rm.waitForState(app.getApplicationId(), RMAppState.FINISHED); + nodeHeartbeat1 = nm1.nodeHeartbeat(aaid, 2, ContainerState.COMPLETE); + Assert.assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat1.getNodeAction()); + rm.waitForState(id1, NodeState.DECOMMISSIONED); + } + + /** * Decommissioning using a post-configured include hosts file */ @Test @@ -1371,7 +1475,7 @@ public void testIncorrectRecommission() throws Exception { excludeHostFile.getAbsolutePath()); writeToHostsFile(hostFile, "host1", "host2"); writeToHostsFile(excludeHostFile, "host1"); - rm.getNodesListManager().refreshNodesGracefully(conf); + rm.getNodesListManager().refreshNodesGracefully(conf, null); rm.drainEvents(); nm1.nodeHeartbeat(true); rm.drainEvents(); @@ -1380,7 +1484,7 @@ public void testIncorrectRecommission() throws Exception { .getInactiveRMNodes().get(nm1.getNodeId()).getState() == NodeState .DECOMMISSIONED); writeToHostsFile(excludeHostFile, ""); - rm.getNodesListManager().refreshNodesGracefully(conf); + rm.getNodesListManager().refreshNodesGracefully(conf, null); rm.drainEvents(); Assert.assertTrue("Node " + nm1.getNodeId().getHost() + " should be Decommissioned", rm.getRMContext() @@ -1411,7 +1515,7 @@ public void testNodeRemovalGracefully() throws Exception { public void refreshNodesOption(boolean doGraceful, Configuration conf) throws Exception { if (doGraceful) { - rm.getNodesListManager().refreshNodesGracefully(conf); + rm.getNodesListManager().refreshNodesGracefully(conf, null); } else { rm.getNodesListManager().refreshNodes(conf); } @@ -1455,18 +1559,23 @@ public void testNodeRemovalUtil(boolean doGraceful) throws Exception { conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile .getAbsolutePath()); refreshNodesOption(doGraceful, conf); + if (doGraceful) { + rm.waitForState(nm2.getNodeId(), NodeState.DECOMMISSIONING); + } nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); rm.drainEvents(); Assert.assertTrue("Node should not be in active node list", !rmContext.getRMNodes().containsKey(nm2.getNodeId())); RMNode rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId()); Assert.assertEquals("Node should be in inactive node list", - rmNode.getState(), NodeState.SHUTDOWN); + rmNode.getState(), + doGraceful? NodeState.DECOMMISSIONED : NodeState.SHUTDOWN); Assert.assertEquals("Active nodes should be 2", metrics.getNumActiveNMs(), 2); - Assert.assertEquals("Shutdown nodes should be 1", - metrics.getNumShutdownNMs(), 1); + Assert.assertEquals("Shutdown nodes should be expected", + metrics.getNumShutdownNMs(), doGraceful? 0 : 1); int nodeRemovalTimeout = conf.getInt( @@ -1491,14 +1600,18 @@ public void testNodeRemovalUtil(boolean doGraceful) throws Exception { rm.drainEvents(); writeToHostsFile("host1", ip); refreshNodesOption(doGraceful, conf); + rm.waitForState(nm2.getNodeId(), + doGraceful? NodeState.DECOMMISSIONING : NodeState.SHUTDOWN); + nm2.nodeHeartbeat(true); rm.drainEvents(); rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId()); Assert.assertEquals("Node should be shutdown", - rmNode.getState(), NodeState.SHUTDOWN); + rmNode.getState(), + doGraceful? NodeState.DECOMMISSIONED : NodeState.SHUTDOWN); Assert.assertEquals("Active nodes should be 2", metrics.getNumActiveNMs(), 2); - Assert.assertEquals("Shutdown nodes should be 1", - metrics.getNumShutdownNMs(), 1); + Assert.assertEquals("Shutdown nodes should be expected", + metrics.getNumShutdownNMs(), doGraceful? 0 : 1); //add back the node before timer expires latch.await(maxThreadSleeptime - 2000, TimeUnit.MILLISECONDS); @@ -1560,6 +1673,7 @@ public void testNodeRemovalUtil(boolean doGraceful) throws Exception { writeToHostsFile("host1", ip); writeToHostsFile(excludeHostFile, ""); refreshNodesOption(doGraceful, conf); + nm2.nodeHeartbeat(true); latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS); rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) : rmContext.getInactiveRMNodes().get(nm2.getNodeId()); @@ -1782,15 +1896,19 @@ private void testNodeRemovalUtilUnhealthy(boolean doGraceful) nm2.nodeHeartbeat(false); nm3.nodeHeartbeat(true); rm.drainEvents(); - Assert.assertNotEquals("host2 should be a shutdown NM!", - rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null); - Assert.assertEquals("host2 should be a shutdown NM!", - rmContext.getInactiveRMNodes().get(nm2.getNodeId()).getState(), - NodeState.SHUTDOWN); + if (!doGraceful) { + Assert.assertNotEquals("host2 should be a shutdown NM!", + rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null); + Assert.assertEquals("host2 should be a shutdown NM!", + rmContext.getInactiveRMNodes().get(nm2.getNodeId()).getState(), + NodeState.SHUTDOWN); + } Assert.assertEquals("There should be 2 Active NM!", clusterMetrics.getNumActiveNMs(), 2); - Assert.assertEquals("There should be 1 Shutdown NM!", - clusterMetrics.getNumShutdownNMs(), 1); + if (!doGraceful) { + Assert.assertEquals("There should be 1 Shutdown NM!", + clusterMetrics.getNumShutdownNMs(), 1); + } Assert.assertEquals("There should be 0 Unhealthy NM!", clusterMetrics.getUnhealthyNMs(), 0); int nodeRemovalTimeout = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java index 2c926d9..f2a45a9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java @@ -211,7 +211,7 @@ public void testDecommissioningNodeReconnect() rm.waitForState(nm1.getNodeId(), NodeState.RUNNING); rm.getRMContext().getNodesListManager().getHostsReader(). - getExcludedHosts().add("127.0.0.1"); + getExcludedHostsWithTimeout().put("127.0.0.1", null); rm.getRMContext().getDispatcher().getEventHandler().handle( new RMNodeEvent(nm1.getNodeId(), RMNodeEventType.GRACEFUL_DECOMMISSION)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/GracefulDecommission.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/GracefulDecommission.md new file mode 100644 index 0000000..9e94de0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/GracefulDecommission.md @@ -0,0 +1,101 @@ + + +Graceful Decommission of Yarn Nodes +=============== + +* [Overview](#overview) +* [Features](#features) + * [NodesListManager detects and handles include and exclude list changes](#nodeslistmanager-detects-and-handles-include-and-exclude-list-changes) + * [RMNode handles decommission events](#rmnode-handles-decommission-events) + * [Automatic and asynchronous tracking of decommissioning nodes status](#automatic-and-asynchronous-tracking-of-decommissioning-nodes-status) + * [Per-Node decommission timeout support](#per-node-decommission-timeout-support) +* [Configuration](#configuration) + +Overview +-------- + +Graceful Decommission of Yarn Nodes is the mechanism to decommission NMs while minimize the impact to running applications. Once a node is in DECOMMISSIONING state, RM won't schedule new containers on it and will wait for running containers and applications to complete (or until decommissioning timeout exceeded) before transition the node into DECOMMISSIONED. + +Features +-------- + +###NodesListManager detects and handles include and exclude list changes + +"yarn rmadmin -refreshNodes -g [timeout in seconds]" notifies NodesListManager to detect and handle include and exclude hosts changes. NodesListManager loads excluded hosts from the exclude file as specified through the "yarn.resourcemanager.nodes.exclude-path" configuration in yarn-site.xml. + +NodesListManager inspects and compares status of RMNodes in resource manager and the exclude list, and apply necessary actions based on following rules: + +* Recommission DECOMMISSIONED or DECOMMISSIONING nodes that are no longer excluded; +* Gracefully decommission excluded nodes that are not already in DECOMMISSIONED nor +DECOMMISSIONING state; +* Forcefully decommission excluded nodes that are not already in DECOMMISSIONED state should -g flag is not specified. + +Accordingly, RECOMMISSION, GRACEFUL_DECOMMISSION or DECOMMISSION RMNodeEvent will be sent to the RMNode. + +###Per-Node decommission timeout support + +To support flexible graceful decommission of nodes using different timeout through +single or multiple refreshNodes requests, HostsFileReader supports optional timeout value +after each hostname (or ip) in the exclude host file. The effective decommissioning timeout +to use for a particular host is based on following priorities: +* Use the timeout for the particular host if specified in exclude host file; +* Use the timeout in "yarn rmadmin -refreshNodes -g [timeout in seconds]" if specified; +* Use the default timeout specified through "yarn.resourcemanager.decommissioning.default.timeout" configuration. + +NodesListManager decides the effective timeout to use and set it on individual RMNode. +The timeout could also be dynamically adjusted through "yarn rmadmin -refreshNodes -g [timeout in seconds]" command. NodesListManager will resolve the effective timeout to use and update RMNode as necessary of the new timeout. Change of timeout does not reset the ongoing decommissioning but only affect the evaluation of whether the node has reached decommissioning timeout. + +###RMNode handles decommission events + +Upon receiving GRACEFUL_DECOMMISSION event, the RMNode will save the decommissioning timeout if specified, update metrics for graceful decommission and preserve its original total capacity, and transition into DECOMMISSIONING state. + +Resources will be dynamically and periodically updated on DECOMMISSIONING RMNode so that scheduler won't be scheduling new containers on them due to no available resources. + +###Automatic and asynchronous tracking of decommissioning nodes status +**DecommissioningNodeWatcher** is the YARN component that tracks DECOMMISSIONING nodes +status automatically and asynchronously after client/admin made the graceful decommission +request. NM periodically send RM heart beat with it latest container status. +DecommissioningNodeWatcher tracks heartbeat updates on all DECOMMISSIONING nodes to decide when, +after all running containers on the node have completed, will be transitioned into DECOMMISSIONED state +after which NodeManager will be told to shutdown. + +Under MR application, a node, after completes all its containers, may still serve it map output data +during the duration of the application for reducers. The YARN graceful decommission +mechanism keeps such DECOMMISSIONING nodes until all involved applications complete. +It could be however undesirable under long-running applications scenario where a bunch of +"idle" nodes might stay around for long period of time. DecommissioningNodeWatcher +balances such concern with a timeout --- a DECOMMISSIONING node will be DECOMMISSIONED +no later than decommissioning timeout regardless of running containers or applications. +If running containers finished earlier, it continues waiting for applications to finish +until the decommissioning timeout. When decommissioning timeout reaches, the node +will be decommissioned regardless. The node will be deactivated and owning tasks will +be rescheduled as necessary. + +Status of all decommissioning node are logged periodically (every 20 seconds) in resource manager logs. +Following are the sub-status of a decommissioning node: + +* NONE --- Node is not in DECOMMISSIONING state. +* WAIT_CONTAINER --- wait for running containers to complete. +* WAIT_APP --- wait for running application to complete (after all containers complete) +* TIMEOUT --- Timeout waiting for either containers or applications to complete +* READY --- Nothing to wait, ready to be decommissioned +* DECOMMISSIONED --- The node has already been decommissioned + +Configuration +-------- + +Property | Value +----- | ------ +yarn.resourcemanager.decommissioning.default.timeout | Timeout in seconds for YARN node graceful decommission. This is the maximal time to wait for running containers and applications to complete before transition a DECOMMISSIONING node into DECOMMISSIONED. The default value is 3600 seconds. Negative value (like -1) is handled as infinite timeout. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnCommands.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnCommands.md index 40704f0..29d5838 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnCommands.md +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnCommands.md @@ -243,7 +243,7 @@ Usage: | COMMAND\_OPTIONS | Description | |:---- |:---- | | -refreshQueues | Reload the queues' acls, states and scheduler specific properties. ResourceManager will reload the mapred-queues configuration file. | -| -refreshNodes | Refresh the hosts information at the ResourceManager. | +| -refreshNodes [-g [timeout in seconds]] | Refresh the hosts information at the ResourceManager. -g option indicates graceful decommission of excluded hosts, in which case, the optional timeout indicates maximal time in seconds ResourceManager should wait before forcefully mark the node as decommissioned. | | -refreshNodesResources | Refresh resources of NodeManagers at the ResourceManager. | | -refreshSuperUserGroupsConfiguration | Refresh superuser proxy groups mappings. | | -refreshUserToGroupsMappings | Refresh user-to-groups mappings. |