diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeState.java index 2700cf296b9..3531b9b91ba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeState.java @@ -49,7 +49,10 @@ DECOMMISSIONING, /** Node has shutdown gracefully. */ - SHUTDOWN; + SHUTDOWN, + + /** Node decommissioning state recovered **/ + DECOMMISSIONING_RECOVERED; public boolean isUnusable() { return (this == UNHEALTHY || this == DECOMMISSIONED diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 4573859d384..1dc872c665c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -332,6 +332,7 @@ enum NodeStateProto { NS_REBOOTED = 6; NS_DECOMMISSIONING = 7; NS_SHUTDOWN = 8; + NS_DECOMMISSIONING_RECOVERED = 9; } message NodeIdProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java index 474ae788235..c6b6fa73eea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java @@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil; +import org.apache.hadoop.yarn.server.resourcemanager.decomissioning.DecommissioningNodesWatcher; import org.junit.Assert; import org.apache.hadoop.conf.Configuration; @@ -727,7 +728,8 @@ public CustomedResourceTrackerService(RMContext rmContext, RMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInRM nmTokenSecretManager) { super(rmContext, nodesListManager, nmLivelinessMonitor, - containerTokenSecretManager, nmTokenSecretManager); + containerTokenSecretManager, nmTokenSecretManager, + new DecommissioningNodesWatcher(rmContext)); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java index 17d570f30c3..3b6dbdbf6c9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java @@ -2361,7 +2361,7 @@ private String createNodeCLIHelpMessage() throws IOException { pw.println(" comma-separated list of node states. The valid node"); pw.println(" state can be one of the following:"); pw.println(" NEW,RUNNING,UNHEALTHY,DECOMMISSIONED,LOST,REBOOTED,DEC"); - pw.println(" OMMISSIONING,SHUTDOWN."); + pw.println(" OMMISSIONING,SHUTDOWN,DECOMMISSIONING_RECOVERED."); pw.println(" -status Prints the status report of the node."); pw.close(); String nodesHelpStr = baos.toString("UTF-8"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java deleted file mode 100644 index ca3eb798414..00000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java +++ /dev/null @@ -1,413 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements.See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership.The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License.You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.resourcemanager; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; -import java.util.Timer; -import java.util.TimerTask; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ContainerState; -import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.NodeState; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.api.records.NodeStatus; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; -import org.apache.hadoop.yarn.util.MonotonicClock; - -/** - * DecommissioningNodesWatcher is used by ResourceTrackerService to track - * DECOMMISSIONING nodes to decide when, after all running containers on - * the node have completed, will be transitioned into DECOMMISSIONED state - * (NodeManager will be told to shutdown). - * Under MR application, a node, after completes all its containers, - * may still serve it map output data during the duration of the application - * for reducers. A fully graceful mechanism would keep such DECOMMISSIONING - * nodes until all involved applications complete. It could be however - * undesirable under long-running applications scenario where a bunch - * of "idle" nodes would stay around for long period of time. - * - * DecommissioningNodesWatcher balance such concern with a timeout policy --- - * a DECOMMISSIONING node will be DECOMMISSIONED no later than - * DECOMMISSIONING_TIMEOUT regardless of running containers or applications. - * - * To be efficient, DecommissioningNodesWatcher skip tracking application - * containers on a particular node before the node is in DECOMMISSIONING state. - * It only tracks containers once the node is in DECOMMISSIONING state. - * DecommissioningNodesWatcher basically is no cost when no node is - * DECOMMISSIONING. This sacrifices the possibility that the node once - * host containers of an application that is still running - * (the affected map tasks will be rescheduled). - */ -public class DecommissioningNodesWatcher { - private static final Log LOG = - LogFactory.getLog(DecommissioningNodesWatcher.class); - - private final RMContext rmContext; - - // Once a RMNode is observed in DECOMMISSIONING state, - // All its ContainerStatus update are tracked inside DecomNodeContext. - class DecommissioningNodeContext { - private final NodeId nodeId; - - // Last known NodeState. - private NodeState nodeState; - - // The moment node is observed in DECOMMISSIONING state. - private final long decommissioningStartTime; - - private long lastContainerFinishTime; - - // number of running containers at the moment. - private int numActiveContainers; - - // All applications run on the node at or after decommissioningStartTime. - private Set appIds; - - // First moment the node is observed in DECOMMISSIONED state. - private long decommissionedTime; - - // Timeout in millis for this decommissioning node. - // This value could be dynamically updated with new value from RMNode. - private long timeoutMs; - - private long lastUpdateTime; - - public DecommissioningNodeContext(NodeId nodeId, int timeoutSec) { - this.nodeId = nodeId; - this.appIds = new HashSet(); - this.decommissioningStartTime = mclock.getTime(); - this.timeoutMs = 1000L * timeoutSec; - } - - void updateTimeout(int timeoutSec) { - this.timeoutMs = 1000L * timeoutSec; - } - } - - // All DECOMMISSIONING nodes to track. - private HashMap decomNodes = - new HashMap(); - - private Timer pollTimer; - private MonotonicClock mclock; - - public DecommissioningNodesWatcher(RMContext rmContext) { - this.rmContext = rmContext; - pollTimer = new Timer(true); - mclock = new MonotonicClock(); - } - - public void init(Configuration conf) { - int v = conf.getInt( - YarnConfiguration.RM_DECOMMISSIONING_NODES_WATCHER_POLL_INTERVAL, - YarnConfiguration - .DEFAULT_RM_DECOMMISSIONING_NODES_WATCHER_POLL_INTERVAL); - pollTimer.schedule(new PollTimerTask(rmContext), 0, (1000L * v)); - } - - /** - * Update rmNode decommissioning status based on NodeStatus. - * @param rmNode The node - * @param remoteNodeStatus latest NodeStatus - */ - public synchronized void update(RMNode rmNode, NodeStatus remoteNodeStatus) { - DecommissioningNodeContext context = decomNodes.get(rmNode.getNodeID()); - long now = mclock.getTime(); - if (rmNode.getState() == NodeState.DECOMMISSIONED) { - if (context == null) { - return; - } - context.nodeState = rmNode.getState(); - // keep DECOMMISSIONED node for a while for status log, so that such - // host will appear as DECOMMISSIONED instead of quietly disappears. - if (context.decommissionedTime == 0) { - context.decommissionedTime = now; - } else if (now - context.decommissionedTime > 60000L) { - decomNodes.remove(rmNode.getNodeID()); - } - } else if (rmNode.getState() == NodeState.DECOMMISSIONING) { - if (context == null) { - context = new DecommissioningNodeContext(rmNode.getNodeID(), - rmNode.getDecommissioningTimeout()); - decomNodes.put(rmNode.getNodeID(), context); - context.nodeState = rmNode.getState(); - context.decommissionedTime = 0; - } - context.updateTimeout(rmNode.getDecommissioningTimeout()); - context.lastUpdateTime = now; - - if (remoteNodeStatus.getKeepAliveApplications() != null) { - context.appIds.addAll(remoteNodeStatus.getKeepAliveApplications()); - } - - // Count number of active containers. - int numActiveContainers = 0; - for (ContainerStatus cs : remoteNodeStatus.getContainersStatuses()) { - ContainerState newState = cs.getState(); - if (newState == ContainerState.RUNNING || - newState == ContainerState.NEW) { - numActiveContainers++; - } - context.numActiveContainers = numActiveContainers; - ApplicationId aid = cs.getContainerId() - .getApplicationAttemptId().getApplicationId(); - if (!context.appIds.contains(aid)) { - context.appIds.add(aid); - } - } - - context.numActiveContainers = numActiveContainers; - - // maintain lastContainerFinishTime. - if (context.numActiveContainers == 0 && - context.lastContainerFinishTime == 0) { - context.lastContainerFinishTime = now; - } - } else { - // remove node in other states - if (context != null) { - decomNodes.remove(rmNode.getNodeID()); - } - } - } - - public synchronized void remove(NodeId nodeId) { - DecommissioningNodeContext context = decomNodes.get(nodeId); - if (context != null) { - LOG.info("remove " + nodeId + " in " + context.nodeState); - decomNodes.remove(nodeId); - } - } - - /** - * Status about a specific decommissioning node. - * - */ - public enum DecommissioningNodeStatus { - // Node is not in DECOMMISSIONING state. - NONE, - - // wait for running containers to complete - WAIT_CONTAINER, - - // wait for running application to complete (after all containers complete); - WAIT_APP, - - // Timeout waiting for either containers or applications to complete. - TIMEOUT, - - // nothing to wait, ready to be decommissioned - READY, - - // The node has already been decommissioned - DECOMMISSIONED, - } - - public boolean checkReadyToBeDecommissioned(NodeId nodeId) { - DecommissioningNodeStatus s = checkDecommissioningStatus(nodeId); - return (s == DecommissioningNodeStatus.READY || - s == DecommissioningNodeStatus.TIMEOUT); - } - - public DecommissioningNodeStatus checkDecommissioningStatus(NodeId nodeId) { - DecommissioningNodeContext context = decomNodes.get(nodeId); - if (context == null) { - return DecommissioningNodeStatus.NONE; - } - - if (context.nodeState == NodeState.DECOMMISSIONED) { - return DecommissioningNodeStatus.DECOMMISSIONED; - } - - long waitTime = mclock.getTime() - context.decommissioningStartTime; - if (context.numActiveContainers > 0) { - return (context.timeoutMs < 0 || waitTime < context.timeoutMs)? - DecommissioningNodeStatus.WAIT_CONTAINER : - DecommissioningNodeStatus.TIMEOUT; - } - - removeCompletedApps(context); - if (context.appIds.size() == 0) { - return DecommissioningNodeStatus.READY; - } else { - return (context.timeoutMs < 0 || waitTime < context.timeoutMs)? - DecommissioningNodeStatus.WAIT_APP : - DecommissioningNodeStatus.TIMEOUT; - } - } - - /** - * PollTimerTask periodically: - * 1. log status of all DECOMMISSIONING nodes; - * 2. identify and taken care of stale DECOMMISSIONING nodes - * (for example, node already terminated). - */ - class PollTimerTask extends TimerTask { - private final RMContext rmContext; - - public PollTimerTask(RMContext rmContext) { - this.rmContext = rmContext; - } - - public void run() { - logDecommissioningNodesStatus(); - long now = mclock.getTime(); - Set staleNodes = new HashSet(); - - for (Iterator> it = - decomNodes.entrySet().iterator(); it.hasNext();) { - Map.Entry e = it.next(); - DecommissioningNodeContext d = e.getValue(); - // Skip node recently updated (NM usually updates every second). - if (now - d.lastUpdateTime < 5000L) { - continue; - } - // Remove stale non-DECOMMISSIONING node - if (d.nodeState != NodeState.DECOMMISSIONING) { - LOG.debug("remove " + d.nodeState + " " + d.nodeId); - it.remove(); - continue; - } else if (now - d.lastUpdateTime > 60000L) { - // Node DECOMMISSIONED could become stale, remove as necessary. - RMNode rmNode = getRmNode(d.nodeId); - if (rmNode != null && - rmNode.getState() == NodeState.DECOMMISSIONED) { - LOG.debug("remove " + rmNode.getState() + " " + d.nodeId); - it.remove(); - continue; - } - } - if (d.timeoutMs >= 0 && - d.decommissioningStartTime + d.timeoutMs < now) { - staleNodes.add(d.nodeId); - LOG.debug("Identified stale and timeout node " + d.nodeId); - } - } - - for (NodeId nodeId : staleNodes) { - RMNode rmNode = this.rmContext.getRMNodes().get(nodeId); - if (rmNode == null || rmNode.getState() != NodeState.DECOMMISSIONING) { - remove(nodeId); - continue; - } - if (rmNode.getState() == NodeState.DECOMMISSIONING && - checkReadyToBeDecommissioned(rmNode.getNodeID())) { - LOG.info("DECOMMISSIONING " + nodeId + " timeout"); - this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION)); - } - } - } - } - - private RMNode getRmNode(NodeId nodeId) { - RMNode rmNode = this.rmContext.getRMNodes().get(nodeId); - if (rmNode == null) { - rmNode = this.rmContext.getInactiveRMNodes().get(nodeId); - } - return rmNode; - } - - private void removeCompletedApps(DecommissioningNodeContext context) { - Iterator it = context.appIds.iterator(); - while (it.hasNext()) { - ApplicationId appId = it.next(); - RMApp rmApp = rmContext.getRMApps().get(appId); - if (rmApp == null) { - LOG.debug("Consider non-existing app " + appId + " as completed"); - it.remove(); - continue; - } - if (rmApp.getState() == RMAppState.FINISHED || - rmApp.getState() == RMAppState.FAILED || - rmApp.getState() == RMAppState.KILLED) { - LOG.debug("Remove " + rmApp.getState() + " app " + appId); - it.remove(); - } - } - } - - // Time in second to be decommissioned. - private int getTimeoutInSec(DecommissioningNodeContext context) { - if (context.nodeState == NodeState.DECOMMISSIONED) { - return 0; - } else if (context.nodeState != NodeState.DECOMMISSIONING) { - return -1; - } - if (context.appIds.size() == 0 && context.numActiveContainers == 0) { - return 0; - } - // negative timeout value means no timeout (infinite timeout). - if (context.timeoutMs < 0) { - return -1; - } - - long now = mclock.getTime(); - long timeout = context.decommissioningStartTime + context.timeoutMs - now; - return Math.max(0, (int)(timeout / 1000)); - } - - private void logDecommissioningNodesStatus() { - if (!LOG.isDebugEnabled() || decomNodes.size() == 0) { - return; - } - long now = mclock.getTime(); - for (DecommissioningNodeContext d : decomNodes.values()) { - StringBuilder sb = new StringBuilder(); - DecommissioningNodeStatus s = checkDecommissioningStatus(d.nodeId); - sb.append(String.format( - "%n %-34s %4ds fresh:%3ds containers:%2d %14s", - d.nodeId.getHost(), - (now - d.decommissioningStartTime) / 1000, - (now - d.lastUpdateTime) / 1000, - d.numActiveContainers, - s)); - if (s == DecommissioningNodeStatus.WAIT_APP || - s == DecommissioningNodeStatus.WAIT_CONTAINER) { - sb.append(String.format(" timeout:%4ds", getTimeoutInSec(d))); - } - for (ApplicationId aid : d.appIds) { - sb.append("\n " + aid); - RMApp rmApp = rmContext.getRMApps().get(aid); - if (rmApp != null) { - sb.append(String.format( - " %s %9s %5.2f%% %5ds", - rmApp.getState(), - (rmApp.getApplicationType() == null)? - "" : rmApp.getApplicationType(), - 100.0 * rmApp.getProgress(), - (mclock.getTime() - rmApp.getStartTime()) / 1000)); - } - } - LOG.debug("Decommissioning node: " + sb.toString()); - } - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java index a73a842f52d..d13f6eca903 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java @@ -31,8 +31,6 @@ import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.Node; @@ -40,8 +38,8 @@ import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.HostsFileReader; import org.apache.hadoop.util.HostsFileReader.HostDetails; -import org.apache.hadoop.util.Time; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; @@ -59,6 +57,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; @@ -66,7 +66,7 @@ public class NodesListManager extends CompositeService implements EventHandler { - private static final Log LOG = LogFactory.getLog(NodesListManager.class); + private static final Logger LOG = LoggerFactory.getLogger(NodesListManager.class); private HostsFileReader hostsReader; private Configuration conf; @@ -106,15 +106,13 @@ protected void serviceInit(Configuration conf) throws Exception { } // Read the hosts/exclude files to restrict access to the RM + this.includesFile = conf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH); + this.excludesFile = conf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, + YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH); try { - this.includesFile = conf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, - YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH); - this.excludesFile = conf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, - YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH); this.hostsReader = createHostsFileReader(this.includesFile, this.excludesFile); - setDecommissionedNMs(); - printConfiguredHosts(); } catch (YarnException ex) { disableHostsFileReader(ex); } catch (IOException ioe) { @@ -161,6 +159,13 @@ public void run() { super.serviceInit(conf); } + @Override + protected void serviceStart() throws Exception { + setDecommissionedNMs(); + printConfiguredHosts(); + super.serviceStart(); + } + private void decrInactiveNMMetrics(RMNode rmNode) { ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics(); switch (rmNode.getState()) { @@ -234,7 +239,11 @@ private void refreshHostsReader( yarnConf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH); LOG.info("refreshNodes excludesFile " + excludesFile); - hostsReader.refresh(includesFile, excludesFile); + if(hostsReader != null) { + hostsReader.refresh(includesFile, excludesFile); + } else { + hostsReader = createHostsFileReader(this.includesFile, this.excludesFile); + } printConfiguredHosts(); LOG.info("hostsReader include:{" + @@ -245,13 +254,19 @@ private void refreshHostsReader( handleExcludeNodeList(graceful, timeout); } + private void setDecommissionedNMs() { Set excludeList = hostsReader.getExcludedHosts(); for (final String host : excludeList) { + if (rmContext.getResourceManager().decommissioningNodesWatcher + .isNodeRecovered(host)) { + LOG.info("Node {} is on exclude list but it is just " + + "recovered from graceful decommissioning.", host); + continue; + } NodeId nodeId = createUnknownNodeId(host); - RMNodeImpl rmNode = new RMNodeImpl(nodeId, - rmContext, host, -1, -1, new UnknownNode(host), - Resource.newInstance(0, 0), "unknown"); + RMNodeImpl rmNode = new RMNodeImpl(nodeId, rmContext, host, -1, -1, + new UnknownNode(host), Resource.newInstance(0, 0), "unknown"); rmContext.getInactiveRMNodes().put(nodeId, rmNode); rmNode.handle(new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION)); } @@ -320,21 +335,33 @@ private void handleExcludeNodeList(boolean graceful, int timeout) { this.rmContext.getDispatcher().getEventHandler().handle(e); } + triggerDecommissionEvents(graceful, timeout, nodesToDecom, excludes); + + updateInactiveNodes(); + } + + private void triggerDecommissionEvents(boolean graceful, + int timeout, + List nodesToDecom, + Map excludes) { for (RMNode n : nodesToDecom) { - RMNodeEvent e; - if (graceful) { - Integer timeoutToUse = (excludes.get(n.getHostName()) != null)? - excludes.get(n.getHostName()) : timeout; - e = new RMNodeDecommissioningEvent(n.getNodeID(), timeoutToUse); - } else { - RMNodeEventType eventType = isUntrackedNode(n.getHostName())? - RMNodeEventType.SHUTDOWN : RMNodeEventType.DECOMMISSION; - e = new RMNodeEvent(n.getNodeID(), eventType); + // ignore nodes that the DecommissioningNodeWatcher already knows about + if (!rmContext.getResourceManager().decommissioningNodesWatcher + .isNodeDecommissioning(n.getNodeID())) { + RMNodeEvent e; + if (graceful) { + Integer timeoutFromFile = excludes.get(n.getHostName()); + Integer timeoutToUse = + timeoutFromFile != null ? timeoutFromFile : timeout; + e = new RMNodeDecommissioningEvent(n.getNodeID(), timeoutToUse); + } else { + RMNodeEventType eventType = isUntrackedNode(n.getHostName()) ? + RMNodeEventType.SHUTDOWN : RMNodeEventType.DECOMMISSION; + e = new RMNodeEvent(n.getNodeID(), eventType); + } + this.rmContext.getDispatcher().getEventHandler().handle(e); } - this.rmContext.getDispatcher().getEventHandler().handle(e); } - - updateInactiveNodes(); } @VisibleForTesting diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 69d50f217e8..306290746be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; +import org.apache.hadoop.yarn.server.resourcemanager.decomissioning.DecommissioningNodesWatcher; import org.apache.hadoop.yarn.server.resourcemanager.federation.FederationStateStoreService; import org.apache.hadoop.yarn.server.resourcemanager.metrics.CombinedSystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.metrics.NoOpSystemMetricPublisher; @@ -206,6 +207,7 @@ private ProxyCAManager proxyCAManager; private WebApp webApp; private AppReportFetcher fetcher = null; + protected DecommissioningNodesWatcher decommissioningNodesWatcher; protected ResourceTrackerService resourceTracker; private JvmMetrics jvmMetrics; private boolean curatorEnabled = false; @@ -266,6 +268,8 @@ protected void serviceInit(Configuration conf) throws Exception { ConfigurationProviderFactory.getConfigurationProvider(conf); this.configurationProvider.init(this.conf); rmContext.setConfigurationProvider(configurationProvider); + this.decommissioningNodesWatcher = + new DecommissioningNodesWatcher(rmContext); // load core-site.xml loadConfigurationXml(YarnConfiguration.CORE_SITE_CONFIGURATION_FILE); @@ -1403,7 +1407,8 @@ protected ResourceTrackerService createResourceTrackerService() { return new ResourceTrackerService(this.rmContext, this.nodesListManager, this.nmLivelinessMonitor, this.rmContext.getContainerTokenSecretManager(), - this.rmContext.getNMTokenSecretManager()); + this.rmContext.getNMTokenSecretManager(), + this.decommissioningNodesWatcher); } protected ClientRMService createClientRMService() { @@ -1511,12 +1516,9 @@ WebApp getWebapp() { @Override public void recover(RMState state) throws Exception { - // recover RMdelegationTokenSecretManager rmContext.getRMDelegationTokenSecretManager().recover(state); - - // recover AMRMTokenSecretManager rmContext.getAMRMTokenSecretManager().recover(state); - + decommissioningNodesWatcher.recover(state); // recover reservations if (reservationSystem != null) { reservationSystem.recover(state); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 9365096fa63..69b8ff3c034 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -72,6 +72,7 @@ import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.resourcemanager.decomissioning.DecommissioningNodesWatcher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils; import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -120,7 +121,7 @@ private int minAllocMb; private int minAllocVcores; - private DecommissioningNodesWatcher decommissioningWatcher; + private DecommissioningNodesWatcher decommissioningNodesWatcher; private boolean isDistributedNodeLabelsConf; private boolean isDelegatedCentralizedNodeLabelsConf; @@ -134,7 +135,8 @@ public ResourceTrackerService(RMContext rmContext, NodesListManager nodesListManager, NMLivelinessMonitor nmLivelinessMonitor, RMContainerTokenSecretManager containerTokenSecretManager, - NMTokenSecretManagerInRM nmTokenSecretManager) { + NMTokenSecretManagerInRM nmTokenSecretManager, + DecommissioningNodesWatcher decommissioningNodesWatcher) { super(ResourceTrackerService.class.getName()); this.rmContext = rmContext; this.nodesListManager = nodesListManager; @@ -144,7 +146,7 @@ public ResourceTrackerService(RMContext rmContext, ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.readLock = lock.readLock(); this.writeLock = lock.writeLock(); - this.decommissioningWatcher = new DecommissioningNodesWatcher(rmContext); + this.decommissioningNodesWatcher = decommissioningNodesWatcher; } @Override @@ -189,7 +191,7 @@ protected void serviceInit(Configuration conf) throws Exception { } loadDynamicResourceConfiguration(conf); - decommissioningWatcher.init(conf); + decommissioningNodesWatcher.init(conf); super.serviceInit(conf); } @@ -376,8 +378,7 @@ public RegisterNodeManagerResponse registerNodeManager( } // Check if this node is a 'valid' node - if (!this.nodesListManager.isValidNode(host) && - !isNodeInDecommissioning(nodeId)) { + if (nodeNeedsToBeShutdown(nodeId)) { String message = "Disallowed NodeManager from " + host + ", Sending SHUTDOWN signal to the NodeManager."; @@ -546,6 +547,12 @@ public RegisterNodeManagerResponse registerNodeManager( return response; } + private boolean nodeNeedsToBeShutdown(NodeId nodeId) { + return !decommissioningNodesWatcher.isNodeRecovered(nodeId) && + !this.nodesListManager.isValidNode(nodeId.getHost()) && + !isNodeInDecommissioning(nodeId); + } + @SuppressWarnings("unchecked") @Override public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) @@ -564,8 +571,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) // 1. Check if it's a valid (i.e. not excluded) node, if not, see if it is // in decommissioning. - if (!this.nodesListManager.isValidNode(nodeId.getHost()) - && !isNodeInDecommissioning(nodeId)) { + if (nodeNeedsToBeShutdown(nodeId)) { String message = "Disallowed NodeManager nodeId: " + nodeId + " hostname: " + nodeId.getHost(); @@ -586,7 +592,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) // Send ping this.nmLivelinessMonitor.receivedPing(nodeId); - this.decommissioningWatcher.update(rmNode, remoteNodeStatus); + this.decommissioningNodesWatcher.update(rmNode, remoteNodeStatus); // 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat NodeHeartbeatResponse lastNodeHeartbeatResponse = rmNode.getLastNodeHeartBeatResponse(); @@ -612,7 +618,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) // Evaluate whether a DECOMMISSIONING node is ready to be DECOMMISSIONED. if (rmNode.getState() == NodeState.DECOMMISSIONING && - decommissioningWatcher.checkReadyToBeDecommissioned( + decommissioningNodesWatcher.isNodeReadyToBeDecommissioned( rmNode.getNodeID())) { String message = "DECOMMISSIONING " + nodeId + " is ready to be decommissioned"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/decomissioning/DecommissioningNodeContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/decomissioning/DecommissioningNodeContext.java new file mode 100644 index 00000000000..cacffd5a99b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/decomissioning/DecommissioningNodeContext.java @@ -0,0 +1,156 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.hadoop.yarn.server.resourcemanager.decomissioning; + +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeState; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +// Once a RMNode is observed in DECOMMISSIONING state, +// All its ContainerStatus update are tracked inside DecomNodeContext. +public class DecommissioningNodeContext { + private final NodeId nodeId; + + // Last known NodeState. + private NodeState nodeState; + + // The moment node is observed in DECOMMISSIONING state. + private final long decommissioningStartTime; + + private long lastContainerFinishTime; + + // number of running containers at the moment. + private int numActiveContainers; + + // All applications run on the node at or after decommissioningStartTime. + private Set appIds; + + // First moment the node is observed in DECOMMISSIONED state. + private long decommissionedTime; + + // Timeout in millis for this decommissioning node. + // This value could be dynamically updated with new value from RMNode. + private long timeoutMs; + + private long lastUpdateTime; + + public DecommissioningNodeContext(NodeId nodeId, long timeoutMs, + long decommissioningStartTimeInMs) { + this.nodeId = nodeId; + this.appIds = new HashSet(); + this.decommissioningStartTime = decommissioningStartTimeInMs; + this.timeoutMs = timeoutMs; + } + + public NodeId getNodeId() { + return nodeId; + } + + public NodeState getNodeState() { + return nodeState; + } + + public void setNodeState(NodeState nodeState) { + this.nodeState = nodeState; + } + + public long getDecommissioningStartTime() { + return decommissioningStartTime; + } + + public long getLastContainerFinishTime() { + return lastContainerFinishTime; + } + + public void setLastContainerFinishTime(long lastContainerFinishTime) { + this.lastContainerFinishTime = lastContainerFinishTime; + } + + public int getNumActiveContainers() { + return numActiveContainers; + } + + public void setNumActiveContainers(int numActiveContainers) { + this.numActiveContainers = numActiveContainers; + } + + public Set getAppIds() { + return appIds; + } + + public void setAppIds(Set appIds) { + this.appIds = appIds; + } + + public long getDecommissionedTime() { + return decommissionedTime; + } + + public void setDecommissionedTime(long decommissionedTime) { + this.decommissionedTime = decommissionedTime; + } + + public long getTimeoutMs() { + return timeoutMs; + } + + public void setTimeoutMs(long timeoutMs) { + this.timeoutMs = timeoutMs; + } + + public long getLastUpdateTime() { + return lastUpdateTime; + } + + public void setLastUpdateTime(long lastUpdateTime) { + this.lastUpdateTime = lastUpdateTime; + } + + void updateTimeout(int timeoutSec) { + this.timeoutMs = 1000L * timeoutSec; + } + + public void addAppIds(List keepAliveApplications) { + appIds.addAll(keepAliveApplications); + } + + public boolean containsAppId(ApplicationId aid) { + return appIds.contains(aid); + } + + public int getAppIdsSize() { + return appIds.size(); + } + + public void addAppId(ApplicationId aid) { + appIds.add(aid); + } + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this, + ToStringStyle.SHORT_PREFIX_STYLE); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/decomissioning/DecommissioningNodeStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/decomissioning/DecommissioningNodeStatus.java new file mode 100644 index 00000000000..ea218436f27 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/decomissioning/DecommissioningNodeStatus.java @@ -0,0 +1,53 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ +package org.apache.hadoop.yarn.server.resourcemanager.decomissioning; + +/** + * Status about a specific decommissioning node. + * + */ +public enum DecommissioningNodeStatus { + // Node is not in DECOMMISSIONING state. + NONE, + + // wait for running containers to complete + WAIT_CONTAINER, + + // wait for running application to complete (after all containers complete); + WAIT_APP, + + // Timeout waiting for either containers or applications to complete. + TIMEOUT, + + // nothing to wait, ready to be decommissioned + READY, + + // The node has already been decommissioned + DECOMMISSIONED; + + public boolean isDecommissioning() { + return !(this == DecommissioningNodeStatus.NONE + || this == DecommissioningNodeStatus.DECOMMISSIONED + || this == DecommissioningNodeStatus.READY); + } + + public boolean isReadyToBeDecommissioned() { + return (this == DecommissioningNodeStatus.READY + || this == DecommissioningNodeStatus.TIMEOUT); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/decomissioning/DecommissioningNodeStatusProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/decomissioning/DecommissioningNodeStatusProvider.java new file mode 100644 index 00000000000..a25e0d400e9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/decomissioning/DecommissioningNodeStatusProvider.java @@ -0,0 +1,89 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.hadoop.yarn.server.resourcemanager.decomissioning; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Iterator; + +public class DecommissioningNodeStatusProvider { + + private static final Logger LOG = + LoggerFactory.getLogger(DecommissioningNodeStatusProvider.class); + private RMContext rmContext; + + public DecommissioningNodeStatusProvider(RMContext rmContext) { + this.rmContext = rmContext; + } + + public DecommissioningNodeStatus getDecommissioningStatus( + DecommissioningNodeContext context, long currentTime) { + if (context == null) { + return DecommissioningNodeStatus.NONE; + } + + if (context.getNodeState() == NodeState.DECOMMISSIONED) { + return DecommissioningNodeStatus.DECOMMISSIONED; + } + + if(context.getNodeState() == NodeState.DECOMMISSIONING_RECOVERED) { + return DecommissioningNodeStatus.NONE; + } + + long waitTime = currentTime - context.getDecommissioningStartTime(); + if (context.getNumActiveContainers() > 0) { + return (context.getTimeoutMs() < 0 || waitTime < context.getTimeoutMs()) + ? DecommissioningNodeStatus.WAIT_CONTAINER + : DecommissioningNodeStatus.TIMEOUT; + } + removeCompletedApps(context); + if (context.getAppIdsSize() == 0) { + return DecommissioningNodeStatus.READY; + } else { + return (context.getTimeoutMs() < 0 || waitTime < context.getTimeoutMs()) + ? DecommissioningNodeStatus.WAIT_APP + : DecommissioningNodeStatus.TIMEOUT; + } + } + + private void removeCompletedApps(DecommissioningNodeContext context) { + Iterator it = context.getAppIds().iterator(); + while (it.hasNext()) { + ApplicationId appId = it.next(); + RMApp rmApp = rmContext.getRMApps().get(appId); + if (rmApp == null) { + LOG.debug("Consider non-existing app " + appId + " as completed"); + it.remove(); + continue; + } + if (rmApp.getState() == RMAppState.FINISHED || + rmApp.getState() == RMAppState.FAILED || + rmApp.getState() == RMAppState.KILLED) { + LOG.debug("Remove " + rmApp.getState() + " app " + appId); + it.remove(); + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/decomissioning/DecommissioningNodesCleanup.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/decomissioning/DecommissioningNodesCleanup.java new file mode 100644 index 00000000000..a90aca909dd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/decomissioning/DecommissioningNodesCleanup.java @@ -0,0 +1,133 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.hadoop.yarn.server.resourcemanager.decomissioning; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +public class DecommissioningNodesCleanup { + + private static final Logger LOG = + LoggerFactory.getLogger(DecommissioningNodesWatcher.class); + + public static final long MINIMUM_UPDATE_DALEY_FOR_CLEANUP_MS = 5000; + public static final long UPDATE_DALEY_FOR_DECOMMISSIONED_NODES_TO_REMOVE_MS = 60000; + + private RMContext rmContext; + private DecommissioningNodeStatusProvider decommissioningNodeStatusProvider; + + public DecommissioningNodesCleanup(RMContext rmContext, + DecommissioningNodeStatusProvider decommissioningNodeStatusProvider) { + this.rmContext = rmContext; + this.decommissioningNodeStatusProvider = decommissioningNodeStatusProvider; + } + + public void cleanUp(Map decomNodes, + long currentTimeInMillis) { + Set staleNodes = new HashSet(); + + for (Iterator> it = + decomNodes.entrySet().iterator(); it.hasNext();) { + Map.Entry e = it.next(); + DecommissioningNodeContext context = e.getValue(); + long updateDelay = currentTimeInMillis - context.getLastUpdateTime(); + // Skip node recently updated (NM usually updates every second). + if (updateDelay < MINIMUM_UPDATE_DALEY_FOR_CLEANUP_MS) { + continue; + } + // Remove stale non-DECOMMISSIONING node + if (context.getNodeState() != NodeState.DECOMMISSIONING) { + removeContextByIterator(it, context); + continue; + } else if (updateDelay > UPDATE_DALEY_FOR_DECOMMISSIONED_NODES_TO_REMOVE_MS) { + // Node DECOMMISSIONED could become stale, remove as necessary. + RMNode rmNode = getRmNode(context.getNodeId()); + if (rmNode != null && rmNode.getState() == NodeState.DECOMMISSIONED) { + removeContextByIterator(it, context); + continue; + } + } + if (context.getTimeoutMs() >= 0 && context.getDecommissioningStartTime() + + context.getTimeoutMs() < currentTimeInMillis) { + staleNodes.add(context.getNodeId()); + LOG.debug("Identified stale and timeout node {}", context.getNodeId()); + } + } + + for (NodeId nodeId : staleNodes) { + RMNode rmNode = this.rmContext.getRMNodes().get(nodeId); + if (rmNode == null || rmNode.getState() != NodeState.DECOMMISSIONING) { + removeDecommissioningNodeContext(nodeId, decomNodes); + continue; + } + if (rmNode.getState() == NodeState.DECOMMISSIONING + && isNodeReadyToBeDecommissioned(nodeId, decomNodes, + currentTimeInMillis)) { + LOG.info("DECOMMISSIONING timeout id: {}", nodeId); + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION)); + } + } + } + + private boolean isNodeReadyToBeDecommissioned(NodeId nodeId, + Map decomNodes, + long currentTimeInMs) { + DecommissioningNodeContext context = decomNodes.get(nodeId); + DecommissioningNodeStatus status = decommissioningNodeStatusProvider + .getDecommissioningStatus(context, currentTimeInMs); + return status.isReadyToBeDecommissioned(); + } + + private RMNode getRmNode(NodeId nodeId) { + RMNode rmNode = this.rmContext.getRMNodes().get(nodeId); + if (rmNode == null) { + rmNode = this.rmContext.getInactiveRMNodes().get(nodeId); + } + return rmNode; + } + + private void removeContextByIterator( + Iterator> iterator, + DecommissioningNodeContext context) { + LOG.info("Remove node id: {} state: {}", context.getNodeId(), + context.getNodeState()); + iterator.remove(); + rmContext.getStateStore().removeDecommissioningNode(context); + } + + private synchronized void removeDecommissioningNodeContext(NodeId nodeId, + Map decomNodes) { + DecommissioningNodeContext context = decomNodes.get(nodeId); + LOG.info("Remove node id: {} state: {}", nodeId, context.getNodeState()); + decomNodes.remove(nodeId); + rmContext.getStateStore().removeDecommissioningNode(context); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/decomissioning/DecommissioningNodesLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/decomissioning/DecommissioningNodesLogger.java new file mode 100644 index 00000000000..56b8a8c4e7c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/decomissioning/DecommissioningNodesLogger.java @@ -0,0 +1,100 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.hadoop.yarn.server.resourcemanager.decomissioning; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +public class DecommissioningNodesLogger { + + private static final Logger LOG = + LoggerFactory.getLogger(DecommissioningNodesLogger.class); + + private RMContext rmContext; + private DecommissioningNodeStatusProvider decommissioningNodeStatusProvider; + + public DecommissioningNodesLogger(RMContext rmContext, + DecommissioningNodeStatusProvider decommissioningNodeStatusProvider){ + this.rmContext = rmContext; + this.decommissioningNodeStatusProvider = decommissioningNodeStatusProvider; + } + + public void logDecommissioningNodesStatus( + Map decomNodes, + long currentTimeInMillis) { + if (!LOG.isDebugEnabled() || decomNodes.size() == 0) { + return; + } + for (DecommissioningNodeContext context : decomNodes.values()) { + StringBuilder sb = new StringBuilder(); + DecommissioningNodeStatus s = decommissioningNodeStatusProvider + .getDecommissioningStatus(context, currentTimeInMillis); + sb.append(String.format("%n %-34s %4ds fresh:%3ds containers:%2d %14s", + context.getNodeId().getHost(), + (currentTimeInMillis - context.getDecommissioningStartTime()) / 1000, + (currentTimeInMillis - context.getLastUpdateTime()) / 1000, + context.getNumActiveContainers(), s)); + if (s == DecommissioningNodeStatus.WAIT_APP + || s == DecommissioningNodeStatus.WAIT_CONTAINER) { + sb.append(String.format(" timeout:%4ds", + getTimeoutInSec(context, currentTimeInMillis))); + } + for (ApplicationId aid : context.getAppIds()) { + sb.append("\n " + aid); + RMApp rmApp = rmContext.getRMApps().get(aid); + if (rmApp != null) { + sb.append(String.format(" %s %9s %5.2f%% %5ds", rmApp.getState(), + (rmApp.getApplicationType() == null) ? "" + : rmApp.getApplicationType(), + 100.0 * rmApp.getProgress(), + (currentTimeInMillis - rmApp.getStartTime()) / 1000)); + } + } + LOG.debug("Decommissioning node: " + sb.toString()); + } + } + + // Time in second to be decommissioned. + private int getTimeoutInSec(DecommissioningNodeContext context, + long currentTimeInMillis) { + if (context.getNodeState() == NodeState.DECOMMISSIONED) { + return 0; + } else if (context.getNodeState() != NodeState.DECOMMISSIONING) { + return -1; + } + if (context.getAppIdsSize() == 0 && context.getNumActiveContainers() == 0) { + return 0; + } + // negative timeout value means no timeout (infinite timeout). + if (context.getTimeoutMs() < 0) { + return -1; + } + + long timeout = context.getDecommissioningStartTime() + + context.getTimeoutMs() - currentTimeInMillis; + return Math.max(0, (int) (timeout / 1000)); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/decomissioning/DecommissioningNodesWatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/decomissioning/DecommissioningNodesWatcher.java new file mode 100644 index 00000000000..d055207ad33 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/decomissioning/DecommissioningNodesWatcher.java @@ -0,0 +1,284 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ +package org.apache.hadoop.yarn.server.resourcemanager.decomissioning; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMDecommissioningNodeData; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecommissioningEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; +import org.apache.hadoop.yarn.util.MonotonicClock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * DecommissioningNodesWatcher is used by ResourceTrackerService to track + * DECOMMISSIONING nodes to decide when, after all running containers on + * the node have completed, will be transitioned into DECOMMISSIONED state + * (NodeManager will be told to shutdown). + * Under MR application, a node, after completes all its containers, + * may still serve it map output data during the duration of the application + * for reducers. A fully graceful mechanism would keep such DECOMMISSIONING + * nodes until all involved applications complete. It could be however + * undesirable under long-running applications scenario where a bunch + * of "idle" nodes would stay around for long period of time. + * + * DecommissioningNodesWatcher balance such concern with a timeout policy --- + * a DECOMMISSIONING node will be DECOMMISSIONED no later than + * DECOMMISSIONING_TIMEOUT regardless of running containers or applications. + * + * To be efficient, DecommissioningNodesWatcher skip tracking application + * containers on a particular node before the node is in DECOMMISSIONING state. + * It only tracks containers once the node is in DECOMMISSIONING state. + * DecommissioningNodesWatcher basically is no cost when no node is + * DECOMMISSIONING. This sacrifices the possibility that the node once + * host containers of an application that is still running + * (the affected map tasks will be rescheduled). + */ +public class DecommissioningNodesWatcher implements Recoverable { + private static final Logger LOG = + LoggerFactory.getLogger(DecommissioningNodesWatcher.class); + + private final RMContext rmContext; + + // All DECOMMISSIONING nodes to track. + private Map decomNodes = + new HashMap<>(); + + private Timer pollTimer; + private MonotonicClock mclock; + private DecommissioningNodeStatusProvider decommissioningNodeStatusProvider; + + public DecommissioningNodesWatcher(RMContext rmContext) { + this.rmContext = rmContext; + pollTimer = new Timer(true); + mclock = new MonotonicClock(); + this.decommissioningNodeStatusProvider = + new DecommissioningNodeStatusProvider(rmContext); + } + + public void init(Configuration conf) { + int pollIntervalInMs = conf.getInt( + YarnConfiguration.RM_DECOMMISSIONING_NODES_WATCHER_POLL_INTERVAL, + YarnConfiguration.DEFAULT_RM_DECOMMISSIONING_NODES_WATCHER_POLL_INTERVAL) + * 1000; + pollTimer.schedule(new PollTimerTask(rmContext), 0, pollIntervalInMs); + } + + /** + * Update rmNode decommissioning status based on NodeStatus. + * @param rmNode The node + * @param remoteNodeStatus latest NodeStatus + */ + public synchronized void update(RMNode rmNode, NodeStatus remoteNodeStatus) { + DecommissioningNodeContext context = decomNodes.get(rmNode.getNodeID()); + long now = mclock.getTime(); + if (rmNode.getState() == NodeState.DECOMMISSIONED) { + if (context == null) { + return; + } + context.setNodeState(rmNode.getState()); + // keep DECOMMISSIONED node for a while for status log, so that such + // host will appear as DECOMMISSIONED instead of quietly disappears. + if (context.getDecommissionedTime() == 0) { + context.setDecommissionedTime(now); + } else if (now - context.getDecommissionedTime() > 60000L) { + removeDecommissioningNodeContext(rmNode.getNodeID()); + } + } else if (rmNode.getState() == NodeState.DECOMMISSIONING) { + if (context == null) { + context = new DecommissioningNodeContext(rmNode.getNodeID(), + rmNode.getDecommissioningTimeout(), mclock.getTime()); + context.setNodeState(NodeState.DECOMMISSIONING); + context.setDecommissionedTime(0); + decomNodes.put(rmNode.getNodeID(), context); + rmContext.getStateStore().storeDecommissioningNode(context); + } + context.updateTimeout(rmNode.getDecommissioningTimeout()); + + updateDecommissionContextTimes(remoteNodeStatus, context, now); + rmContext.getStateStore().updateDecommissioningNode(context); + } else { + if (context == null) { + return; + } + if(context.getNodeState() == NodeState.DECOMMISSIONING_RECOVERED){ + LOG.info("Update recovered node: {}", rmNode.getNodeID()); + context.setNodeState(NodeState.DECOMMISSIONING); + updateDecommissionContextTimes(remoteNodeStatus, context, now); + Long timeoutInSec = context.getTimeoutMs() / 1000; + RMNodeEvent event = new RMNodeDecommissioningEvent(context.getNodeId(), + timeoutInSec.intValue()); + rmContext.getDispatcher().getEventHandler().handle(event); + }else { + removeDecommissioningNodeContext(rmNode.getNodeID()); + } + } + } + + private void updateDecommissionContextTimes(NodeStatus remoteNodeStatus, DecommissioningNodeContext context, long now) { + if (remoteNodeStatus.getKeepAliveApplications() != null) { + context.addAppIds(remoteNodeStatus.getKeepAliveApplications()); + } + + context.setLastUpdateTime(now); + context.setNumActiveContainers( + getNumOfActiveContainers(remoteNodeStatus, context)); + + // maintain lastContainerFinishTime. + if (context.getNumActiveContainers() == 0 && + context.getLastContainerFinishTime() == 0) { + context.setLastContainerFinishTime(now); + } + } + + private int getNumOfActiveContainers(NodeStatus remoteNodeStatus, + DecommissioningNodeContext context) { + int numOfActiveContainers = 0; + for (ContainerStatus cs : remoteNodeStatus.getContainersStatuses()) { + ContainerState newState = cs.getState(); + if (newState == ContainerState.RUNNING || + newState == ContainerState.NEW) { + numOfActiveContainers++; + } + context.setNumActiveContainers(numOfActiveContainers); + ApplicationId aid = cs.getContainerId() + .getApplicationAttemptId().getApplicationId(); + if (!context.containsAppId(aid)) { + context.addAppId(aid); + } + } + return numOfActiveContainers; + } + + private synchronized void removeDecommissioningNodeContext(NodeId nodeId) { + DecommissioningNodeContext context = decomNodes.get(nodeId); + if (context != null) { + LOG.info("Remove node id: {} state: {}", nodeId, context.getNodeState()); + decomNodes.remove(nodeId); + rmContext.getStateStore().removeDecommissioningNode(context); + } + } + + public boolean isNodeReadyToBeDecommissioned(NodeId nodeId) { + DecommissioningNodeStatus s = getDecommissioningStatus(nodeId); + return s.isReadyToBeDecommissioned(); + } + + public boolean isNodeDecommissioning(NodeId nodeId) { + DecommissioningNodeStatus nodeStatus = getDecommissioningStatus(nodeId); + return nodeStatus.isDecommissioning(); + } + + public boolean isNodeRecovered(NodeId nodeId) { + DecommissioningNodeContext context = decomNodes.get(nodeId); + if(context != null) { + return context.getNodeState() == NodeState.DECOMMISSIONING_RECOVERED; + } else { + return false; + } + } + + public boolean isNodeRecovered(String host) { + DecommissioningNodeContext context = + getDecommissioningNodeContextByHost(host); + if(context != null) { + return context.getNodeState() == NodeState.DECOMMISSIONING_RECOVERED; + } else { + return false; + } + } + + private synchronized DecommissioningNodeContext getDecommissioningNodeContextByHost( + String host) { + Iterator> iter = + decomNodes.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry entry = iter.next(); + NodeId nodeId = entry.getKey(); + if (nodeId.getHost().equals(host)) { + return entry.getValue(); + } + } + return null; + } + + public DecommissioningNodeStatus getDecommissioningStatus(NodeId nodeId) { + DecommissioningNodeContext context = decomNodes.get(nodeId); + return decommissioningNodeStatusProvider.getDecommissioningStatus(context, + mclock.getTime()); + } + + /** + * PollTimerTask periodically: + * 1. log status of all DECOMMISSIONING nodes; + * 2. identify and taken care of stale DECOMMISSIONING nodes + * (for example, node already terminated). + */ + class PollTimerTask extends TimerTask { + private final DecommissioningNodesLogger decommissioningNodesLogger; + private final DecommissioningNodesCleanup decommissioningNodesCleanup; + + public PollTimerTask(RMContext rmContext) { + this.decommissioningNodesLogger = new DecommissioningNodesLogger( + rmContext, decommissioningNodeStatusProvider); + this.decommissioningNodesCleanup = new DecommissioningNodesCleanup( + rmContext, decommissioningNodeStatusProvider); + } + + public void run() { + long currentTimeInMs = mclock.getTime(); + synchronized (DecommissioningNodesWatcher.this) { + decommissioningNodesLogger.logDecommissioningNodesStatus(decomNodes, + currentTimeInMs); + decommissioningNodesCleanup.cleanUp(decomNodes, currentTimeInMs); + } + } + } + + @Override + public synchronized void recover(RMStateStore.RMState rmState) + throws Exception { + LOG.info("Recovering DecommissioningNodesWatcher."); + for (Map.Entry entry : rmState + .getDecommissioningNodesState().entrySet()) { + NodeId nodeId = NodeId.fromString(entry.getKey()); + DecommissioningNodeContext context = + entry.getValue().getDecommissioningNodeContext(nodeId); + context.setNodeState(NodeState.DECOMMISSIONING_RECOVERED); + decomNodes.put(nodeId, context); + LOG.info("Decommissioning node recovered: {}", context.toString()); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index ed0486ab4a3..9ccdb1871e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -58,10 +58,12 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.decomissioning.DecommissioningNodeContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMDecommissioningNodeData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMDelegationTokenIdentifierData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AMRMTokenSecretManagerStatePBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; @@ -83,6 +85,8 @@ * Also, AMRMToken has been removed from ApplicationAttemptState. * * Changes from 1.2 to 1.3, Addition of ReservationSystem state. + * + * Changes from 1.3 to 1.4, Addition of Decommissioning Nodes */ public class FileSystemRMStateStore extends RMStateStore { @@ -90,7 +94,7 @@ protected static final String ROOT_DIR_NAME = "FSRMStateRoot"; protected static final Version CURRENT_VERSION_INFO = Version - .newInstance(1, 3); + .newInstance(1, 4); protected static final String AMRMTOKEN_SECRET_MANAGER_NODE = "AMRMTokenSecretManagerNode"; @@ -116,6 +120,7 @@ Path amrmTokenSecretManagerRoot; private Path reservationRoot; + private Path decomNodesRoot; private Path proxyCARoot; @Override @@ -128,6 +133,7 @@ public synchronized void initInternal(Configuration conf) amrmTokenSecretManagerRoot = new Path(rootDirPath, AMRMTOKEN_SECRET_MANAGER_ROOT); reservationRoot = new Path(rootDirPath, RESERVATION_SYSTEM_ROOT); + decomNodesRoot = new Path(rootDirPath, DECOMMISSIONING_NODES_ROOT); proxyCARoot = new Path(rootDirPath, PROXY_CA_ROOT); fsNumRetries = conf.getInt(YarnConfiguration.FS_RM_STATE_STORE_NUM_RETRIES, @@ -161,6 +167,7 @@ protected synchronized void startInternal() throws Exception { mkdirsWithRetries(rmAppRoot); mkdirsWithRetries(amrmTokenSecretManagerRoot); mkdirsWithRetries(reservationRoot); + mkdirsWithRetries(decomNodesRoot); mkdirsWithRetries(proxyCARoot); } @@ -233,6 +240,8 @@ public synchronized RMState loadState() throws Exception { loadAMRMTokenSecretManagerState(rmState); // recover reservation state loadReservationSystemState(rmState); + // recover decommissioning nodes + loadDecommissioningNodesState(rmState); // recover ProxyCAManager state loadProxyCAManagerState(rmState); return rmState; @@ -297,6 +306,19 @@ private void loadRMAppState(RMState rmState) throws Exception { } } + private void loadDecommissioningNodesState(RMState rmState) throws Exception { + try { + final RMDecommissioningNodesFileProcessor fileProcessor = new + RMDecommissioningNodesFileProcessor(rmState); + final Path rootDirectory = this.decomNodesRoot; + + processDirectoriesOfFiles(fileProcessor, rootDirectory); + } catch (Exception e) { + LOG.error("Failed to load state.", e); + throw e; + } + } + private void processDirectoriesOfFiles( RMStateFileProcessor rmAppStateFileProcessor, Path rootDirectory) throws Exception { @@ -933,6 +955,47 @@ protected void removeReservationState( deleteFileWithRetries(reservationPath); } + @Override + protected void storeDecommissioningNodeState( + DecommissioningNodeContext nodeContext) + throws Exception { + Path nodeCreatePath = getNodePath(decomNodesRoot, + nodeIdToFilename(nodeContext.getNodeId().toString())); + if (LOG.isDebugEnabled()) { + LOG.debug("Storing " + nodeContext.getNodeId().toString()); + } + RMDecommissioningNodeData data = new RMDecommissioningNodeData(nodeContext); + writeFileWithRetries(nodeCreatePath, data.toByteArray(), false); + } + + @Override + protected void removeDecommissioningNodeState( + DecommissioningNodeContext nodeContext) + throws Exception { + Path nodeRemovePath = getNodePath(decomNodesRoot, + nodeIdToFilename(nodeContext.getNodeId().toString())); + + if (LOG.isDebugEnabled()) { + LOG.debug("Removing " + nodeContext.getNodeId().toString()); + } + + deleteFileWithRetries(nodeRemovePath); + } + + @Override + protected void updateDecommissioningNodeState( + DecommissioningNodeContext nodeContext) + throws Exception { + Path nodeUpdatePath = getNodePath(decomNodesRoot, + nodeIdToFilename(nodeContext.getNodeId().toString())); + RMDecommissioningNodeData data = new RMDecommissioningNodeData(nodeContext); + if (existsWithRetries(nodeUpdatePath)) { + updateFile(nodeUpdatePath, data.toByteArray(), false); + } else { + writeFileWithRetries(nodeUpdatePath, data.toByteArray(), false); + } + } + @VisibleForTesting public int getNumRetries() { return fsNumRetries; @@ -1023,4 +1086,34 @@ void processChildNode(String appDirName, String childNodeName, byte[] childData) throws IOException; } + + private static class RMDecommissioningNodesFileProcessor + implements RMStateFileProcessor { + private RMState rmState; + + public RMDecommissioningNodesFileProcessor(RMState rmState) { + this.rmState = rmState; + } + + @Override + public void processChildNode(String appDirName, String childNodeName, + byte[] childData) throws IOException { + childNodeName = filenameToNodeId(childNodeName); + RMDecommissioningNodeData nodeData = new RMDecommissioningNodeData(); + nodeData.readFields(childData); + rmState.getDecommissioningNodesState().put(childNodeName, nodeData); + } + } + + private static String nodeIdToFilename(String nodeId) { + // replace :, which is not allowed in Paths, with a space, + // which is allowed in Paths, but not in hostnames, so there should + // be no collision + return nodeId.replace(":", " "); + } + + private static String filenameToNodeId(String filename) { + // convert back from #nodeIdToFilename + return filename.replace(" ", ":"); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java index 0a911615f8e..04880e85b3e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java @@ -57,10 +57,12 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.decomissioning.DecommissioningNodeContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMDecommissioningNodeData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMDelegationTokenIdentifierData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AMRMTokenSecretManagerStatePBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; @@ -70,6 +72,7 @@ import org.fusesource.leveldbjni.internal.NativeDB; import org.iq80.leveldb.DB; import org.iq80.leveldb.DBException; +import org.iq80.leveldb.Logger; import org.iq80.leveldb.Options; import org.iq80.leveldb.WriteBatch; @@ -77,6 +80,8 @@ /** * Changes from 1.0 to 1.1, Addition of ReservationSystem state. + * + * Changes from 1.1 to 1.2, Addition of Decommissioning Nodes */ public class LeveldbRMStateStore extends RMStateStore { @@ -95,9 +100,11 @@ RM_APP_ROOT + SEPARATOR + ApplicationId.appIdStrPrefix; private static final String RM_RESERVATION_KEY_PREFIX = RESERVATION_SYSTEM_ROOT + SEPARATOR; + private static final String RM_DECOMMISSIONING_NODES_PREFIX = + DECOMMISSIONING_NODES_ROOT + SEPARATOR; private static final Version CURRENT_VERSION_INFO = Version - .newInstance(1, 1); + .newInstance(1, 2); private DB db; private Timer compactionTimer; @@ -131,6 +138,10 @@ private String getReservationNodeKey(String planName, + reservationId; } + private String getDecomNodeNodeKey(String nodeId) { + return RM_DECOMMISSIONING_NODES_PREFIX + nodeId; + } + private String getProxyCACertNodeKey() { return PROXY_CA_ROOT + SEPARATOR + PROXY_CA_CERT_NODE; } @@ -280,10 +291,11 @@ public synchronized long getAndIncrementEpoch() throws Exception { @Override public RMState loadState() throws Exception { RMState rmState = new RMState(); - loadRMDTSecretManagerState(rmState); - loadRMApps(rmState); - loadAMRMTokenSecretManagerState(rmState); + loadRMDTSecretManagerState(rmState); + loadRMApps(rmState); + loadAMRMTokenSecretManagerState(rmState); loadReservationState(rmState); + loadDecommissioningNodesState(rmState); loadProxyCAManagerState(rmState); return rmState; } @@ -332,6 +344,34 @@ private void loadReservationState(RMState rmState) throws IOException { LOG.info("Recovered " + numReservations + " reservations"); } + private void loadDecommissioningNodesState(RMState rmState) throws Exception { + LeveldbIterator iter = null; + try { + iter = new LeveldbIterator(db); + iter.seek(bytes(RM_DECOMMISSIONING_NODES_PREFIX)); + while (iter.hasNext()) { + Entry entry = iter.next(); + String key = asString(entry.getKey()); + if (!key.startsWith(RM_DECOMMISSIONING_NODES_PREFIX)) { + break; + } + RMDecommissioningNodeData nodeData = new RMDecommissioningNodeData(); + nodeData.readFields(entry.getValue()); + key = key.substring(RM_DECOMMISSIONING_NODES_PREFIX.length()); + rmState.decommissioningNodesState.put(key, nodeData); + if (LOG.isDebugEnabled()) { + LOG.debug("Loaded decommissioning node " + key); + } + } + } catch (DBException e) { + throw new IOException(e); + } finally { + if (iter != null) { + iter.close(); + } + } + } + private void loadRMDTSecretManagerState(RMState state) throws IOException { int numKeys = loadRMDTSecretManagerKeys(state); LOG.info("Recovered " + numKeys + " RM delegation token master keys"); @@ -917,6 +957,51 @@ int getNumEntriesInDatabase() throws IOException { return numEntries; } + @Override + protected void storeDecommissioningNodeState( + DecommissioningNodeContext nodeContext) + throws Exception { + try { + WriteBatch batch = db.createWriteBatch(); + try { + String key = getDecomNodeNodeKey(nodeContext.getNodeId().toString()); + RMDecommissioningNodeData data = + new RMDecommissioningNodeData(nodeContext); + if (LOG.isDebugEnabled()) { + LOG.debug("Storing " + nodeContext.getNodeId().toString()); + } + batch.put(bytes(key), data.toByteArray()); + db.write(batch); + } finally { + batch.close(); + } + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + protected void removeDecommissioningNodeState( + DecommissioningNodeContext nodeContext) + throws Exception { + String key = getDecomNodeNodeKey(nodeContext.getNodeId().toString()); + if (LOG.isDebugEnabled()) { + LOG.debug("Removing " + nodeContext.getNodeId().toString()); + } + try { + db.delete(bytes(key)); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + protected void updateDecommissioningNodeState( + DecommissioningNodeContext nodeContext) + throws Exception { + storeDecommissioningNodeState(nodeContext); + } + private class CompactionTimerTask extends TimerTask { @Override public void run() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index 8c82af89842..f5c0cc5b1d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -36,11 +36,13 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.resourcemanager.decomissioning.DecommissioningNodeContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMDecommissioningNodeData; @Private @Unstable @@ -80,6 +82,8 @@ public synchronized RMState loadState() throws Exception { state.amrmTokenSecretManagerState == null ? null : AMRMTokenSecretManagerState .newInstance(state.amrmTokenSecretManagerState); + returnState.decommissioningNodesState.putAll( + state.getDecommissioningNodesState()); if (state.proxyCAState.getCaCert() != null) { byte[] caCertData = state.proxyCAState.getCaCert().getEncoded(); returnState.proxyCAState.setCaCert(caCertData); @@ -327,4 +331,28 @@ public void deleteStore() throws Exception { public void removeApplication(ApplicationId removeAppId) throws Exception { } + @Override + protected void storeDecommissioningNodeState( + DecommissioningNodeContext nodeContext) + throws Exception { + RMDecommissioningNodeData data = new RMDecommissioningNodeData(nodeContext); + state.getDecommissioningNodesState().put(nodeContext.getNodeId().toString(), + data); + } + + @Override + protected void removeDecommissioningNodeState( + DecommissioningNodeContext nodeContext) + throws Exception { + state.getDecommissioningNodesState().remove( + nodeContext.getNodeId().toString()); + } + + @Override + protected void updateDecommissioningNodeState( + DecommissioningNodeContext nodeContext) + throws Exception { + storeDecommissioningNodeState(nodeContext); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java index 1068f33eb72..f04eb56c48d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.resourcemanager.decomissioning.DecommissioningNodeContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; @@ -178,6 +179,27 @@ public void removeApplication(ApplicationId removeAppId) throws Exception { // Do nothing } + @Override + protected void storeDecommissioningNodeState( + DecommissioningNodeContext nodeContext) + throws Exception { + // Do nothing + } + + @Override + protected void removeDecommissioningNodeState( + DecommissioningNodeContext nodeContext) + throws Exception { + // Do nothing + } + + @Override + protected void updateDecommissioningNodeState( + DecommissioningNodeContext nodeContext) + throws Exception { + // Do nothing + } + @Override protected void storeProxyCACertState( X509Certificate caCert, PrivateKey caPrivateKey) throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index deb79a573d6..e58f2e6b73b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -69,9 +69,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent; import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.decomissioning.DecommissioningNodeContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMDecommissioningNodeData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; @@ -110,6 +112,8 @@ "AMRMTokenSecretManagerRoot"; protected static final String RESERVATION_SYSTEM_ROOT = "ReservationSystemRoot"; + protected static final String DECOMMISSIONING_NODES_ROOT = + "DecommissioningNodesRoot"; protected static final String PROXY_CA_ROOT = "ProxyCARoot"; protected static final String PROXY_CA_CERT_NODE = "caCert"; protected static final String PROXY_CA_PRIVATE_KEY_NODE = "caPrivateKey"; @@ -193,6 +197,18 @@ EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED), RMStateStoreEventType.REMOVE_RESERVATION, new RemoveReservationAllocationTransition()) + .addTransition(RMStateStoreState.ACTIVE, + EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED), + RMStateStoreEventType.STORE_DECOMMISSIONING_NODE, + new StoreDecommissioningNodeTransition()) + .addTransition(RMStateStoreState.ACTIVE, + EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED), + RMStateStoreEventType.REMOVE_DECOMMISSIONING_NODE, + new RemoveDecommissioningNodeTransition()) + .addTransition(RMStateStoreState.ACTIVE, + EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED), + RMStateStoreEventType.UPDATE_DECOMMISSIONING_NODE, + new UpdateDecommissioningNodeTransition()) .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED, RMStateStoreEventType.FENCED) .addTransition(RMStateStoreState.ACTIVE, @@ -215,6 +231,10 @@ RMStateStoreEventType.UPDATE_AMRM_TOKEN, RMStateStoreEventType.STORE_RESERVATION, RMStateStoreEventType.REMOVE_RESERVATION, + RMStateStoreEventType.STORE_DECOMMISSIONING_NODE, + RMStateStoreEventType.REMOVE_DECOMMISSIONING_NODE, + RMStateStoreEventType.UPDATE_DECOMMISSIONING_NODE)); + RMStateStoreEventType.REMOVE_RESERVATION, RMStateStoreEventType.STORE_PROXY_CA_CERT)); private final StateMachine> reservationState = new TreeMap<>(); + Map decommissioningNodesState = + new HashMap<>(); + ProxyCAState proxyCAState = new ProxyCAState(); public Map getApplicationState() { @@ -784,6 +807,11 @@ public AMRMTokenSecretManagerState getAMRMTokenSecretManagerState() { return reservationState; } + public Map + getDecommissioningNodesState() { + return decommissioningNodesState; + } + public ProxyCAState getProxyCAState() { return proxyCAState; } @@ -1175,7 +1203,7 @@ public void removeApplication(RMApp app) { /** * Blocking API - * Derived classes must implement this method to remove the state of an + * Derived classes must implement this method to remove the state of an * application and its attempts */ protected abstract void removeApplicationStateInternal( @@ -1355,6 +1383,133 @@ protected EventHandler getRMStateStoreEventHandler() { return dispatcher.getEventHandler(); } + private static class StoreDecommissioningNodeTransition implements + MultipleArcTransition { + @Override + public RMStateStoreState transition(RMStateStore store, + RMStateStoreEvent event) { + if (!(event instanceof RMStateStoreDecommissioningNodeEvent)) { + // should never happen + LOG.error("Illegal event type: " + event.getClass()); + return RMStateStoreState.ACTIVE; + } + boolean isFenced = false; + RMStateStoreDecommissioningNodeEvent dnEvent = + (RMStateStoreDecommissioningNodeEvent) event; + try { + LOG.info("Storing Decommissioning Node"); + store.storeDecommissioningNodeState(dnEvent.getNodeContext()); + } catch (Exception e) { + LOG.error("Error While Storing Decommissioning Node ", e); + isFenced = store.notifyStoreOperationFailedInternal(e); + } + return finalState(isFenced); + } + } + + private static class RemoveDecommissioningNodeTransition implements + MultipleArcTransition { + @Override + public RMStateStoreState transition(RMStateStore store, + RMStateStoreEvent event) { + if (!(event instanceof RMStateStoreDecommissioningNodeEvent)) { + // should never happen + LOG.error("Illegal event type: " + event.getClass()); + return RMStateStoreState.ACTIVE; + } + boolean isFenced = false; + RMStateStoreDecommissioningNodeEvent dnEvent = + (RMStateStoreDecommissioningNodeEvent) event; + try { + LOG.info("Removing Decommissioning Node"); + store.removeDecommissioningNodeState(dnEvent.getNodeContext()); + } catch (Exception e) { + LOG.error("Error While Removing Decommissioning Node ", e); + isFenced = store.notifyStoreOperationFailedInternal(e); + } + return finalState(isFenced); + } + } + + private static class UpdateDecommissioningNodeTransition implements + MultipleArcTransition { + @Override + public RMStateStoreState transition(RMStateStore store, + RMStateStoreEvent event) { + if (!(event instanceof RMStateStoreDecommissioningNodeEvent)) { + // should never happen + LOG.error("Illegal event type: " + event.getClass()); + return RMStateStoreState.ACTIVE; + } + boolean isFenced = false; + RMStateStoreDecommissioningNodeEvent dnEvent = + (RMStateStoreDecommissioningNodeEvent) event; + try { + LOG.info("Updating Decommissioning Node"); + store.updateDecommissioningNodeState(dnEvent.getNodeContext()); + } catch (Exception e) { + LOG.error("Error While Updating Decommissioning Node ", e); + isFenced = store.notifyStoreOperationFailedInternal(e); + } + return finalState(isFenced); + } + } + /** + * DecommissioningNodesWatcher call this to store the state of a + * Decommissioning Node + */ + public void storeDecommissioningNode(DecommissioningNodeContext nodeContext) { + handleStoreEvent(new RMStateStoreDecommissioningNodeEvent(nodeContext, + RMStateStoreEventType.STORE_DECOMMISSIONING_NODE)); + } + + /** + * Blocking API + * Derived classes must implement this method to store the state of a + * Decommissioning Node + */ + protected abstract void storeDecommissioningNodeState( + DecommissioningNodeContext nodeContext) + throws Exception; + + /** + * DecommissioningNodesWatcher call this to remove the state of a + * Decommissioning Node + */ + public void removeDecommissioningNode(DecommissioningNodeContext nodeContext) { + handleStoreEvent(new RMStateStoreDecommissioningNodeEvent(nodeContext, + RMStateStoreEventType.REMOVE_DECOMMISSIONING_NODE)); + } + + /** + * Blocking API + * Derived classes must implement this method to remove the state of a + * Decommissioning Node + */ + protected abstract void removeDecommissioningNodeState( + DecommissioningNodeContext nodeContext) throws Exception; + + /** + * DecommissioningNodesWatcher call this to update the state of a + * Decommissioning Node + */ + public void updateDecommissioningNode(DecommissioningNodeContext nodeContext) { + handleStoreEvent(new RMStateStoreDecommissioningNodeEvent(nodeContext, + RMStateStoreEventType.UPDATE_DECOMMISSIONING_NODE)); + } + + /** + * Blocking API + * Derived classes must implement this method to update the state of a + * Decommissioning Node + */ + protected abstract void updateDecommissioningNodeState( + DecommissioningNodeContext nodeContext) + throws Exception; + /** * ProxyCAManager calls this to store the CA Certificate and Private Key. */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreDecommissioningNodeEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreDecommissioningNodeEvent.java new file mode 100644 index 00000000000..59008bdf0f1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreDecommissioningNodeEvent.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + +import org.apache.hadoop.yarn.server.resourcemanager.decomissioning.DecommissioningNodeContext; + +public class RMStateStoreDecommissioningNodeEvent extends RMStateStoreEvent { + private DecommissioningNodeContext nodeContext; + + public RMStateStoreDecommissioningNodeEvent(RMStateStoreEventType type) { + super(type); + } + + public RMStateStoreDecommissioningNodeEvent( + DecommissioningNodeContext nodeContext, + RMStateStoreEventType type) { + this(type); + this.nodeContext = nodeContext; + } + + public DecommissioningNodeContext getNodeContext() { + return nodeContext; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java index 3d60fd29a4e..321fd05d4f9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java @@ -36,5 +36,8 @@ UPDATE_AMRM_TOKEN, STORE_RESERVATION, REMOVE_RESERVATION, + STORE_DECOMMISSIONING_NODE, + REMOVE_DECOMMISSIONING_NODE, + UPDATE_DECOMMISSIONING_NODE STORE_PROXY_CA_CERT, } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index bcdbcfd4cf2..8bada36513e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -45,10 +45,12 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.decomissioning.DecommissioningNodeContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMDecommissioningNodeData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMDelegationTokenIdentifierData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AMRMTokenSecretManagerStatePBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; @@ -155,6 +157,10 @@ * | .... * |------PLAN_2 * .... + * |--- DECOMMISSIONING_NODES_ROOT + * |----- (#NodeId1) + * |----- (#NodeId2) + * .... * |-- PROXY_CA_ROOT * |----- caCert * |----- caPrivateKey @@ -174,6 +180,8 @@ * splitting it in 2 parts, depending on a configurable split index. This limits * the number of delegation token znodes returned in a single call while loading * tokens state. + * + * Changes from 1.5 to 1.6, Addition of Decommissioning Nodes */ @Private @Unstable @@ -189,7 +197,7 @@ @VisibleForTesting public static final String ROOT_ZNODE_NAME = "ZKRMStateRoot"; protected static final Version CURRENT_VERSION_INFO = Version - .newInstance(1, 5); + .newInstance(1, 6); @VisibleForTesting public static final String RM_APP_ROOT_HIERARCHIES = "HIERARCHIES"; @@ -204,6 +212,7 @@ private String dtSequenceNumberPath; private String amrmTokenSecretManagerRoot; private String reservationRoot; + private String decomNodesRoot; private String proxyCARoot; @VisibleForTesting @@ -366,6 +375,7 @@ public synchronized void initInternal(Configuration conf) getNodePath(zkRootNodePath, AMRMTOKEN_SECRET_MANAGER_ROOT); proxyCARoot = getNodePath(zkRootNodePath, PROXY_CA_ROOT); reservationRoot = getNodePath(zkRootNodePath, RESERVATION_SYSTEM_ROOT); + decomNodesRoot = getNodePath(zkRootNodePath, DECOMMISSIONING_NODES_ROOT); zkManager = resourceManager.getZKManager(); if(zkManager==null) { zkManager = resourceManager.createAndStartZKManager(conf); @@ -410,6 +420,7 @@ public synchronized void startInternal() throws Exception { create(dtSequenceNumberPath); create(amrmTokenSecretManagerRoot); create(reservationRoot); + create(decomNodesRoot); create(proxyCARoot); } @@ -526,11 +537,40 @@ public synchronized RMState loadState() throws Exception { loadAMRMTokenSecretManagerState(rmState); // recover reservation state loadReservationSystemState(rmState); + // recover decommissioning nodes + loadDecommissioningNodesState(rmState); + // recover ProxyCAManager state loadProxyCAManagerState(rmState); return rmState; } + private synchronized void loadDecommissioningNodesState(RMState rmState) + throws Exception { + List childNodes = getChildren(decomNodesRoot); + + for (String childNodeName : childNodes) { + String childNodePath = getNodePath(decomNodesRoot, childNodeName); + byte[] childData = getData(childNodePath); + + if (childData == null) { + LOG.warn("Content of " + childNodePath + " is broken."); + continue; + } + + try (DataInputStream fsIn = + new DataInputStream(new ByteArrayInputStream(childData))) { + RMDecommissioningNodeData nodeData = new RMDecommissioningNodeData(); + nodeData.readFields(fsIn); + rmState.getDecommissioningNodesState().put(childNodeName, nodeData); + + if (LOG.isDebugEnabled()) { + LOG.debug("Loaded decommissioning node " + childNodeName); + } + } + } + } + private void loadReservationSystemState(RMState rmState) throws Exception { List planNodes = getChildren(reservationRoot); @@ -1194,6 +1234,62 @@ public synchronized void removeApplication(ApplicationId removeAppId) removeApp(removeAppId.toString()); } + @Override + protected void storeDecommissioningNodeState( + DecommissioningNodeContext nodeContext) throws Exception { + SafeTransaction trx = zkManager.createTransaction(zkAcl, + fencingNodePath); + String nodeCreatePath = + getNodePath(decomNodesRoot, nodeContext.getNodeId().toString()); + RMDecommissioningNodeData data = new RMDecommissioningNodeData(nodeContext); + trx.create(nodeCreatePath, data.toByteArray(), zkAcl, + CreateMode.PERSISTENT); + if (LOG.isDebugEnabled()) { + LOG.debug("Storing " + nodeContext.getNodeId().toString()); + } + trx.commit(); + } + + @Override + protected void removeDecommissioningNodeState( + DecommissioningNodeContext nodeContext) throws Exception { + String nodeRemovePath = + getNodePath(decomNodesRoot, nodeContext.getNodeId().toString()); + + if (LOG.isDebugEnabled()) { + LOG.debug("Removing " + nodeContext.getNodeId().toString()); + } + + zkManager.safeDelete(nodeRemovePath, zkAcl, fencingNodePath); + } + + @Override + protected void updateDecommissioningNodeState( + DecommissioningNodeContext nodeContext) throws Exception { + SafeTransaction trx = zkManager.createTransaction(zkAcl, + fencingNodePath); + String nodeUpdatePath = + getNodePath(decomNodesRoot, nodeContext.getNodeId().toString()); + + RMDecommissioningNodeData data = new RMDecommissioningNodeData(nodeContext); + if (exists(nodeUpdatePath)) { + // in case znode exists + if (LOG.isDebugEnabled()) { + LOG.debug("Updating " + nodeContext.getNodeId().toString()); + } + trx.setData(nodeUpdatePath, data.toByteArray(), -1); + } else { + // in case znode doesn't exist + trx.create(nodeUpdatePath, data.toByteArray(), zkAcl, + CreateMode.PERSISTENT); + if (LOG.isDebugEnabled()) { + LOG.debug("Storing " + nodeContext.getNodeId().toString()); + } + } + + trx.commit(); + } + @VisibleForTesting String getNodePath(String root, String nodeName) { return (root + "/" + nodeName); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/RMDecommissioningNodeData.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/RMDecommissioningNodeData.java new file mode 100644 index 00000000000..2927146a211 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/RMDecommissioningNodeData.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery.records; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.RMDecommissioningNodeDataProto; +import org.apache.hadoop.yarn.server.resourcemanager.decomissioning.DecommissioningNodeContext; + +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.IOException; + +public class RMDecommissioningNodeData { + RMDecommissioningNodeDataProto.Builder builder = + RMDecommissioningNodeDataProto.newBuilder(); + + public RMDecommissioningNodeData() {} + + public RMDecommissioningNodeData( + DecommissioningNodeContext nodeContext) { + builder.setDecommissioningStartTime( + nodeContext.getDecommissioningStartTime()); + builder.setLastUpdateTime(nodeContext.getLastUpdateTime()); + builder.setTimeoutMs(nodeContext.getTimeoutMs()); + } + + public void readFields(DataInput in) throws IOException { + builder.mergeFrom((DataInputStream) in); + } + + public void readFields(byte[] in) throws IOException { + builder.mergeFrom(in); + } + + public byte[] toByteArray() throws IOException { + return builder.build().toByteArray(); + } + + public DecommissioningNodeContext getDecommissioningNodeContext(NodeId nodeId) + throws IOException { + DecommissioningNodeContext nodeContext = + new DecommissioningNodeContext( + nodeId, + builder.getTimeoutMs(), + builder.getDecommissioningStartTime()); + nodeContext.setLastUpdateTime(builder.getLastUpdateTime()); + return nodeContext; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto index 35c77ab78b4..efc1191396d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto @@ -105,3 +105,9 @@ message RMDelegationTokenIdentifierDataProto { optional YARNDelegationTokenIdentifierProto token_identifier = 1; optional int64 renewDate = 2; } + +message RMDecommissioningNodeDataProto { + required int64 decommissioning_start_time = 1; + required int64 timeout_ms = 2; + required int64 last_Update_time = 3; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 408cb187619..f87c7ad8e3a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -1109,7 +1109,7 @@ protected ResourceTrackerService createResourceTrackerService() { nmTokenSecretManager.rollMasterKey(); return new ResourceTrackerService(getRMContext(), nodesListManager, this.nmLivelinessMonitor, containerTokenSecretManager, - nmTokenSecretManager) { + nmTokenSecretManager, decommissioningNodesWatcher) { @Override protected void serviceStart() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java index c17dee80d37..2b7ae972cc5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java @@ -573,7 +573,8 @@ protected ResourceTrackerService createResourceTrackerService() { return new ResourceTrackerService(this.rmContext, this.nodesListManager, this.nmLivelinessMonitor, this.rmContext.getContainerTokenSecretManager(), - this.rmContext.getNMTokenSecretManager()) { + this.rmContext.getNMTokenSecretManager(), + decommissioningNodesWatcher) { @Override protected void serviceStart() throws Exception { throw new Exception("ResourceTracker service failed"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 97883da8093..f101064f0db 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -2113,19 +2113,19 @@ public void testSynchronouslyRenewDTOnRecovery() throws Exception { MockRM rm2 = new MockRM(conf, rm1.getRMStateStore()) { @Override protected ResourceTrackerService createResourceTrackerService() { - return new ResourceTrackerService(this.rmContext, - this.nodesListManager, this.nmLivelinessMonitor, - this.rmContext.getContainerTokenSecretManager(), - this.rmContext.getNMTokenSecretManager()) { + return new ResourceTrackerService(this.rmContext, this.nodesListManager, + this.nmLivelinessMonitor, + this.rmContext.getContainerTokenSecretManager(), + this.rmContext.getNMTokenSecretManager(), + decommissioningNodesWatcher) { @Override protected void serviceStart() throws Exception { // send the container_finished event as soon as the // ResourceTrackerService is started. super.serviceStart(); nm1.setResourceTrackerService(getResourceTrackerService()); - NMContainerStatus status = - TestRMRestart.createNMContainerStatus( - am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + NMContainerStatus status = TestRMRestart.createNMContainerStatus( + am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE); nm1.registerNode(Arrays.asList(status), null); } }; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/decomissioning/MockApplicationId.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/decomissioning/MockApplicationId.java new file mode 100644 index 00000000000..00443cf1b26 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/decomissioning/MockApplicationId.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.decomissioning; + +import org.apache.hadoop.yarn.api.records.ApplicationId; + +public class MockApplicationId extends ApplicationId { + + private int id; + + public MockApplicationId(int id){ + this.id = id; + } + + @Override + public int getId() { + return id; + } + + @Override + protected void setId(int id) { + this.id = id; + } + + @Override + public long getClusterTimestamp() { + return 0; + } + + @Override + protected void setClusterTimestamp(long clusterTimestamp) { + + } + + @Override + protected void build() { + + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/decomissioning/MockNodeId.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/decomissioning/MockNodeId.java new file mode 100644 index 00000000000..e12645a2cd6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/decomissioning/MockNodeId.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.decomissioning; + +import org.apache.hadoop.yarn.api.records.NodeId; + +public class MockNodeId extends NodeId { + + private String host; + private int port; + + public MockNodeId(String host, int port){ + this.host = host; + this.port = port; + } + + @Override + public String getHost() { + return host; + } + + @Override + protected void setHost(String host) { + this.host = host; + } + + @Override + public int getPort() { + return port; + } + + @Override + protected void setPort(int port) { + this.port = port; + } + + @Override + protected void build() { + + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/decomissioning/TestDecommissioningNodeStatusProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/decomissioning/TestDecommissioningNodeStatusProvider.java new file mode 100644 index 00000000000..a3f2ff86e33 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/decomissioning/TestDecommissioningNodeStatusProvider.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.decomissioning; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import static org.mockito.Mockito.when; + +public class TestDecommissioningNodeStatusProvider { + + private DecommissioningNodeStatusProvider target; + private RMContext rmContext; + private NodeId nodeId; + private ApplicationId appId; + + private static final long TIMEOUT = 10L; + private static final long NO_TIMEOUT = -1L; + private static final long DECOM_START_TIME = 20; + private static final long AFTER_TIMEOUT = DECOM_START_TIME + TIMEOUT + 1; + private static final long BEFORE_TIMOUT = DECOM_START_TIME + TIMEOUT - 1; + + @Before + public void init() { + rmContext = Mockito.mock(RMContext.class); + target = new DecommissioningNodeStatusProvider(rmContext); + nodeId = new MockNodeId("host", 1234); + appId = new MockApplicationId(10); + } + + @Test + public void testNullContext() { + Assert.assertEquals(DecommissioningNodeStatus.NONE, + target.getDecommissioningStatus(null, 10L)); + } + + @Test + public void testRecover() { + DecommissioningNodeContext context = + new DecommissioningNodeContext(nodeId, TIMEOUT, DECOM_START_TIME); + context.setNodeState(NodeState.DECOMMISSIONING_RECOVERED); + Assert.assertEquals(DecommissioningNodeStatus.NONE, + target.getDecommissioningStatus(context, 10L)); + } + + @Test + public void testTimeoutWithContainers() { + DecommissioningNodeContext context = + new DecommissioningNodeContext(nodeId, TIMEOUT, DECOM_START_TIME); + context.setNumActiveContainers(1); + DecommissioningNodeStatus status = + target.getDecommissioningStatus(context, AFTER_TIMEOUT); + Assert.assertEquals(DecommissioningNodeStatus.TIMEOUT, status); + } + + @Test + public void testTimeoutWithoutContainers() { + DecommissioningNodeContext context = + new DecommissioningNodeContext(nodeId, TIMEOUT, DECOM_START_TIME); + + ConcurrentMap mockMap = new ConcurrentHashMap<>(); + mockMap.put(appId, new MockRMApp(10, 10L, RMAppState.RUNNING)); + when(rmContext.getRMApps()).thenReturn(mockMap); + + context.setNumActiveContainers(0); + context.getAppIds().add(appId); + + DecommissioningNodeStatus status = + target.getDecommissioningStatus(context, AFTER_TIMEOUT); + Assert.assertEquals(DecommissioningNodeStatus.TIMEOUT, status); + } + + @Test + public void testWaitForContainer() { + DecommissioningNodeContext context = + new DecommissioningNodeContext(nodeId, TIMEOUT, DECOM_START_TIME); + context.setNumActiveContainers(1); + DecommissioningNodeStatus status = + target.getDecommissioningStatus(context, BEFORE_TIMOUT); + Assert.assertEquals(DecommissioningNodeStatus.WAIT_CONTAINER, status); + } + + @Test + public void testWaitForContainerForever() { + DecommissioningNodeContext context = + new DecommissioningNodeContext(nodeId, NO_TIMEOUT, DECOM_START_TIME); + context.setNumActiveContainers(1); + DecommissioningNodeStatus status = + target.getDecommissioningStatus(context, AFTER_TIMEOUT); + Assert.assertEquals(DecommissioningNodeStatus.WAIT_CONTAINER, status); + } + + @Test + public void testReady() { + DecommissioningNodeContext context = + new DecommissioningNodeContext(nodeId, TIMEOUT, DECOM_START_TIME); + + ConcurrentMap mockMap = new ConcurrentHashMap<>(); + mockMap.put(appId, new MockRMApp(10, 10L, RMAppState.FINISHED)); + when(rmContext.getRMApps()).thenReturn(mockMap); + + context.setNumActiveContainers(0); + context.getAppIds().add(appId); + + DecommissioningNodeStatus status = + target.getDecommissioningStatus(context, BEFORE_TIMOUT); + Assert.assertEquals(DecommissioningNodeStatus.READY, status); + } + + @Test + public void testWaitForApp() { + DecommissioningNodeContext context = + new DecommissioningNodeContext(nodeId, TIMEOUT, DECOM_START_TIME); + + ConcurrentMap mockMap = new ConcurrentHashMap<>(); + mockMap.put(appId, new MockRMApp(10, 10L, RMAppState.RUNNING)); + when(rmContext.getRMApps()).thenReturn(mockMap); + + context.setNumActiveContainers(0); + context.getAppIds().add(appId); + + DecommissioningNodeStatus status = + target.getDecommissioningStatus(context, BEFORE_TIMOUT); + Assert.assertEquals(DecommissioningNodeStatus.WAIT_APP, status); + } + + @Test + public void testWaitForAppForever() { + DecommissioningNodeContext context = + new DecommissioningNodeContext(nodeId, NO_TIMEOUT, DECOM_START_TIME); + + ConcurrentMap mockMap = new ConcurrentHashMap<>(); + mockMap.put(appId, new MockRMApp(10, 10L, RMAppState.RUNNING)); + when(rmContext.getRMApps()).thenReturn(mockMap); + + context.setNumActiveContainers(0); + context.getAppIds().add(appId); + + DecommissioningNodeStatus status = + target.getDecommissioningStatus(context, AFTER_TIMEOUT); + Assert.assertEquals(DecommissioningNodeStatus.WAIT_APP, status); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/decomissioning/TestDecommissioningNodesCleanup.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/decomissioning/TestDecommissioningNodesCleanup.java new file mode 100644 index 00000000000..caa46c0ba88 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/decomissioning/TestDecommissioningNodesCleanup.java @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.decomissioning; + +import static org.mockito.Mockito.*; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +public class TestDecommissioningNodesCleanup { + + private static final long CURRENT_TIME_IN_MS = 100000; + + private DecommissioningNodesCleanup target; + + private RMContext rmContext; + private DecommissioningNodeStatusProvider decommissioningNodeStatusProvider; + private RMStateStore rmStateStore; + private EventHandler eventHandler; + + @Before + public void init() { + this.rmContext = mock(RMContext.class); + this.decommissioningNodeStatusProvider = + mock(DecommissioningNodeStatusProvider.class); + this.rmStateStore = mock(RMStateStore.class); + this.eventHandler = mock(EventHandler.class); + + target = new DecommissioningNodesCleanup(rmContext, + decommissioningNodeStatusProvider); + + Dispatcher dispatcher = mock(Dispatcher.class); + when(rmContext.getStateStore()).thenReturn(rmStateStore); + when(rmContext.getDispatcher()).thenReturn(dispatcher); + when(dispatcher.getEventHandler()).thenReturn(eventHandler); + } + + @Test + public void testSkipByLastUpdate() { + long tooYoungToUpdateTime = CURRENT_TIME_IN_MS + - DecommissioningNodesCleanup.MINIMUM_UPDATE_DALEY_FOR_CLEANUP_MS + 1; + Map decomNodes = new HashMap<>(); + MockNodeId nodeId = new MockNodeId("host1", 1234); + DecommissioningNodeContext context = + new DecommissioningNodeContext(nodeId, 10, 10); + context.setLastUpdateTime(tooYoungToUpdateTime); + context.setNodeState(NodeState.RUNNING); + decomNodes.put(nodeId, context); + + target.cleanUp(decomNodes, CURRENT_TIME_IN_MS); + Assert.assertEquals(1, decomNodes.size()); + + verifyZeroInteractions(rmContext); + verifyZeroInteractions(decommissioningNodeStatusProvider); + } + + @Test + public void testRemoveNonDecommissioningNodes() { + Map decomNodes = new HashMap<>(); + MockNodeId nodeId = new MockNodeId("host1", 1234); + + addNonDecommissioningContextAndCheckRemoval(decomNodes, nodeId, + NodeState.DECOMMISSIONED); + addNonDecommissioningContextAndCheckRemoval(decomNodes, nodeId, + NodeState.RUNNING); + addNonDecommissioningContextAndCheckRemoval(decomNodes, nodeId, + NodeState.UNHEALTHY); + addNonDecommissioningContextAndCheckRemoval(decomNodes, nodeId, + NodeState.LOST); + addNonDecommissioningContextAndCheckRemoval(decomNodes, nodeId, + NodeState.SHUTDOWN); + addNonDecommissioningContextAndCheckRemoval(decomNodes, nodeId, + NodeState.REBOOTED); + addNonDecommissioningContextAndCheckRemoval(decomNodes, nodeId, + NodeState.NEW); + } + + @Test + public void testOldDecommissionedNodesRemoval() { + Map decomNodes = new HashMap<>(); + MockNodeId nodeId = new MockNodeId("host1", 1234); + long oldEnoughUpdateTime = CURRENT_TIME_IN_MS + - DecommissioningNodesCleanup.UPDATE_DALEY_FOR_DECOMMISSIONED_NODES_TO_REMOVE_MS + - 1; + + DecommissioningNodeContext context = + new DecommissioningNodeContext(nodeId, 10, 10); + context.setLastUpdateTime(oldEnoughUpdateTime); + context.setNodeState(NodeState.DECOMMISSIONING); + + RMNode rmNode = mock(RMNode.class); + when(rmNode.getState()).thenReturn(NodeState.DECOMMISSIONED); + + ConcurrentMap nodeMap = new ConcurrentHashMap<>(); + nodeMap.put(nodeId, rmNode); + + when(rmContext.getRMNodes()).thenReturn(nodeMap); + + decomNodes.put(nodeId, context); + + target.cleanUp(decomNodes, CURRENT_TIME_IN_MS); + Assert.assertEquals(0, decomNodes.size()); + verify(rmStateStore).removeDecommissioningNode(context); + } + + @Test + public void testNonDecommissioningNodesAfterTimeoutRemoval() { + Map decomNodes = new HashMap<>(); + MockNodeId nodeId = new MockNodeId("host1", 1234); + long oldEnoughUpdateTime = CURRENT_TIME_IN_MS + - DecommissioningNodesCleanup.UPDATE_DALEY_FOR_DECOMMISSIONED_NODES_TO_REMOVE_MS + - 1; + + DecommissioningNodeContext context = + new DecommissioningNodeContext(nodeId, 10, 10); + context.setLastUpdateTime(oldEnoughUpdateTime); + context.setNodeState(NodeState.DECOMMISSIONING); + + decomNodes.put(nodeId, context); + + RMNode rmNode = mock(RMNode.class); + when(rmNode.getState()).thenReturn(NodeState.RUNNING); + + ConcurrentMap nodeMap = new ConcurrentHashMap<>(); + nodeMap.put(nodeId, rmNode); + + when(rmContext.getRMNodes()).thenReturn(nodeMap); + + target.cleanUp(decomNodes, CURRENT_TIME_IN_MS); + + Assert.assertEquals(0, decomNodes.size()); + verify(rmStateStore).removeDecommissioningNode(context); + } + + @Test + public void testDecommissionEventOnTimeout() { + Map decomNodes = new HashMap<>(); + MockNodeId nodeId = new MockNodeId("host1", 1234); + long oldEnoughUpdateTime = CURRENT_TIME_IN_MS + - DecommissioningNodesCleanup.UPDATE_DALEY_FOR_DECOMMISSIONED_NODES_TO_REMOVE_MS + - 1; + + DecommissioningNodeContext context = + new DecommissioningNodeContext(nodeId, 10, 10); + context.setLastUpdateTime(oldEnoughUpdateTime); + context.setNodeState(NodeState.DECOMMISSIONING); + + decomNodes.put(nodeId, context); + + RMNode rmNode = mock(RMNode.class); + when(rmNode.getState()).thenReturn(NodeState.DECOMMISSIONING); + + ConcurrentMap nodeMap = new ConcurrentHashMap<>(); + nodeMap.put(nodeId, rmNode); + + when(rmContext.getRMNodes()).thenReturn(nodeMap); + when(decommissioningNodeStatusProvider.getDecommissioningStatus(context, + CURRENT_TIME_IN_MS)).thenReturn(DecommissioningNodeStatus.READY); + + target.cleanUp(decomNodes, CURRENT_TIME_IN_MS); + + verify(rmStateStore, times(0)).removeDecommissioningNode(context); + ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(RMNodeEvent.class); + verify(eventHandler).handle(eventCaptor.capture()); + RMNodeEvent event = eventCaptor.getValue(); + Assert.assertEquals(nodeId, event.getNodeId()); + Assert.assertEquals(RMNodeEventType.DECOMMISSION, event.getType()); + } + + private void addNonDecommissioningContextAndCheckRemoval( + Map decomNodes, NodeId nodeId, + NodeState nodeState) { + + long oldEnoughUpdateTime = CURRENT_TIME_IN_MS + - DecommissioningNodesCleanup.MINIMUM_UPDATE_DALEY_FOR_CLEANUP_MS - 1; + + DecommissioningNodeContext context = + new DecommissioningNodeContext(nodeId, 10, 10); + context.setLastUpdateTime(oldEnoughUpdateTime); + + decomNodes.put(nodeId, context); + context.setNodeState(nodeState); + target.cleanUp(decomNodes, CURRENT_TIME_IN_MS); + Assert.assertEquals(0, decomNodes.size()); + verify(rmStateStore).removeDecommissioningNode(context); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/decomissioning/TestDecommissioningNodesWatcher.java similarity index 70% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/decomissioning/TestDecommissioningNodesWatcher.java index 43711560855..2ba75156552 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/decomissioning/TestDecommissioningNodesWatcher.java @@ -16,10 +16,12 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.server.resourcemanager; +package org.apache.hadoop.yarn.server.resourcemanager.decomissioning; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -29,10 +31,16 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; -import org.apache.hadoop.yarn.server.resourcemanager.DecommissioningNodesWatcher.DecommissioningNodeStatus; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMDecommissioningNodeData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -40,6 +48,9 @@ import org.junit.Assert; import org.junit.Test; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + /** * This class tests DecommissioningNodesWatcher. */ @@ -62,7 +73,7 @@ public void testDecommissioningNodesWatcher() throws Exception { NodeId id1 = nm1.getNodeId(); rm.waitForState(id1, NodeState.RUNNING); - Assert.assertFalse(watcher.checkReadyToBeDecommissioned(id1)); + Assert.assertFalse(watcher.isNodeReadyToBeDecommissioned(id1)); RMApp app = rm.submitApp(2000); MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1); @@ -75,21 +86,47 @@ public void testDecommissioningNodesWatcher() throws Exception { // Update status with decreasing number of running containers until 0. watcher.update(node1, createNodeStatus(id1, app, 12)); watcher.update(node1, createNodeStatus(id1, app, 11)); - Assert.assertFalse(watcher.checkReadyToBeDecommissioned(id1)); + Assert.assertFalse(watcher.isNodeReadyToBeDecommissioned(id1)); watcher.update(node1, createNodeStatus(id1, app, 1)); Assert.assertEquals(DecommissioningNodeStatus.WAIT_CONTAINER, - watcher.checkDecommissioningStatus(id1)); + watcher.getDecommissioningStatus(id1)); watcher.update(node1, createNodeStatus(id1, app, 0)); Assert.assertEquals(DecommissioningNodeStatus.WAIT_APP, - watcher.checkDecommissioningStatus(id1)); + watcher.getDecommissioningStatus(id1)); // Set app to be FINISHED and verified DecommissioningNodeStatus is READY. MockRM.finishAMAndVerifyAppState(app, rm, nm1, am); rm.waitForState(app.getApplicationId(), RMAppState.FINISHED); Assert.assertEquals(DecommissioningNodeStatus.READY, - watcher.checkDecommissioningStatus(id1)); + watcher.getDecommissioningStatus(id1)); + } + + @Test + public void testDecommissioningNodesRecover() throws Exception { + RMContext rmContext = mock(RMContext.class); + + NodeId nodeId= NodeId.newInstance("host", 1234); + + DecommissioningNodesWatcher target = + new DecommissioningNodesWatcher(rmContext); + RMStateStore.RMState rmState = new RMStateStore.RMState(); + + DecommissioningNodeContext context = new DecommissioningNodeContext(nodeId, 10, 10); + context.setNodeState(NodeState.DECOMMISSIONING); + RMDecommissioningNodeData data = new RMDecommissioningNodeData(context); + + ConcurrentMap nodesMap = new ConcurrentHashMap<>(); + nodesMap.put(nodeId, mock(RMNode.class)); + when(rmContext.getRMNodes()).thenReturn(nodesMap); + + + rmState.getDecommissioningNodesState().put(nodeId.toString(), data); + + target.recover(rmState); + + Assert.assertTrue(target.isNodeRecovered(nodeId)); } @After diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index cd44dda844d..b759c0224a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; @@ -64,11 +65,13 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.decomissioning.DecommissioningNodeContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMDTSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMDecommissioningNodeData; import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil; @@ -569,7 +572,7 @@ public void testCheckVersion(RMStateStoreHelper stateStoreHelper) Assert.assertTrue(t instanceof RMStateVersionIncompatibleException); } } - + public void testEpoch(RMStateStoreHelper stateStoreHelper) throws Exception { RMStateStore store = stateStoreHelper.getRMStateStore(); @@ -983,4 +986,83 @@ void assertAllocationStateEqual( ReservationSystemUtil.toAllocations( actual.getAllocationRequestsList())); } + + public void testDecommissioningNodes(RMStateStoreHelper stateStoreHelper) + throws Exception { + RMStateStore store = stateStoreHelper.getRMStateStore(); + + NodeId nodeId1 = mock(NodeId.class); + when(nodeId1.toString()).thenReturn("host1:1234"); + NodeId nodeId2 = mock(NodeId.class); + when(nodeId2.toString()).thenReturn("host2:1234"); + + // 1. Load empty store + RMState state = store.loadState(); + Map decomNodesState = + state.getDecommissioningNodesState(); + Assert.assertEquals(0, decomNodesState.size()); + + // 2. Store single node + DecommissioningNodeContext nodeContext1 = + new DecommissioningNodeContext( + nodeId1, + System.currentTimeMillis() + 1000*60*10, + System.currentTimeMillis() - 1000*60*20); + store.storeDecommissioningNode(nodeContext1); + state = store.loadState(); + Assert.assertEquals(1, state.getDecommissioningNodesState().size()); + RMDecommissioningNodeData readNodeData1 = + state.getDecommissioningNodesState().get(nodeId1.toString()); + Assert.assertNotNull(readNodeData1); + DecommissioningNodeContext readNodeContext1 = + readNodeData1.getDecommissioningNodeContext(nodeId1); + assertDecommissioningNodeContextEquals(nodeContext1, readNodeContext1); + + // 3. Update node + nodeContext1.setLastUpdateTime(System.currentTimeMillis()); + store.updateDecommissioningNode(nodeContext1); + state = store.loadState(); + Assert.assertEquals(1, state.getDecommissioningNodesState().size()); + readNodeData1 = + state.getDecommissioningNodesState().get(nodeId1.toString()); + Assert.assertNotNull(readNodeData1); + readNodeContext1 = readNodeData1.getDecommissioningNodeContext(nodeId1); + assertDecommissioningNodeContextEquals(nodeContext1, readNodeContext1); + + // 4. add a second one and remove the first one + DecommissioningNodeContext nodeContext2 = + new DecommissioningNodeContext(nodeId2, 0l, 0l); + store.storeDecommissioningNode(nodeContext2); + state = store.loadState(); + Assert.assertEquals(2, state.getDecommissioningNodesState().size()); + readNodeData1 = + state.getDecommissioningNodesState().get(nodeId1.toString()); + Assert.assertNotNull(readNodeData1); + readNodeContext1 = readNodeData1.getDecommissioningNodeContext(nodeId1); + assertDecommissioningNodeContextEquals(nodeContext1, readNodeContext1); + RMDecommissioningNodeData readNodeData2 = + state.getDecommissioningNodesState().get(nodeId2.toString()); + Assert.assertNotNull(readNodeData2); + DecommissioningNodeContext readNodeContext2 = + readNodeData2.getDecommissioningNodeContext(nodeId2); + assertDecommissioningNodeContextEquals(nodeContext2, readNodeContext2); + + store.removeDecommissioningNode(nodeContext1); + state = store.loadState(); + Assert.assertEquals(1, state.getDecommissioningNodesState().size()); + readNodeData2 = + state.getDecommissioningNodesState().get(nodeId2.toString()); + Assert.assertNotNull(readNodeData2); + readNodeContext2 = readNodeData2.getDecommissioningNodeContext(nodeId2); + assertDecommissioningNodeContextEquals(nodeContext2, readNodeContext2); + } + + void assertDecommissioningNodeContextEquals( + DecommissioningNodeContext expected, + DecommissioningNodeContext actual) { + Assert.assertEquals(expected.getDecommissioningStartTime(), actual.getDecommissioningStartTime()); + Assert.assertEquals(expected.getLastUpdateTime(), actual.getLastUpdateTime()); + Assert.assertEquals(expected.getNodeId(), actual.getNodeId()); + Assert.assertEquals(expected.getTimeoutMs(), actual.getTimeoutMs()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java index 764424de797..a5c2f7d8d66 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java @@ -205,6 +205,7 @@ public void testFSRMStateStore() throws Exception { testRemoveAttempt(fsTester); testAMRMTokenSecretManagerStateStore(fsTester); testReservationStateStore(fsTester); + testDecommissioningNodes(fsTester); testProxyCA(fsTester); } finally { cluster.shutdown(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java index 7a4ead48fc8..af2718e8c1b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java @@ -124,6 +124,12 @@ public void testReservation() throws Exception { testReservationStateStore(tester); } + @Test(timeout = 60000) + public void testDecomissioningNodes() throws Exception { + LeveldbStateStoreTester tester = new LeveldbStateStoreTester(); + testDecommissioningNodes(tester); + } + @Test(timeout = 60000) public void testProxyCA() throws Exception { LeveldbStateStoreTester tester = new LeveldbStateStoreTester(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java index ce9e68d9c5d..ca72021a639 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java @@ -287,6 +287,7 @@ public void testZKRMStateStoreRealZK() throws Exception { testRemoveAttempt(zkTester); testAMRMTokenSecretManagerStateStore(zkTester); testReservationStateStore(zkTester); + testDecommissioningNodes(zkTester); ((TestZKRMStateStoreTester.TestZKRMStateStoreInternal) zkTester.getRMStateStore()).testRetryingCreateRootDir(); testProxyCA(zkTester); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java index c837450f021..1611ee465bd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker; +import org.apache.hadoop.yarn.server.resourcemanager.decomissioning.DecommissioningNodesWatcher; import org.junit.Assert; import org.apache.commons.logging.Log; @@ -91,7 +92,7 @@ public void setUp() { nmTokenSecretManager.start(); resourceTrackerService = new ResourceTrackerService(context, nodesListManager, nmLivelinessMonitor, containerTokenSecretManager, - nmTokenSecretManager); + nmTokenSecretManager, new DecommissioningNodesWatcher(context)); resourceTrackerService.init(conf); resourceTrackerService.start(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java index 3c4e6b424de..36ad40551cc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.ParameterizedSchedulerTestBase; import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.decomissioning.DecommissioningNodesWatcher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; @@ -116,7 +117,7 @@ public void setUp() { nmTokenSecretManager.start(); resourceTrackerService = new ResourceTrackerService(context, nodesListManager, nmLivelinessMonitor, containerTokenSecretManager, - nmTokenSecretManager); + nmTokenSecretManager, new DecommissioningNodesWatcher(context)); resourceTrackerService.init(conf); resourceTrackerService.start(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java index 4f9469548ae..5a434185c28 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; +import org.apache.hadoop.yarn.server.resourcemanager.decomissioning.DecommissioningNodesWatcher; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -82,7 +83,8 @@ public void handle(Event event) { resourceTrackerService = new ResourceTrackerService(context, nodesListManager, new NMLivelinessMonitor(dispatcher), context.getContainerTokenSecretManager(), - context.getNMTokenSecretManager()); + context.getNMTokenSecretManager(), + new DecommissioningNodesWatcher(context)); resourceTrackerService.init(conf); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java index ba409b1386b..90382f22343 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java @@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; +import org.apache.hadoop.yarn.server.resourcemanager.decomissioning.DecommissioningNodesWatcher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; @@ -878,7 +879,8 @@ private ResourceTrackerService getPrivateResourceTrackerService( ResourceTrackerService privateResourceTrackerService = new ResourceTrackerService(privateContext, nodesListManager, nmLivelinessMonitor, containerTokenSecretManager, - nmTokenSecretManager); + nmTokenSecretManager, + new DecommissioningNodesWatcher(privateContext)); privateResourceTrackerService.init(conf); privateResourceTrackerService.start(); rm.getResourceScheduler().setRMContext(privateContext); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnCommands.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnCommands.md index e17538ccdfc..928ca929407 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnCommands.md +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnCommands.md @@ -228,7 +228,7 @@ Usage: | COMMAND\_OPTIONS | Description | |:---- |:---- | | -refreshQueues | Reload the queues' acls, states and scheduler specific properties. ResourceManager will reload the mapred-queues configuration file. | -| -refreshNodes [-g|graceful [timeout in seconds] -client|server] | Refresh the hosts information at the ResourceManager. Here [-g|graceful [timeout in seconds] -client|server] is optional, if we specify the timeout then ResourceManager will wait for timeout before marking the NodeManager as decommissioned. The -client|server indicates if the timeout tracking should be handled by the client or the ResourceManager. The client-side tracking is blocking, while the server-side tracking is not. Omitting the timeout, or a timeout of -1, indicates an infinite timeout. Known Issue: the server-side tracking will immediately decommission if an RM HA failover occurs. | +| -refreshNodes [-g|graceful [timeout in seconds] -client|server] | Refresh the hosts information at the ResourceManager. Here [-g|graceful [timeout in seconds] -client|server] is optional, if we specify the timeout then ResourceManager will wait for timeout before marking the NodeManager as decommissioned. The -client|server indicates if the timeout tracking should be handled by the client or the ResourceManager. The client-side tracking is blocking, while the server-side tracking is not. Omitting the timeout, or a timeout of -1, indicates an infinite timeout. | | -refreshNodesResources | Refresh resources of NodeManagers at the ResourceManager. | | -refreshSuperUserGroupsConfiguration | Refresh superuser proxy groups mappings. | | -refreshUserToGroupsMappings | Refresh user-to-groups mappings. |