diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java index cac43c9..198acb2 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java @@ -19,8 +19,11 @@ package org.apache.hadoop.util; import java.io.*; +import java.util.HashMap; +import java.util.Map; import java.util.Set; import java.util.HashSet; +import java.util.Map.Entry; import org.apache.commons.io.Charsets; import org.apache.commons.logging.LogFactory; @@ -35,7 +38,8 @@ @InterfaceStability.Unstable public class HostsFileReader { private Set includes; - private Set excludes; + // exclude host list with optional timeout. If the value is null, it indicates default timeout. + private Map excludes; private String includesFile; private String excludesFile; @@ -44,7 +48,7 @@ public HostsFileReader(String inFile, String exFile) throws IOException { includes = new HashSet(); - excludes = new HashSet(); + excludes = new HashMap(); includesFile = inFile; excludesFile = exFile; refresh(); @@ -54,7 +58,7 @@ public HostsFileReader(String inFile, public HostsFileReader(String includesFile, InputStream inFileInputStream, String excludesFile, InputStream exFileInputStream) throws IOException { includes = new HashSet(); - excludes = new HashSet(); + excludes = new HashMap(); this.includesFile = includesFile; this.excludesFile = excludesFile; refresh(inFileInputStream, exFileInputStream); @@ -71,6 +75,21 @@ public static void readFileToSet(String type, public static void readFileToSetWithFileInputStream(String type, String filename, InputStream fileInputStream, Set set) throws IOException { + Map map = new HashMap(); + readFileToMapWithFileInputStream(type, filename, fileInputStream, map); + set.addAll(map.keySet()); + } + + public static void readFileToMap(String type, + String filename, Map map) throws IOException { + File file = new File(filename); + FileInputStream fis = new FileInputStream(file); + readFileToMapWithFileInputStream(type, filename, fis, map); + } + + public static void readFileToMapWithFileInputStream(String type, + String filename, InputStream fileInputStream, Map map) + throws IOException { BufferedReader reader = null; try { reader = new BufferedReader( @@ -86,13 +105,19 @@ public static void readFileToSetWithFileInputStream(String type, break; } if (!nodes[i].isEmpty()) { - LOG.info("Adding a node \"" + nodes[i] + "\" to the list of " - + type + " hosts from " + filename); - set.add(nodes[i]); + // look ahead for optional timeout values + Integer timeout = null; + if (i < nodes.length - 1) { + timeout = tryParseInteger(nodes[i+1]); + } + map.put(nodes[i], timeout); + // skip the timeout if exist + if (timeout != null) i++; } } } } + prettyLogMap(type, map, filename); } finally { if (reader != null) { reader.close(); @@ -104,7 +129,7 @@ public static void readFileToSetWithFileInputStream(String type, public synchronized void refresh() throws IOException { LOG.info("Refreshing hosts (include/exclude) list"); Set newIncludes = new HashSet(); - Set newExcludes = new HashSet(); + Map newExcludes = new HashMap(); boolean switchIncludes = false; boolean switchExcludes = false; if (!includesFile.isEmpty()) { @@ -112,7 +137,7 @@ public synchronized void refresh() throws IOException { switchIncludes = true; } if (!excludesFile.isEmpty()) { - readFileToSet("excluded", excludesFile, newExcludes); + readFileToMap("excluded", excludesFile, newExcludes); switchExcludes = true; } @@ -131,7 +156,7 @@ public synchronized void refresh(InputStream inFileInputStream, InputStream exFileInputStream) throws IOException { LOG.info("Refreshing hosts (include/exclude) list"); Set newIncludes = new HashSet(); - Set newExcludes = new HashSet(); + Map newExcludes = new HashMap(); boolean switchIncludes = false; boolean switchExcludes = false; if (inFileInputStream != null) { @@ -140,7 +165,7 @@ public synchronized void refresh(InputStream inFileInputStream, switchIncludes = true; } if (exFileInputStream != null) { - readFileToSetWithFileInputStream("excluded", excludesFile, + readFileToMapWithFileInputStream("excluded", excludesFile, exFileInputStream, newExcludes); switchExcludes = true; } @@ -159,6 +184,10 @@ public synchronized void refresh(InputStream inFileInputStream, } public synchronized Set getExcludedHosts() { + return excludes.keySet(); + } + + public synchronized Map getExcludedHostsWithTimeout() { return excludes; } @@ -177,4 +206,26 @@ public synchronized void updateFileNames(String includesFile, setIncludesFile(includesFile); setExcludesFile(excludesFile); } + + private static Integer tryParseInteger(String str) { + try{ + int num = Integer.parseInt(str); + return num; + } catch (Exception e) { + return null; + } + } + + private static void prettyLogMap(String type, Map excludes, String filename) { + if (excludes.size() == 0) return; + StringBuilder sb = new StringBuilder(); + for (Entry n : excludes.entrySet()) { + if (n.getValue() != null) { + sb.append(String.format("%n %s : %d", n.getKey(), n.getValue())); + } else { + sb.append(String.format("%n %s", n.getKey())); + } + } + LOG.info("List of " + type + " hosts from " + filename + sb.toString()); + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java index 7de0be8..7398f3c 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java @@ -20,13 +20,15 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.FileWriter; +import java.util.Map; import org.junit.*; + import static org.junit.Assert.*; /* * Test for HostsFileReader.java - * + * */ public class TestHostsFileReader { @@ -262,4 +264,37 @@ public void testHostFileReaderWithTabs() throws Exception { assertFalse(hfp.getExcludedHosts().contains("somehost5")); } + + /* + * Test if timeout values are provided in HostFile + */ + @Test + public void testHostFileReaderWithTimeout() throws Exception { + FileWriter efw = new FileWriter(excludesFile); + FileWriter ifw = new FileWriter(includesFile); + + efw.write("#DFS-Hosts-excluded\n"); + efw.write(" \n"); + efw.write(" somehost 123 \t somehost2 \n somehost4 78"); + efw.write(" somehost3 456 \t # somehost5"); + efw.close(); + + ifw.write("#Hosts-in-DFS\n"); + ifw.write(" \n"); + ifw.write(" somehost \t somehost2 \n somehost4"); + ifw.write(" somehost3 \t # somehost5"); + ifw.close(); + + HostsFileReader hfp = new HostsFileReader(includesFile, excludesFile); + + int includesLen = hfp.getHosts().size(); + int excludesLen = hfp.getExcludedHosts().size(); + assertEquals(4, includesLen); + assertEquals(4, excludesLen); + + Map excludes = hfp.getExcludedHostsWithTimeout(); + assertTrue(excludes.containsKey("somehost")); + assertTrue(excludes.get("somehost") == 123); + assertTrue(excludes.get("somehost2") == null); + } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java index 92d586b..abe83a7 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java @@ -199,6 +199,11 @@ public ResourceUtilization getAggregatedContainersUtilization() { public ResourceUtilization getNodeUtilization() { return null; } + + @Override + public Integer getDecommissioningTimeout() { + return null; + } } public static RMNode newNodeInfo(String rackName, String hostName, diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index 2e9cccb..085656d 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -188,4 +188,9 @@ public ResourceUtilization getAggregatedContainersUtilization() { public ResourceUtilization getNodeUtilization() { return node.getNodeUtilization(); } + + @Override + public Integer getDecommissioningTimeout() { + return null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 3845987..405e2d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -652,6 +652,15 @@ public static boolean isAclEnabled(Configuration conf) { */ public static final String RM_PROXY_USER_PREFIX = RM_PREFIX + "proxyuser."; + /** + * Timeout in seconds for YARN node graceful decommission. + * This is the maximal time to wait for running containers and applications to complete + * before transition a DECOMMISSIONING node into DECOMMISSIONED. + */ + public static final String DECOMMISSIONING_TIMEOUT_KEY = + RM_PREFIX + "decommissioning.timeout"; + public static int DEFAULT_DECOMMISSIONING_TIMEOUT = 3600; + //////////////////////////////// // Node Manager Configs //////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java index 5c3b1d8..456d065 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.PrintStream; import java.util.ArrayList; +import java.io.OutputStreamWriter; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; @@ -29,6 +30,10 @@ import java.util.Map; import java.util.Set; +import com.google.common.collect.ImmutableMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; @@ -77,7 +82,7 @@ @Private @Unstable public class RMAdminCLI extends HAAdmin { - + static final Log LOG = LogFactory.getLog(RMAdminCLI.class); private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); private boolean directlyAccessNodeLabelStore = false; @@ -303,11 +308,11 @@ private int refreshQueues() throws IOException, YarnException { return 0; } - private int refreshNodes() throws IOException, YarnException { + private int refreshNodes(boolean graceful) throws IOException, YarnException { // Refresh the nodes ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol(); RefreshNodesRequest request = RefreshNodesRequest - .newInstance(DecommissionType.NORMAL); + .newInstance(graceful? DecommissionType.GRACEFUL : DecommissionType.NORMAL); adminProtocol.refreshNodes(request); return 0; } @@ -369,6 +374,10 @@ private int refreshNodesResources() throws IOException, YarnException { return 0; } + private int refreshNodes() throws IOException, YarnException { + return refreshNodes(false); + } + private int refreshUserToGroupsMappings() throws IOException, YarnException { // Refresh the user-to-groups mappings @@ -711,18 +720,31 @@ public int run(String[] args) throws Exception { return exitCode; } } - + + // -refreshNodes may take -graceful arg + if ("-refreshNodes".equals(cmd) && args.length > 3) { + printUsage(cmd, isHAEnabled); + return exitCode; + } + try { if ("-refreshQueues".equals(cmd)) { exitCode = refreshQueues(); } else if ("-refreshNodes".equals(cmd)) { + boolean graceful = false; + // In example "yarn rmadmin -refreshNodes -graceful 1000", args[1] is -graceful if (args.length == 1) { exitCode = refreshNodes(); - } else if (args.length == 3) { + } else if (args.length >= 2) { // if the graceful timeout specified - if ("-g".equals(args[1])) { - long timeout = validateTimeout(args[2]); - exitCode = refreshNodes(timeout); + if ("-g".equals(args[1]) || "-graceful".equals(args[1])) { + graceful = true; + if (args.length == 3) { + long timeout = validateTimeout(args[2]); + exitCode = refreshNodes(timeout); + } else { + exitCode = refreshNodes(graceful); + } } else { printUsage(cmd, isHAEnabled); return -1; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 1d410f1..ed35d6b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2367,6 +2367,16 @@ + + Timeout in seconds for YARN node graceful decommission. + This is the maximal time to wait for running containers and applications to complete + before transition a DECOMMISSIONING node into DECOMMISSIONED. + + yarn.resourcemanager.decommissioning.timeout + 3600 + + + The Node Label script to run. Script output Line starting with "NODE_PARTITION:" will be considered as Node Label Partition. In case of multiple lines have this pattern, then last one will be considered diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index a9a5411..b786dc2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -624,6 +624,12 @@ private void initAndStartNodeManager(Configuration conf, boolean hasToReboot) { this.start(); } catch (Throwable t) { LOG.fatal("Error starting NodeManager", t); + LOG.fatal("Exit in 5 seconds"); + // Delay 5 seconds before exit to not restart too frequently. + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + } System.exit(-1); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java index 5917b99..93eefcf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java @@ -190,4 +190,4 @@ public void addAMRegisterDelay(long delay) { aMRegisterDelay.add(delay); } -} +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java new file mode 100644 index 0000000..b2b3afa --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java @@ -0,0 +1,374 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; + +import com.google.common.base.Stopwatch; + +// DecommissioningNodesWatcher is used by ResourceTrackerService to track DECOMMISSIONING +// nodes to decide when, after all running containers on the node have completed, +// will be transitioned into DECOMMISSIONED state (NodeManager will be told to shutdown). +// Under MR application, a node, after completes all its containers, may still serve +// it map output data during the duration of the application for reducers. +// A fully graceful mechanism would keep such DECOMMISSIONING nodes until all involved +// applications complete. It could be however undesirable under long-running applications +// scenario where a bunch of "idle" nodes would stay around for long period of time. + +// DecommissioningNodesWatcher balance such concern with a timeout policy --- +// a DECOMMISSIONING node will be DECOMMISSIONED no later than DECOMMISSIONING_TIMEOUT +// regardless of running containers or applications. If running containers finished +// earlier, it waits up to MAX_APP_WAIT_TIME for applications to finish, but overall +// honor the DECOMMISSIONING_TIMEOUT. + +// To be efficient, DecommissioningNodesWatcher skip tracking application containers +// on a particular node before the node is in DECOMMISSIONING state. It only tracks +// containers once the node is in DECOMMISSIONING state. DecommissioningNodesWatcher +// basically is no cost when no node is DECOMMISSIONING. This sacrifices the possibility +// that that node once host containers of an still running application. + +public class DecommissioningNodesWatcher { + private static final Log LOG = LogFactory.getLog(DecommissioningNodesWatcher.class); + + private final RMContext rmContext; + + // Default timeout value in mills. + // Negative value indicates no timeout. 0 means immediate. + private long defaultTimeoutMs = 1000L * YarnConfiguration.DEFAULT_DECOMMISSIONING_TIMEOUT; + + // Once a RMNode is observed in DECOMMISSIONING state, + // All its ContainerStatus update are tracked inside DecomNodeContext. + class DecommissioningNodeContext { + public final NodeId nodeId; + + // Last known NodeState. + public NodeState nodeState; + + // The moment node is observed in DECOMMISSIONING state. + public final long decommissioningStartTime; + + public long lastContainerFinishTime; + + // number of running containers at the moment. + public int numActiveContainers; + + // All applications run on the node at or after decommissioningStartTime. + public Set appIds; + + // First moment the node is observed in DECOMMISSIONED state. + public long decommissionedTime; + + // Timeout in millis for this decommissioning node. + // This value could be dynamically updated with new value from RMNode. + public long timeoutMs; + + public long lastUpdateTime; + + public DecommissioningNodeContext(NodeId nodeId) { + this.nodeId = nodeId; + this.appIds = new HashSet(); + this.decommissioningStartTime = System.currentTimeMillis(); + this.timeoutMs = defaultTimeoutMs; + } + + void updateTimeout(Integer timeoutSec) { + this.timeoutMs = (timeoutSec == null)? defaultTimeoutMs : (1000L * timeoutSec); + } + } + + // All DECOMMISSIONING nodes to track. + HashMap decomNodes = + new HashMap(); + + Stopwatch pollWatch = new Stopwatch().start(); + + Set emptyNodeIdSet = new HashSet(); + + public DecommissioningNodesWatcher(RMContext rmContext) { + this.rmContext = rmContext; + } + + public void init(Configuration conf) { + int v1 = conf.getInt(YarnConfiguration.DECOMMISSIONING_TIMEOUT_KEY, + YarnConfiguration.DEFAULT_DECOMMISSIONING_TIMEOUT); + defaultTimeoutMs = 1000L * v1; + LOG.info("decommissioningTimeoutMs: " + defaultTimeoutMs); + } + + public void updateNoThrow(RMNode rmNode, NodeStatus remoteNodeStatus) { + try { + update(rmNode, remoteNodeStatus); + } catch (Exception e) { + LOG.info("updateNoThrow exception " + e.getMessage()); + } + } + + // Update rmNode decommissioning status based on NodeStatus. + public void update(RMNode rmNode, NodeStatus remoteNodeStatus) { + DecommissioningNodeContext context = decomNodes.get(rmNode.getNodeID()); + if (rmNode.getState() == NodeState.DECOMMISSIONED) { + if (context == null) return; + context.nodeState = rmNode.getState(); + // keep DECOMMISSIONED node for a while for status log. + if (context.decommissionedTime == 0) { + context.decommissionedTime = System.currentTimeMillis(); + } else if (System.currentTimeMillis() - context.decommissionedTime > 60000L) { + decomNodes.remove(rmNode.getNodeID()); + LOG.info("remove " + rmNode.getState() + " " + rmNode.getNodeID()); + } + } else if (rmNode.getState() == NodeState.DECOMMISSIONING) { + if (context == null) { + context = new DecommissioningNodeContext(rmNode.getNodeID()); + decomNodes.put(rmNode.getNodeID(), context); + context.nodeState = rmNode.getState(); + context.decommissionedTime = 0; + } + context.updateTimeout(rmNode.getDecommissioningTimeout()); + context.lastUpdateTime = System.currentTimeMillis(); + + if (remoteNodeStatus.getKeepAliveApplications() != null) { + context.appIds.addAll(remoteNodeStatus.getKeepAliveApplications()); + } + + // Count number of active containers. + int numActiveContainers = 0; + for (ContainerStatus cs : remoteNodeStatus.getContainersStatuses()) { + ContainerState newState = cs.getState(); + if (newState == ContainerState.RUNNING || newState == ContainerState.NEW) { + numActiveContainers++; + } + context.numActiveContainers = numActiveContainers; + ApplicationId aid = cs.getContainerId().getApplicationAttemptId().getApplicationId(); + if (!context.appIds.contains(aid)) { + context.appIds.add(aid); + } + } + + context.numActiveContainers = numActiveContainers; + + // maintain lastContainerFinishTime. + if (context.numActiveContainers == 0 && context.lastContainerFinishTime == 0) { + context.lastContainerFinishTime = System.currentTimeMillis(); + } + } else { + // remove node in other states + if (context == null) return; + decomNodes.remove(rmNode.getNodeID()); + } + } + + public void remove(NodeId nodeId) { + DecommissioningNodeContext context = decomNodes.get(nodeId); + if (context != null) { + LOG.info("remove " + nodeId + " in " + context.nodeState); + decomNodes.remove(nodeId); + } + } + + // Status about a specific decommissioning node. + public enum DecommissioningNodeStatus { + // Node is not in DECOMMISSIONING state. + NONE, + + // wait for running containers to complete + WAIT_CONTAINER, + + // wait for running application to complete (after all containers complete); + WAIT_APP, + + // Timeout waiting for either containers or applications to complete. + TIMEOUT, + + // nothing to wait, ready to be decommissioned + READY, + + // The node has already been decommissioned + DECOMMISSIONED, + } + + public boolean checkReadyToBeDecommissioned(NodeId nodeId) { + DecommissioningNodeStatus s = getDecommissioningStatus(nodeId); + return (s == DecommissioningNodeStatus.READY || s == DecommissioningNodeStatus.TIMEOUT); + } + + public DecommissioningNodeStatus getDecommissioningStatus(NodeId nodeId) { + DecommissioningNodeContext context = decomNodes.get(nodeId); + if (context == null) { + return DecommissioningNodeStatus.NONE; + } + + if (context.nodeState == NodeState.DECOMMISSIONED) { + return DecommissioningNodeStatus.DECOMMISSIONED; + } + + long waitTime = System.currentTimeMillis() - context.decommissioningStartTime; + if (context.numActiveContainers > 0) { + return (context.timeoutMs >= 0 && waitTime < context.timeoutMs)? + DecommissioningNodeStatus.WAIT_CONTAINER : DecommissioningNodeStatus.TIMEOUT; + } + + removeCompletedApps(context); + if (context.appIds.size() == 0) { + return DecommissioningNodeStatus.READY; + } else { + return (context.timeoutMs >= 0 && waitTime < context.timeoutMs)? + DecommissioningNodeStatus.WAIT_APP : DecommissioningNodeStatus.TIMEOUT; + } + } + + // Periodically polling from ResourceTrackerService (RMS). + // This method primarily does two things: + // 1. log status of all decommissioning nodes; + // 2. identify and return stale decommissioning nodes to be taken care of by RMS. + public Set poll() { + if (decomNodes.size() == 0) return emptyNodeIdSet; + if (pollWatch.elapsedTime(TimeUnit.SECONDS) < 20) return emptyNodeIdSet; + pollWatch.reset().start(); + readDecommissioningTimeout(); + logDecommissioningNodesStatus(); + long now = System.currentTimeMillis(); + Set output = new HashSet(); + + for (Iterator> it = + decomNodes.entrySet().iterator(); it.hasNext();) { + Map.Entry e = it.next(); + DecommissioningNodeContext d = e.getValue(); + // Skip node updated in last 30 seconds (NodeManager usually updates every second). + if (now - d.lastUpdateTime < 30000L) { + continue; + } + // Remove stale non-DECOMMISSIONING node + if (d.nodeState != NodeState.DECOMMISSIONING) { + LOG.info("remove " + d.nodeState + " " + d.nodeId); + it.remove(); + continue; + } + if (d.decommissioningStartTime + d.timeoutMs < now) { + output.add(d.nodeId); + LOG.info("Identified stale and timeout node " + d.nodeId); + } + } + return output; + } + + private void removeCompletedApps(DecommissioningNodeContext context) { + Iterator it = context.appIds.iterator(); + while (it.hasNext()) { + ApplicationId appId = it.next(); + RMApp rmApp = rmContext.getRMApps().get(appId); + if (rmApp == null) { + LOG.info("Consider non-existing app " + appId + " as completed"); + it.remove(); + continue; + } + if (rmApp.getState() == RMAppState.FINISHED || + rmApp.getState() == RMAppState.FAILED || + rmApp.getState() == RMAppState.KILLED) { + LOG.info("Remove " + rmApp.getState() + " app " + appId); + it.remove(); + } + } + } + + // Time in second to be decommissioned. + private int getTimeoutInSec(DecommissioningNodeContext context) { + if (context.nodeState == NodeState.DECOMMISSIONED) { + return 0; + } else if (context.nodeState != NodeState.DECOMMISSIONING) { + return -1; + } + if (context.appIds.size() == 0 && context.numActiveContainers == 0) { + return 0; + } + // negative timeout value means no timeout. + if (context.timeoutMs < 0) return -1; + + long now = System.currentTimeMillis(); + long timeToWait = context.decommissioningStartTime + context.timeoutMs - now; + return Math.max(0, (int)(timeToWait / 1000)); + } + + private void logDecommissioningNodesStatus() { + if (decomNodes.size() == 0) return; + StringBuilder sb = new StringBuilder(); + long now = System.currentTimeMillis(); + for (DecommissioningNodeContext d : decomNodes.values()) { + DecommissioningNodeStatus s = getDecommissioningStatus(d.nodeId); + sb.append(String.format( + "\n %-34s %4ds fresh:%3ds containers:%2d %14s", + d.nodeId.getHost(), + (now - d.decommissioningStartTime) / 1000, + (now - d.lastUpdateTime) / 1000, + d.numActiveContainers, + s)); + if (s == DecommissioningNodeStatus.WAIT_APP || + s == DecommissioningNodeStatus.WAIT_CONTAINER) { + sb.append(String.format(" timeout:%4ds", getTimeoutInSec(d))); + } + for (ApplicationId aid : d.appIds) { + sb.append("\n " + aid); + RMApp rmApp = rmContext.getRMApps().get(aid); + if (rmApp != null) { + sb.append(String.format( + " %s %9s %5.2f%% %5ds", + rmApp.getState(), + (rmApp.getApplicationType() == null)? "" : rmApp.getApplicationType(), + 100.0 * rmApp.getProgress(), + (System.currentTimeMillis() - rmApp.getStartTime()) / 1000)); + } + } + } + LOG.info("Decommissioning Nodes: " + sb.toString()); + } + + // Read possible new DECOMMISSIONING_TIMEOUT_KEY from yarn-site.xml. + // This enables DecommissioningNodesWatcher to pick up new value without ResourceManager restart. + private void readDecommissioningTimeout() { + try { + Configuration conf = new YarnConfiguration(); + int v = conf.getInt(YarnConfiguration.DECOMMISSIONING_TIMEOUT_KEY, + YarnConfiguration.DEFAULT_DECOMMISSIONING_TIMEOUT); + if (defaultTimeoutMs != 1000L * v) { + defaultTimeoutMs = 1000L * v; + LOG.info("Use new decommissioningTimeoutMs: " + defaultTimeoutMs); + // Here we only read the new value but leave regular update logic to start using it. + } + } catch (Exception e) { + LOG.info("Error readDecommissioningTimeout " + e.getMessage()); + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java index e6251fe..41f9015 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java @@ -19,14 +19,17 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.io.IOException; +import java.util.ArrayList; import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; -import java.util.Map; -import java.util.Iterator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -49,11 +52,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; - -import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; + @SuppressWarnings("unchecked") public class NodesListManager extends CompositeService implements EventHandler { @@ -112,10 +116,10 @@ private void printConfiguredHosts() { if (!LOG.isDebugEnabled()) { return; } - - LOG.debug("hostsReader: in=" + conf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + + LOG.debug("hostsReader: in=" + conf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH) + " out=" + - conf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, + conf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH)); for (String include : hostsReader.getHosts()) { LOG.debug("include: " + include); @@ -125,30 +129,29 @@ private void printConfiguredHosts() { } } - public void refreshNodes(Configuration yarnConf) throws IOException, - YarnException { - refreshHostsReader(yarnConf); + public void refreshNodes(Configuration yarnConf) throws IOException, YarnException { + refreshNodes(yarnConf, false); + } - for (NodeId nodeId: rmContext.getRMNodes().keySet()) { - if (!isValidNode(nodeId.getHost())) { - this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION)); - } - } + public void refreshNodes(Configuration yarnConf, boolean graceful) throws IOException, + YarnException { + refreshHostsReader(yarnConf, graceful); } - private void refreshHostsReader(Configuration yarnConf) throws IOException, + private void refreshHostsReader(Configuration yarnConf, boolean graceful) throws IOException, YarnException { synchronized (hostsReader) { if (null == yarnConf) { yarnConf = new YarnConfiguration(); } + includesFile = yarnConf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH); excludesFile = yarnConf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH); + LOG.info("refreshNodes excludesFile " + excludesFile); hostsReader.updateFileNames(includesFile, excludesFile); hostsReader.refresh( includesFile.isEmpty() ? null : this.rmContext @@ -156,6 +159,11 @@ private void refreshHostsReader(Configuration yarnConf) throws IOException, this.conf, includesFile), excludesFile.isEmpty() ? null : this.rmContext.getConfigurationProvider() .getConfigurationInputStream(this.conf, excludesFile)); + + LOG.info("hostsReader include:{" + Joiner.on(',').join(hostsReader.getHosts()) + + "} exclude:{" + Joiner.on(',').join(hostsReader.getExcludedHosts()) + "}"); + + handleExcludeNodeList(graceful); printConfiguredHosts(); } } @@ -180,6 +188,71 @@ private void setDecomissionedNMs() { } } + // Handle excluded nodes based on following rules: + // Recommission DECOMMISSIONED or DECOMMISSIONING nodes that are no longer excluded; + // Gracefully decommission excluded nodes that are not already in + // DECOMMISSIONED nor DECOMMISSIONING state; Take no action for excluded nodes + // that are already in DECOMMISSIONED or DECOMMISSIONING state. + private void handleExcludeNodeList(boolean graceful) { + // DECOMMISSIONED/DECOMMISSIONING nodes need to be re-commissioned. + List nodesToRecom = new ArrayList(); + + // Nodes need to be gracefully decommissioned (only when graceful is true) + List nodesToDecomGraceful = new ArrayList(); + + // Nodes need to be forcefully decommissioned (only when graceful is false) + List nodesToDecomForceful = new ArrayList(); + + Map timeouts = hostsReader.getExcludedHostsWithTimeout(); + + for (RMNode n : this.rmContext.getRMNodes().values()) { + NodeState s = n.getState(); + // an invalid node (either due to explicit exclude or not include) should be excluded. + boolean exclude = !isValidNode(n.getHostName()); + if (graceful) { + if ((s == NodeState.DECOMMISSIONED || s == NodeState.DECOMMISSIONING) && !exclude) { + LOG.info("Recommission node " + n.getNodeID() + " with state " + s); + nodesToRecom.add(n); + } else if ((s != NodeState.DECOMMISSIONED && s != NodeState.DECOMMISSIONING) && exclude) { + LOG.info("Gracefully decommission node " + n.getNodeID() + " with state " + s); + nodesToDecomGraceful.add(n); + } else if (s == NodeState.DECOMMISSIONING && + exclude && + !Objects.equals(n.getDecommissioningTimeout(), timeouts.get(n.getHostName()))) { + LOG.info("Update timeout on decommissioning node " + + n.getNodeID() + " to be " + timeouts.get(n.getHostName())); + nodesToDecomGraceful.add(n); + } else if (exclude) { + LOG.info("No action for node " + n.getNodeID() + " with state " + s); + } + } else { + if ((s == NodeState.DECOMMISSIONED || s == NodeState.DECOMMISSIONING) && !exclude) { + LOG.info("Recommission node " + n.getNodeID() + " with state " + s); + nodesToRecom.add(n); + } else if ((s != NodeState.DECOMMISSIONED) && exclude) { + LOG.info("Forcefully decommission node " + n.getNodeID() + " with state " + s); + nodesToDecomForceful.add(n); + } + } + } + + for (RMNode n : nodesToRecom) { + RMNodeEvent e = new RMNodeEvent(n.getNodeID(), RMNodeEventType.RECOMMISSION); + this.rmContext.getDispatcher().getEventHandler().handle(e); + } + + for (RMNode n : nodesToDecomGraceful) { + RMNodeEvent e = new RMNodeEvent( + n.getNodeID(), RMNodeEventType.GRACEFUL_DECOMMISSION, timeouts.get(n.getHostName())); + this.rmContext.getDispatcher().getEventHandler().handle(e); + } + + for (RMNode n : nodesToDecomForceful) { + RMNodeEvent e = new RMNodeEvent(n.getNodeID(), RMNodeEventType.DECOMMISSION); + this.rmContext.getDispatcher().getEventHandler().handle(e); + } + } + @VisibleForTesting public Resolver getResolver() { return resolver; @@ -293,7 +366,7 @@ public void run() { } public boolean isValidNode(String hostName) { - String ip = resolver.resolve(hostName); + String ip = NetUtils.normalizeHostName(hostName); synchronized (hostsReader) { Set hostsList = hostsReader.getHosts(); Set excludeList = hostsReader.getExcludedHosts(); @@ -392,21 +465,7 @@ private HostsFileReader createHostsFileReader(String includesFile, */ public void refreshNodesGracefully(Configuration conf) throws IOException, YarnException { - refreshHostsReader(conf); - for (Entry entry:rmContext.getRMNodes().entrySet()) { - NodeId nodeId = entry.getKey(); - if (!isValidNode(nodeId.getHost())) { - this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeEvent(nodeId, RMNodeEventType.GRACEFUL_DECOMMISSION)); - } else { - // Recommissioning the nodes - if (entry.getValue().getState() == NodeState.DECOMMISSIONING - || entry.getValue().getState() == NodeState.DECOMMISSIONED) { - this.rmContext.getDispatcher().getEventHandler() - .handle(new RMNodeEvent(nodeId, RMNodeEventType.RECOMMISSION)); - } - } - } + refreshHostsReader(conf, true); } /** @@ -529,4 +588,4 @@ public void setHost(String hst) { this.host = hst; } } -} \ No newline at end of file +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java index e19d55e..84a0a8d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java @@ -74,7 +74,8 @@ ArrayList results = new ArrayList(); if (acceptedStates.contains(NodeState.NEW) || acceptedStates.contains(NodeState.RUNNING) || - acceptedStates.contains(NodeState.UNHEALTHY)) { + acceptedStates.contains(NodeState.UNHEALTHY) || + acceptedStates.contains(NodeState.DECOMMISSIONING)) { for (RMNode rmNode : context.getRMNodes().values()) { if (acceptedStates.contains(rmNode.getState())) { results.add(rmNode); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 902244b..3d4e42e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -103,6 +103,8 @@ private int minAllocMb; private int minAllocVcores; + DecommissioningNodesWatcher decomWatcher; + private boolean isDistributedNodeLabelsConf; private boolean isDelegatedCentralizedNodeLabelsConf; @@ -117,7 +119,7 @@ public ResourceTrackerService(RMContext rmContext, this.nmLivelinessMonitor = nmLivelinessMonitor; this.containerTokenSecretManager = containerTokenSecretManager; this.nmTokenSecretManager = nmTokenSecretManager; - + this.decomWatcher = new DecommissioningNodesWatcher(rmContext); } @Override @@ -156,6 +158,7 @@ protected void serviceInit(Configuration conf) throws Exception { YarnConfiguration.isDelegatedCentralizedNodeLabelConfiguration(conf); } + decomWatcher.init(conf); super.serviceInit(conf); } @@ -425,6 +428,8 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) // Send ping this.nmLivelinessMonitor.receivedPing(nodeId); + + this.decomWatcher.updateNoThrow(rmNode, remoteNodeStatus); // 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat NodeHeartbeatResponse lastNodeHeartbeatResponse = rmNode.getLastNodeHeartBeatResponse(); @@ -447,6 +452,18 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) message); } + if (rmNode.getState() == NodeState.DECOMMISSIONING && + decomWatcher.checkReadyToBeDecommissioned(rmNode.getNodeID())) { + String message = "DECOMMISSIONING " + nodeId + " is ready to be decommissioned"; + LOG.info(message); + this.rmContext.getDispatcher().getEventHandler().handle( + new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION)); + return YarnServerBuilderUtils.newNodeHeartbeatResponse( + NodeAction.SHUTDOWN, message); + } + + pollDecommissioningNodesWatcher(); + // Heartbeat response NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils .newNodeHeartbeatResponse(lastNodeHeartbeatResponse. @@ -491,6 +508,26 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) return nodeHeartBeatResponse; } + // Poll DecommissioningNodesWatcher to log status and take care of decommissioning + // node that didn't update itself (already terminated for example). + private void pollDecommissioningNodesWatcher() { + Set nodes = decomWatcher.poll(); + if (nodes.size() == 0) return; + for (NodeId nodeId : nodes) { + RMNode rmNode = this.rmContext.getRMNodes().get(nodeId); + if (rmNode == null || rmNode.getState() != NodeState.DECOMMISSIONING) { + decomWatcher.remove(nodeId); + continue; + } + if (rmNode.getState() == NodeState.DECOMMISSIONING && + decomWatcher.checkReadyToBeDecommissioned(rmNode.getNodeID())) { + LOG.info("DECOMMISSIONING " + nodeId + " timeout"); + this.rmContext.getDispatcher().getEventHandler().handle( + new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION)); + } + } + } + /** * Check if node in decommissioning state. * @param nodeId diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index d8df9f1..56bee08 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -168,4 +168,10 @@ public void updateNodeHeartbeatResponseForContainersDecreasing( NodeHeartbeatResponse response); public List pullNewlyIncreasedContainers(); + + /* + * Optional decommissioning timeout in second (null indicates default timeout). + * @return the decommissioning timeout in second. + */ + public Integer getDecommissioningTimeout(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEvent.java index 9ecb366..446d448 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEvent.java @@ -24,13 +24,27 @@ public class RMNodeEvent extends AbstractEvent { private final NodeId nodeId; + // Optional decommissioning timeout in second. + private final Integer decommissioningTimeout; public RMNodeEvent(NodeId nodeId, RMNodeEventType type) { super(type); this.nodeId = nodeId; + this.decommissioningTimeout = null; + } + + // Create instance with optional timeout (timeout could be null which means use default). + public RMNodeEvent(NodeId nodeId, RMNodeEventType type, Integer timeout) { + super(type); + this.nodeId = nodeId; + this.decommissioningTimeout = timeout; } public NodeId getNodeId() { return this.nodeId; } + + public Integer getDecommissioningTimeout() { + return this.decommissioningTimeout; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index f4e483b..b229c1a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -26,6 +26,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentLinkedQueue; @@ -70,6 +71,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.AllocationExpirationInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeDecomSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRecomSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; @@ -81,6 +84,7 @@ import org.apache.hadoop.yarn.state.StateMachineFactory; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Throwables; /** * This class is used to keep track of all the applications/containers @@ -116,6 +120,7 @@ private String healthReport; private long lastHealthReportTime; private String nodeManagerVersion; + private Integer decommissioningTimeout; /* Aggregated resource utilization for the containers. */ private ResourceUtilization containersUtilization; @@ -167,7 +172,6 @@ NodeState, RMNodeEventType, RMNodeEvent>(NodeState.NEW) - //Transitions from NEW state .addTransition(NodeState.NEW, NodeState.RUNNING, RMNodeEventType.STARTED, new AddNodeTransition()) @@ -229,6 +233,21 @@ RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM, new AddContainersToBeRemovedFromNMTransition()) + .addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED, + RMNodeEventType.RECONNECTED, new DecommissionTransition()) + .addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED, + RMNodeEventType.REBOOTING, new DecommissionTransition()) + .addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED, + RMNodeEventType.GRACEFUL_DECOMMISSION, new DecommissionTransition()) + .addTransition(NodeState.DECOMMISSIONED, NodeState.RUNNING, + RMNodeEventType.RECOMMISSION, new RecommissionTransition()) + .addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED, + RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition()) + .addTransition(NodeState.DECOMMISSIONED, NodeState.LOST, + RMNodeEventType.EXPIRE, new DeactivateNodeTransition(NodeState.LOST)) + .addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED, + RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition()) + //Transitions from DECOMMISSIONING state .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONED, RMNodeEventType.DECOMMISSION, @@ -250,6 +269,9 @@ .addTransition(NodeState.DECOMMISSIONING, NodeState.REBOOTED, RMNodeEventType.REBOOTING, new DeactivateNodeTransition(NodeState.REBOOTED)) + .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING, + RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM, + new AddContainersToBeRemovedFromNMTransition()) .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING, RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition()) @@ -614,7 +636,7 @@ public void handle(RMNodeEvent event) { } catch (InvalidStateTransitionException e) { LOG.error("Can't handle this event at current state", e); LOG.error("Invalid event " + event.getType() + - " on Node " + this.nodeId); + " on Node " + this.nodeId + " oldState " + oldState); } if (oldState != getState()) { LOG.info(nodeId + " Node Transitioned from " + oldState + " to " @@ -646,6 +668,8 @@ private void updateMetricsForRejoinedNode(NodeState previousNodeState) { break; case SHUTDOWN: metrics.decrNumShutdownNMs(); + case DECOMMISSIONING: + metrics.decrDecommissioningNMs(); break; default: LOG.debug("Unexpected previous node state"); @@ -684,6 +708,7 @@ private void updateMetricsForGracefulDecommission(NodeState initialState, private void updateMetricsForDeactivatedNode(NodeState initialState, NodeState finalState) { + if (initialState == NodeState.DECOMMISSIONED) return; ClusterMetrics metrics = ClusterMetrics.getMetrics(); switch (initialState) { @@ -765,6 +790,23 @@ private static NodeHealthStatus updateRMNodeFromStatusEvents( return remoteNodeHealthStatus; } + // Update metrics due to a node entering DECOMMISSIONING from initial state. + private static void updateMetricsForNodeIntoDecommissioning(NodeState initialState) { + if (initialState == NodeState.DECOMMISSIONING) return; + + ClusterMetrics metrics = ClusterMetrics.getMetrics(); + metrics.incrDecommissioningNMs(); + + // Only RUNNING or DECOMMISSIONING state can be transitioned into DECOMMISSIONING. + switch (initialState) { + case RUNNING: + metrics.decrNumActiveNodes(); + break; + default: + break; + } + } + public static class AddNodeTransition implements SingleArcTransition { @@ -1114,25 +1156,7 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { return NodeState.UNHEALTHY; } } - if (isNodeDecommissioning) { - List runningApps = rmNode.getRunningApps(); - - List keepAliveApps = statusEvent.getKeepAliveAppIds(); - - // no running (and keeping alive) app on this node, get it - // decommissioned. - // TODO may need to check no container is being scheduled on this node - // as well. - if ((runningApps == null || runningApps.size() == 0) - && (keepAliveApps == null || keepAliveApps.size() == 0)) { - RMNodeImpl.deactivateNode(rmNode, NodeState.DECOMMISSIONED); - return NodeState.DECOMMISSIONED; - } - - // TODO (in YARN-3223) if node in decommissioning, get node resource - // updated if container get finished (keep available resource to be 0) - } - + rmNode.handleContainerStatus(statusEvent.getContainers()); rmNode.handleReportedIncreasedContainers( statusEvent.getNMReportedIncreasedContainers()); @@ -1199,6 +1223,87 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { } } + // Handler of DECOMMISSIONING event that transition node into DECOMMISSIONING state. + public static class DecommissioningTransition implements + SingleArcTransition { + + @Override + public void transition(RMNodeImpl rmNode, RMNodeEvent event) { + NodeState prevState = rmNode.getState(); + if (rmNode.getState() == NodeState.DECOMMISSIONING) { + if (!Objects.equals(rmNode.getDecommissioningTimeout(), + event.getDecommissioningTimeout())) { + LOG.info("Update " + rmNode.getNodeID() + " DecommissioningTimeout to be " + + event.getDecommissioningTimeout()); + rmNode.decommissioningTimeout = event.getDecommissioningTimeout(); + } else { + LOG.info(rmNode.getNodeID() + " is already DECOMMISSIONING"); + } + return; + } + LOG.info("DecommissioningTransition " + rmNode.hostName + + " with timeout " + event.getDecommissioningTimeout()); + rmNode.decommissioningTimeout = event.getDecommissioningTimeout(); + rmNode.context.getDispatcher().getEventHandler().handle( + new NodeDecomSchedulerEvent(rmNode)); + updateMetricsForNodeIntoDecommissioning(prevState); + } + } + + // Handler of DECOMMISSION event that transition node into DECOMMISSIONED state. + public static class DecommissionTransition implements + SingleArcTransition { + + @Override + public void transition(RMNodeImpl rmNode, RMNodeEvent event) { + NodeState prevState = rmNode.getState(); + if (rmNode.getState() == NodeState.DECOMMISSIONED) { + return; + } + LOG.info("DecommissionTransition " + rmNode.hostName); + + rmNode.nodeUpdateQueue.clear(); + + // Inform the scheduler + rmNode.nodeUpdateQueue.clear(); + rmNode.context.getDispatcher().getEventHandler() + .handle(new NodeRemovedSchedulerEvent(rmNode)); + + // Notify NodesListManager to notify all RMApp so that MRAppMaster + // could take care of tasks on the node. + rmNode.context.getDispatcher().getEventHandler().handle( + new NodesListManagerEvent( + NodesListManagerEventType.NODE_UNUSABLE, rmNode)); + + rmNode.updateMetricsForDeactivatedNode(prevState, NodeState.DECOMMISSIONED); + } + } + + // Handler for RECOMMISSION event. + public static class RecommissionTransition implements + SingleArcTransition { + + @Override + public void transition(RMNodeImpl rmNode, RMNodeEvent event) { + NodeState prevState = rmNode.getState(); + if (prevState != NodeState.DECOMMISSIONING && + prevState != NodeState.DECOMMISSIONED) { + LOG.info(rmNode.getNodeID() + " is neither DECOMMISSIONING nor DECOMMISSIONED"); + return; + } + LOG.info("RecommissionTransition " + rmNode.hostName); + rmNode.context.getDispatcher().getEventHandler().handle( + new NodeRecomSchedulerEvent(rmNode)); + + if (rmNode.getState() == NodeState.DECOMMISSIONED) { + rmNode.context.getDispatcher().getEventHandler().handle( + new NodesListManagerEvent( + NodesListManagerEventType.NODE_USABLE, rmNode)); + } + rmNode.updateMetricsForRejoinedNode(prevState); + } + } + @Override public List pullContainerUpdates() { List latestContainerInfoList = @@ -1353,4 +1458,9 @@ private void handleLogAggregationStatus( writeLock.unlock(); } } + + @Override + public Integer getDecommissioningTimeout() { + return decommissioningTimeout; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 03edd40..7ecc3b4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import java.io.File; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -31,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; @@ -44,7 +47,6 @@ import com.google.common.collect.ImmutableSet; - /** * Represents a YARN Cluster Node from the viewpoint of the scheduler. */ @@ -200,6 +202,16 @@ public synchronized void decreaseContainer(ContainerId containerId, * @return available resources on the node */ public synchronized Resource getAvailableResource() { + // resources on DECOMMISSIONING/DECOMMISSIONED nodes are logically not available. + if(rmNode.getState() == NodeState.DECOMMISSIONING || + rmNode.getState() == NodeState.DECOMMISSIONED) { + return Resource.newInstance(0, 0); + } + return this.availableResource; + } + + // AvailableResource regardless of DECOMMISSIONING + public synchronized Resource getAvailableResourceIncludeDecom() { return this.availableResource; } @@ -209,7 +221,7 @@ public synchronized Resource getAvailableResource() { * @return used resources on the node */ public synchronized Resource getUsedResource() { - return this.usedResource; + return this.usedResource; } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index ee3a3f9..9fdcc85 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; @@ -119,6 +120,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeDecomSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRecomSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; @@ -1037,9 +1040,7 @@ private synchronized void nodeUpdate(RMNode nm) { if (LOG.isDebugEnabled()) { LOG.debug("nodeUpdate: " + nm + " clusterResources: " + clusterResource); } - Resource releaseResources = Resource.newInstance(0, 0); - FiCaSchedulerNode node = getNode(nm.getNodeID()); List containerInfoList = nm.pullContainerUpdates(); @@ -1308,10 +1309,22 @@ public void handle(SchedulerEvent event) { nodeAddedEvent.getAddedRMNode()); } break; + case NODE_DECOMMISSIONING: + { + NodeDecomSchedulerEvent nodeDecomEvent = (NodeDecomSchedulerEvent)event; + decomNode(nodeDecomEvent.getRMNode()); + } + break; + case NODE_RECOMMISSION: + { + NodeRecomSchedulerEvent nodeRecomEvent = (NodeRecomSchedulerEvent)event; + recomNode(nodeRecomEvent.getRMNode()); + } + break; case NODE_REMOVED: { NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event; - removeNode(nodeRemovedEvent.getRemovedRMNode()); + removeNode(nodeRemovedEvent.getRemovedRMNode(), nodeRemovedEvent.getPrevNodeState()); } break; case NODE_RESOURCE_UPDATE: @@ -1458,17 +1471,39 @@ private synchronized void addNode(RMNode nodeManager) { } } - private synchronized void removeNode(RMNode nodeInfo) { + private synchronized void decomNode(RMNode nodeManager) { + Resources.subtractFrom(clusterResource, nodeManager.getTotalCapability()); + root.updateClusterResource(clusterResource, new ResourceLimits(clusterResource)); + LOG.info("Decom node " + nodeManager.getNodeAddress() + + " clusterResource: " + clusterResource); + } + + private synchronized void recomNode(RMNode nodeManager) { + Resources.addTo(clusterResource, nodeManager.getTotalCapability()); + root.updateClusterResource(clusterResource, new ResourceLimits(clusterResource)); + LOG.info("Recom node " + nodeManager.getNodeAddress() + + " clusterResource: " + clusterResource); + } + + private synchronized void removeNode(RMNode nodeInfo, NodeState prevState) { // update this node to node label manager if (labelManager != null) { labelManager.deactivateNode(nodeInfo.getNodeID()); } - + FiCaSchedulerNode node = nodes.get(nodeInfo.getNodeID()); if (node == null) { return; } - Resources.subtractFrom(clusterResource, node.getTotalResource()); + // Subtract node resource from cluster resource if not already. + // When the previous state is DECOMMISSIONING or DECOMMISSIONED, + // resource has already been subtracted earlier by decomNode(). + // For example, during the DECOMMISSIONING -> LOST phase of RUNNING -> DECOMMISSIONING -> LOST. + LOG.info("removeNode " + nodeInfo.getNodeAddress() + + " prevState:" + prevState + " state:" + nodeInfo.getState()); + if (prevState != NodeState.DECOMMISSIONING && prevState != NodeState.DECOMMISSIONED) { + Resources.subtractFrom(clusterResource, node.getTotalResource()); + } root.updateClusterResource(clusterResource, new ResourceLimits( clusterResource)); int numNodes = numNodeManagers.decrementAndGet(); @@ -1500,7 +1535,7 @@ private synchronized void removeNode(RMNode nodeInfo) { this.nodes.remove(nodeInfo.getNodeID()); updateMaximumAllocation(node, false); - LOG.info("Removed node " + nodeInfo.getNodeAddress() + + LOG.info("Removed " + nodeInfo.getState() + " node " + nodeInfo.getNodeAddress() + " clusterResource: " + clusterResource); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeDecomSchedulerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeDecomSchedulerEvent.java new file mode 100644 index 0000000..875d6ac --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeDecomSchedulerEvent.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; + +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; + +public class NodeDecomSchedulerEvent extends SchedulerEvent { + + private final RMNode rmNode; + + public NodeDecomSchedulerEvent(RMNode rmNode) { + super(SchedulerEventType.NODE_DECOMMISSIONING); + this.rmNode = rmNode; + } + + public RMNode getRMNode() { + return rmNode; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeRecomSchedulerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeRecomSchedulerEvent.java new file mode 100644 index 0000000..c36a053 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeRecomSchedulerEvent.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; + +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; + +public class NodeRecomSchedulerEvent extends SchedulerEvent { + + private final RMNode rmNode; + + public NodeRecomSchedulerEvent(RMNode rmNode) { + super(SchedulerEventType.NODE_RECOMMISSION); + this.rmNode = rmNode; + } + + public RMNode getRMNode() { + return rmNode; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeRemovedSchedulerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeRemovedSchedulerEvent.java index 5fe541e..13d19e3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeRemovedSchedulerEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeRemovedSchedulerEvent.java @@ -18,19 +18,27 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; public class NodeRemovedSchedulerEvent extends SchedulerEvent { private final RMNode rmNode; + + // Current NodeState (which will become previous node state after state transition). + private final NodeState prevNodeState; public NodeRemovedSchedulerEvent(RMNode rmNode) { super(SchedulerEventType.NODE_REMOVED); this.rmNode = rmNode; + this.prevNodeState = rmNode.getState(); } public RMNode getRemovedRMNode() { return rmNode; } + public NodeState getPrevNodeState() { + return prevNodeState; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java index 9cf09e9..6354e0c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java @@ -42,6 +42,9 @@ KILL_RESERVED_CONTAINER, MARK_CONTAINER_FOR_PREEMPTION, // Mark a container for preemption // in the near future - KILL_PREEMPTED_CONTAINER // Kill a container previously marked for - // preemption + KILL_PREEMPTED_CONTAINER, // Kill a container previously marked for + // preemption + + NODE_DECOMMISSIONING, + NODE_RECOMMISSION } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 7e013e0..cf9aec1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -43,6 +43,8 @@ import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; @@ -86,6 +88,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeDecomSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRecomSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; @@ -900,14 +904,38 @@ private synchronized void addNode(List containerReports, recoverContainersOnNode(containerReports, node); updateRootQueueMetrics(); } + + private synchronized void decomNode(RMNode nodeManager) { + Resources.subtractFrom(clusterResource, nodeManager.getTotalCapability()); + updateRootQueueMetrics(); + LOG.info("Decom node " + nodeManager.getNodeAddress() + + " clusterResource: " + clusterResource); + } + + private synchronized void recomNode(RMNode nodeManager) { + Resources.addTo(clusterResource, nodeManager.getTotalCapability()); + updateRootQueueMetrics(); + LOG.info("Recom node " + nodeManager.getNodeAddress() + + " clusterResource: " + clusterResource); + } - private synchronized void removeNode(RMNode rmNode) { + private synchronized void removeNode(RMNode rmNode, NodeState prevState) { FSSchedulerNode node = getFSSchedulerNode(rmNode.getNodeID()); // This can occur when an UNHEALTHY node reconnects if (node == null) { return; } - Resources.subtractFrom(clusterResource, rmNode.getTotalCapability()); + + // Subtract node resource from cluster resource if not already. + // When the previous state is DECOMMISSIONING or DECOMMISSIONED, + // resource has already been subtracted earlier by decomNode(). + // For example, during the DECOMMISSIONING -> LOST phase of RUNNING -> DECOMMISSIONING -> LOST. + LOG.info("removeNode " + rmNode.getNodeAddress() + + " prevState:" + prevState + " state:" + rmNode.getState()); + if (prevState != NodeState.DECOMMISSIONING && prevState != NodeState.DECOMMISSIONED) { + Resources.subtractFrom(clusterResource, rmNode.getTotalCapability()); + } + updateRootQueueMetrics(); triggerUpdate(); @@ -944,7 +972,7 @@ private synchronized void removeNode(RMNode rmNode) { queueMgr.getRootQueue().setSteadyFairShare(clusterResource); queueMgr.getRootQueue().recomputeSteadyShares(); updateMaximumAllocation(node, false); - LOG.info("Removed node " + rmNode.getNodeAddress() + + LOG.info("Removed " + rmNode.getState() + " node " + rmNode.getNodeAddress() + " cluster capacity: " + clusterResource); } @@ -1223,12 +1251,20 @@ public void handle(SchedulerEvent event) { addNode(nodeAddedEvent.getContainerReports(), nodeAddedEvent.getAddedRMNode()); break; + case NODE_DECOMMISSIONING: + NodeDecomSchedulerEvent nodeDecomEvent = (NodeDecomSchedulerEvent)event; + decomNode(nodeDecomEvent.getRMNode()); + break; + case NODE_RECOMMISSION: + NodeRecomSchedulerEvent nodeRecomEvent = (NodeRecomSchedulerEvent)event; + recomNode(nodeRecomEvent.getRMNode()); + break; case NODE_REMOVED: if (!(event instanceof NodeRemovedSchedulerEvent)) { throw new RuntimeException("Unexpected event type: " + event); } NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event; - removeNode(nodeRemovedEvent.getRemovedRMNode()); + removeNode(nodeRemovedEvent.getRemovedRMNode(), nodeRemovedEvent.getPrevNodeState()); break; case NODE_UPDATE: if (!(event instanceof NodeUpdateSchedulerEvent)) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 5787ba6..f5070e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -87,6 +88,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeDecomSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRecomSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; @@ -792,10 +795,22 @@ public void handle(SchedulerEvent event) { } break; + case NODE_DECOMMISSIONING: + { + NodeDecomSchedulerEvent nodeDecomEvent = (NodeDecomSchedulerEvent)event; + decomNode(nodeDecomEvent.getRMNode()); + } + break; + case NODE_RECOMMISSION: + { + NodeRecomSchedulerEvent nodeRecomEvent = (NodeRecomSchedulerEvent)event; + recomNode(nodeRecomEvent.getRMNode()); + } + break; case NODE_REMOVED: { NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event; - removeNode(nodeRemovedEvent.getRemovedRMNode()); + removeNode(nodeRemovedEvent.getRemovedRMNode(), nodeRemovedEvent.getPrevNodeState()); } break; case NODE_RESOURCE_UPDATE: @@ -912,7 +927,7 @@ protected synchronized void completedContainerInternal( private Resource usedResource = recordFactory.newRecordInstance(Resource.class); - private synchronized void removeNode(RMNode nodeInfo) { + private synchronized void removeNode(RMNode nodeInfo, NodeState prevState) { FiCaSchedulerNode node = getNode(nodeInfo.getNodeID()); if (node == null) { return; @@ -930,8 +945,15 @@ private synchronized void removeNode(RMNode nodeInfo) { this.nodes.remove(nodeInfo.getNodeID()); updateMaximumAllocation(node, false); - // Update cluster metrics - Resources.subtractFrom(clusterResource, node.getTotalResource()); + // Subtract node resource from cluster resource if not already. + // When the previous state is DECOMMISSIONING or DECOMMISSIONED, + // resource has already been subtracted earlier by decomNode(). + // For example, during the DECOMMISSIONING -> LOST phase of RUNNING -> DECOMMISSIONING -> LOST. + LOG.info("removeNode " + nodeInfo.getNodeAddress() + + " prevState:" + prevState + " state:" + nodeInfo.getState()); + if (prevState != NodeState.DECOMMISSIONING && prevState != NodeState.DECOMMISSIONED) { + Resources.subtractFrom(clusterResource, node.getRMNode().getTotalCapability()); + } } @Override @@ -958,6 +980,14 @@ private synchronized void addNode(RMNode nodeManager) { updateMaximumAllocation(schedulerNode, true); } + private synchronized void decomNode(RMNode nodeManager) { + Resources.subtractFrom(clusterResource, nodeManager.getTotalCapability()); + } + + private synchronized void recomNode(RMNode nodeManager) { + Resources.addTo(clusterResource, nodeManager.getTotalCapability()); + } + @Override public void recover(RMState state) { // NOT IMPLEMENTED diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java index 3012d0d..b639ac7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java @@ -97,7 +97,7 @@ public ClusterMetricsInfo(final ResourceManager rm) { this.rebootedNodes = clusterMetrics.getNumRebootedNMs(); this.shutdownNodes = clusterMetrics.getNumShutdownNMs(); this.totalNodes = activeNodes + lostNodes + decommissionedNodes - + rebootedNodes + unhealthyNodes + shutdownNodes; + + rebootedNodes + unhealthyNodes + decommissioningNodes + shutdownNodes; } public int getAppsSubmitted() { @@ -199,5 +199,4 @@ public int getDecommissionedNodes() { public int getShutdownNodes() { return this.shutdownNodes; } - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 89aff29..6b72f19 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -260,6 +260,11 @@ public ResourceUtilization getAggregatedContainersUtilization() { public ResourceUtilization getNodeUtilization() { return this.nodeUtilization; } + + @Override + public Integer getDecommissioningTimeout() { + return null; + } }; private static RMNode buildRMNode(int rack, final Resource perNode, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index d5b64c1..d61e5cc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -635,17 +635,25 @@ public void sendNodeLost(MockNM nm) throws Exception { node.handle(new RMNodeEvent(nm.getNodeId(), RMNodeEventType.EXPIRE)); } + public void sendNodeEvent(MockNM nm, RMNodeEventType event) throws Exception { + RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get(nm.getNodeId()); + node.handle(new RMNodeEvent(nm.getNodeId(), event)); + } + public void NMwaitForState(NodeId nodeid, NodeState finalState) throws Exception { RMNode node = getRMContext().getRMNodes().get(nodeid); + if (node == null) { + node = getRMContext().getInactiveRMNodes().get(nodeid); + } Assert.assertNotNull("node shouldn't be null", node); int timeoutSecs = 0; while (!finalState.equals(node.getState()) && timeoutSecs++ < 20) { - System.out.println("Node State is : " + node.getState() + System.out.println("Node " + nodeid.getHost() + " State is : " + node.getState() + " Waiting for state : " + finalState); Thread.sleep(500); } - System.out.println("Node State is : " + node.getState()); + System.out.println("Node " + nodeid.getHost() + " State is : " + node.getState()); Assert.assertEquals("Node state is not correct (timedout)", finalState, node.getState()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java new file mode 100644 index 0000000..ca1d406 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.resourcemanager.DecommissioningNodesWatcher.DecommissioningNodeStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +public class TestDecommissioningNodesWatcher { + private MockRM rm; + + @Test + public void testDecommissioningNodesWatcher() throws Exception { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.DECOMMISSIONING_TIMEOUT_KEY, "40"); + + rm = new MockRM(conf); + rm.start(); + + DecommissioningNodesWatcher watcher = new DecommissioningNodesWatcher(rm.getRMContext()); + + MockNM nm1 = rm.registerNode("host1:1234", 10240); + RMNode node1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); + NodeId id1 = nm1.getNodeId(); + + rm.NMwaitForState(id1, NodeState.RUNNING); + Assert.assertFalse(watcher.checkReadyToBeDecommissioned(id1)); + + RMApp app = rm.submitApp(2000); + MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1); + + // Setup nm1 as DECOMMISSIONING for DecommissioningNodesWatcher to track it. + rm.sendNodeEvent(nm1, RMNodeEventType.GRACEFUL_DECOMMISSION); + rm.NMwaitForState(id1, NodeState.DECOMMISSIONING); + + // Update status with decreasing number of running containers until 0. + watcher.update(node1, createNodeStatus(id1, app, 12)); + watcher.update(node1, createNodeStatus(id1, app, 11)); + Assert.assertFalse(watcher.checkReadyToBeDecommissioned(id1)); + + watcher.update(node1, createNodeStatus(id1, app, 1)); + Assert.assertEquals(DecommissioningNodeStatus.WAIT_CONTAINER, + watcher.getDecommissioningStatus(id1)); + + watcher.update(node1, createNodeStatus(id1, app, 0)); + Assert.assertEquals(DecommissioningNodeStatus.WAIT_APP, + watcher.getDecommissioningStatus(id1)); + + // Set the app to be FINISHED and verified DecommissioningNodeStatus is READY. + MockRM.finishAMAndVerifyAppState(app, rm, nm1, am); + rm.waitForState(app.getApplicationId(), RMAppState.FINISHED); + Assert.assertEquals(DecommissioningNodeStatus.READY, + watcher.getDecommissioningStatus(id1)); + } + + @After + public void tearDown() { + if (rm != null) { + rm.stop(); + } + } + + private NodeStatus createNodeStatus(NodeId nodeId, RMApp app, int numRunningContainers) { + return NodeStatus.newInstance( + nodeId, 0, getContainerStatuses(app, numRunningContainers), + new ArrayList(), + NodeHealthStatus.newInstance(true, "", System.currentTimeMillis() - 1000), + null, null, null); + } + + // Get mocked ContainerStatus for bunch of containers, where numRunningContainers are RUNNING. + private List getContainerStatuses(RMApp app, int numRunningContainers) { + // Total 12 containers + final int total = 12; + numRunningContainers = Math.min(total, numRunningContainers); + List output = new ArrayList(); + for (int i = 0; i < total; i++) { + output.add(ContainerStatus.newInstance( + ContainerId.newInstance(ApplicationAttemptId.newInstance(app.getApplicationId(), i), 1), + i >= numRunningContainers? ContainerState.COMPLETE : ContainerState.RUNNING, + "Dummy", 0)); + } + return output; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 78aa139..7cea9eb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -255,17 +255,6 @@ public void testStatusUpdateOnDecommissioningNode() { cm.getNumDecommissioningNMs()); Assert.assertEquals("Decommissioned Nodes", initialDecommissioned, cm.getNumDecommisionedNMs()); - - // Verify node in DECOMMISSIONING will be changed by status update - // without running apps - statusEvent = getMockRMNodeStatusEventWithoutRunningApps(); - node.handle(statusEvent); - Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState()); - Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs()); - Assert.assertEquals("Decommissioning Nodes", initialDecommissioning - 1, - cm.getNumDecommissioningNMs()); - Assert.assertEquals("Decommissioned Nodes", initialDecommissioned + 1, - cm.getNumDecommisionedNMs()); } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index e0fd9ab..97d36e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; @@ -217,6 +218,107 @@ public void testDecommissionWithExcludeHosts() throws Exception { } /** + * Graceful decommission node with no running application. + */ + @Test + public void testGracefulDecommissionNoApp() throws Exception { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile + .getAbsolutePath()); + + writeToHostsFile(""); + rm = new MockRM(conf); + rm.start(); + + MockNM nm1 = rm.registerNode("host1:1234", 5120); + MockNM nm2 = rm.registerNode("host2:5678", 10240); + MockNM nm3 = rm.registerNode("host3:4433", 5120); + + int metricCount = ClusterMetrics.getMetrics().getNumDecommisionedNMs(); + NodeHeartbeatResponse nodeHeartbeat1 = nm1.nodeHeartbeat(true); + NodeHeartbeatResponse nodeHeartbeat2 = nm2.nodeHeartbeat(true); + NodeHeartbeatResponse nodeHeartbeat3 = nm3.nodeHeartbeat(true); + + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat1.getNodeAction())); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat2.getNodeAction())); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat3.getNodeAction())); + + rm.NMwaitForState(nm2.getNodeId(), NodeState.RUNNING); + rm.NMwaitForState(nm3.getNodeId(), NodeState.RUNNING); + + // Graceful decommission both host2 and host3. + writeToHostsFile("host2", "host3"); + rm.getNodesListManager().refreshNodes(conf, true); + + rm.NMwaitForState(nm2.getNodeId(), NodeState.DECOMMISSIONING); + rm.NMwaitForState(nm3.getNodeId(), NodeState.DECOMMISSIONING); + + nodeHeartbeat1 = nm1.nodeHeartbeat(true); + nodeHeartbeat2 = nm2.nodeHeartbeat(true); + nodeHeartbeat3 = nm3.nodeHeartbeat(true); + + checkDecommissionedNMCount(rm, metricCount + 2); + rm.NMwaitForState(nm2.getNodeId(), NodeState.DECOMMISSIONED); + rm.NMwaitForState(nm3.getNodeId(), NodeState.DECOMMISSIONED); + + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat1.getNodeAction())); + Assert.assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat2.getNodeAction()); + Assert.assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat3.getNodeAction()); + } + + /** + * Graceful decommission node with running application. + */ + @Test + public void testGracefulDecommissionWithApp() throws Exception { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile + .getAbsolutePath()); + + writeToHostsFile(""); + rm = new MockRM(conf); + rm.start(); + + MockNM nm1 = rm.registerNode("host1:1234", 10240); + MockNM nm2 = rm.registerNode("host2:5678", 20480); + MockNM nm3 = rm.registerNode("host3:4433", 10240); + NodeId id1 = nm1.getNodeId(); + NodeId id3 = nm3.getNodeId(); + rm.NMwaitForState(id1, NodeState.RUNNING); + rm.NMwaitForState(id3, NodeState.RUNNING); + + // Create an app and launch two containers on host1. + RMApp app = rm.submitApp(2000); + MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1); + ApplicationAttemptId aaid = app.getCurrentAppAttempt().getAppAttemptId(); + + // Graceful decommission host1 and host3 + writeToHostsFile("host1", "host3"); + rm.getNodesListManager().refreshNodes(conf, true); + rm.NMwaitForState(id1, NodeState.DECOMMISSIONING); + rm.NMwaitForState(id3, NodeState.DECOMMISSIONING); + + // host1 should be DECOMMISSIONING due to running containers. + // host3 should become DECOMMISSIONED. + nm1.nodeHeartbeat(aaid, 2, ContainerState.RUNNING); + nm3.nodeHeartbeat(true); + rm.NMwaitForState(id1, NodeState.DECOMMISSIONING); + rm.NMwaitForState(id3, NodeState.DECOMMISSIONED); + nm1.nodeHeartbeat(aaid, 2, ContainerState.RUNNING); + + // Complete containers on host1. Since the app is still RUNNING, expect NodeAction.NORMAL. + NodeHeartbeatResponse nodeHeartbeat1 = nm1.nodeHeartbeat(aaid, 2, ContainerState.COMPLETE); + Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat1.getNodeAction()); + + // Finish the app and verified DECOMMISSIONED. + MockRM.finishAMAndVerifyAppState(app, rm, nm1, am); + rm.waitForState(app.getApplicationId(), RMAppState.FINISHED); + nodeHeartbeat1 = nm1.nodeHeartbeat(aaid, 2, ContainerState.COMPLETE); + Assert.assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat1.getNodeAction()); + rm.NMwaitForState(id1, NodeState.DECOMMISSIONED); + } + + /** * Decommissioning using a post-configured include hosts file */ @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index e139df6..bc8e61c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -874,9 +874,7 @@ public void testResourceOverCommit() throws Exception { as.updateNodeResource(request); // Now, the used resource is still 4 GB, and available resource is minus value. - report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); - Assert.assertEquals(4 * GB, report_nm1.getUsedResource().getMemory()); - Assert.assertEquals(-2 * GB, report_nm1.getAvailableResource().getMemory()); + waitForNodeResource(rm, nm1, 4 * GB, -2 * GB); // Check container can complete successfully in case of resource over-commitment. ContainerStatus containerStatus = BuilderUtils.newContainerStatus( @@ -913,7 +911,24 @@ public void testResourceOverCommit() throws Exception { 0, alloc1Response.getAllocatedContainers().size()); rm.stop(); } - + + // Waits briefly for expected node resource due to asynchronous node resource update. + void waitForNodeResource(MockRM rm, MockNM nm, int usedMemory, int availMemory) + throws InterruptedException { + SchedulerNodeReport report_nm = rm.getResourceScheduler().getNodeReport(nm.getNodeId()); + int retry = 0; + while (++retry < 10) { + if (usedMemory == report_nm.getUsedResource().getMemory() && + availMemory == report_nm.getAvailableResource().getMemory()) { + break; + } + Thread.sleep(100); + report_nm = rm.getResourceScheduler().getNodeReport(nm.getNodeId()); + } + Assert.assertEquals(usedMemory, report_nm.getUsedResource().getMemory()); + Assert.assertEquals(availMemory, report_nm.getAvailableResource().getMemory()); + } + @Test public void testGetAppsInQueue() throws Exception { Application application_0 = new Application("user_0", "a1", resourceManager);