diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java index cac43c9..5097eee 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java @@ -19,8 +19,11 @@ package org.apache.hadoop.util; import java.io.*; +import java.util.HashMap; +import java.util.Map; import java.util.Set; import java.util.HashSet; +import java.util.Map.Entry; import org.apache.commons.io.Charsets; import org.apache.commons.logging.LogFactory; @@ -35,7 +38,9 @@ @InterfaceStability.Unstable public class HostsFileReader { private Set includes; - private Set excludes; + // exclude host list with optional timeout. + // If the value is null, it indicates default timeout. + private Map excludes; private String includesFile; private String excludesFile; @@ -44,7 +49,7 @@ public HostsFileReader(String inFile, String exFile) throws IOException { includes = new HashSet(); - excludes = new HashSet(); + excludes = new HashMap(); includesFile = inFile; excludesFile = exFile; refresh(); @@ -54,7 +59,7 @@ public HostsFileReader(String inFile, public HostsFileReader(String includesFile, InputStream inFileInputStream, String excludesFile, InputStream exFileInputStream) throws IOException { includes = new HashSet(); - excludes = new HashSet(); + excludes = new HashMap(); this.includesFile = includesFile; this.excludesFile = excludesFile; refresh(inFileInputStream, exFileInputStream); @@ -71,6 +76,21 @@ public static void readFileToSet(String type, public static void readFileToSetWithFileInputStream(String type, String filename, InputStream fileInputStream, Set set) throws IOException { + Map map = new HashMap(); + readFileToMapWithFileInputStream(type, filename, fileInputStream, map); + set.addAll(map.keySet()); + } + + public static void readFileToMap(String type, + String filename, Map map) throws IOException { + File file = new File(filename); + FileInputStream fis = new FileInputStream(file); + readFileToMapWithFileInputStream(type, filename, fis, map); + } + + public static void readFileToMapWithFileInputStream(String type, + String filename, InputStream fileInputStream, Map map) + throws IOException { BufferedReader reader = null; try { reader = new BufferedReader( @@ -86,13 +106,21 @@ public static void readFileToSetWithFileInputStream(String type, break; } if (!nodes[i].isEmpty()) { - LOG.info("Adding a node \"" + nodes[i] + "\" to the list of " - + type + " hosts from " + filename); - set.add(nodes[i]); + // look ahead for optional timeout values + Integer timeout = null; + if (i < nodes.length - 1) { + timeout = tryParseInteger(nodes[i+1]); + } + map.put(nodes[i], timeout); + // skip the timeout if exist + if (timeout != null) { + i++; + } } } } } + prettyLogMap(type, map, filename); } finally { if (reader != null) { reader.close(); @@ -104,7 +132,7 @@ public static void readFileToSetWithFileInputStream(String type, public synchronized void refresh() throws IOException { LOG.info("Refreshing hosts (include/exclude) list"); Set newIncludes = new HashSet(); - Set newExcludes = new HashSet(); + Map newExcludes = new HashMap(); boolean switchIncludes = false; boolean switchExcludes = false; if (!includesFile.isEmpty()) { @@ -112,7 +140,7 @@ public synchronized void refresh() throws IOException { switchIncludes = true; } if (!excludesFile.isEmpty()) { - readFileToSet("excluded", excludesFile, newExcludes); + readFileToMap("excluded", excludesFile, newExcludes); switchExcludes = true; } @@ -131,7 +159,7 @@ public synchronized void refresh(InputStream inFileInputStream, InputStream exFileInputStream) throws IOException { LOG.info("Refreshing hosts (include/exclude) list"); Set newIncludes = new HashSet(); - Set newExcludes = new HashSet(); + Map newExcludes = new HashMap(); boolean switchIncludes = false; boolean switchExcludes = false; if (inFileInputStream != null) { @@ -140,7 +168,7 @@ public synchronized void refresh(InputStream inFileInputStream, switchIncludes = true; } if (exFileInputStream != null) { - readFileToSetWithFileInputStream("excluded", excludesFile, + readFileToMapWithFileInputStream("excluded", excludesFile, exFileInputStream, newExcludes); switchExcludes = true; } @@ -159,6 +187,10 @@ public synchronized void refresh(InputStream inFileInputStream, } public synchronized Set getExcludedHosts() { + return excludes.keySet(); + } + + public synchronized Map getExcludedHostsWithTimeout() { return excludes; } @@ -177,4 +209,29 @@ public synchronized void updateFileNames(String includesFile, setIncludesFile(includesFile); setExcludesFile(excludesFile); } + + private static Integer tryParseInteger(String str) { + try{ + int num = Integer.parseInt(str); + return num; + } catch (Exception e) { + return null; + } + } + + private static void prettyLogMap( + String type, Map excludes, String filename) { + if (excludes.size() == 0) { + return; + } + StringBuilder sb = new StringBuilder(); + for (Entry n : excludes.entrySet()) { + if (n.getValue() != null) { + sb.append(String.format("%n %s : %d", n.getKey(), n.getValue())); + } else { + sb.append(String.format("%n %s", n.getKey())); + } + } + LOG.info("List of " + type + " hosts from " + filename + sb.toString()); + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java index 3000069..3d02b50 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java @@ -20,14 +20,16 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.FileWriter; +import java.util.Map; import org.apache.hadoop.test.GenericTestUtils; import org.junit.*; + import static org.junit.Assert.*; /* * Test for HostsFileReader.java - * + * */ public class TestHostsFileReader { @@ -262,4 +264,37 @@ public void testHostFileReaderWithTabs() throws Exception { assertFalse(hfp.getExcludedHosts().contains("somehost5")); } + + /* + * Test if timeout values are provided in HostFile + */ + @Test + public void testHostFileReaderWithTimeout() throws Exception { + FileWriter efw = new FileWriter(excludesFile); + FileWriter ifw = new FileWriter(includesFile); + + efw.write("#DFS-Hosts-excluded\n"); + efw.write(" \n"); + efw.write(" somehost 123 \t somehost2 \n somehost4 78"); + efw.write(" somehost3 456 \t # somehost5"); + efw.close(); + + ifw.write("#Hosts-in-DFS\n"); + ifw.write(" \n"); + ifw.write(" somehost \t somehost2 \n somehost4"); + ifw.write(" somehost3 \t # somehost5"); + ifw.close(); + + HostsFileReader hfp = new HostsFileReader(includesFile, excludesFile); + + int includesLen = hfp.getHosts().size(); + int excludesLen = hfp.getExcludedHosts().size(); + assertEquals(4, includesLen); + assertEquals(4, excludesLen); + + Map excludes = hfp.getExcludedHostsWithTimeout(); + assertTrue(excludes.containsKey("somehost")); + assertTrue(excludes.get("somehost") == 123); + assertTrue(excludes.get("somehost2") == null); + } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java index 92d586b..abe83a7 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java @@ -199,6 +199,11 @@ public ResourceUtilization getAggregatedContainersUtilization() { public ResourceUtilization getNodeUtilization() { return null; } + + @Override + public Integer getDecommissioningTimeout() { + return null; + } } public static RMNode newNodeInfo(String rackName, String hostName, diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index 2e9cccb..085656d 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -188,4 +188,9 @@ public ResourceUtilization getAggregatedContainersUtilization() { public ResourceUtilization getNodeUtilization() { return node.getNodeUtilization(); } + + @Override + public Integer getDecommissioningTimeout() { + return null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 8acee57..b4198c7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -652,6 +652,15 @@ public static boolean isAclEnabled(Configuration conf) { */ public static final String RM_PROXY_USER_PREFIX = RM_PREFIX + "proxyuser."; + /** + * Timeout in seconds for YARN node graceful decommission. + * This is the maximal time to wait for running containers and applications + * to complete before transition a DECOMMISSIONING node into DECOMMISSIONED. + */ + public static final String DECOMMISSIONING_DEFAULT_TIMEOUT_KEY = + RM_PREFIX + "decommissioning.default.timeout"; + public static final int DEFAULT_DECOMMISSIONING_TIMEOUT = 3600; + //////////////////////////////// // Node Manager Configs //////////////////////////////// @@ -1409,6 +1418,13 @@ public static boolean isAclEnabled(Configuration conf) { public static final String NM_LOG_AGG_POLICY_CLASS_PARAMETERS = NM_PREFIX + "log-aggregation.policy.parameters"; + /** Time to wait before NodeManager exits on start error. + * This avoids NodeManager restarts and registers to RM too quickly + * after being rejected by RM due to being excluded. + */ + public static final String NM_EXIT_WAIT_MS = NM_PREFIX + "exit-wait.ms"; + public static final long DEFAULT_NM_EXIT_WAIT_MS = 5000; + //////////////////////////////// // Web Proxy Configs //////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshNodesRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshNodesRequest.java index 0333c3b..732d98e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshNodesRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshNodesRequest.java @@ -43,6 +43,16 @@ public static RefreshNodesRequest newInstance( return request; } + @Private + @Unstable + public static RefreshNodesRequest newInstance( + DecommissionType decommissionType, Integer timeout) { + RefreshNodesRequest request = Records.newRecord(RefreshNodesRequest.class); + request.setDecommissionType(decommissionType); + request.setDecommissionTimeout(timeout); + return request; + } + /** * Set the DecommissionType * @@ -56,4 +66,18 @@ public static RefreshNodesRequest newInstance( * @return decommissionType */ public abstract DecommissionType getDecommissionType(); -} + + /** + * Set the DecommissionTimeout. + * + * @param timeout graceful decommission timeout in seconds + */ + public abstract void setDecommissionTimeout(Integer timeout); + + /** + * Get the DecommissionTimeout. + * + * @return decommissionTimeout + */ + public abstract Integer getDecommissionTimeout(); +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto index eaf658f..b9f30db 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto @@ -37,6 +37,7 @@ message RefreshQueuesResponseProto { message RefreshNodesRequestProto { optional DecommissionTypeProto decommissionType = 1 [default = NORMAL]; + optional int32 decommissionTimeout = 2; } message RefreshNodesResponseProto { } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java index d407c20..dbea150 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java @@ -303,28 +303,33 @@ private int refreshQueues() throws IOException, YarnException { return 0; } - private int refreshNodes() throws IOException, YarnException { + private int refreshNodes(boolean graceful) throws IOException, YarnException { // Refresh the nodes ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol(); - RefreshNodesRequest request = RefreshNodesRequest - .newInstance(DecommissionType.NORMAL); + RefreshNodesRequest request = RefreshNodesRequest.newInstance( + graceful? DecommissionType.GRACEFUL : DecommissionType.NORMAL); adminProtocol.refreshNodes(request); return 0; } - private int refreshNodes(long timeout) throws IOException, YarnException { + private int refreshNodes(int timeout) throws IOException, YarnException { // Graceful decommissioning with timeout ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol(); RefreshNodesRequest gracefulRequest = RefreshNodesRequest - .newInstance(DecommissionType.GRACEFUL); + .newInstance(DecommissionType.GRACEFUL, timeout); adminProtocol.refreshNodes(gracefulRequest); CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest = recordFactory .newRecordInstance(CheckForDecommissioningNodesRequest.class); long waitingTime; boolean nodesDecommissioning = true; + // Additional seconds to wait before forcefully decommission nodes. + // This is usually not needed since RM enforces timeout automatically. + final int gracePeriod = 20; // timeout=-1 means wait for all the nodes to be gracefully // decommissioned - for (waitingTime = 0; waitingTime < timeout || timeout == -1; waitingTime++) { + for (waitingTime = 0; + timeout == -1 || (timeout >= 0 && waitingTime < timeout + gracePeriod); + waitingTime++) { // wait for one second to check nodes decommissioning status try { Thread.sleep(1000); @@ -369,6 +374,10 @@ private int refreshNodesResources() throws IOException, YarnException { return 0; } + private int refreshNodes() throws IOException, YarnException { + return refreshNodes(false); + } + private int refreshUserToGroupsMappings() throws IOException, YarnException { // Refresh the user-to-groups mappings @@ -714,26 +723,12 @@ public int run(String[] args) throws Exception { return exitCode; } } - + try { if ("-refreshQueues".equals(cmd)) { exitCode = refreshQueues(); } else if ("-refreshNodes".equals(cmd)) { - if (args.length == 1) { - exitCode = refreshNodes(); - } else if (args.length == 3) { - // if the graceful timeout specified - if ("-g".equals(args[1])) { - long timeout = validateTimeout(args[2]); - exitCode = refreshNodes(timeout); - } else { - printUsage(cmd, isHAEnabled); - return -1; - } - } else { - printUsage(cmd, isHAEnabled); - return -1; - } + exitCode = handleRefreshNodes(args, isHAEnabled); } else if ("-refreshNodesResources".equals(cmd)) { exitCode = refreshNodesResources(); } else if ("-refreshUserToGroupsMappings".equals(cmd)) { @@ -750,22 +745,7 @@ public int run(String[] args) throws Exception { String[] usernames = Arrays.copyOfRange(args, i, args.length); exitCode = getGroups(usernames); } else if ("-updateNodeResource".equals(cmd)) { - if (args.length < 4 || args.length > 5) { - System.err.println("Number of parameters specified for " + - "updateNodeResource is wrong."); - printUsage(cmd, isHAEnabled); - exitCode = -1; - } else { - String nodeID = args[i++]; - String memSize = args[i++]; - String cores = args[i++]; - int overCommitTimeout = ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT; - if (i == args.length - 1) { - overCommitTimeout = Integer.parseInt(args[i]); - } - exitCode = updateNodeResource(nodeID, Integer.parseInt(memSize), - Integer.parseInt(cores), overCommitTimeout); - } + exitCode = handleUpdateNodeResource(args, cmd, isHAEnabled); } else if ("-addToClusterNodeLabels".equals(cmd)) { if (i >= args.length) { System.err.println(NO_LABEL_ERR_MSG); @@ -825,10 +805,54 @@ public int run(String[] args) throws Exception { return exitCode; } - private long validateTimeout(String strTimeout) { - long timeout; + // A helper method to reduce the number of lines of run() + private int handleRefreshNodes(String[] args, boolean isHAEnabled) + throws IOException, YarnException { + // In example "yarn rmadmin -refreshNodes -graceful 1000", + // args[1] is -graceful + if (args.length == 1) { + return refreshNodes(); + } else if (args.length >= 2) { + // if the graceful timeout specified + if ("-g".equals(args[1]) || "-graceful".equals(args[1])) { + if (args.length == 3) { + int timeout = validateTimeout(args[2]); + return refreshNodes(timeout); + } else { + return refreshNodes(true); + } + } + } + printUsage("-refreshNodes", isHAEnabled); + return -1; + } + + private int handleUpdateNodeResource( + String[] args, String cmd, boolean isHAEnabled) + throws NumberFormatException, IOException, YarnException { + int i = 1; + if (args.length < 4 || args.length > 5) { + System.err.println("Number of parameters specified for " + + "updateNodeResource is wrong."); + printUsage(cmd, isHAEnabled); + return -1; + } else { + String nodeID = args[i++]; + String memSize = args[i++]; + String cores = args[i++]; + int overCommitTimeout = ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT; + if (i == args.length - 1) { + overCommitTimeout = Integer.parseInt(args[i]); + } + return updateNodeResource(nodeID, Integer.parseInt(memSize), + Integer.parseInt(cores), overCommitTimeout); + } + } + + private int validateTimeout(String strTimeout) { + int timeout; try { - timeout = Long.parseLong(strTimeout); + timeout = Integer.parseInt(strTimeout); } catch (NumberFormatException ex) { throw new IllegalArgumentException(INVALID_TIMEOUT_ERR_MSG + strTimeout); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java index 057594d..b56bcb1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java @@ -266,7 +266,7 @@ public void testRefreshNodesWithGracefulTimeout() throws Exception { CheckForDecommissioningNodesRequest.class))).thenReturn(response); assertEquals(0, rmAdminCLI.run(args)); verify(admin).refreshNodes( - RefreshNodesRequest.newInstance(DecommissionType.GRACEFUL)); + RefreshNodesRequest.newInstance(DecommissionType.GRACEFUL, 1)); // Forceful decommission when timeout occurs String[] focefulDecomArgs = { "-refreshNodes", "-g", "1" }; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshNodesRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshNodesRequestPBImpl.java index 05f3230..c03a569 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshNodesRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshNodesRequestPBImpl.java @@ -31,7 +31,6 @@ @Private @Unstable public class RefreshNodesRequestPBImpl extends RefreshNodesRequest { - RefreshNodesRequestProto proto = RefreshNodesRequestProto.getDefaultInstance(); RefreshNodesRequestProto.Builder builder = null; boolean viaProto = false; @@ -108,6 +107,22 @@ public synchronized DecommissionType getDecommissionType() { return convertFromProtoFormat(p.getDecommissionType()); } + @Override + public synchronized void setDecommissionTimeout(Integer timeout) { + maybeInitBuilder(); + if (timeout != null) { + builder.setDecommissionTimeout(timeout); + } else { + builder.clearDecommissionTimeout(); + } + } + + @Override + public synchronized Integer getDecommissionTimeout() { + RefreshNodesRequestProtoOrBuilder p = viaProto ? proto : builder; + return p.hasDecommissionTimeout()? p.getDecommissionTimeout() : null; + } + private DecommissionType convertFromProtoFormat(DecommissionTypeProto p) { return DecommissionType.valueOf(p.name()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 506cf3d..66480dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2431,6 +2431,26 @@ + + Timeout in seconds for YARN node graceful decommission.exit-wait.ms + This is the maximal time to wait for running containers and applications to complete + before transition a DECOMMISSIONING node into DECOMMISSIONED. + + yarn.resourcemanager.decommissioning.default.timeout + 3600 + + + + + Time to wait before NodeManager exits on start error. + This avoids NodeManager restarts and registers to RM too quickly + after being rejected by RM due to being excluded. + + yarn.nodemanager.exit-wait.ms + 5000 + + + The Node Label script to run. Script output Line starting with "NODE_PARTITION:" will be considered as Node Label Partition. In case of multiple lines have this pattern, then last one will be considered diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java index 07b06fa..2ec3870 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java @@ -354,7 +354,7 @@ private static Object genTypeValue(Type type) { return rand.nextBoolean(); } else if (type.equals(byte.class)) { return bytes[rand.nextInt(4)]; - } else if (type.equals(int.class)) { + } else if (type.equals(int.class) || type.equals(Integer.class)) { return rand.nextInt(1000000); } else if (type.equals(long.class)) { return Long.valueOf(rand.nextInt(1000000)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 7c104d5..c7308c3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -634,6 +634,15 @@ private void initAndStartNodeManager(Configuration conf, boolean hasToReboot) { this.start(); } catch (Throwable t) { LOG.fatal("Error starting NodeManager", t); + long exitWaitMs = conf.getLong(YarnConfiguration.NM_EXIT_WAIT_MS, + YarnConfiguration.DEFAULT_NM_EXIT_WAIT_MS); + LOG.fatal("Exit in " + exitWaitMs + " milliseconds"); + if (exitWaitMs > 0) { + try { + Thread.sleep(exitWaitMs); + } catch (InterruptedException e) { + } + } System.exit(-1); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index f75219b..12bbce8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -440,7 +440,8 @@ public RefreshNodesResponse refreshNodes(RefreshNodesRequest request) rmContext.getNodesListManager().refreshNodes(conf); break; case GRACEFUL: - rmContext.getNodesListManager().refreshNodesGracefully(conf); + rmContext.getNodesListManager().refreshNodesGracefully( + conf, request.getDecommissionTimeout()); break; case FORCEFUL: rmContext.getNodesListManager().refreshNodesForcefully(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java new file mode 100644 index 0000000..36fbd21 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java @@ -0,0 +1,415 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; + +import com.google.common.base.Stopwatch; + +/** + * DecommissioningNodesWatcher is used by ResourceTrackerService to track + * DECOMMISSIONING nodes to decide when, after all running containers on + * the node have completed, will be transitioned into DECOMMISSIONED state + * (NodeManager will be told to shutdown). + * Under MR application, a node, after completes all its containers, + * may still serve it map output data during the duration of the application + * for reducers. A fully graceful mechanism would keep such DECOMMISSIONING + * nodes until all involved applications complete. It could be however + * undesirable under long-running applications scenario where a bunch + * of "idle" nodes would stay around for long period of time. + * + * DecommissioningNodesWatcher balance such concern with a timeout policy --- + * a DECOMMISSIONING node will be DECOMMISSIONED no later than + * DECOMMISSIONING_TIMEOUT regardless of running containers or applications. + * + * To be efficient, DecommissioningNodesWatcher skip tracking application + * containers on a particular node before the node is in DECOMMISSIONING state. + * It only tracks containers once the node is in DECOMMISSIONING state. + * DecommissioningNodesWatcher basically is no cost when no node is + * DECOMMISSIONING. This sacrifices the possibility that that node once + * host containers of an still running application. + */ +public class DecommissioningNodesWatcher { + private static final Log LOG = + LogFactory.getLog(DecommissioningNodesWatcher.class); + + private final RMContext rmContext; + + // Default timeout value in mills. + // Negative value indicates no timeout. 0 means immediate. + private long defaultTimeoutMs = + 1000L * YarnConfiguration.DEFAULT_DECOMMISSIONING_TIMEOUT; + + // Once a RMNode is observed in DECOMMISSIONING state, + // All its ContainerStatus update are tracked inside DecomNodeContext. + class DecommissioningNodeContext { + private final NodeId nodeId; + + // Last known NodeState. + private NodeState nodeState; + + // The moment node is observed in DECOMMISSIONING state. + private final long decommissioningStartTime; + + private long lastContainerFinishTime; + + // number of running containers at the moment. + private int numActiveContainers; + + // All applications run on the node at or after decommissioningStartTime. + private Set appIds; + + // First moment the node is observed in DECOMMISSIONED state. + private long decommissionedTime; + + // Timeout in millis for this decommissioning node. + // This value could be dynamically updated with new value from RMNode. + private long timeoutMs; + + private long lastUpdateTime; + + public DecommissioningNodeContext(NodeId nodeId) { + this.nodeId = nodeId; + this.appIds = new HashSet(); + this.decommissioningStartTime = System.currentTimeMillis(); + this.timeoutMs = defaultTimeoutMs; + } + + void updateTimeout(Integer timeoutSec) { + this.timeoutMs = (timeoutSec == null)? + defaultTimeoutMs : (1000L * timeoutSec); + } + } + + // All DECOMMISSIONING nodes to track. + private HashMap decomNodes = + new HashMap(); + + private Stopwatch pollWatch = new Stopwatch().start(); + + private final Set emptyNodeIdSet = new HashSet(); + + public DecommissioningNodesWatcher(RMContext rmContext) { + this.rmContext = rmContext; + } + + public void init(Configuration conf) { + readDecommissioningTimeout(conf); + } + + /** + * Update rmNode decommissioning status based on NodeStatus. + * @param rmNode + * @param remoteNodeStatus + */ + public synchronized void update(RMNode rmNode, NodeStatus remoteNodeStatus) { + DecommissioningNodeContext context = decomNodes.get(rmNode.getNodeID()); + long now = System.currentTimeMillis(); + if (rmNode.getState() == NodeState.DECOMMISSIONED) { + if (context == null) { + return; + } + context.nodeState = rmNode.getState(); + // keep DECOMMISSIONED node for a while for status log. + if (context.decommissionedTime == 0) { + context.decommissionedTime = now; + } else if (now - context.decommissionedTime > 60000L) { + decomNodes.remove(rmNode.getNodeID()); + LOG.info("remove " + rmNode.getState() + " " + rmNode.getNodeID()); + } + } else if (rmNode.getState() == NodeState.DECOMMISSIONING) { + if (context == null) { + context = new DecommissioningNodeContext(rmNode.getNodeID()); + decomNodes.put(rmNode.getNodeID(), context); + context.nodeState = rmNode.getState(); + context.decommissionedTime = 0; + } + context.updateTimeout(rmNode.getDecommissioningTimeout()); + context.lastUpdateTime = now; + + if (remoteNodeStatus.getKeepAliveApplications() != null) { + context.appIds.addAll(remoteNodeStatus.getKeepAliveApplications()); + } + + // Count number of active containers. + int numActiveContainers = 0; + for (ContainerStatus cs : remoteNodeStatus.getContainersStatuses()) { + ContainerState newState = cs.getState(); + if (newState == ContainerState.RUNNING || + newState == ContainerState.NEW) { + numActiveContainers++; + } + context.numActiveContainers = numActiveContainers; + ApplicationId aid = cs.getContainerId() + .getApplicationAttemptId().getApplicationId(); + if (!context.appIds.contains(aid)) { + context.appIds.add(aid); + } + } + + context.numActiveContainers = numActiveContainers; + + // maintain lastContainerFinishTime. + if (context.numActiveContainers == 0 && + context.lastContainerFinishTime == 0) { + context.lastContainerFinishTime = now; + } + } else { + // remove node in other states + if (context != null) { + decomNodes.remove(rmNode.getNodeID()); + } + } + } + + public synchronized void remove(NodeId nodeId) { + DecommissioningNodeContext context = decomNodes.get(nodeId); + if (context != null) { + LOG.info("remove " + nodeId + " in " + context.nodeState); + decomNodes.remove(nodeId); + } + } + + /** + * Status about a specific decommissioning node. + * + */ + public enum DecomNodeStatus { + // Node is not in DECOMMISSIONING state. + NONE, + + // wait for running containers to complete + WAIT_CONTAINER, + + // wait for running application to complete (after all containers complete); + WAIT_APP, + + // Timeout waiting for either containers or applications to complete. + TIMEOUT, + + // nothing to wait, ready to be decommissioned + READY, + + // The node has already been decommissioned + DECOMMISSIONED, + } + + public boolean checkReadyToBeDecommissioned(NodeId nodeId) { + DecomNodeStatus s = getDecommissioningStatus(nodeId); + return (s == DecomNodeStatus.READY || s == DecomNodeStatus.TIMEOUT); + } + + public DecomNodeStatus getDecommissioningStatus(NodeId nodeId) { + DecommissioningNodeContext context = decomNodes.get(nodeId); + if (context == null) { + return DecomNodeStatus.NONE; + } + + if (context.nodeState == NodeState.DECOMMISSIONED) { + return DecomNodeStatus.DECOMMISSIONED; + } + + long waitTime = + System.currentTimeMillis() - context.decommissioningStartTime; + if (context.numActiveContainers > 0) { + return (context.timeoutMs < 0 || waitTime < context.timeoutMs)? + DecomNodeStatus.WAIT_CONTAINER : DecomNodeStatus.TIMEOUT; + } + + removeCompletedApps(context); + if (context.appIds.size() == 0) { + return DecomNodeStatus.READY; + } else { + return (context.timeoutMs < 0 || waitTime < context.timeoutMs)? + DecomNodeStatus.WAIT_APP : DecomNodeStatus.TIMEOUT; + } + } + + /** + * Periodically invoked by ResourceTrackerService (RTS) to: + * 1. log status of all DECOMMISSIONING nodes; + * 2. identify and return stale DECOMMISSIONING nodes + * (to be taken care of by RTS). + */ + public Set poll() { + if (decomNodes.size() == 0 || + pollWatch.elapsedTime(TimeUnit.SECONDS) < 20) { + return emptyNodeIdSet; + } + return pollInternal(); + } + + private synchronized Set pollInternal() { + pollWatch.reset().start(); + readDecommissioningTimeout(null); + logDecommissioningNodesStatus(); + long now = System.currentTimeMillis(); + Set output = new HashSet(); + + for (Iterator> it = + decomNodes.entrySet().iterator(); it.hasNext();) { + Map.Entry e = it.next(); + DecommissioningNodeContext d = e.getValue(); + // Skip node recently updated (NM usually updates every second). + if (now - d.lastUpdateTime < 30000L) { + continue; + } + // Remove stale non-DECOMMISSIONING node + if (d.nodeState != NodeState.DECOMMISSIONING) { + LOG.info("remove " + d.nodeState + " " + d.nodeId); + it.remove(); + continue; + } else if (now - d.lastUpdateTime > 60000L) { + // Node DECOMMISSIONED could become stale, check RMNode state to remove. + RMNode rmNode = getRmNode(d.nodeId); + if (rmNode != null && rmNode.getState() == NodeState.DECOMMISSIONED) { + LOG.info("remove " + rmNode.getState() + " " + d.nodeId); + it.remove(); + continue; + } + } + if (d.timeoutMs >= 0 && d.decommissioningStartTime + d.timeoutMs < now) { + output.add(d.nodeId); + LOG.info("Identified stale and timeout node " + d.nodeId); + } + } + return output; + } + + private RMNode getRmNode(NodeId nodeId) { + RMNode rmNode = this.rmContext.getRMNodes().get(nodeId); + if (rmNode == null) { + rmNode = this.rmContext.getInactiveRMNodes().get(nodeId); + } + return rmNode; + } + + private void removeCompletedApps(DecommissioningNodeContext context) { + Iterator it = context.appIds.iterator(); + while (it.hasNext()) { + ApplicationId appId = it.next(); + RMApp rmApp = rmContext.getRMApps().get(appId); + if (rmApp == null) { + LOG.info("Consider non-existing app " + appId + " as completed"); + it.remove(); + continue; + } + if (rmApp.getState() == RMAppState.FINISHED || + rmApp.getState() == RMAppState.FAILED || + rmApp.getState() == RMAppState.KILLED) { + LOG.info("Remove " + rmApp.getState() + " app " + appId); + it.remove(); + } + } + } + + // Time in second to be decommissioned. + private int getTimeoutInSec(DecommissioningNodeContext context) { + if (context.nodeState == NodeState.DECOMMISSIONED) { + return 0; + } else if (context.nodeState != NodeState.DECOMMISSIONING) { + return -1; + } + if (context.appIds.size() == 0 && context.numActiveContainers == 0) { + return 0; + } + // negative timeout value means no timeout (infinite timeout). + if (context.timeoutMs < 0) { + return -1; + } + + long now = System.currentTimeMillis(); + long timeout = context.decommissioningStartTime + context.timeoutMs - now; + return Math.max(0, (int)(timeout / 1000)); + } + + private void logDecommissioningNodesStatus() { + if (decomNodes.size() == 0) { + return; + } + StringBuilder sb = new StringBuilder(); + long now = System.currentTimeMillis(); + for (DecommissioningNodeContext d : decomNodes.values()) { + DecomNodeStatus s = getDecommissioningStatus(d.nodeId); + sb.append(String.format( + "%n %-34s %4ds fresh:%3ds containers:%2d %14s", + d.nodeId.getHost(), + (now - d.decommissioningStartTime) / 1000, + (now - d.lastUpdateTime) / 1000, + d.numActiveContainers, + s)); + if (s == DecomNodeStatus.WAIT_APP || + s == DecomNodeStatus.WAIT_CONTAINER) { + sb.append(String.format(" timeout:%4ds", getTimeoutInSec(d))); + } + for (ApplicationId aid : d.appIds) { + sb.append("\n " + aid); + RMApp rmApp = rmContext.getRMApps().get(aid); + if (rmApp != null) { + sb.append(String.format( + " %s %9s %5.2f%% %5ds", + rmApp.getState(), + (rmApp.getApplicationType() == null)? + "" : rmApp.getApplicationType(), + 100.0 * rmApp.getProgress(), + (System.currentTimeMillis() - rmApp.getStartTime()) / 1000)); + } + } + } + LOG.info("Decommissioning Nodes: " + sb.toString()); + } + + // Read possible new DECOMMISSIONING_TIMEOUT_KEY from yarn-site.xml. + // This enables DecommissioningNodesWatcher to pick up new value + // without ResourceManager restart. + private void readDecommissioningTimeout(Configuration conf) { + try { + if (conf == null) { + conf = new YarnConfiguration(); + } + int v = conf.getInt( + YarnConfiguration.DECOMMISSIONING_DEFAULT_TIMEOUT_KEY, + YarnConfiguration.DEFAULT_DECOMMISSIONING_TIMEOUT); + if (defaultTimeoutMs != 1000L * v) { + defaultTimeoutMs = 1000L * v; + LOG.info("Use new decommissioningTimeoutMs: " + defaultTimeoutMs); + } + } catch (Exception e) { + LOG.info("Error readDecommissioningTimeout ", e); + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java index 121c418..319eafc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java @@ -19,14 +19,17 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.io.IOException; +import java.util.ArrayList; import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; -import java.util.Map; -import java.util.Iterator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -49,11 +52,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; - -import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; + @SuppressWarnings("unchecked") public class NodesListManager extends CompositeService implements EventHandler { @@ -112,10 +116,11 @@ private void printConfiguredHosts() { if (!LOG.isDebugEnabled()) { return; } - - LOG.debug("hostsReader: in=" + conf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + + LOG.debug("hostsReader: in=" + + conf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH) + " out=" + - conf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, + conf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH)); for (String include : hostsReader.getHosts()) { LOG.debug("include: " + include); @@ -125,30 +130,31 @@ private void printConfiguredHosts() { } } - public void refreshNodes(Configuration yarnConf) throws IOException, - YarnException { - refreshHostsReader(yarnConf); + public void refreshNodes(Configuration yarnConf) + throws IOException, YarnException { + refreshNodes(yarnConf, false); + } - for (NodeId nodeId: rmContext.getRMNodes().keySet()) { - if (!isValidNode(nodeId.getHost())) { - this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION)); - } - } + public void refreshNodes(Configuration yarnConf, boolean graceful) + throws IOException, YarnException { + refreshHostsReader(yarnConf, graceful, null); } - private void refreshHostsReader(Configuration yarnConf) throws IOException, - YarnException { + private void refreshHostsReader( + Configuration yarnConf, boolean graceful, Integer timeout) + throws IOException, YarnException { synchronized (hostsReader) { if (null == yarnConf) { yarnConf = new YarnConfiguration(); } + includesFile = yarnConf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH); excludesFile = yarnConf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH); + LOG.info("refreshNodes excludesFile " + excludesFile); hostsReader.updateFileNames(includesFile, excludesFile); hostsReader.refresh( includesFile.isEmpty() ? null : this.rmContext @@ -156,6 +162,13 @@ private void refreshHostsReader(Configuration yarnConf) throws IOException, this.conf, includesFile), excludesFile.isEmpty() ? null : this.rmContext.getConfigurationProvider() .getConfigurationInputStream(this.conf, excludesFile)); + + LOG.info("hostsReader include:{" + + Joiner.on(',').join(hostsReader.getHosts()) + + "} exclude:{" + + Joiner.on(',').join(hostsReader.getExcludedHosts()) + "}"); + + handleExcludeNodeList(graceful, timeout); printConfiguredHosts(); } } @@ -171,6 +184,75 @@ private void setDecomissionedNMs() { } } + // Handle excluded nodes based on following rules: + // Recommission DECOMMISSIONED or DECOMMISSIONING nodes no longer excluded; + // Gracefully decommission excluded nodes that are not already + // DECOMMISSIONED nor DECOMMISSIONING; Take no action for excluded nodes + // that are already DECOMMISSIONED or DECOMMISSIONING. + private void handleExcludeNodeList(boolean graceful, Integer timeout) { + // DECOMMISSIONED/DECOMMISSIONING nodes need to be re-commissioned. + List nodesToRecom = new ArrayList(); + + // Nodes need to be decommissioned (graceful or forceful); + List nodesToDecom = new ArrayList(); + + Map timeouts = hostsReader.getExcludedHostsWithTimeout(); + + for (RMNode n : this.rmContext.getRMNodes().values()) { + NodeState s = n.getState(); + // An invalid node (either due to explicit exclude or not include) + // should be excluded. + boolean exclude = !isValidNode(n.getHostName()); + String nodeStr = "node " + n.getNodeID() + " with state " + s; + if (!exclude) { + if (s == NodeState.DECOMMISSIONED || s == NodeState.DECOMMISSIONING) { + LOG.info("Recommission " + nodeStr); + nodesToRecom.add(n); + } + // Otherwise no-action needed. + } else { + // exclude is true. + if (graceful) { + // Use per node timeout if exist otherwise the request timeout. + Integer timeoutToUse = (timeouts.get(n.getHostName()) != null)? + timeouts.get(n.getHostName()) : timeout; + if (s != NodeState.DECOMMISSIONED && + s != NodeState.DECOMMISSIONING) { + LOG.info("Gracefully decommission " + nodeStr); + nodesToDecom.add(n); + } else if (s == NodeState.DECOMMISSIONING && + !Objects.equals(n.getDecommissioningTimeout(), + timeoutToUse)) { + LOG.info("Update " + nodeStr + " timeout to be " + timeoutToUse); + nodesToDecom.add(n); + } else { + LOG.info("No action for " + nodeStr); + } + } else { + if (s != NodeState.DECOMMISSIONED) { + LOG.info("Forcefully decommission " + nodeStr); + nodesToDecom.add(n); + } + } + } + } + + for (RMNode n : nodesToRecom) { + RMNodeEvent e = new RMNodeEvent( + n.getNodeID(), RMNodeEventType.RECOMMISSION); + this.rmContext.getDispatcher().getEventHandler().handle(e); + } + + for (RMNode n : nodesToDecom) { + RMNodeEventType eventType = graceful? + RMNodeEventType.GRACEFUL_DECOMMISSION : RMNodeEventType.DECOMMISSION; + Integer timeoutToUse = (timeouts.get(n.getHostName()) != null)? + timeouts.get(n.getHostName()) : timeout; + RMNodeEvent e = new RMNodeEvent(n.getNodeID(), eventType, timeoutToUse); + this.rmContext.getDispatcher().getEventHandler().handle(e); + } + } + @VisibleForTesting public Resolver getResolver() { return resolver; @@ -378,25 +460,13 @@ private HostsFileReader createHostsFileReader(String includesFile, * Refresh the nodes gracefully * * @param conf + * @param timeout decommission timeout, null means default timeout. * @throws IOException * @throws YarnException */ - public void refreshNodesGracefully(Configuration conf) throws IOException, - YarnException { - refreshHostsReader(conf); - for (Entry entry:rmContext.getRMNodes().entrySet()) { - NodeId nodeId = entry.getKey(); - if (!isValidNode(nodeId.getHost())) { - this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeEvent(nodeId, RMNodeEventType.GRACEFUL_DECOMMISSION)); - } else { - // Recommissioning the nodes - if (entry.getValue().getState() == NodeState.DECOMMISSIONING) { - this.rmContext.getDispatcher().getEventHandler() - .handle(new RMNodeEvent(nodeId, RMNodeEventType.RECOMMISSION)); - } - } - } + public void refreshNodesGracefully(Configuration yarnConf, Integer timeout) + throws IOException, YarnException { + refreshHostsReader(yarnConf, true, timeout); } /** @@ -489,4 +559,4 @@ public void setHost(String hst) { this.host = hst; } } -} \ No newline at end of file +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java index e19d55e..d130e0b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java @@ -70,11 +70,12 @@ public static List queryRMNodes(RMContext context, EnumSet acceptedStates) { - // nodes contains nodes that are NEW, RUNNING OR UNHEALTHY + // nodes contains nodes that are NEW, RUNNING, UNHEALTHY or DECOMMISSIONING. ArrayList results = new ArrayList(); if (acceptedStates.contains(NodeState.NEW) || acceptedStates.contains(NodeState.RUNNING) || - acceptedStates.contains(NodeState.UNHEALTHY)) { + acceptedStates.contains(NodeState.UNHEALTHY) || + acceptedStates.contains(NodeState.DECOMMISSIONING)) { for (RMNode rmNode : context.getRMNodes().values()) { if (acceptedStates.contains(rmNode.getState())) { results.add(rmNode); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index b0bc565..67324a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -105,6 +105,8 @@ private int minAllocMb; private int minAllocVcores; + private DecommissioningNodesWatcher decomWatcher; + private boolean isDistributedNodeLabelsConf; private boolean isDelegatedCentralizedNodeLabelsConf; private volatile DynamicResourceConfiguration drConf; @@ -120,7 +122,7 @@ public ResourceTrackerService(RMContext rmContext, this.nmLivelinessMonitor = nmLivelinessMonitor; this.containerTokenSecretManager = containerTokenSecretManager; this.nmTokenSecretManager = nmTokenSecretManager; - + this.decomWatcher = new DecommissioningNodesWatcher(rmContext); } @Override @@ -161,6 +163,7 @@ protected void serviceInit(Configuration conf) throws Exception { loadDynamicResourceConfiguration(conf); + decomWatcher.init(conf); super.serviceInit(conf); } @@ -474,6 +477,8 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) // Send ping this.nmLivelinessMonitor.receivedPing(nodeId); + this.decomWatcher.update(rmNode, remoteNodeStatus); + // 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat NodeHeartbeatResponse lastNodeHeartbeatResponse = rmNode.getLastNodeHeartBeatResponse(); if (remoteNodeStatus.getResponseId() + 1 == lastNodeHeartbeatResponse @@ -495,6 +500,20 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) message); } + // Evaluate whether a DECOMMISSIONING node is ready to be DECOMMISSIONED. + if (rmNode.getState() == NodeState.DECOMMISSIONING && + decomWatcher.checkReadyToBeDecommissioned(rmNode.getNodeID())) { + String message = "DECOMMISSIONING " + nodeId + + " is ready to be decommissioned"; + LOG.info(message); + this.rmContext.getDispatcher().getEventHandler().handle( + new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION)); + return YarnServerBuilderUtils.newNodeHeartbeatResponse( + NodeAction.SHUTDOWN, message); + } + + pollDecommissioningNodesWatcher(); + // Heartbeat response NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils .newNodeHeartbeatResponse(lastNodeHeartbeatResponse. @@ -539,6 +558,26 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) return nodeHeartBeatResponse; } + // Poll DecommissioningNodesWatcher to log status and take care of + // DECOMMISSIONING node that didn't update itself + // (already terminated for example). + private void pollDecommissioningNodesWatcher() { + Set nodes = decomWatcher.poll(); + for (NodeId nodeId : nodes) { + RMNode rmNode = this.rmContext.getRMNodes().get(nodeId); + if (rmNode == null || rmNode.getState() != NodeState.DECOMMISSIONING) { + decomWatcher.remove(nodeId); + continue; + } + if (rmNode.getState() == NodeState.DECOMMISSIONING && + decomWatcher.checkReadyToBeDecommissioned(rmNode.getNodeID())) { + LOG.info("DECOMMISSIONING " + nodeId + " timeout"); + this.rmContext.getDispatcher().getEventHandler().handle( + new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION)); + } + } + } + /** * Check if node in decommissioning state. * @param nodeId diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index d8df9f1..1ea7a30 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -168,4 +168,11 @@ public void updateNodeHeartbeatResponseForContainersDecreasing( NodeHeartbeatResponse response); public List pullNewlyIncreasedContainers(); + + /* + * Optional decommissioning timeout in second + * (null indicates default timeout). + * @return the decommissioning timeout in second. + */ + Integer getDecommissioningTimeout(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEvent.java index 9ecb366..f7c492f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEvent.java @@ -24,13 +24,28 @@ public class RMNodeEvent extends AbstractEvent { private final NodeId nodeId; + // Optional decommissioning timeout in second. + private final Integer decommissioningTimeout; public RMNodeEvent(NodeId nodeId, RMNodeEventType type) { super(type); this.nodeId = nodeId; + this.decommissioningTimeout = null; + } + + // Create instance with optional timeout + // (timeout could be null which means use default). + public RMNodeEvent(NodeId nodeId, RMNodeEventType type, Integer timeout) { + super(type); + this.nodeId = nodeId; + this.decommissioningTimeout = timeout; } public NodeId getNodeId() { return this.nodeId; } + + public Integer getDecommissioningTimeout() { + return this.decommissioningTimeout; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 9b80716..edad663 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -26,6 +26,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentLinkedQueue; @@ -119,6 +120,7 @@ private String healthReport; private long lastHealthReportTime; private String nodeManagerVersion; + private Integer decommissioningTimeout; /* Aggregated resource utilization for the containers. */ private ResourceUtilization containersUtilization; @@ -170,7 +172,6 @@ NodeState, RMNodeEventType, RMNodeEvent>(NodeState.NEW) - //Transitions from NEW state .addTransition(NodeState.NEW, NodeState.RUNNING, RMNodeEventType.STARTED, new AddNodeTransition()) @@ -232,6 +233,16 @@ RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM, new AddContainersToBeRemovedFromNMTransition()) + .addTransition(NodeState.DECOMMISSIONED, NodeState.RUNNING, + RMNodeEventType.RECOMMISSION, + new RecommissionNodeTransition(NodeState.RUNNING)) + .addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED, + RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition()) + .addTransition(NodeState.DECOMMISSIONED, NodeState.LOST, + RMNodeEventType.EXPIRE, new DeactivateNodeTransition(NodeState.LOST)) + .addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED, + RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition()) + //Transitions from DECOMMISSIONING state .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONED, RMNodeEventType.DECOMMISSION, @@ -256,6 +267,9 @@ .addTransition(NodeState.DECOMMISSIONING, NodeState.REBOOTED, RMNodeEventType.REBOOTING, new DeactivateNodeTransition(NodeState.REBOOTED)) + .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING, + RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM, + new AddContainersToBeRemovedFromNMTransition()) .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING, RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition()) @@ -620,7 +634,7 @@ public void handle(RMNodeEvent event) { } catch (InvalidStateTransitionException e) { LOG.error("Can't handle this event at current state", e); LOG.error("Invalid event " + event.getType() + - " on Node " + this.nodeId); + " on Node " + this.nodeId + " oldState " + oldState); } if (oldState != getState()) { LOG.info(nodeId + " Node Transitioned from " + oldState + " to " @@ -653,6 +667,9 @@ private void updateMetricsForRejoinedNode(NodeState previousNodeState) { case SHUTDOWN: metrics.decrNumShutdownNMs(); break; + case DECOMMISSIONING: + metrics.decrDecommissioningNMs(); + break; default: LOG.debug("Unexpected previous node state"); } @@ -699,6 +716,9 @@ private void updateMetricsForDeactivatedNode(NodeState initialState, case DECOMMISSIONING: metrics.decrDecommissioningNMs(); break; + case DECOMMISSIONED: + metrics.decrDecommisionedNMs(); + break; case UNHEALTHY: metrics.decrNumUnhealthyNMs(); break; @@ -1071,9 +1091,23 @@ public DecommissioningNodeTransition(NodeState initState, @Override public void transition(RMNodeImpl rmNode, RMNodeEvent event) { + // Pick up possible updates on decommissioningTimeout. + if (rmNode.getState() == NodeState.DECOMMISSIONING) { + if (!Objects.equals(rmNode.getDecommissioningTimeout(), + event.getDecommissioningTimeout())) { + LOG.info("Update " + rmNode.getNodeID() + + " DecommissioningTimeout to be " + + event.getDecommissioningTimeout()); + rmNode.decommissioningTimeout = event.getDecommissioningTimeout(); + } else { + LOG.info(rmNode.getNodeID() + " is already DECOMMISSIONING"); + } + return; + } LOG.info("Put Node " + rmNode.nodeId + " in DECOMMISSIONING."); // Update NM metrics during graceful decommissioning. rmNode.updateMetricsForGracefulDecommission(initState, finalState); + rmNode.decommissioningTimeout = event.getDecommissioningTimeout(); if (rmNode.originalTotalCapability == null){ rmNode.originalTotalCapability = Resources.clone(rmNode.totalCapability); @@ -1140,24 +1174,6 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { return NodeState.UNHEALTHY; } } - if (isNodeDecommissioning) { - List runningApps = rmNode.getRunningApps(); - - List keepAliveApps = statusEvent.getKeepAliveAppIds(); - - // no running (and keeping alive) app on this node, get it - // decommissioned. - // TODO may need to check no container is being scheduled on this node - // as well. - if ((runningApps == null || runningApps.size() == 0) - && (keepAliveApps == null || keepAliveApps.size() == 0)) { - RMNodeImpl.deactivateNode(rmNode, NodeState.DECOMMISSIONED); - return NodeState.DECOMMISSIONED; - } - - // TODO (in YARN-3223) if node in decommissioning, get node resource - // updated if container get finished (keep available resource to be 0) - } rmNode.handleContainerStatus(statusEvent.getContainers()); rmNode.handleReportedIncreasedContainers( @@ -1383,4 +1399,9 @@ private void handleLogAggregationStatus( public Resource getOriginalTotalCapability() { return this.originalTotalCapability; } - } + + @Override + public Integer getDecommissioningTimeout() { + return decommissioningTimeout; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java index 3012d0d..1789e09 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java @@ -97,7 +97,7 @@ public ClusterMetricsInfo(final ResourceManager rm) { this.rebootedNodes = clusterMetrics.getNumRebootedNMs(); this.shutdownNodes = clusterMetrics.getNumShutdownNMs(); this.totalNodes = activeNodes + lostNodes + decommissionedNodes - + rebootedNodes + unhealthyNodes + shutdownNodes; + + rebootedNodes + unhealthyNodes + decommissioningNodes + shutdownNodes; } public int getAppsSubmitted() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 89aff29..6b72f19 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -260,6 +260,11 @@ public ResourceUtilization getAggregatedContainersUtilization() { public ResourceUtilization getNodeUtilization() { return this.nodeUtilization; } + + @Override + public Integer getDecommissioningTimeout() { + return null; + } }; private static RMNode buildRMNode(int rack, final Resource perNode, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 25c558f..6ab7064 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -638,17 +638,25 @@ public void sendNodeLost(MockNM nm) throws Exception { node.handle(new RMNodeEvent(nm.getNodeId(), RMNodeEventType.EXPIRE)); } + public void sendNodeEvent(MockNM nm, RMNodeEventType event) throws Exception { + RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get(nm.getNodeId()); + node.handle(new RMNodeEvent(nm.getNodeId(), event)); + } + public void NMwaitForState(NodeId nodeid, NodeState finalState) throws Exception { RMNode node = getRMContext().getRMNodes().get(nodeid); + if (node == null) { + node = getRMContext().getInactiveRMNodes().get(nodeid); + } Assert.assertNotNull("node shouldn't be null", node); int timeoutSecs = 0; while (!finalState.equals(node.getState()) && timeoutSecs++ < 20) { - System.out.println("Node State is : " + node.getState() + System.out.println("Node " + nodeid.getHost() + " State is : " + node.getState() + " Waiting for state : " + finalState); Thread.sleep(500); } - System.out.println("Node State is : " + node.getState()); + System.out.println("Node " + nodeid.getHost() + " State is : " + node.getState()); Assert.assertEquals("Node state is not correct (timedout)", finalState, node.getState()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java new file mode 100644 index 0000000..1039908 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.resourcemanager.DecommissioningNodesWatcher.DecomNodeStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +public class TestDecommissioningNodesWatcher { + private MockRM rm; + + @Test + public void testDecommissioningNodesWatcher() throws Exception { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.DECOMMISSIONING_DEFAULT_TIMEOUT_KEY, "40"); + + rm = new MockRM(conf); + rm.start(); + + DecommissioningNodesWatcher watcher = + new DecommissioningNodesWatcher(rm.getRMContext()); + + MockNM nm1 = rm.registerNode("host1:1234", 10240); + RMNode node1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); + NodeId id1 = nm1.getNodeId(); + + rm.NMwaitForState(id1, NodeState.RUNNING); + Assert.assertFalse(watcher.checkReadyToBeDecommissioned(id1)); + + RMApp app = rm.submitApp(2000); + MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1); + + // Setup nm1 as DECOMMISSIONING for DecommissioningNodesWatcher. + rm.sendNodeEvent(nm1, RMNodeEventType.GRACEFUL_DECOMMISSION); + rm.NMwaitForState(id1, NodeState.DECOMMISSIONING); + + // Update status with decreasing number of running containers until 0. + watcher.update(node1, createNodeStatus(id1, app, 12)); + watcher.update(node1, createNodeStatus(id1, app, 11)); + Assert.assertFalse(watcher.checkReadyToBeDecommissioned(id1)); + + watcher.update(node1, createNodeStatus(id1, app, 1)); + Assert.assertEquals(DecomNodeStatus.WAIT_CONTAINER, + watcher.getDecommissioningStatus(id1)); + + watcher.update(node1, createNodeStatus(id1, app, 0)); + Assert.assertEquals(DecomNodeStatus.WAIT_APP, + watcher.getDecommissioningStatus(id1)); + + // Set app to be FINISHED and verified DecommissioningNodeStatus is READY. + MockRM.finishAMAndVerifyAppState(app, rm, nm1, am); + rm.waitForState(app.getApplicationId(), RMAppState.FINISHED); + Assert.assertEquals(DecomNodeStatus.READY, + watcher.getDecommissioningStatus(id1)); + } + + @After + public void tearDown() { + if (rm != null) { + rm.stop(); + } + } + + private NodeStatus createNodeStatus( + NodeId nodeId, RMApp app, int numRunningContainers) { + return NodeStatus.newInstance( + nodeId, 0, getContainerStatuses(app, numRunningContainers), + new ArrayList(), + NodeHealthStatus.newInstance( + true, "", System.currentTimeMillis() - 1000), + null, null, null); + } + + // Get mocked ContainerStatus for bunch of containers, + // where numRunningContainers are RUNNING. + private List getContainerStatuses( + RMApp app, int numRunningContainers) { + // Total 12 containers + final int total = 12; + numRunningContainers = Math.min(total, numRunningContainers); + List output = new ArrayList(); + for (int i = 0; i < total; i++) { + ContainerState cstate = (i >= numRunningContainers)? + ContainerState.COMPLETE : ContainerState.RUNNING; + output.add(ContainerStatus.newInstance( + ContainerId.newContainerId( + ApplicationAttemptId.newInstance(app.getApplicationId(), i), 1), + cstate, "Dummy", 0)); + } + return output; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 7c03574..c71a698 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -255,17 +255,6 @@ public void testStatusUpdateOnDecommissioningNode() { cm.getNumDecommissioningNMs()); Assert.assertEquals("Decommissioned Nodes", initialDecommissioned, cm.getNumDecommisionedNMs()); - - // Verify node in DECOMMISSIONING will be changed by status update - // without running apps - statusEvent = getMockRMNodeStatusEventWithoutRunningApps(); - node.handle(statusEvent); - Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState()); - Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs()); - Assert.assertEquals("Decommissioning Nodes", initialDecommissioning - 1, - cm.getNumDecommissioningNMs()); - Assert.assertEquals("Decommissioned Nodes", initialDecommissioned + 1, - cm.getNumDecommisionedNMs()); } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 4259e6b..327c945 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; @@ -215,6 +216,107 @@ public void testDecommissionWithExcludeHosts() throws Exception { } /** + * Graceful decommission node with no running application. + */ + @Test + public void testGracefulDecommissionNoApp() throws Exception { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile + .getAbsolutePath()); + + writeToHostsFile(""); + rm = new MockRM(conf); + rm.start(); + + MockNM nm1 = rm.registerNode("host1:1234", 5120); + MockNM nm2 = rm.registerNode("host2:5678", 10240); + MockNM nm3 = rm.registerNode("host3:4433", 5120); + + int metricCount = ClusterMetrics.getMetrics().getNumDecommisionedNMs(); + NodeHeartbeatResponse nodeHeartbeat1 = nm1.nodeHeartbeat(true); + NodeHeartbeatResponse nodeHeartbeat2 = nm2.nodeHeartbeat(true); + NodeHeartbeatResponse nodeHeartbeat3 = nm3.nodeHeartbeat(true); + + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat1.getNodeAction())); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat2.getNodeAction())); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat3.getNodeAction())); + + rm.NMwaitForState(nm2.getNodeId(), NodeState.RUNNING); + rm.NMwaitForState(nm3.getNodeId(), NodeState.RUNNING); + + // Graceful decommission both host2 and host3. + writeToHostsFile("host2", "host3"); + rm.getNodesListManager().refreshNodes(conf, true); + + rm.NMwaitForState(nm2.getNodeId(), NodeState.DECOMMISSIONING); + rm.NMwaitForState(nm3.getNodeId(), NodeState.DECOMMISSIONING); + + nodeHeartbeat1 = nm1.nodeHeartbeat(true); + nodeHeartbeat2 = nm2.nodeHeartbeat(true); + nodeHeartbeat3 = nm3.nodeHeartbeat(true); + + checkDecommissionedNMCount(rm, metricCount + 2); + rm.NMwaitForState(nm2.getNodeId(), NodeState.DECOMMISSIONED); + rm.NMwaitForState(nm3.getNodeId(), NodeState.DECOMMISSIONED); + + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat1.getNodeAction())); + Assert.assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat2.getNodeAction()); + Assert.assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat3.getNodeAction()); + } + + /** + * Graceful decommission node with running application. + */ + @Test + public void testGracefulDecommissionWithApp() throws Exception { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile + .getAbsolutePath()); + + writeToHostsFile(""); + rm = new MockRM(conf); + rm.start(); + + MockNM nm1 = rm.registerNode("host1:1234", 10240); + MockNM nm2 = rm.registerNode("host2:5678", 20480); + MockNM nm3 = rm.registerNode("host3:4433", 10240); + NodeId id1 = nm1.getNodeId(); + NodeId id3 = nm3.getNodeId(); + rm.NMwaitForState(id1, NodeState.RUNNING); + rm.NMwaitForState(id3, NodeState.RUNNING); + + // Create an app and launch two containers on host1. + RMApp app = rm.submitApp(2000); + MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1); + ApplicationAttemptId aaid = app.getCurrentAppAttempt().getAppAttemptId(); + + // Graceful decommission host1 and host3 + writeToHostsFile("host1", "host3"); + rm.getNodesListManager().refreshNodes(conf, true); + rm.NMwaitForState(id1, NodeState.DECOMMISSIONING); + rm.NMwaitForState(id3, NodeState.DECOMMISSIONING); + + // host1 should be DECOMMISSIONING due to running containers. + // host3 should become DECOMMISSIONED. + nm1.nodeHeartbeat(aaid, 2, ContainerState.RUNNING); + nm3.nodeHeartbeat(true); + rm.NMwaitForState(id1, NodeState.DECOMMISSIONING); + rm.NMwaitForState(id3, NodeState.DECOMMISSIONED); + nm1.nodeHeartbeat(aaid, 2, ContainerState.RUNNING); + + // Complete containers on host1. Since the app is still RUNNING, expect NodeAction.NORMAL. + NodeHeartbeatResponse nodeHeartbeat1 = nm1.nodeHeartbeat(aaid, 2, ContainerState.COMPLETE); + Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat1.getNodeAction()); + + // Finish the app and verified DECOMMISSIONED. + MockRM.finishAMAndVerifyAppState(app, rm, nm1, am); + rm.waitForState(app.getApplicationId(), RMAppState.FINISHED); + nodeHeartbeat1 = nm1.nodeHeartbeat(aaid, 2, ContainerState.COMPLETE); + Assert.assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat1.getNodeAction()); + rm.NMwaitForState(id1, NodeState.DECOMMISSIONED); + } + + /** * Decommissioning using a post-configured include hosts file */ @Test @@ -1280,7 +1382,7 @@ public void testIncorrectRecommission() throws Exception { excludeHostFile.getAbsolutePath()); writeToHostsFile(hostFile, "host1", "host2"); writeToHostsFile(excludeHostFile, "host1"); - rm.getNodesListManager().refreshNodesGracefully(conf); + rm.getNodesListManager().refreshNodesGracefully(conf, null); rm.drainEvents(); nm1.nodeHeartbeat(true); rm.drainEvents(); @@ -1289,7 +1391,7 @@ public void testIncorrectRecommission() throws Exception { .getInactiveRMNodes().get(nm1.getNodeId()).getState() == NodeState .DECOMMISSIONED); writeToHostsFile(excludeHostFile, ""); - rm.getNodesListManager().refreshNodesGracefully(conf); + rm.getNodesListManager().refreshNodesGracefully(conf, null); rm.drainEvents(); Assert.assertTrue("Node " + nm1.getNodeId().getHost() + " should be Decommissioned", rm.getRMContext() diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/GracefulDecommission.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/GracefulDecommission.md new file mode 100644 index 0000000..5254f94 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/GracefulDecommission.md @@ -0,0 +1,106 @@ + + +Graceful Decommission of Yarn Nodes +=============== + +* [Overview](#overview) +* [Features](#features) + * [NodesListManager detects and handles include and exclude list changes](#nodeslistmanager-detects-and-handles-include-and-exclude-list-changes) + * [RMNode handles decommission events](#rmnode-handles-decommission-events) + * [Automatic and asynchronous tracking of decommissioning nodes status](#automatic-and-asynchronous-tracking-of-decommissioning-nodes-status) + * [Per-Node decommission timeout support](#per-node-decommission-timeout-support) +* [Configuration](#configuration) + +Overview +-------- + +Graceful Decommission of Yarn Nodes is the mechanism to decommission NMs while minimize the impact to running applications. Once a node is in DECOMMISSIONING state, RM won't schedule new containers on it and running ll wait for running containers and applications to complete (or timeout exceeded) before mark the node as DECOMMISSIONED. + +Features +-------- + +###NodesListManager detects and handles include and exclude list changes + +"yarn rmadmin -refreshNodes -g [timeout in seconds]" notifies NodesListManager to detect and handle include and exclude list changes. NodesListManager loads excluded hosts from the exclude file as specified through the "yarn.resourcemanager.nodes.exclude-path" configuration in yarn-site.xml. + +NodesListManager inspects and compares status of RMNodes in resource manager and the exclude list, and handle excluded nodes based on following rules: + +* Recommission DECOMMISSIONED or DECOMMISSIONING nodes that are no longer excluded; +* Gracefully decommission excluded nodes that are not already in DECOMMISSIONED nor +DECOMMISSIONING state; +* Forcefully decommission excluded nodes that are not already in DECOMMISSIONED state should -g flag is not specified. + +Accordingly, RECOMMISSION, GRACEFUL_DECOMMISSION or DECOMMISSION RMNodeEvent will be sent to the RMNode. + +###RMNode handles decommission events + +Upon receiving GRACEFUL_DECOMMISSION event, the RMNode will save the decommissioning timeout if specified, update metrics for graceful decommission and preserve its original total capacity, and transition into DECOMMISSIONING state. + +Resources will be dynamically and periodically updated on DECOMMISSIONING RMNode so that scheduler won't be scheduling new containers on them due to no available resources. + +###Automatic and asynchronous tracking of decommissioning nodes status + +**DecommissioningNodeWatcher** inside ResourceTrackingService tracks DECOMMISSIONING nodes +status automatically and asynchronously after client/admin made the graceful decommission +request. ResourceTrackerService handles node registration and frequent heart beat with latest +contain status so it is a natural place to hook up automatic decommissioning status tracking logic. +ResourceTrackerService tracks heartbeat updates on all DECOMMISSIONING nodes to decide when, +after all running containers on the node have completed, will be transitioned into DECOMMISSIONED state +after which NodeManager will be told to shutdown. + +Under MR application, a node, after completes all its containers, may still serve it map output data +during the duration of the application for reducers. A fully graceful mechanism would keep such +DECOMMISSIONING nodes until all involved applications complete. It could be however +undesirable under long-running applications scenario where a bunch of "idle" nodes would stay +around for long period of time. DecommissioningNodesWatcher balance such concern with a +timeout policy --- a DECOMMISSIONING node will be DECOMMISSIONED no later than +DECOMMISSIONING_TIMEOUT regardless of running containers or applications. If running +containers finished earlier, it continues wait up to DECOMMISSIONING_TIMEOUT for the +applications to finish. Following are the sub status of a decommissioning node: + +* NONE --- Node is not in DECOMMISSIONING state. +* WAIT_CONTAINER --- wait for running containers to complete. +* WAIT_APP --- wait for running application to complete (after all containers complete) +* TIMEOUT --- Timeout waiting for either containers or applications to complete +* READY --- Nothing to wait, ready to be decommissioned +* DECOMMISSIONED --- The node has already been decommissioned + +Status of all decommissioning node are logged periodically (every 20 seconds) in human friendly +format to resource manager logs. +To be efficient, DecommissioningNodesWatcher skip tracking application containers on a particular +node before the node is in DECOMMISSIONING state. It only tracks containers once the node is in +DECOMMISSIONING state. DecommissioningNodesWatcher basically is no cost when no node is +under DECOMMISSIONING. This sacrifices the possibility that an idle node once host containers of a +still running application. + +When decommissioning node TIMEOUT, it will be decommissioned regardless. Proper events will +be send to ensure the node be deactivated and owning tasks will be rescheduled as necessary. + +###Per-Node decommission timeout support + +To support flexible graceful decommission of nodes using different timeout through +single or multiple refreshNodes requests, HostsFileReader now supports optional timeout value +after each hostname (or ip) in the exclude host file. These timeout will be read and associated +with the corresponding nodes. Further, the timeout could be dynamically adjusted while the node +is during DECOMMISSIONING. NodesListManager will detect and update decommission timeout +on individual RMNode as necessary. DecommissioningNodesWatcher evaluates timeout +based on the dynamic decommission timeout on individual RMNode. + +Configuration +-------- + +Property | Value +----- | ------ +yarn.resourcemanager.decommissioning.default.timeout | Timeout in seconds for YARN node graceful decommission. This is the maximal time to wait for running containers and applications to complete before transition a DECOMMISSIONING node into DECOMMISSIONED. The default value is 3600 seconds. Negative value (like -1) is handled as infinite timeout. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnCommands.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnCommands.md index 40704f0..29d5838 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnCommands.md +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnCommands.md @@ -243,7 +243,7 @@ Usage: | COMMAND\_OPTIONS | Description | |:---- |:---- | | -refreshQueues | Reload the queues' acls, states and scheduler specific properties. ResourceManager will reload the mapred-queues configuration file. | -| -refreshNodes | Refresh the hosts information at the ResourceManager. | +| -refreshNodes [-g [timeout in seconds]] | Refresh the hosts information at the ResourceManager. -g option indicates graceful decommission of excluded hosts, in which case, the optional timeout indicates maximal time in seconds ResourceManager should wait before forcefully mark the node as decommissioned. | | -refreshNodesResources | Refresh resources of NodeManagers at the ResourceManager. | | -refreshSuperUserGroupsConfiguration | Refresh superuser proxy groups mappings. | | -refreshUserToGroupsMappings | Refresh user-to-groups mappings. |