diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java index cac43c9..5097eee 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java @@ -19,8 +19,11 @@ package org.apache.hadoop.util; import java.io.*; +import java.util.HashMap; +import java.util.Map; import java.util.Set; import java.util.HashSet; +import java.util.Map.Entry; import org.apache.commons.io.Charsets; import org.apache.commons.logging.LogFactory; @@ -35,7 +38,9 @@ @InterfaceStability.Unstable public class HostsFileReader { private Set includes; - private Set excludes; + // exclude host list with optional timeout. + // If the value is null, it indicates default timeout. + private Map excludes; private String includesFile; private String excludesFile; @@ -44,7 +49,7 @@ public HostsFileReader(String inFile, String exFile) throws IOException { includes = new HashSet(); - excludes = new HashSet(); + excludes = new HashMap(); includesFile = inFile; excludesFile = exFile; refresh(); @@ -54,7 +59,7 @@ public HostsFileReader(String inFile, public HostsFileReader(String includesFile, InputStream inFileInputStream, String excludesFile, InputStream exFileInputStream) throws IOException { includes = new HashSet(); - excludes = new HashSet(); + excludes = new HashMap(); this.includesFile = includesFile; this.excludesFile = excludesFile; refresh(inFileInputStream, exFileInputStream); @@ -71,6 +76,21 @@ public static void readFileToSet(String type, public static void readFileToSetWithFileInputStream(String type, String filename, InputStream fileInputStream, Set set) throws IOException { + Map map = new HashMap(); + readFileToMapWithFileInputStream(type, filename, fileInputStream, map); + set.addAll(map.keySet()); + } + + public static void readFileToMap(String type, + String filename, Map map) throws IOException { + File file = new File(filename); + FileInputStream fis = new FileInputStream(file); + readFileToMapWithFileInputStream(type, filename, fis, map); + } + + public static void readFileToMapWithFileInputStream(String type, + String filename, InputStream fileInputStream, Map map) + throws IOException { BufferedReader reader = null; try { reader = new BufferedReader( @@ -86,13 +106,21 @@ public static void readFileToSetWithFileInputStream(String type, break; } if (!nodes[i].isEmpty()) { - LOG.info("Adding a node \"" + nodes[i] + "\" to the list of " - + type + " hosts from " + filename); - set.add(nodes[i]); + // look ahead for optional timeout values + Integer timeout = null; + if (i < nodes.length - 1) { + timeout = tryParseInteger(nodes[i+1]); + } + map.put(nodes[i], timeout); + // skip the timeout if exist + if (timeout != null) { + i++; + } } } } } + prettyLogMap(type, map, filename); } finally { if (reader != null) { reader.close(); @@ -104,7 +132,7 @@ public static void readFileToSetWithFileInputStream(String type, public synchronized void refresh() throws IOException { LOG.info("Refreshing hosts (include/exclude) list"); Set newIncludes = new HashSet(); - Set newExcludes = new HashSet(); + Map newExcludes = new HashMap(); boolean switchIncludes = false; boolean switchExcludes = false; if (!includesFile.isEmpty()) { @@ -112,7 +140,7 @@ public synchronized void refresh() throws IOException { switchIncludes = true; } if (!excludesFile.isEmpty()) { - readFileToSet("excluded", excludesFile, newExcludes); + readFileToMap("excluded", excludesFile, newExcludes); switchExcludes = true; } @@ -131,7 +159,7 @@ public synchronized void refresh(InputStream inFileInputStream, InputStream exFileInputStream) throws IOException { LOG.info("Refreshing hosts (include/exclude) list"); Set newIncludes = new HashSet(); - Set newExcludes = new HashSet(); + Map newExcludes = new HashMap(); boolean switchIncludes = false; boolean switchExcludes = false; if (inFileInputStream != null) { @@ -140,7 +168,7 @@ public synchronized void refresh(InputStream inFileInputStream, switchIncludes = true; } if (exFileInputStream != null) { - readFileToSetWithFileInputStream("excluded", excludesFile, + readFileToMapWithFileInputStream("excluded", excludesFile, exFileInputStream, newExcludes); switchExcludes = true; } @@ -159,6 +187,10 @@ public synchronized void refresh(InputStream inFileInputStream, } public synchronized Set getExcludedHosts() { + return excludes.keySet(); + } + + public synchronized Map getExcludedHostsWithTimeout() { return excludes; } @@ -177,4 +209,29 @@ public synchronized void updateFileNames(String includesFile, setIncludesFile(includesFile); setExcludesFile(excludesFile); } + + private static Integer tryParseInteger(String str) { + try{ + int num = Integer.parseInt(str); + return num; + } catch (Exception e) { + return null; + } + } + + private static void prettyLogMap( + String type, Map excludes, String filename) { + if (excludes.size() == 0) { + return; + } + StringBuilder sb = new StringBuilder(); + for (Entry n : excludes.entrySet()) { + if (n.getValue() != null) { + sb.append(String.format("%n %s : %d", n.getKey(), n.getValue())); + } else { + sb.append(String.format("%n %s", n.getKey())); + } + } + LOG.info("List of " + type + " hosts from " + filename + sb.toString()); + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java index 3000069..3d02b50 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java @@ -20,14 +20,16 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.FileWriter; +import java.util.Map; import org.apache.hadoop.test.GenericTestUtils; import org.junit.*; + import static org.junit.Assert.*; /* * Test for HostsFileReader.java - * + * */ public class TestHostsFileReader { @@ -262,4 +264,37 @@ public void testHostFileReaderWithTabs() throws Exception { assertFalse(hfp.getExcludedHosts().contains("somehost5")); } + + /* + * Test if timeout values are provided in HostFile + */ + @Test + public void testHostFileReaderWithTimeout() throws Exception { + FileWriter efw = new FileWriter(excludesFile); + FileWriter ifw = new FileWriter(includesFile); + + efw.write("#DFS-Hosts-excluded\n"); + efw.write(" \n"); + efw.write(" somehost 123 \t somehost2 \n somehost4 78"); + efw.write(" somehost3 456 \t # somehost5"); + efw.close(); + + ifw.write("#Hosts-in-DFS\n"); + ifw.write(" \n"); + ifw.write(" somehost \t somehost2 \n somehost4"); + ifw.write(" somehost3 \t # somehost5"); + ifw.close(); + + HostsFileReader hfp = new HostsFileReader(includesFile, excludesFile); + + int includesLen = hfp.getHosts().size(); + int excludesLen = hfp.getExcludedHosts().size(); + assertEquals(4, includesLen); + assertEquals(4, excludesLen); + + Map excludes = hfp.getExcludedHostsWithTimeout(); + assertTrue(excludes.containsKey("somehost")); + assertTrue(excludes.get("somehost") == 123); + assertTrue(excludes.get("somehost2") == null); + } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java index 92d586b..abe83a7 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java @@ -199,6 +199,11 @@ public ResourceUtilization getAggregatedContainersUtilization() { public ResourceUtilization getNodeUtilization() { return null; } + + @Override + public Integer getDecommissioningTimeout() { + return null; + } } public static RMNode newNodeInfo(String rackName, String hostName, diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index 2e9cccb..085656d 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -188,4 +188,9 @@ public ResourceUtilization getAggregatedContainersUtilization() { public ResourceUtilization getNodeUtilization() { return node.getNodeUtilization(); } + + @Override + public Integer getDecommissioningTimeout() { + return null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 8acee57..b4198c7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -652,6 +652,15 @@ public static boolean isAclEnabled(Configuration conf) { */ public static final String RM_PROXY_USER_PREFIX = RM_PREFIX + "proxyuser."; + /** + * Timeout in seconds for YARN node graceful decommission. + * This is the maximal time to wait for running containers and applications + * to complete before transition a DECOMMISSIONING node into DECOMMISSIONED. + */ + public static final String DECOMMISSIONING_DEFAULT_TIMEOUT_KEY = + RM_PREFIX + "decommissioning.default.timeout"; + public static final int DEFAULT_DECOMMISSIONING_TIMEOUT = 3600; + //////////////////////////////// // Node Manager Configs //////////////////////////////// @@ -1409,6 +1418,13 @@ public static boolean isAclEnabled(Configuration conf) { public static final String NM_LOG_AGG_POLICY_CLASS_PARAMETERS = NM_PREFIX + "log-aggregation.policy.parameters"; + /** Time to wait before NodeManager exits on start error. + * This avoids NodeManager restarts and registers to RM too quickly + * after being rejected by RM due to being excluded. + */ + public static final String NM_EXIT_WAIT_MS = NM_PREFIX + "exit-wait.ms"; + public static final long DEFAULT_NM_EXIT_WAIT_MS = 5000; + //////////////////////////////// // Web Proxy Configs //////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshNodesRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshNodesRequest.java index 0333c3b..42c6b79 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshNodesRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshNodesRequest.java @@ -56,4 +56,18 @@ public static RefreshNodesRequest newInstance( * @return decommissionType */ public abstract DecommissionType getDecommissionType(); -} + + /** + * Set the DecommissionTimeout + * + * @param timeout + */ + public abstract void setDecommissionTimeout(Integer timeout); + + /** + * Get the DecommissionTimeout + * + * @return decommissionTimeout + */ + public abstract Integer getDecommissionTimeout(); +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto index eaf658f..b9f30db 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto @@ -37,6 +37,7 @@ message RefreshQueuesResponseProto { message RefreshNodesRequestProto { optional DecommissionTypeProto decommissionType = 1 [default = NORMAL]; + optional int32 decommissionTimeout = 2; } message RefreshNodesResponseProto { } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java index d407c20..8d91ec4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java @@ -303,28 +303,34 @@ private int refreshQueues() throws IOException, YarnException { return 0; } - private int refreshNodes() throws IOException, YarnException { + private int refreshNodes(boolean graceful) throws IOException, YarnException { // Refresh the nodes ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol(); - RefreshNodesRequest request = RefreshNodesRequest - .newInstance(DecommissionType.NORMAL); + RefreshNodesRequest request = RefreshNodesRequest.newInstance( + graceful? DecommissionType.GRACEFUL : DecommissionType.NORMAL); adminProtocol.refreshNodes(request); return 0; } - private int refreshNodes(long timeout) throws IOException, YarnException { + private int refreshNodes(int timeout) throws IOException, YarnException { // Graceful decommissioning with timeout ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol(); RefreshNodesRequest gracefulRequest = RefreshNodesRequest .newInstance(DecommissionType.GRACEFUL); + gracefulRequest.setDecommissionTimeout(timeout); adminProtocol.refreshNodes(gracefulRequest); CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest = recordFactory .newRecordInstance(CheckForDecommissioningNodesRequest.class); long waitingTime; boolean nodesDecommissioning = true; + // Additional seconds to wait before forcefully decommission nodes. + // This is usually not needed since RM enforces timeout automatically. + final int forcefulDelay = 20; // timeout=-1 means wait for all the nodes to be gracefully // decommissioned - for (waitingTime = 0; waitingTime < timeout || timeout == -1; waitingTime++) { + for (waitingTime = 0; + timeout == -1 || (timeout >= 0 && waitingTime < timeout + forcefulDelay); + waitingTime++) { // wait for one second to check nodes decommissioning status try { Thread.sleep(1000); @@ -369,6 +375,10 @@ private int refreshNodesResources() throws IOException, YarnException { return 0; } + private int refreshNodes() throws IOException, YarnException { + return refreshNodes(false); + } + private int refreshUserToGroupsMappings() throws IOException, YarnException { // Refresh the user-to-groups mappings @@ -714,26 +724,12 @@ public int run(String[] args) throws Exception { return exitCode; } } - + try { if ("-refreshQueues".equals(cmd)) { exitCode = refreshQueues(); } else if ("-refreshNodes".equals(cmd)) { - if (args.length == 1) { - exitCode = refreshNodes(); - } else if (args.length == 3) { - // if the graceful timeout specified - if ("-g".equals(args[1])) { - long timeout = validateTimeout(args[2]); - exitCode = refreshNodes(timeout); - } else { - printUsage(cmd, isHAEnabled); - return -1; - } - } else { - printUsage(cmd, isHAEnabled); - return -1; - } + exitCode = handleRefreshNodes(args, isHAEnabled); } else if ("-refreshNodesResources".equals(cmd)) { exitCode = refreshNodesResources(); } else if ("-refreshUserToGroupsMappings".equals(cmd)) { @@ -750,22 +746,7 @@ public int run(String[] args) throws Exception { String[] usernames = Arrays.copyOfRange(args, i, args.length); exitCode = getGroups(usernames); } else if ("-updateNodeResource".equals(cmd)) { - if (args.length < 4 || args.length > 5) { - System.err.println("Number of parameters specified for " + - "updateNodeResource is wrong."); - printUsage(cmd, isHAEnabled); - exitCode = -1; - } else { - String nodeID = args[i++]; - String memSize = args[i++]; - String cores = args[i++]; - int overCommitTimeout = ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT; - if (i == args.length - 1) { - overCommitTimeout = Integer.parseInt(args[i]); - } - exitCode = updateNodeResource(nodeID, Integer.parseInt(memSize), - Integer.parseInt(cores), overCommitTimeout); - } + exitCode = handleUpdateNodeResource(args, cmd, isHAEnabled); } else if ("-addToClusterNodeLabels".equals(cmd)) { if (i >= args.length) { System.err.println(NO_LABEL_ERR_MSG); @@ -825,10 +806,54 @@ public int run(String[] args) throws Exception { return exitCode; } - private long validateTimeout(String strTimeout) { - long timeout; + // A helper method to reduce the number of lines of run() + private int handleRefreshNodes(String[] args, boolean isHAEnabled) + throws IOException, YarnException { + // In example "yarn rmadmin -refreshNodes -graceful 1000", + // args[1] is -graceful + if (args.length == 1) { + return refreshNodes(); + } else if (args.length >= 2) { + // if the graceful timeout specified + if ("-g".equals(args[1]) || "-graceful".equals(args[1])) { + if (args.length == 3) { + int timeout = validateTimeout(args[2]); + return refreshNodes(timeout); + } else { + return refreshNodes(true); + } + } + } + printUsage("-refreshNodes", isHAEnabled); + return -1; + } + + private int handleUpdateNodeResource( + String[] args, String cmd, boolean isHAEnabled) + throws NumberFormatException, IOException, YarnException { + int i = 1; + if (args.length < 4 || args.length > 5) { + System.err.println("Number of parameters specified for " + + "updateNodeResource is wrong."); + printUsage(cmd, isHAEnabled); + return -1; + } else { + String nodeID = args[i++]; + String memSize = args[i++]; + String cores = args[i++]; + int overCommitTimeout = ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT; + if (i == args.length - 1) { + overCommitTimeout = Integer.parseInt(args[i]); + } + return updateNodeResource(nodeID, Integer.parseInt(memSize), + Integer.parseInt(cores), overCommitTimeout); + } + } + + private int validateTimeout(String strTimeout) { + int timeout; try { - timeout = Long.parseLong(strTimeout); + timeout = Integer.parseInt(strTimeout); } catch (NumberFormatException ex) { throw new IllegalArgumentException(INVALID_TIMEOUT_ERR_MSG + strTimeout); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshNodesRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshNodesRequestPBImpl.java index 05f3230..585613e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshNodesRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshNodesRequestPBImpl.java @@ -36,6 +36,7 @@ RefreshNodesRequestProto.Builder builder = null; boolean viaProto = false; private DecommissionType decommissionType; + private Integer decommissionTimeout; public RefreshNodesRequestPBImpl() { builder = RefreshNodesRequestProto.newBuilder(); @@ -65,6 +66,11 @@ private void mergeLocalToBuilder() { if (this.decommissionType != null) { builder.setDecommissionType(convertToProtoFormat(this.decommissionType)); } + if (this.decommissionTimeout != null) { + builder.setDecommissionTimeout(decommissionTimeout); + } else { + builder.clearDecommissionTimeout(); + } } private synchronized void maybeInitBuilder() { @@ -108,6 +114,19 @@ public synchronized DecommissionType getDecommissionType() { return convertFromProtoFormat(p.getDecommissionType()); } + @Override + public synchronized void setDecommissionTimeout(Integer timeout) { + maybeInitBuilder(); + this.decommissionTimeout = timeout; + mergeLocalToBuilder(); + } + + @Override + public synchronized Integer getDecommissionTimeout() { + RefreshNodesRequestProtoOrBuilder p = viaProto ? proto : builder; + return p.hasDecommissionTimeout()? p.getDecommissionTimeout() : null; + } + private DecommissionType convertFromProtoFormat(DecommissionTypeProto p) { return DecommissionType.valueOf(p.name()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 506cf3d..66480dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2431,6 +2431,26 @@ + + Timeout in seconds for YARN node graceful decommission.exit-wait.ms + This is the maximal time to wait for running containers and applications to complete + before transition a DECOMMISSIONING node into DECOMMISSIONED. + + yarn.resourcemanager.decommissioning.default.timeout + 3600 + + + + + Time to wait before NodeManager exits on start error. + This avoids NodeManager restarts and registers to RM too quickly + after being rejected by RM due to being excluded. + + yarn.nodemanager.exit-wait.ms + 5000 + + + The Node Label script to run. Script output Line starting with "NODE_PARTITION:" will be considered as Node Label Partition. In case of multiple lines have this pattern, then last one will be considered diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 7c104d5..c7308c3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -634,6 +634,15 @@ private void initAndStartNodeManager(Configuration conf, boolean hasToReboot) { this.start(); } catch (Throwable t) { LOG.fatal("Error starting NodeManager", t); + long exitWaitMs = conf.getLong(YarnConfiguration.NM_EXIT_WAIT_MS, + YarnConfiguration.DEFAULT_NM_EXIT_WAIT_MS); + LOG.fatal("Exit in " + exitWaitMs + " milliseconds"); + if (exitWaitMs > 0) { + try { + Thread.sleep(exitWaitMs); + } catch (InterruptedException e) { + } + } System.exit(-1); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index f75219b..12bbce8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -440,7 +440,8 @@ public RefreshNodesResponse refreshNodes(RefreshNodesRequest request) rmContext.getNodesListManager().refreshNodes(conf); break; case GRACEFUL: - rmContext.getNodesListManager().refreshNodesGracefully(conf); + rmContext.getNodesListManager().refreshNodesGracefully( + conf, request.getDecommissionTimeout()); break; case FORCEFUL: rmContext.getNodesListManager().refreshNodesForcefully(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java new file mode 100644 index 0000000..36fbd21 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java @@ -0,0 +1,415 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; + +import com.google.common.base.Stopwatch; + +/** + * DecommissioningNodesWatcher is used by ResourceTrackerService to track + * DECOMMISSIONING nodes to decide when, after all running containers on + * the node have completed, will be transitioned into DECOMMISSIONED state + * (NodeManager will be told to shutdown). + * Under MR application, a node, after completes all its containers, + * may still serve it map output data during the duration of the application + * for reducers. A fully graceful mechanism would keep such DECOMMISSIONING + * nodes until all involved applications complete. It could be however + * undesirable under long-running applications scenario where a bunch + * of "idle" nodes would stay around for long period of time. + * + * DecommissioningNodesWatcher balance such concern with a timeout policy --- + * a DECOMMISSIONING node will be DECOMMISSIONED no later than + * DECOMMISSIONING_TIMEOUT regardless of running containers or applications. + * + * To be efficient, DecommissioningNodesWatcher skip tracking application + * containers on a particular node before the node is in DECOMMISSIONING state. + * It only tracks containers once the node is in DECOMMISSIONING state. + * DecommissioningNodesWatcher basically is no cost when no node is + * DECOMMISSIONING. This sacrifices the possibility that that node once + * host containers of an still running application. + */ +public class DecommissioningNodesWatcher { + private static final Log LOG = + LogFactory.getLog(DecommissioningNodesWatcher.class); + + private final RMContext rmContext; + + // Default timeout value in mills. + // Negative value indicates no timeout. 0 means immediate. + private long defaultTimeoutMs = + 1000L * YarnConfiguration.DEFAULT_DECOMMISSIONING_TIMEOUT; + + // Once a RMNode is observed in DECOMMISSIONING state, + // All its ContainerStatus update are tracked inside DecomNodeContext. + class DecommissioningNodeContext { + private final NodeId nodeId; + + // Last known NodeState. + private NodeState nodeState; + + // The moment node is observed in DECOMMISSIONING state. + private final long decommissioningStartTime; + + private long lastContainerFinishTime; + + // number of running containers at the moment. + private int numActiveContainers; + + // All applications run on the node at or after decommissioningStartTime. + private Set appIds; + + // First moment the node is observed in DECOMMISSIONED state. + private long decommissionedTime; + + // Timeout in millis for this decommissioning node. + // This value could be dynamically updated with new value from RMNode. + private long timeoutMs; + + private long lastUpdateTime; + + public DecommissioningNodeContext(NodeId nodeId) { + this.nodeId = nodeId; + this.appIds = new HashSet(); + this.decommissioningStartTime = System.currentTimeMillis(); + this.timeoutMs = defaultTimeoutMs; + } + + void updateTimeout(Integer timeoutSec) { + this.timeoutMs = (timeoutSec == null)? + defaultTimeoutMs : (1000L * timeoutSec); + } + } + + // All DECOMMISSIONING nodes to track. + private HashMap decomNodes = + new HashMap(); + + private Stopwatch pollWatch = new Stopwatch().start(); + + private final Set emptyNodeIdSet = new HashSet(); + + public DecommissioningNodesWatcher(RMContext rmContext) { + this.rmContext = rmContext; + } + + public void init(Configuration conf) { + readDecommissioningTimeout(conf); + } + + /** + * Update rmNode decommissioning status based on NodeStatus. + * @param rmNode + * @param remoteNodeStatus + */ + public synchronized void update(RMNode rmNode, NodeStatus remoteNodeStatus) { + DecommissioningNodeContext context = decomNodes.get(rmNode.getNodeID()); + long now = System.currentTimeMillis(); + if (rmNode.getState() == NodeState.DECOMMISSIONED) { + if (context == null) { + return; + } + context.nodeState = rmNode.getState(); + // keep DECOMMISSIONED node for a while for status log. + if (context.decommissionedTime == 0) { + context.decommissionedTime = now; + } else if (now - context.decommissionedTime > 60000L) { + decomNodes.remove(rmNode.getNodeID()); + LOG.info("remove " + rmNode.getState() + " " + rmNode.getNodeID()); + } + } else if (rmNode.getState() == NodeState.DECOMMISSIONING) { + if (context == null) { + context = new DecommissioningNodeContext(rmNode.getNodeID()); + decomNodes.put(rmNode.getNodeID(), context); + context.nodeState = rmNode.getState(); + context.decommissionedTime = 0; + } + context.updateTimeout(rmNode.getDecommissioningTimeout()); + context.lastUpdateTime = now; + + if (remoteNodeStatus.getKeepAliveApplications() != null) { + context.appIds.addAll(remoteNodeStatus.getKeepAliveApplications()); + } + + // Count number of active containers. + int numActiveContainers = 0; + for (ContainerStatus cs : remoteNodeStatus.getContainersStatuses()) { + ContainerState newState = cs.getState(); + if (newState == ContainerState.RUNNING || + newState == ContainerState.NEW) { + numActiveContainers++; + } + context.numActiveContainers = numActiveContainers; + ApplicationId aid = cs.getContainerId() + .getApplicationAttemptId().getApplicationId(); + if (!context.appIds.contains(aid)) { + context.appIds.add(aid); + } + } + + context.numActiveContainers = numActiveContainers; + + // maintain lastContainerFinishTime. + if (context.numActiveContainers == 0 && + context.lastContainerFinishTime == 0) { + context.lastContainerFinishTime = now; + } + } else { + // remove node in other states + if (context != null) { + decomNodes.remove(rmNode.getNodeID()); + } + } + } + + public synchronized void remove(NodeId nodeId) { + DecommissioningNodeContext context = decomNodes.get(nodeId); + if (context != null) { + LOG.info("remove " + nodeId + " in " + context.nodeState); + decomNodes.remove(nodeId); + } + } + + /** + * Status about a specific decommissioning node. + * + */ + public enum DecomNodeStatus { + // Node is not in DECOMMISSIONING state. + NONE, + + // wait for running containers to complete + WAIT_CONTAINER, + + // wait for running application to complete (after all containers complete); + WAIT_APP, + + // Timeout waiting for either containers or applications to complete. + TIMEOUT, + + // nothing to wait, ready to be decommissioned + READY, + + // The node has already been decommissioned + DECOMMISSIONED, + } + + public boolean checkReadyToBeDecommissioned(NodeId nodeId) { + DecomNodeStatus s = getDecommissioningStatus(nodeId); + return (s == DecomNodeStatus.READY || s == DecomNodeStatus.TIMEOUT); + } + + public DecomNodeStatus getDecommissioningStatus(NodeId nodeId) { + DecommissioningNodeContext context = decomNodes.get(nodeId); + if (context == null) { + return DecomNodeStatus.NONE; + } + + if (context.nodeState == NodeState.DECOMMISSIONED) { + return DecomNodeStatus.DECOMMISSIONED; + } + + long waitTime = + System.currentTimeMillis() - context.decommissioningStartTime; + if (context.numActiveContainers > 0) { + return (context.timeoutMs < 0 || waitTime < context.timeoutMs)? + DecomNodeStatus.WAIT_CONTAINER : DecomNodeStatus.TIMEOUT; + } + + removeCompletedApps(context); + if (context.appIds.size() == 0) { + return DecomNodeStatus.READY; + } else { + return (context.timeoutMs < 0 || waitTime < context.timeoutMs)? + DecomNodeStatus.WAIT_APP : DecomNodeStatus.TIMEOUT; + } + } + + /** + * Periodically invoked by ResourceTrackerService (RTS) to: + * 1. log status of all DECOMMISSIONING nodes; + * 2. identify and return stale DECOMMISSIONING nodes + * (to be taken care of by RTS). + */ + public Set poll() { + if (decomNodes.size() == 0 || + pollWatch.elapsedTime(TimeUnit.SECONDS) < 20) { + return emptyNodeIdSet; + } + return pollInternal(); + } + + private synchronized Set pollInternal() { + pollWatch.reset().start(); + readDecommissioningTimeout(null); + logDecommissioningNodesStatus(); + long now = System.currentTimeMillis(); + Set output = new HashSet(); + + for (Iterator> it = + decomNodes.entrySet().iterator(); it.hasNext();) { + Map.Entry e = it.next(); + DecommissioningNodeContext d = e.getValue(); + // Skip node recently updated (NM usually updates every second). + if (now - d.lastUpdateTime < 30000L) { + continue; + } + // Remove stale non-DECOMMISSIONING node + if (d.nodeState != NodeState.DECOMMISSIONING) { + LOG.info("remove " + d.nodeState + " " + d.nodeId); + it.remove(); + continue; + } else if (now - d.lastUpdateTime > 60000L) { + // Node DECOMMISSIONED could become stale, check RMNode state to remove. + RMNode rmNode = getRmNode(d.nodeId); + if (rmNode != null && rmNode.getState() == NodeState.DECOMMISSIONED) { + LOG.info("remove " + rmNode.getState() + " " + d.nodeId); + it.remove(); + continue; + } + } + if (d.timeoutMs >= 0 && d.decommissioningStartTime + d.timeoutMs < now) { + output.add(d.nodeId); + LOG.info("Identified stale and timeout node " + d.nodeId); + } + } + return output; + } + + private RMNode getRmNode(NodeId nodeId) { + RMNode rmNode = this.rmContext.getRMNodes().get(nodeId); + if (rmNode == null) { + rmNode = this.rmContext.getInactiveRMNodes().get(nodeId); + } + return rmNode; + } + + private void removeCompletedApps(DecommissioningNodeContext context) { + Iterator it = context.appIds.iterator(); + while (it.hasNext()) { + ApplicationId appId = it.next(); + RMApp rmApp = rmContext.getRMApps().get(appId); + if (rmApp == null) { + LOG.info("Consider non-existing app " + appId + " as completed"); + it.remove(); + continue; + } + if (rmApp.getState() == RMAppState.FINISHED || + rmApp.getState() == RMAppState.FAILED || + rmApp.getState() == RMAppState.KILLED) { + LOG.info("Remove " + rmApp.getState() + " app " + appId); + it.remove(); + } + } + } + + // Time in second to be decommissioned. + private int getTimeoutInSec(DecommissioningNodeContext context) { + if (context.nodeState == NodeState.DECOMMISSIONED) { + return 0; + } else if (context.nodeState != NodeState.DECOMMISSIONING) { + return -1; + } + if (context.appIds.size() == 0 && context.numActiveContainers == 0) { + return 0; + } + // negative timeout value means no timeout (infinite timeout). + if (context.timeoutMs < 0) { + return -1; + } + + long now = System.currentTimeMillis(); + long timeout = context.decommissioningStartTime + context.timeoutMs - now; + return Math.max(0, (int)(timeout / 1000)); + } + + private void logDecommissioningNodesStatus() { + if (decomNodes.size() == 0) { + return; + } + StringBuilder sb = new StringBuilder(); + long now = System.currentTimeMillis(); + for (DecommissioningNodeContext d : decomNodes.values()) { + DecomNodeStatus s = getDecommissioningStatus(d.nodeId); + sb.append(String.format( + "%n %-34s %4ds fresh:%3ds containers:%2d %14s", + d.nodeId.getHost(), + (now - d.decommissioningStartTime) / 1000, + (now - d.lastUpdateTime) / 1000, + d.numActiveContainers, + s)); + if (s == DecomNodeStatus.WAIT_APP || + s == DecomNodeStatus.WAIT_CONTAINER) { + sb.append(String.format(" timeout:%4ds", getTimeoutInSec(d))); + } + for (ApplicationId aid : d.appIds) { + sb.append("\n " + aid); + RMApp rmApp = rmContext.getRMApps().get(aid); + if (rmApp != null) { + sb.append(String.format( + " %s %9s %5.2f%% %5ds", + rmApp.getState(), + (rmApp.getApplicationType() == null)? + "" : rmApp.getApplicationType(), + 100.0 * rmApp.getProgress(), + (System.currentTimeMillis() - rmApp.getStartTime()) / 1000)); + } + } + } + LOG.info("Decommissioning Nodes: " + sb.toString()); + } + + // Read possible new DECOMMISSIONING_TIMEOUT_KEY from yarn-site.xml. + // This enables DecommissioningNodesWatcher to pick up new value + // without ResourceManager restart. + private void readDecommissioningTimeout(Configuration conf) { + try { + if (conf == null) { + conf = new YarnConfiguration(); + } + int v = conf.getInt( + YarnConfiguration.DECOMMISSIONING_DEFAULT_TIMEOUT_KEY, + YarnConfiguration.DEFAULT_DECOMMISSIONING_TIMEOUT); + if (defaultTimeoutMs != 1000L * v) { + defaultTimeoutMs = 1000L * v; + LOG.info("Use new decommissioningTimeoutMs: " + defaultTimeoutMs); + } + } catch (Exception e) { + LOG.info("Error readDecommissioningTimeout ", e); + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java index ec2708e..24ae208 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java @@ -19,14 +19,17 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.io.IOException; +import java.util.ArrayList; import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; -import java.util.Map; -import java.util.Iterator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -49,11 +52,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; - -import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; + @SuppressWarnings("unchecked") public class NodesListManager extends CompositeService implements EventHandler { @@ -112,10 +116,11 @@ private void printConfiguredHosts() { if (!LOG.isDebugEnabled()) { return; } - - LOG.debug("hostsReader: in=" + conf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + + LOG.debug("hostsReader: in=" + + conf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH) + " out=" + - conf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, + conf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH)); for (String include : hostsReader.getHosts()) { LOG.debug("include: " + include); @@ -125,30 +130,31 @@ private void printConfiguredHosts() { } } - public void refreshNodes(Configuration yarnConf) throws IOException, - YarnException { - refreshHostsReader(yarnConf); + public void refreshNodes(Configuration yarnConf) + throws IOException, YarnException { + refreshNodes(yarnConf, false); + } - for (NodeId nodeId: rmContext.getRMNodes().keySet()) { - if (!isValidNode(nodeId.getHost())) { - this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION)); - } - } + public void refreshNodes(Configuration yarnConf, boolean graceful) + throws IOException, YarnException { + refreshHostsReader(yarnConf, graceful, null); } - private void refreshHostsReader(Configuration yarnConf) throws IOException, - YarnException { + private void refreshHostsReader( + Configuration yarnConf, boolean graceful, Integer timeout) + throws IOException, YarnException { synchronized (hostsReader) { if (null == yarnConf) { yarnConf = new YarnConfiguration(); } + includesFile = yarnConf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH); excludesFile = yarnConf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH); + LOG.info("refreshNodes excludesFile " + excludesFile); hostsReader.updateFileNames(includesFile, excludesFile); hostsReader.refresh( includesFile.isEmpty() ? null : this.rmContext @@ -156,6 +162,13 @@ private void refreshHostsReader(Configuration yarnConf) throws IOException, this.conf, includesFile), excludesFile.isEmpty() ? null : this.rmContext.getConfigurationProvider() .getConfigurationInputStream(this.conf, excludesFile)); + + LOG.info("hostsReader include:{" + + Joiner.on(',').join(hostsReader.getHosts()) + + "} exclude:{" + + Joiner.on(',').join(hostsReader.getExcludedHosts()) + "}"); + + handleExcludeNodeList(graceful, timeout); printConfiguredHosts(); } } @@ -171,6 +184,77 @@ private void setDecomissionedNMs() { } } + // Handle excluded nodes based on following rules: + // Recommission DECOMMISSIONED or DECOMMISSIONING nodes no longer excluded; + // Gracefully decommission excluded nodes that are not already + // DECOMMISSIONED nor DECOMMISSIONING; Take no action for excluded nodes + // that are already DECOMMISSIONED or DECOMMISSIONING. + private void handleExcludeNodeList(boolean graceful, Integer timeout) { + // DECOMMISSIONED/DECOMMISSIONING nodes need to be re-commissioned. + List nodesToRecom = new ArrayList(); + + // Nodes need to be decommissioned (graceful or forceful); + List nodesToDecom = new ArrayList(); + + Map timeouts = hostsReader.getExcludedHostsWithTimeout(); + + for (RMNode n : this.rmContext.getRMNodes().values()) { + NodeState s = n.getState(); + // An invalid node (either due to explicit exclude or not include) + // should be excluded. + boolean exclude = !isValidNode(n.getHostName()); + if (!exclude) { + if (s == NodeState.DECOMMISSIONED || s == NodeState.DECOMMISSIONING) { + LOG.info("Recommission node " + n.getNodeID() + " with state " + s); + nodesToRecom.add(n); + } + // Otherwise no-action needed. + } else { + // exclude is true. + if (graceful) { + // Use per node timeout if exist otherwise the request timeout. + Integer timeoutToUse = (timeouts.get(n.getHostName()) != null)? + timeouts.get(n.getHostName()) : timeout; + if (s != NodeState.DECOMMISSIONED && + s != NodeState.DECOMMISSIONING) { + LOG.info("Gracefully decommission node " + n.getNodeID() + + " with state " + s); + nodesToDecom.add(n); + } else if (s == NodeState.DECOMMISSIONING && + !Objects.equals(n.getDecommissioningTimeout(), + timeoutToUse)) { + LOG.info("Update timeout on decommissioning node " + + n.getNodeID() + " to be " + timeoutToUse); + nodesToDecom.add(n); + } else { + LOG.info("No action for node " + n.getNodeID() + " with state " + s); + } + } else { + if (s != NodeState.DECOMMISSIONED) { + LOG.info("Forcefully decommission node " + n.getNodeID() + + " with state " + s); + nodesToDecom.add(n); + } + } + } + } + + for (RMNode n : nodesToRecom) { + RMNodeEvent e = new RMNodeEvent( + n.getNodeID(), RMNodeEventType.RECOMMISSION); + this.rmContext.getDispatcher().getEventHandler().handle(e); + } + + for (RMNode n : nodesToDecom) { + RMNodeEventType eventType = graceful? + RMNodeEventType.GRACEFUL_DECOMMISSION : RMNodeEventType.DECOMMISSION; + Integer timeoutToUse = (timeouts.get(n.getHostName()) != null)? + timeouts.get(n.getHostName()) : timeout; + RMNodeEvent e = new RMNodeEvent(n.getNodeID(), eventType, timeoutToUse); + this.rmContext.getDispatcher().getEventHandler().handle(e); + } + } + @VisibleForTesting public Resolver getResolver() { return resolver; @@ -378,25 +462,13 @@ private HostsFileReader createHostsFileReader(String includesFile, * Refresh the nodes gracefully * * @param conf + * @param timeout decommission timeout, null means default timeout. * @throws IOException * @throws YarnException */ - public void refreshNodesGracefully(Configuration conf) throws IOException, - YarnException { - refreshHostsReader(conf); - for (Entry entry:rmContext.getRMNodes().entrySet()) { - NodeId nodeId = entry.getKey(); - if (!isValidNode(nodeId.getHost())) { - this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeEvent(nodeId, RMNodeEventType.GRACEFUL_DECOMMISSION)); - } else { - // Recommissioning the nodes - if (entry.getValue().getState() == NodeState.DECOMMISSIONING) { - this.rmContext.getDispatcher().getEventHandler() - .handle(new RMNodeEvent(nodeId, RMNodeEventType.RECOMMISSION)); - } - } - } + public void refreshNodesGracefully(Configuration conf, Integer timeout) + throws IOException, YarnException { + refreshHostsReader(conf, true, timeout); } /** @@ -519,4 +591,4 @@ public void setHost(String hst) { this.host = hst; } } -} \ No newline at end of file +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java index e19d55e..d130e0b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java @@ -70,11 +70,12 @@ public static List queryRMNodes(RMContext context, EnumSet acceptedStates) { - // nodes contains nodes that are NEW, RUNNING OR UNHEALTHY + // nodes contains nodes that are NEW, RUNNING, UNHEALTHY or DECOMMISSIONING. ArrayList results = new ArrayList(); if (acceptedStates.contains(NodeState.NEW) || acceptedStates.contains(NodeState.RUNNING) || - acceptedStates.contains(NodeState.UNHEALTHY)) { + acceptedStates.contains(NodeState.UNHEALTHY) || + acceptedStates.contains(NodeState.DECOMMISSIONING)) { for (RMNode rmNode : context.getRMNodes().values()) { if (acceptedStates.contains(rmNode.getState())) { results.add(rmNode); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index b0bc565..67324a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -105,6 +105,8 @@ private int minAllocMb; private int minAllocVcores; + private DecommissioningNodesWatcher decomWatcher; + private boolean isDistributedNodeLabelsConf; private boolean isDelegatedCentralizedNodeLabelsConf; private volatile DynamicResourceConfiguration drConf; @@ -120,7 +122,7 @@ public ResourceTrackerService(RMContext rmContext, this.nmLivelinessMonitor = nmLivelinessMonitor; this.containerTokenSecretManager = containerTokenSecretManager; this.nmTokenSecretManager = nmTokenSecretManager; - + this.decomWatcher = new DecommissioningNodesWatcher(rmContext); } @Override @@ -161,6 +163,7 @@ protected void serviceInit(Configuration conf) throws Exception { loadDynamicResourceConfiguration(conf); + decomWatcher.init(conf); super.serviceInit(conf); } @@ -474,6 +477,8 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) // Send ping this.nmLivelinessMonitor.receivedPing(nodeId); + this.decomWatcher.update(rmNode, remoteNodeStatus); + // 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat NodeHeartbeatResponse lastNodeHeartbeatResponse = rmNode.getLastNodeHeartBeatResponse(); if (remoteNodeStatus.getResponseId() + 1 == lastNodeHeartbeatResponse @@ -495,6 +500,20 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) message); } + // Evaluate whether a DECOMMISSIONING node is ready to be DECOMMISSIONED. + if (rmNode.getState() == NodeState.DECOMMISSIONING && + decomWatcher.checkReadyToBeDecommissioned(rmNode.getNodeID())) { + String message = "DECOMMISSIONING " + nodeId + + " is ready to be decommissioned"; + LOG.info(message); + this.rmContext.getDispatcher().getEventHandler().handle( + new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION)); + return YarnServerBuilderUtils.newNodeHeartbeatResponse( + NodeAction.SHUTDOWN, message); + } + + pollDecommissioningNodesWatcher(); + // Heartbeat response NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils .newNodeHeartbeatResponse(lastNodeHeartbeatResponse. @@ -539,6 +558,26 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) return nodeHeartBeatResponse; } + // Poll DecommissioningNodesWatcher to log status and take care of + // DECOMMISSIONING node that didn't update itself + // (already terminated for example). + private void pollDecommissioningNodesWatcher() { + Set nodes = decomWatcher.poll(); + for (NodeId nodeId : nodes) { + RMNode rmNode = this.rmContext.getRMNodes().get(nodeId); + if (rmNode == null || rmNode.getState() != NodeState.DECOMMISSIONING) { + decomWatcher.remove(nodeId); + continue; + } + if (rmNode.getState() == NodeState.DECOMMISSIONING && + decomWatcher.checkReadyToBeDecommissioned(rmNode.getNodeID())) { + LOG.info("DECOMMISSIONING " + nodeId + " timeout"); + this.rmContext.getDispatcher().getEventHandler().handle( + new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION)); + } + } + } + /** * Check if node in decommissioning state. * @param nodeId diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index d8df9f1..1ea7a30 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -168,4 +168,11 @@ public void updateNodeHeartbeatResponseForContainersDecreasing( NodeHeartbeatResponse response); public List pullNewlyIncreasedContainers(); + + /* + * Optional decommissioning timeout in second + * (null indicates default timeout). + * @return the decommissioning timeout in second. + */ + Integer getDecommissioningTimeout(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEvent.java index 9ecb366..f7c492f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEvent.java @@ -24,13 +24,28 @@ public class RMNodeEvent extends AbstractEvent { private final NodeId nodeId; + // Optional decommissioning timeout in second. + private final Integer decommissioningTimeout; public RMNodeEvent(NodeId nodeId, RMNodeEventType type) { super(type); this.nodeId = nodeId; + this.decommissioningTimeout = null; + } + + // Create instance with optional timeout + // (timeout could be null which means use default). + public RMNodeEvent(NodeId nodeId, RMNodeEventType type, Integer timeout) { + super(type); + this.nodeId = nodeId; + this.decommissioningTimeout = timeout; } public NodeId getNodeId() { return this.nodeId; } + + public Integer getDecommissioningTimeout() { + return this.decommissioningTimeout; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 5f8317e..2269dd5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -26,6 +26,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentLinkedQueue; @@ -119,6 +120,7 @@ private String healthReport; private long lastHealthReportTime; private String nodeManagerVersion; + private Integer decommissioningTimeout; /* Aggregated resource utilization for the containers. */ private ResourceUtilization containersUtilization; @@ -170,7 +172,6 @@ NodeState, RMNodeEventType, RMNodeEvent>(NodeState.NEW) - //Transitions from NEW state .addTransition(NodeState.NEW, NodeState.RUNNING, RMNodeEventType.STARTED, new AddNodeTransition()) @@ -232,6 +233,16 @@ RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM, new AddContainersToBeRemovedFromNMTransition()) + .addTransition(NodeState.DECOMMISSIONED, NodeState.RUNNING, + RMNodeEventType.RECOMMISSION, + new RecommissionNodeTransition(NodeState.RUNNING)) + .addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED, + RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition()) + .addTransition(NodeState.DECOMMISSIONED, NodeState.LOST, + RMNodeEventType.EXPIRE, new DeactivateNodeTransition(NodeState.LOST)) + .addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED, + RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition()) + //Transitions from DECOMMISSIONING state .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONED, RMNodeEventType.DECOMMISSION, @@ -256,6 +267,9 @@ .addTransition(NodeState.DECOMMISSIONING, NodeState.REBOOTED, RMNodeEventType.REBOOTING, new DeactivateNodeTransition(NodeState.REBOOTED)) + .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING, + RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM, + new AddContainersToBeRemovedFromNMTransition()) .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING, RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition()) @@ -620,7 +634,7 @@ public void handle(RMNodeEvent event) { } catch (InvalidStateTransitionException e) { LOG.error("Can't handle this event at current state", e); LOG.error("Invalid event " + event.getType() + - " on Node " + this.nodeId); + " on Node " + this.nodeId + " oldState " + oldState); } if (oldState != getState()) { LOG.info(nodeId + " Node Transitioned from " + oldState + " to " @@ -653,6 +667,9 @@ private void updateMetricsForRejoinedNode(NodeState previousNodeState) { case SHUTDOWN: metrics.decrNumShutdownNMs(); break; + case DECOMMISSIONING: + metrics.decrDecommissioningNMs(); + break; default: LOG.debug("Unexpected previous node state"); } @@ -699,6 +716,9 @@ private void updateMetricsForDeactivatedNode(NodeState initialState, case DECOMMISSIONING: metrics.decrDecommissioningNMs(); break; + case DECOMMISSIONED: + metrics.decrDecommisionedNMs(); + break; case UNHEALTHY: metrics.decrNumUnhealthyNMs(); break; @@ -1071,9 +1091,23 @@ public DecommissioningNodeTransition(NodeState initState, @Override public void transition(RMNodeImpl rmNode, RMNodeEvent event) { + // Pick up possible updates on decommissioningTimeout. + if (rmNode.getState() == NodeState.DECOMMISSIONING) { + if (!Objects.equals(rmNode.getDecommissioningTimeout(), + event.getDecommissioningTimeout())) { + LOG.info("Update " + rmNode.getNodeID() + + " DecommissioningTimeout to be " + + event.getDecommissioningTimeout()); + rmNode.decommissioningTimeout = event.getDecommissioningTimeout(); + } else { + LOG.info(rmNode.getNodeID() + " is already DECOMMISSIONING"); + } + return; + } LOG.info("Put Node " + rmNode.nodeId + " in DECOMMISSIONING."); // Update NM metrics during graceful decommissioning. rmNode.updateMetricsForGracefulDecommission(initState, finalState); + rmNode.decommissioningTimeout = event.getDecommissioningTimeout(); if (rmNode.originalTotalCapability == null){ rmNode.originalTotalCapability = Resources.clone(rmNode.totalCapability); @@ -1140,24 +1174,6 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { return NodeState.UNHEALTHY; } } - if (isNodeDecommissioning) { - List runningApps = rmNode.getRunningApps(); - - List keepAliveApps = statusEvent.getKeepAliveAppIds(); - - // no running (and keeping alive) app on this node, get it - // decommissioned. - // TODO may need to check no container is being scheduled on this node - // as well. - if ((runningApps == null || runningApps.size() == 0) - && (keepAliveApps == null || keepAliveApps.size() == 0)) { - RMNodeImpl.deactivateNode(rmNode, NodeState.DECOMMISSIONED); - return NodeState.DECOMMISSIONED; - } - - // TODO (in YARN-3223) if node in decommissioning, get node resource - // updated if container get finished (keep available resource to be 0) - } rmNode.handleContainerStatus(statusEvent.getContainers()); rmNode.handleReportedIncreasedContainers( @@ -1383,4 +1399,9 @@ private void handleLogAggregationStatus( public Resource getOriginalTotalCapability() { return this.originalTotalCapability; } - } + + @Override + public Integer getDecommissioningTimeout() { + return decommissioningTimeout; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java index 3012d0d..1789e09 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java @@ -97,7 +97,7 @@ public ClusterMetricsInfo(final ResourceManager rm) { this.rebootedNodes = clusterMetrics.getNumRebootedNMs(); this.shutdownNodes = clusterMetrics.getNumShutdownNMs(); this.totalNodes = activeNodes + lostNodes + decommissionedNodes - + rebootedNodes + unhealthyNodes + shutdownNodes; + + rebootedNodes + unhealthyNodes + decommissioningNodes + shutdownNodes; } public int getAppsSubmitted() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 89aff29..6b72f19 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -260,6 +260,11 @@ public ResourceUtilization getAggregatedContainersUtilization() { public ResourceUtilization getNodeUtilization() { return this.nodeUtilization; } + + @Override + public Integer getDecommissioningTimeout() { + return null; + } }; private static RMNode buildRMNode(int rack, final Resource perNode, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 25c558f..6ab7064 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -638,17 +638,25 @@ public void sendNodeLost(MockNM nm) throws Exception { node.handle(new RMNodeEvent(nm.getNodeId(), RMNodeEventType.EXPIRE)); } + public void sendNodeEvent(MockNM nm, RMNodeEventType event) throws Exception { + RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get(nm.getNodeId()); + node.handle(new RMNodeEvent(nm.getNodeId(), event)); + } + public void NMwaitForState(NodeId nodeid, NodeState finalState) throws Exception { RMNode node = getRMContext().getRMNodes().get(nodeid); + if (node == null) { + node = getRMContext().getInactiveRMNodes().get(nodeid); + } Assert.assertNotNull("node shouldn't be null", node); int timeoutSecs = 0; while (!finalState.equals(node.getState()) && timeoutSecs++ < 20) { - System.out.println("Node State is : " + node.getState() + System.out.println("Node " + nodeid.getHost() + " State is : " + node.getState() + " Waiting for state : " + finalState); Thread.sleep(500); } - System.out.println("Node State is : " + node.getState()); + System.out.println("Node " + nodeid.getHost() + " State is : " + node.getState()); Assert.assertEquals("Node state is not correct (timedout)", finalState, node.getState()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java new file mode 100644 index 0000000..a554b19 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.resourcemanager.DecommissioningNodesWatcher.DecomNodeStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +public class TestDecommissioningNodesWatcher { + private MockRM rm; + + @Test + public void testDecommissioningNodesWatcher() throws Exception { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.DECOMMISSIONING_DEFAULT_TIMEOUT_KEY, "40"); + + rm = new MockRM(conf); + rm.start(); + + DecommissioningNodesWatcher watcher = new DecommissioningNodesWatcher(rm.getRMContext()); + + MockNM nm1 = rm.registerNode("host1:1234", 10240); + RMNode node1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); + NodeId id1 = nm1.getNodeId(); + + rm.NMwaitForState(id1, NodeState.RUNNING); + Assert.assertFalse(watcher.checkReadyToBeDecommissioned(id1)); + + RMApp app = rm.submitApp(2000); + MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1); + + // Setup nm1 as DECOMMISSIONING for DecommissioningNodesWatcher to track it. + rm.sendNodeEvent(nm1, RMNodeEventType.GRACEFUL_DECOMMISSION); + rm.NMwaitForState(id1, NodeState.DECOMMISSIONING); + + // Update status with decreasing number of running containers until 0. + watcher.update(node1, createNodeStatus(id1, app, 12)); + watcher.update(node1, createNodeStatus(id1, app, 11)); + Assert.assertFalse(watcher.checkReadyToBeDecommissioned(id1)); + + watcher.update(node1, createNodeStatus(id1, app, 1)); + Assert.assertEquals(DecomNodeStatus.WAIT_CONTAINER, + watcher.getDecommissioningStatus(id1)); + + watcher.update(node1, createNodeStatus(id1, app, 0)); + Assert.assertEquals(DecomNodeStatus.WAIT_APP, + watcher.getDecommissioningStatus(id1)); + + // Set the app to be FINISHED and verified DecommissioningNodeStatus is READY. + MockRM.finishAMAndVerifyAppState(app, rm, nm1, am); + rm.waitForState(app.getApplicationId(), RMAppState.FINISHED); + Assert.assertEquals(DecomNodeStatus.READY, + watcher.getDecommissioningStatus(id1)); + } + + @After + public void tearDown() { + if (rm != null) { + rm.stop(); + } + } + + private NodeStatus createNodeStatus(NodeId nodeId, RMApp app, int numRunningContainers) { + return NodeStatus.newInstance( + nodeId, 0, getContainerStatuses(app, numRunningContainers), + new ArrayList(), + NodeHealthStatus.newInstance(true, "", System.currentTimeMillis() - 1000), + null, null, null); + } + + // Get mocked ContainerStatus for bunch of containers, where numRunningContainers are RUNNING. + private List getContainerStatuses(RMApp app, int numRunningContainers) { + // Total 12 containers + final int total = 12; + numRunningContainers = Math.min(total, numRunningContainers); + List output = new ArrayList(); + for (int i = 0; i < total; i++) { + output.add(ContainerStatus.newInstance( + ContainerId.newInstance(ApplicationAttemptId.newInstance(app.getApplicationId(), i), 1), + i >= numRunningContainers? ContainerState.COMPLETE : ContainerState.RUNNING, + "Dummy", 0)); + } + return output; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 6ba360b..24b568c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -255,17 +255,6 @@ public void testStatusUpdateOnDecommissioningNode() { cm.getNumDecommissioningNMs()); Assert.assertEquals("Decommissioned Nodes", initialDecommissioned, cm.getNumDecommisionedNMs()); - - // Verify node in DECOMMISSIONING will be changed by status update - // without running apps - statusEvent = getMockRMNodeStatusEventWithoutRunningApps(); - node.handle(statusEvent); - Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState()); - Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs()); - Assert.assertEquals("Decommissioning Nodes", initialDecommissioning - 1, - cm.getNumDecommissioningNMs()); - Assert.assertEquals("Decommissioned Nodes", initialDecommissioned + 1, - cm.getNumDecommisionedNMs()); } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 4259e6b..327c945 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; @@ -215,6 +216,107 @@ public void testDecommissionWithExcludeHosts() throws Exception { } /** + * Graceful decommission node with no running application. + */ + @Test + public void testGracefulDecommissionNoApp() throws Exception { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile + .getAbsolutePath()); + + writeToHostsFile(""); + rm = new MockRM(conf); + rm.start(); + + MockNM nm1 = rm.registerNode("host1:1234", 5120); + MockNM nm2 = rm.registerNode("host2:5678", 10240); + MockNM nm3 = rm.registerNode("host3:4433", 5120); + + int metricCount = ClusterMetrics.getMetrics().getNumDecommisionedNMs(); + NodeHeartbeatResponse nodeHeartbeat1 = nm1.nodeHeartbeat(true); + NodeHeartbeatResponse nodeHeartbeat2 = nm2.nodeHeartbeat(true); + NodeHeartbeatResponse nodeHeartbeat3 = nm3.nodeHeartbeat(true); + + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat1.getNodeAction())); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat2.getNodeAction())); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat3.getNodeAction())); + + rm.NMwaitForState(nm2.getNodeId(), NodeState.RUNNING); + rm.NMwaitForState(nm3.getNodeId(), NodeState.RUNNING); + + // Graceful decommission both host2 and host3. + writeToHostsFile("host2", "host3"); + rm.getNodesListManager().refreshNodes(conf, true); + + rm.NMwaitForState(nm2.getNodeId(), NodeState.DECOMMISSIONING); + rm.NMwaitForState(nm3.getNodeId(), NodeState.DECOMMISSIONING); + + nodeHeartbeat1 = nm1.nodeHeartbeat(true); + nodeHeartbeat2 = nm2.nodeHeartbeat(true); + nodeHeartbeat3 = nm3.nodeHeartbeat(true); + + checkDecommissionedNMCount(rm, metricCount + 2); + rm.NMwaitForState(nm2.getNodeId(), NodeState.DECOMMISSIONED); + rm.NMwaitForState(nm3.getNodeId(), NodeState.DECOMMISSIONED); + + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat1.getNodeAction())); + Assert.assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat2.getNodeAction()); + Assert.assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat3.getNodeAction()); + } + + /** + * Graceful decommission node with running application. + */ + @Test + public void testGracefulDecommissionWithApp() throws Exception { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile + .getAbsolutePath()); + + writeToHostsFile(""); + rm = new MockRM(conf); + rm.start(); + + MockNM nm1 = rm.registerNode("host1:1234", 10240); + MockNM nm2 = rm.registerNode("host2:5678", 20480); + MockNM nm3 = rm.registerNode("host3:4433", 10240); + NodeId id1 = nm1.getNodeId(); + NodeId id3 = nm3.getNodeId(); + rm.NMwaitForState(id1, NodeState.RUNNING); + rm.NMwaitForState(id3, NodeState.RUNNING); + + // Create an app and launch two containers on host1. + RMApp app = rm.submitApp(2000); + MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1); + ApplicationAttemptId aaid = app.getCurrentAppAttempt().getAppAttemptId(); + + // Graceful decommission host1 and host3 + writeToHostsFile("host1", "host3"); + rm.getNodesListManager().refreshNodes(conf, true); + rm.NMwaitForState(id1, NodeState.DECOMMISSIONING); + rm.NMwaitForState(id3, NodeState.DECOMMISSIONING); + + // host1 should be DECOMMISSIONING due to running containers. + // host3 should become DECOMMISSIONED. + nm1.nodeHeartbeat(aaid, 2, ContainerState.RUNNING); + nm3.nodeHeartbeat(true); + rm.NMwaitForState(id1, NodeState.DECOMMISSIONING); + rm.NMwaitForState(id3, NodeState.DECOMMISSIONED); + nm1.nodeHeartbeat(aaid, 2, ContainerState.RUNNING); + + // Complete containers on host1. Since the app is still RUNNING, expect NodeAction.NORMAL. + NodeHeartbeatResponse nodeHeartbeat1 = nm1.nodeHeartbeat(aaid, 2, ContainerState.COMPLETE); + Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat1.getNodeAction()); + + // Finish the app and verified DECOMMISSIONED. + MockRM.finishAMAndVerifyAppState(app, rm, nm1, am); + rm.waitForState(app.getApplicationId(), RMAppState.FINISHED); + nodeHeartbeat1 = nm1.nodeHeartbeat(aaid, 2, ContainerState.COMPLETE); + Assert.assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat1.getNodeAction()); + rm.NMwaitForState(id1, NodeState.DECOMMISSIONED); + } + + /** * Decommissioning using a post-configured include hosts file */ @Test @@ -1280,7 +1382,7 @@ public void testIncorrectRecommission() throws Exception { excludeHostFile.getAbsolutePath()); writeToHostsFile(hostFile, "host1", "host2"); writeToHostsFile(excludeHostFile, "host1"); - rm.getNodesListManager().refreshNodesGracefully(conf); + rm.getNodesListManager().refreshNodesGracefully(conf, null); rm.drainEvents(); nm1.nodeHeartbeat(true); rm.drainEvents(); @@ -1289,7 +1391,7 @@ public void testIncorrectRecommission() throws Exception { .getInactiveRMNodes().get(nm1.getNodeId()).getState() == NodeState .DECOMMISSIONED); writeToHostsFile(excludeHostFile, ""); - rm.getNodesListManager().refreshNodesGracefully(conf); + rm.getNodesListManager().refreshNodesGracefully(conf, null); rm.drainEvents(); Assert.assertTrue("Node " + nm1.getNodeId().getHost() + " should be Decommissioned", rm.getRMContext()