diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 001d02ed577..bc53d2b9687 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -298,6 +298,15 @@ private static void addDeprecatedKeys() { public static final String DEFAULT_RM_WEBAPP_HTTPS_ADDRESS = "0.0.0.0:" + DEFAULT_RM_WEBAPP_HTTPS_PORT; + public static final String RM_THRESHOLD_PERCENTAGE_STRESSED_NODES = + RM_PREFIX + "threshold.percentage.stressed.nodes"; + public static final double DEFAULT_THRESHOLD_PERCENTAGE_STRESSED_NODES = 10.0; + + public static double thresholdPercentageOfStressedNodes(Configuration conf) { + return conf.getDouble(YarnConfiguration.RM_THRESHOLD_PERCENTAGE_STRESSED_NODES, + DEFAULT_THRESHOLD_PERCENTAGE_STRESSED_NODES); + } + /** * Enable YARN WebApp V2. */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeHealthStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeHealthStatus.java index b21b88071f5..6b29d8a41a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeHealthStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeHealthStatus.java @@ -75,6 +75,10 @@ public static NodeHealthStatus newInstance(boolean isNodeHealthy, @Stable public abstract String getHealthReport(); + @Public + @Unstable + public abstract boolean isNodeStressed(); + @Private @Unstable public abstract void setHealthReport(String healthReport); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeHealthStatusPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeHealthStatusPBImpl.java index 75aa3d1cfd0..19b83852a1b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeHealthStatusPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeHealthStatusPBImpl.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.api.records.impl.pb; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProtoOrBuilder; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; @@ -30,6 +31,8 @@ private boolean viaProto = false; private NodeHealthStatusProto proto = NodeHealthStatusProto .getDefaultInstance(); + /** Node stressed message*/ + private static final String NODE_STRESSED_MSG = "NODE_STRESSED"; public NodeHealthStatusPBImpl() { this.builder = NodeHealthStatusProto.newBuilder(); @@ -105,6 +108,16 @@ public String getHealthReport() { return (p.getHealthReport()); } + @Override + public boolean isNodeStressed() { + String healthReport = getHealthReport(); + if (healthReport == null) { + return false; + } + + return (healthReport.contains(NODE_STRESSED_MSG)); + } + @Override public void setHealthReport(String healthReport) { maybeInitBuilder(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java index 9dc59451499..0fa7ef86060 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java @@ -73,6 +73,9 @@ private final ConcurrentMap inactiveNodes = new ConcurrentHashMap(); + private final ConcurrentMap stressedNodes = + new ConcurrentHashMap<>(); + private final ConcurrentMap systemCredentials = new ConcurrentHashMap(); @@ -193,6 +196,12 @@ public RMStateStore getStateStore() { return this.inactiveNodes; } + @Private + @Unstable + public ConcurrentMap getStressedRMNodes() { + return this.stressedNodes; + } + @Private @Unstable public ContainerAllocationExpirer getContainerAllocationExpirer() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index b255a304eae..82b46c80981 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -70,6 +70,10 @@ ConcurrentMap getInactiveRMNodes(); + ConcurrentMap getStressedRMNodes(); + + boolean canAddStressedNodes(); + ConcurrentMap getRMNodes(); AMLivelinessMonitor getAMLivelinessMonitor(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index d7c624d4863..c7156d7c1c1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.ConfigurationProvider; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; @@ -277,6 +278,23 @@ public void setYarnConfiguration(Configuration yarnConfiguration) { serviceContext.setYarnConfiguration(yarnConfiguration); } + @Override + public boolean canAddStressedNodes() { + double thresholdPercentage = YarnConfiguration.thresholdPercentageOfStressedNodes(getYarnConfiguration()); + + // Current number of stressed nodes + int stressedNodes = getStressedRMNodes().size(); + // Total nodes + int totalNodes = getRMNodes().size(); + + // Percentage of stressed nodes if one more node is added to stressed map + double stressedNodesPercentage = ((stressedNodes + 1) * 100.0 ) / totalNodes; + + // Return false, if violation of threshold occurs + // Return true, if no violation of threshold occurs + return stressedNodesPercentage <= thresholdPercentage; + } + public String getHAZookeeperConnectionState() { return serviceContext.getHAZookeeperConnectionState(); } @@ -319,6 +337,11 @@ public RMStateStore getStateStore() { return activeServiceContext.getInactiveRMNodes(); } + @Override + public ConcurrentMap getStressedRMNodes() { + return activeServiceContext.getStressedRMNodes(); + } + @Override public ContainerAllocationExpirer getContainerAllocationExpirer() { return activeServiceContext.getContainerAllocationExpirer(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 7ad47fb49df..4d20ddfd945 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -1,20 +1,20 @@ /** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.yarn.server.resourcemanager.rmnode; @@ -35,6 +35,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; +import com.google.common.base.CharMatcher; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -105,6 +106,9 @@ private static final RecordFactory recordFactory = RecordFactoryProvider .getRecordFactory(null); + /** Node stressed message*/ + private static final String NODE_STRESSED_MSG = "NODE_STRESSED"; + private final ReadLock readLock; private final WriteLock writeLock; @@ -143,7 +147,7 @@ private final ContainerAllocationExpirer containerAllocationExpirer; /* set of containers that have just launched */ private final Set launchedContainers = - new HashSet(); + new HashSet(); /* track completed container globally */ private final Set completedContainers = @@ -171,7 +175,7 @@ /* the list of applications that are running on this node */ private final List runningApplications = new ArrayList(); - + private final Map toBeUpdatedContainers = new HashMap<>(); @@ -188,7 +192,7 @@ private static final StateMachineFactory stateMachineFactory + RMNodeEvent> stateMachineFactory = new StateMachineFactory stateMachine; + RMNodeEvent> stateMachine; public RMNodeImpl(NodeId nodeId, RMContext context, String hostName, int cmPort, int httpPort, Node node, Resource capability, @@ -375,7 +379,7 @@ public RMNodeImpl(NodeId nodeId, RMContext context, String hostName, this.hostName = hostName; this.commandPort = cmPort; this.httpPort = httpPort; - this.totalCapability = capability; + this.totalCapability = capability; this.nodeAddress = hostName + ":" + cmPort; this.httpAddress = hostName + ":" + httpPort; this.node = node; @@ -447,12 +451,12 @@ public Resource getTotalCapability() { public String getRackName() { return node.getNetworkLocation(); } - + @Override public Node getNode() { return this.node; } - + @Override public String getHealthReport() { this.readLock.lock(); @@ -463,7 +467,7 @@ public String getHealthReport() { this.readLock.unlock(); } } - + public void setHealthReport(String healthReport) { this.writeLock.lock(); @@ -473,7 +477,7 @@ public void setHealthReport(String healthReport) { this.writeLock.unlock(); } } - + public void setLastHealthReportTime(long lastHealthReportTime) { this.writeLock.lock(); @@ -483,7 +487,7 @@ public void setLastHealthReportTime(long lastHealthReportTime) { this.writeLock.unlock(); } } - + @Override public long getLastHealthReportTime() { this.readLock.lock(); @@ -574,7 +578,7 @@ public NodeState getState() { } } - + @Override public List getRunningApps() { this.readLock.lock(); @@ -661,18 +665,18 @@ public void handle(RMNodeEvent event) { writeLock.lock(); NodeState oldState = getState(); try { - stateMachine.doTransition(event.getType(), event); + stateMachine.doTransition(event.getType(), event); } catch (InvalidStateTransitionException e) { LOG.error("Can't handle this event at current state", e); - LOG.error("Invalid event " + event.getType() + + LOG.error("Invalid event " + event.getType() + " on Node " + this.nodeId + " oldState " + oldState); } if (oldState != getState()) { LOG.info(nodeId + " Node Transitioned from " + oldState + " to " - + getState()); + + getState()); } } - + finally { writeLock.unlock(); } @@ -800,19 +804,53 @@ private static void handleRunningAppOnNode(RMNodeImpl rmNode, context.getDispatcher().getEventHandler() .handle(new RMAppRunningOnNodeEvent(appId, nodeId)); } - - private static void updateNodeResourceFromEvent(RMNodeImpl rmNode, - RMNodeResourceUpdateEvent event){ - ResourceOption resourceOption = event.getResourceOption(); - // Set resource on RMNode - rmNode.totalCapability = resourceOption.getResource(); + + private static void updateNodeResourceFromEvent(RMNodeImpl rmNode, + RMNodeResourceUpdateEvent event){ + ResourceOption resourceOption = event.getResourceOption(); + // Set resource on RMNode + rmNode.totalCapability = resourceOption.getResource(); } private static NodeHealthStatus updateRMNodeFromStatusEvents( RMNodeImpl rmNode, RMNodeStatusEvent statusEvent) { // Switch the last heartbeatresponse. + NodeHealthStatus remoteNodeHealthStatus = statusEvent.getNodeHealthStatus(); - rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport()); + String remoteNodeHealthReport = remoteNodeHealthStatus.getHealthReport(); + + // Process the stressed node signal + // Remote node is not reporting stress + if (!remoteNodeHealthStatus.isNodeStressed()) { + // Local node is already stressed + if (rmNode.context.getStressedRMNodes().containsKey(rmNode.nodeId)) { + // Remove the node from the stressed map + rmNode.context.getStressedRMNodes().remove(rmNode.nodeId); + // Copy the remote health report as it is + rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport()); + } + } else { + // Remote node is reporting stress + // Local node is also stressed + if (rmNode.context.getStressedRMNodes().containsKey(rmNode.nodeId)) { + // Copy the remote health report as it is + rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport()); + } else { + // Local node state is not stressed + // Check if a node could be added to stressed map + if (rmNode.context.canAddStressedNodes()) { + // Put the stressed node in the stressed map + rmNode.context.getStressedRMNodes().put(rmNode.nodeId, rmNode); + // Copy the remote health report as it is + rmNode.setHealthReport(remoteNodeHealthReport); + } else { + // If not, remove stress from healthReport and copy the remaining healthReport as it is + String healthReport = removeStressFromHealthReport(remoteNodeHealthReport); + rmNode.setHealthReport(healthReport); + } + } + } + rmNode.setLastHealthReportTime(remoteNodeHealthStatus .getLastHealthReportTime()); rmNode.setAggregatedContainersUtilization(statusEvent @@ -821,6 +859,39 @@ private static NodeHealthStatus updateRMNodeFromStatusEvents( return remoteNodeHealthStatus; } + // This function removes the stressed msg from node health report + // This is done when number of stressed nodes are more than threshold + private static String removeStressFromHealthReport(String healthReport) { + // HealthReport is ";" delimited list of health report + String[] healthReports = healthReport.split(";"); + String localNodeHealthReport = ""; + + for (String report : healthReports) { + if (!report.equals(NODE_STRESSED_MSG)) { + localNodeHealthReport += report; + localNodeHealthReport += ";"; + } + } + + localNodeHealthReport = CharMatcher.is(';').trimTrailingFrom(localNodeHealthReport); + + return localNodeHealthReport; + } + + // This function cleans stressed signals from node + private static void cleanStressedSignalsFromNodeIfPresent(RMNodeImpl rmNode) { + if (rmNode != null && + rmNode.context.getStressedRMNodes().containsKey(rmNode.nodeId)) { + // Remove node from stressed map + rmNode.context.getStressedRMNodes().remove(rmNode.nodeId); + + // Also remove stressed report + String healthReport = rmNode.getHealthReport(); + String removeStressedReport = removeStressFromHealthReport(healthReport); + rmNode.setHealthReport(removeStressedReport); + } + } + public static class AddNodeTransition implements SingleArcTransition { @@ -878,9 +949,9 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { RMNode newNode = reconnectEvent.getReconnectedNode(); rmNode.nodeManagerVersion = newNode.getNodeManagerVersion(); List runningApps = reconnectEvent.getRunningApplications(); - boolean noRunningApps = + boolean noRunningApps = (runningApps == null) || (runningApps.size() == 0); - + // No application running on the node, so send node-removal event with // cleaning up old container info. if (noRunningApps) { @@ -915,7 +986,7 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { rmNode.totalCapability = newNode.getTotalCapability(); isCapabilityChanged = true; } - + handleNMContainerStatus(reconnectEvent.getNMContainerStatuses(), rmNode); for (ApplicationId appId : reconnectEvent.getRunningApplications()) { @@ -959,7 +1030,7 @@ private ContainerStatus createContainerStatus( return cStatus; } } - + public static class UpdateNodeResourceWhenRunningTransition implements SingleArcTransition { @@ -972,7 +1043,7 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { new NodeResourceUpdateSchedulerEvent(rmNode, updateEvent.getResourceOption())); } } - + public static class UpdateNodeResourceWhenUnusableTransition implements SingleArcTransition { @@ -986,9 +1057,9 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { // and can sync later from RMnode. } } - + public static class CleanUpAppTransition - implements SingleArcTransition { + implements SingleArcTransition { @Override public void transition(RMNodeImpl rmNode, RMNodeEvent event) { @@ -1023,7 +1094,7 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { */ public static class UpdateContainersTransition implements SingleArcTransition { - + @Override public void transition(RMNodeImpl rmNode, RMNodeEvent event) { RMNodeUpdateContainerEvent de = (RMNodeUpdateContainerEvent) event; @@ -1059,11 +1130,17 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { * @param finalState */ public static void deactivateNode(RMNodeImpl rmNode, NodeState finalState) { + // If node is UNHEALTHY + if (rmNode.getState() == NodeState.UNHEALTHY) { + // Remove from stressed map if already present + cleanStressedSignalsFromNodeIfPresent(rmNode); + } if (rmNode.getNodeID().getPort() == -1) { rmNode.updateMetricsForDeactivatedNode(rmNode.getState(), finalState); return; } + reportNodeUnusable(rmNode, finalState); // Deactivate the node @@ -1118,6 +1195,12 @@ public DecommissioningNodeTransition(NodeState initState, @Override public void transition(RMNodeImpl rmNode, RMNodeEvent event) { Integer timeout = null; + + // If node is unhealthy + stressed + if (initState == NodeState.UNHEALTHY) { + cleanStressedSignalsFromNodeIfPresent(rmNode); + } + if (RMNodeDecommissioningEvent.class.isInstance(event)) { RMNodeDecommissioningEvent e = ((RMNodeDecommissioningEvent) event); timeout = e.getDecommissioningTimeout(); @@ -1126,7 +1209,7 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { if (rmNode.getState() == NodeState.DECOMMISSIONING) { if (!Objects.equals(rmNode.getDecommissioningTimeout(), timeout)) { LOG.info("Update " + rmNode.getNodeID() + - " DecommissioningTimeout to be " + timeout); + " DecommissioningTimeout to be " + timeout); rmNode.decommissioningTimeout = timeout; } else { LOG.info(rmNode.getNodeID() + " is already DECOMMISSIONING"); @@ -1195,18 +1278,25 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { List keepAliveApps = statusEvent.getKeepAliveAppIds(); if (rmNode.runningApplications.isEmpty() && (keepAliveApps == null || keepAliveApps.isEmpty())) { + // Remove any stress signal from the node evaluated in updateRMNodeFromStatusEvents + cleanStressedSignalsFromNodeIfPresent(rmNode); RMNodeImpl.deactivateNode(rmNode, NodeState.DECOMMISSIONED); return NodeState.DECOMMISSIONED; } } - if (!remoteNodeHealthStatus.getIsNodeHealthy()) { + // Remote node state is unhealthy or stressed + // (evaluated in updateRMNodeFromStatusEvents) + if (!remoteNodeHealthStatus.getIsNodeHealthy() + || rmNode.context.getStressedRMNodes().containsKey(rmNode.nodeId)) { LOG.info("Node " + rmNode.nodeId + " reported UNHEALTHY with details: " + remoteNodeHealthStatus.getHealthReport()); - // if a node in decommissioning receives an unhealthy report, + + // If a node in decommissioning receives an unhealthy or stressed report, // it will stay in decommissioning. if (isNodeDecommissioning) { + cleanStressedSignalsFromNodeIfPresent(rmNode); return NodeState.DECOMMISSIONING; } else { reportNodeUnusable(rmNode, NodeState.UNHEALTHY); @@ -1252,12 +1342,20 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { // Switch the last heartbeatresponse. NodeHealthStatus remoteNodeHealthStatus = updateRMNodeFromStatusEvents( rmNode, statusEvent); - if (remoteNodeHealthStatus.getIsNodeHealthy()) { + + // Node can become unhealthy in two cases + // a) Remote node is stressed and RM threshold for stressed node has not reached + // (evaluated in updateRMNodeFromStatusEvents) + // b) Remote node state is unhealthy + if (rmNode.context.getStressedRMNodes().containsKey(rmNode.nodeId) + || !remoteNodeHealthStatus.getIsNodeHealthy()) { + return NodeState.UNHEALTHY; + } else { rmNode.context.getDispatcher().getEventHandler().handle( new NodeAddedSchedulerEvent(rmNode)); rmNode.context.getDispatcher().getEventHandler().handle( - new NodesListManagerEvent( - NodesListManagerEventType.NODE_USABLE, rmNode)); + new NodesListManagerEvent( + NodesListManagerEventType.NODE_USABLE, rmNode)); // ??? how about updating metrics before notifying to ensure that // notifiers get update metadata because they will very likely query it // upon notification @@ -1265,8 +1363,6 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { rmNode.updateMetricsForRejoinedNode(NodeState.UNHEALTHY); return NodeState.RUNNING; } - - return NodeState.UNHEALTHY; } } @@ -1282,7 +1378,7 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { @Override public List pullContainerUpdates() { - List latestContainerInfoList = + List latestContainerInfoList = new ArrayList(); UpdatedContainerInfo containerInfo; while ((containerInfo = nodeUpdateQueue.poll()) != null) { @@ -1296,7 +1392,7 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { public void setNextHeartBeat(boolean nextHeartBeat) { this.nextHeartBeat = nextHeartBeat; } - + @VisibleForTesting public int getQueueSize() { return nodeUpdateQueue.size(); @@ -1321,7 +1417,7 @@ public int getQueueSize() { } return nlm.getLabelsOnNode(nodeId); } - + private void handleReportedIncreasedContainers( List reportedIncreasedContainers) { for (Container container : reportedIncreasedContainers) { @@ -1345,7 +1441,7 @@ private void handleReportedIncreasedContainers( + " no further processing"); continue; } - + this.nmReportedIncreasedContainers.put(containerId, container); } } @@ -1479,11 +1575,11 @@ private void handleLogAggregationStatus( nmReportedIncreasedContainers.clear(); return container; } - + } finally { writeLock.unlock(); } - } + } public Resource getOriginalTotalCapability() { return this.originalTotalCapability; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 272633269a2..7d914f26895 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Random; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.HostsFileReader; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -42,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.InlineDispatcher; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; @@ -53,6 +55,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecommissioningEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeFinishedContainersPulledByAMEvent; @@ -117,6 +120,7 @@ public void setUp() throws Exception { new RMContextImpl(rmDispatcher, mock(ContainerAllocationExpirer.class), null, null, mock(DelegationTokenRenewer.class), null, null, null, null, null); + ((RMContextImpl) rmContext).setYarnConfiguration(new YarnConfiguration()); NodesListManager nodesListManager = mock(NodesListManager.class); HostsFileReader reader = mock(HostsFileReader.class); when(nodesListManager.getHostsReader()).thenReturn(reader); @@ -130,7 +134,7 @@ public Void answer(InvocationOnMock invocation) throws Throwable { final SchedulerEvent event = (SchedulerEvent)(invocation.getArguments()[0]); eventType = event.getType(); if (eventType == SchedulerEventType.NODE_UPDATE) { - List lastestContainersInfoList = + List lastestContainersInfoList = ((NodeUpdateSchedulerEvent)event).getRMNode().pullContainerUpdates(); for(UpdatedContainerInfo lastestContainersInfo : lastestContainersInfoList) { completedContainers.addAll(lastestContainersInfo.getCompletedContainers()); @@ -672,6 +676,275 @@ public void testUpdateHeartbeatResponseForAppLifeCycle() { Assert.assertEquals(0, node.getRunningApps().size()); } + @Test + public void testHealthyToUnHealthyStressedSingleNodeAboveThreshold() { + Configuration conf = new YarnConfiguration(); + // No threshold for stressed nodes, which means all stressed reports should be ignored + conf.set(YarnConfiguration.RM_THRESHOLD_PERCENTAGE_STRESSED_NODES, "0.0"); + ((RMContextImpl) rmContext).setYarnConfiguration(conf); + + RMNodeImpl node = getRunningNode(); + rmContext.getRMNodes().put(node.getNodeID(), node); + + NodeHealthStatus status = NodeHealthStatus.newInstance(true, "NODE_STRESSED", + System.currentTimeMillis()); + NodeStatus nodeStatus = NodeStatus.newInstance(node.getNodeID(), 0, + null, null, status, null, null, + null); + node.handle(new RMNodeStatusEvent(node.getNodeID(), nodeStatus, null)); + + // Change in the node running status + Assert.assertEquals(NodeState.RUNNING, node.getState()); + Assert.assertEquals(0, rmContext.getStressedRMNodes().size()); + // Node should not have stressed in the health report + Assert.assertTrue(!node.getHealthReport().contains("NODE_STRESSED")); + + node = getRunningNode(); + rmContext.getRMNodes().put(node.getNodeID(), node); + + status = NodeHealthStatus.newInstance(false, "NODE_STRESSED", + System.currentTimeMillis()); + nodeStatus = NodeStatus.newInstance(node.getNodeID(), 0, + null, null, status, null, null, + null); + node.handle(new RMNodeStatusEvent(node.getNodeID(), nodeStatus, null)); + + // Node should change to unhealthy irrespective of stress threshold + Assert.assertEquals(NodeState.UNHEALTHY, node.getState()); + // Node should not have stressed in the health report as threshold has been reached + Assert.assertTrue(!node.getHealthReport().contains("NODE_STRESSED")); + + rmContext.getRMNodes().clear(); + rmContext.getStressedRMNodes().clear(); + } + + @Test + public void testHealthyToUnHealthyStressedDifferentNodes() { + Configuration conf = new YarnConfiguration(); + // 100% of nodes could be stressed + conf.set(YarnConfiguration.RM_THRESHOLD_PERCENTAGE_STRESSED_NODES, "100.0"); + ((RMContextImpl) rmContext).setYarnConfiguration(conf); + + RMNodeImpl nodeA = getRunningNode(); + rmContext.getRMNodes().put(nodeA.getNodeID(), nodeA); + + NodeHealthStatus status = NodeHealthStatus.newInstance(true, "unhealthy;NODE_STRESSED", + System.currentTimeMillis()); + NodeStatus nodeStatus = NodeStatus.newInstance(nodeA.getNodeID(), 0, + null, null, status, null, null, + null); + nodeA.handle(new RMNodeStatusEvent(nodeA.getNodeID(), nodeStatus, null)); + + // Node should become unhealthy + Assert.assertEquals(NodeState.UNHEALTHY, nodeA.getState()); + Assert.assertEquals(1, rmContext.getStressedRMNodes().size()); + Assert.assertTrue(nodeA.getHealthReport().contains("NODE_STRESSED")); + Assert.assertTrue(nodeA.getHealthReport().contains("unhealthy")); + + RMNodeImpl nodeB = getRunningNode("ver1", 4096); + rmContext.getRMNodes().put(nodeB.getNodeID(), nodeB); + + // 50% of nodes could be stressed + conf.set(YarnConfiguration.RM_THRESHOLD_PERCENTAGE_STRESSED_NODES, "50.0"); + ((RMContextImpl) rmContext).setYarnConfiguration(conf); + + status = NodeHealthStatus.newInstance(true, "NODE_STRESSED", + System.currentTimeMillis()); + nodeStatus = NodeStatus.newInstance(nodeB.getNodeID(), 0, + null, null, status, null, null, + null); + + nodeB.handle(new RMNodeStatusEvent(nodeB.getNodeID(), nodeStatus, null)); + + // Only 50% of the nodes could be in stressed state + // This node should not go to the UNHEALTHY state + Assert.assertEquals(NodeState.RUNNING, nodeB.getState()); + Assert.assertEquals(1, rmContext.getStressedRMNodes().size()); + Assert.assertTrue(!nodeB.getHealthReport().contains("NODE_STRESSED")); + + rmContext.getStressedRMNodes().clear(); + rmContext.getRMNodes().clear(); + } + + @Test + public void testUnHealthyStressedToUnHealthyStressedSingleNode() { + Configuration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RM_THRESHOLD_PERCENTAGE_STRESSED_NODES, "0.0"); + ((RMContextImpl) rmContext).setYarnConfiguration(conf); + + RMNodeImpl nodeA = getUnhealthyNode(); + rmContext.getRMNodes().put(nodeA.getNodeID(), nodeA); + nodeA.setHealthReport("NODE_STRESSED"); + rmContext.getStressedRMNodes().put(nodeA.getNodeID(), nodeA); + + NodeHealthStatus status = NodeHealthStatus.newInstance(false, "unhealthy;NODE_STRESSED", + System.currentTimeMillis()); + NodeStatus nodeStatus = NodeStatus.newInstance(nodeA.getNodeID(), 0, + null, null, status, null, null, + null); + nodeA.handle(new RMNodeStatusEvent(nodeA.getNodeID(), nodeStatus, null)); + + // Threshold doesn't matter as this node was already stressed + Assert.assertEquals(NodeState.UNHEALTHY, nodeA.getState()); + Assert.assertEquals(1, rmContext.getStressedRMNodes().size()); + Assert.assertTrue(nodeA.getHealthReport().contains("NODE_STRESSED")); + Assert.assertTrue(nodeA.getHealthReport().contains("unhealthy")); + + rmContext.getStressedRMNodes().clear(); + rmContext.getRMNodes().clear(); + } + + @Test + public void testUnHealthyStressedToRunningSingleNodeBelowThreshold() { + Configuration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RM_THRESHOLD_PERCENTAGE_STRESSED_NODES, "0.0"); + ((RMContextImpl) rmContext).setYarnConfiguration(conf); + + RMNodeImpl nodeA = getUnhealthyNode(); + nodeA.setHealthReport("NODE_STRESSED"); + + rmContext.getRMNodes().put(nodeA.getNodeID(), nodeA); + rmContext.getStressedRMNodes().put(nodeA.getNodeID(), nodeA); + + NodeHealthStatus status = NodeHealthStatus.newInstance(true, "", + System.currentTimeMillis()); + NodeStatus nodeStatus = NodeStatus.newInstance(nodeA.getNodeID(), 0, + null, null, status, null, null, + null); + nodeA.handle(new RMNodeStatusEvent(nodeA.getNodeID(), nodeStatus, null)); + + Assert.assertEquals(NodeState.RUNNING, nodeA.getState()); + Assert.assertEquals(0, rmContext.getStressedRMNodes().size()); + Assert.assertTrue(!nodeA.getHealthReport().contains("NODE_STRESSED")); + Assert.assertTrue(nodeA.getHealthReport().equals("")); + + rmContext.getStressedRMNodes().clear(); + rmContext.getRMNodes().clear(); + } + + @Test + public void testUnHealthyToRunningSingleNodeBelowThreshold() { + Configuration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RM_THRESHOLD_PERCENTAGE_STRESSED_NODES, "0.0"); + ((RMContextImpl) rmContext).setYarnConfiguration(conf); + + RMNodeImpl nodeA = getUnhealthyNode(); + rmContext.getRMNodes().put(nodeA.getNodeID(), nodeA); + + NodeHealthStatus status = NodeHealthStatus.newInstance(true, "healthy;NODE_STRESSED", + System.currentTimeMillis()); + NodeStatus nodeStatus = NodeStatus.newInstance(nodeA.getNodeID(), 0, + null, null, status, null, null, + null); + + nodeA.handle(new RMNodeStatusEvent(nodeA.getNodeID(), nodeStatus, null)); + + // Node should be running mode since threshold doesn't allow + Assert.assertEquals(NodeState.RUNNING, nodeA.getState()); + Assert.assertTrue(!nodeA.getHealthReport().contains("NODE_STRESSED")); + Assert.assertTrue(nodeA.getHealthReport().contains("healthy")); + + rmContext.getRMNodes().clear(); + rmContext.getStressedRMNodes().clear(); + } + + // Unhealthy to Unhealthy + stressed above threshold + @Test + public void testUnHealthyToUnHealthyStressedAboveThreshold() { + Configuration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RM_THRESHOLD_PERCENTAGE_STRESSED_NODES, "100.0"); + ((RMContextImpl) rmContext).setYarnConfiguration(conf); + + RMNodeImpl nodeA = getUnhealthyNode(); + rmContext.getRMNodes().put(nodeA.getNodeID(), nodeA); + + NodeHealthStatus status = NodeHealthStatus.newInstance(true, "healthy;NODE_STRESSED", + System.currentTimeMillis()); + NodeStatus nodeStatus = NodeStatus.newInstance(nodeA.getNodeID(), 0, + null, null, status, null, null, + null); + nodeA.handle(new RMNodeStatusEvent(nodeA.getNodeID(), nodeStatus, null)); + Assert.assertEquals(NodeState.UNHEALTHY, nodeA.getState()); + Assert.assertEquals(rmContext.getStressedRMNodes().size(), 1); + Assert.assertTrue(nodeA.getHealthReport().contains("NODE_STRESSED")); + + rmContext.getRMNodes().clear(); + rmContext.getStressedRMNodes().clear(); + } + + // Unhealthy to unhealthy + stressed + // Running to unhealthy + stressed + @Test + public void testTransitionToStressedMultipleNodes() { + Configuration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RM_THRESHOLD_PERCENTAGE_STRESSED_NODES, "100.0"); + ((RMContextImpl) rmContext).setYarnConfiguration(conf); + + RMNodeImpl nodeA = getUnhealthyNode(); + rmContext.getRMNodes().put(nodeA.getNodeID(), nodeA); + + NodeHealthStatus status = NodeHealthStatus.newInstance(true, "NODE_STRESSED", + System.currentTimeMillis()); + NodeStatus nodeStatus = NodeStatus.newInstance(nodeA.getNodeID(), 0, + null, null, status, null, null, + null); + nodeA.handle(new RMNodeStatusEvent(nodeA.getNodeID(), nodeStatus, null)); + Assert.assertEquals(NodeState.UNHEALTHY, nodeA.getState()); + Assert.assertEquals(1, rmContext.getStressedRMNodes().size()); + + RMNodeImpl nodeB = getRunningNode("ver1", 4096); + rmContext.getRMNodes().put(nodeB.getNodeID(), nodeB); + + status = NodeHealthStatus.newInstance(true, "NODE_STRESSED", + System.currentTimeMillis()); + nodeStatus = NodeStatus.newInstance(nodeB.getNodeID(), 0, + null, null, status, null, null, + null); + nodeB.handle(new RMNodeStatusEvent(nodeB.getNodeID(), nodeStatus, null)); + + // All of the nodes could be stressed + // This node should go to the UNHEALTHY state + Assert.assertEquals(NodeState.UNHEALTHY, nodeB.getState()); + Assert.assertEquals(2, rmContext.getStressedRMNodes().size()); + + rmContext.getStressedRMNodes().clear(); + rmContext.getRMNodes().clear(); + } + + @Test + public void testUnHealthyToDecommissionAndDeActivated() { + // unhealthy + stressed to decommissioning + Configuration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RM_THRESHOLD_PERCENTAGE_STRESSED_NODES, "100.0"); + ((RMContextImpl) rmContext).setYarnConfiguration(conf); + + RMNodeImpl nodeA = getUnhealthyNode(); + rmContext.getRMNodes().put(nodeA.getNodeID(), nodeA); + rmContext.getStressedRMNodes().put(nodeA.getNodeID(), nodeA); + nodeA.setHealthReport("healthy;NODE_STRESSED"); + + nodeA.handle(new RMNodeDecommissioningEvent(nodeA.getNodeID(), Integer.MAX_VALUE)); + + Assert.assertEquals(NodeState.DECOMMISSIONING, nodeA.getState()); + Assert.assertEquals(0, rmContext.getStressedRMNodes().size()); + Assert.assertTrue(!nodeA.getHealthReport().contains("NODE_STRESSED")); + + RMNodeImpl nodeB = getUnhealthyNode(); + rmContext.getRMNodes().put(nodeA.getNodeID(), nodeA); + rmContext.getStressedRMNodes().put(nodeA.getNodeID(), nodeA); + nodeA.setHealthReport("healthy;NODE_STRESSED"); + + // unhealthy + stressed to expire + nodeB.handle(new RMNodeEvent(nodeB.getNodeID(), RMNodeEventType.EXPIRE)); + + Assert.assertEquals(NodeState.LOST, nodeB.getState()); + Assert.assertEquals(0, rmContext.getStressedRMNodes().size()); + Assert.assertTrue(!nodeB.getHealthReport().contains("NODE_STRESSED")); + + rmContext.getStressedRMNodes().clear(); + rmContext.getRMNodes().clear(); + } + @Test public void testUnknownNodeId() { NodeId nodeId = @@ -919,8 +1192,12 @@ public void testResourceUpdateOnRebootedNode() { // Otherwise, it will go to decommissioned @Test public void testDecommissioningUnhealthy() { + Configuration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RM_THRESHOLD_PERCENTAGE_STRESSED_NODES, "100.0"); + ((RMContextImpl) rmContext).setYarnConfiguration(conf); + RMNodeImpl node = getDecommissioningNode(); - NodeHealthStatus status = NodeHealthStatus.newInstance(false, "sick", + NodeHealthStatus status = NodeHealthStatus.newInstance(false, "sick;NODE_STRESSED", System.currentTimeMillis()); List keepAliveApps = new ArrayList<>(); keepAliveApps.add(BuilderUtils.newApplicationId(1, 1)); @@ -928,8 +1205,14 @@ public void testDecommissioningUnhealthy() { null, keepAliveApps, status, null, null, null); node.handle(new RMNodeStatusEvent(node.getNodeID(), nodeStatus, null)); Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState()); + Assert.assertTrue(!node.getHealthReport().contains("NODE_STRESSED")); + Assert.assertTrue(node.getHealthReport().contains("sick")); + Assert.assertEquals(0, rmContext.getStressedRMNodes().size()); nodeStatus.setKeepAliveApplications(null); node.handle(new RMNodeStatusEvent(node.getNodeID(), nodeStatus, null)); + Assert.assertTrue(!node.getHealthReport().contains("NODE_STRESSED")); + Assert.assertTrue(node.getHealthReport().contains("sick")); + Assert.assertEquals(0, rmContext.getStressedRMNodes().size()); Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState()); }