diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index dff5ef4..382abbb 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -129,6 +129,7 @@ import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.NodeUtil; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -2141,7 +2142,7 @@ public void transition(JobImpl job, JobEvent event) { JobUpdatedNodesEvent updateEvent = (JobUpdatedNodesEvent) event; for(NodeReport nr: updateEvent.getUpdatedNodes()) { NodeState nodeState = nr.getNodeState(); - if(nodeState.isUnusable()) { + if (NodeUtil.isUnusable(nodeState, job.conf)) { // act on the updates job.actOnUnusableNode(nr.getNodeId(), nodeState); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index e068997..0241a8c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -82,6 +82,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.NodeUtil; import org.apache.hadoop.yarn.util.RackResolver; import org.apache.hadoop.yarn.util.resource.Resources; @@ -817,7 +818,7 @@ private void handleUpdatedNodes(AllocateResponse response) { HashSet unusableNodes = new HashSet(); for (NodeReport nr : updatedNodes) { NodeState nodeState = nr.getNodeState(); - if (nodeState.isUnusable()) { + if (NodeUtil.isUnusable(nodeState, getConfig())) { unusableNodes.add(nr.getNodeId()); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java index b03d58d..42a8314 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java @@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.junit.Test; @@ -201,6 +202,7 @@ public void testUpdatedNodes() throws Exception { MRApp app = new MRAppWithHistory(2, 2, false, this.getClass().getName(), true, ++runCount); Configuration conf = new Configuration(); + conf.setBoolean(YarnConfiguration.YARN_NM_DRAIN_UNHEATHY, false); // after half of the map completion, reduce will start conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.5f); // uberization forces full slowstart (1.0), so disable that diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index 3642670..7e91bfa 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -918,6 +918,7 @@ protected ContainerAllocator createContainerAllocator( @Test public void testUpdatedNodes() throws Exception { Configuration conf = new Configuration(); + conf.setBoolean(YarnConfiguration.YARN_NM_DRAIN_UNHEATHY, false); MyResourceManager rm = new MyResourceManager(conf); rm.start(); DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeState.java index ff1ca48..c402fa4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeState.java @@ -44,8 +44,4 @@ /** Node has rebooted */ REBOOTED; - - public boolean isUnusable() { - return (this == UNHEALTHY || this == DECOMMISSIONED || this == LOST); - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 4b4f581..0a03354 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -679,6 +679,10 @@ private static void addDeprecatedKeys() { public static final String NM_LOG_DIRS = NM_PREFIX + "log-dirs"; public static final String DEFAULT_NM_LOG_DIRS = "/tmp/logs"; + public static final String YARN_NM_DRAIN_UNHEATHY = NM_PREFIX + + "unheathy.drain.containers"; + public static final boolean YARN_NM_DRAIN_UNHEATHY_DEFAULT = true; + public static final String NM_RESOURCEMANAGER_MINIMUM_VERSION = NM_PREFIX + "resourcemanager.minimum.version"; public static final String DEFAULT_NM_RESOURCEMANAGER_MINIMUM_VERSION = "NONE"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/NodeUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/NodeUtil.java new file mode 100644 index 0000000..e7374cb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/NodeUtil.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.util; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +public class NodeUtil { + + public static boolean drainUnhealthy(Configuration conf) { + return conf.getBoolean(YarnConfiguration.YARN_NM_DRAIN_UNHEATHY, + YarnConfiguration.YARN_NM_DRAIN_UNHEATHY_DEFAULT); + } + + public static boolean isUnusable(NodeState nodeState, Configuration conf) { + return (nodeState == NodeState.UNHEALTHY && !drainUnhealthy(conf)) + || nodeState == NodeState.DECOMMISSIONED + || nodeState == NodeState.LOST; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index af3b5aa..a094d9e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1156,6 +1156,11 @@ + + yarn.nodemanager.unheathy.drain.containers + true + + yarn.nodemanager.aux-services.mapreduce_shuffle.class diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 1774eb5..01d60ad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; +import org.apache.hadoop.yarn.util.NodeUtil; import com.google.common.annotations.VisibleForTesting; @@ -686,9 +687,13 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { rmNode.nodeUpdateQueue.clear(); // If the current state is NodeState.UNHEALTHY // Then node is already been removed from the - // Scheduler + // Scheduler. However, containers on the node might + // still be running if drainUnhealthy is enabled. In that case, + // ask the Scheduler to remove those running containers. NodeState initialState = rmNode.getState(); - if (!initialState.equals(NodeState.UNHEALTHY)) { + if (!initialState.equals(NodeState.UNHEALTHY) + || NodeUtil.drainUnhealthy( + rmNode.context.getNodesListManager().getConfig())) { rmNode.context.getDispatcher().getEventHandler() .handle(new NodeRemovedSchedulerEvent(rmNode)); } @@ -722,21 +727,6 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport()); rmNode.setLastHealthReportTime( remoteNodeHealthStatus.getLastHealthReportTime()); - if (!remoteNodeHealthStatus.getIsNodeHealthy()) { - LOG.info("Node " + rmNode.nodeId + " reported UNHEALTHY with details: " - + remoteNodeHealthStatus.getHealthReport()); - rmNode.nodeUpdateQueue.clear(); - // Inform the scheduler - rmNode.context.getDispatcher().getEventHandler().handle( - new NodeRemovedSchedulerEvent(rmNode)); - rmNode.context.getDispatcher().getEventHandler().handle( - new NodesListManagerEvent( - NodesListManagerEventType.NODE_UNUSABLE, rmNode)); - // Update metrics - rmNode.updateMetricsForDeactivatedNode(rmNode.getState(), - NodeState.UNHEALTHY); - return NodeState.UNHEALTHY; - } // Filter the map to only obtain just launched containers and finished // containers. @@ -776,7 +766,12 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { completedContainers.add(remoteContainer); } } - if(newlyLaunchedContainers.size() != 0 + + // Notify the scheduler completed containers regardless if + // the node becomes unhealthy in the same heartbeat. + // This will make sure completed containers notifications are delivered + // to AM if the node is asked to drain containers. + if(newlyLaunchedContainers.size() != 0 || completedContainers.size() != 0) { rmNode.nodeUpdateQueue.add(new UpdatedContainerInfo (newlyLaunchedContainers, completedContainers)); @@ -787,6 +782,21 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { new NodeUpdateSchedulerEvent(rmNode)); } + if (!remoteNodeHealthStatus.getIsNodeHealthy()) { + LOG.info("Node " + rmNode.nodeId + " reported UNHEALTHY with details: " + + remoteNodeHealthStatus.getHealthReport()); + // Inform the scheduler + rmNode.context.getDispatcher().getEventHandler().handle( + new NodeRemovedSchedulerEvent(rmNode, true)); + rmNode.context.getDispatcher().getEventHandler().handle( + new NodesListManagerEvent( + NodesListManagerEventType.NODE_UNUSABLE, rmNode)); + // Update metrics + rmNode.updateMetricsForDeactivatedNode(rmNode.getState(), + NodeState.UNHEALTHY); + return NodeState.UNHEALTHY; + } + // Update DTRenewer in secure mode to keep these apps alive. Today this is // needed for log-aggregation to finish long after the apps are gone. if (UserGroupInformation.isSecurityEnabled()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index bf720ae..d47af7e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -74,6 +74,9 @@ // Nodes in the cluster, indexed by NodeId protected Map nodes = new ConcurrentHashMap(); + // Unhealthy nodes in case drain is enabled in the cluster, indexed by NodeId + protected Map unhealthyNodes = new ConcurrentHashMap(); + // Whole capacity of the cluster protected Resource clusterResource = Resource.newInstance(0, 0); @@ -484,6 +487,10 @@ public SchedulerNode getSchedulerNode(NodeId nodeId) { return nodes.get(nodeId); } + public SchedulerNode getUnhealthyNode(NodeId nodeId) { + return unhealthyNodes.get(nodeId); + } + @Override public synchronized void moveAllApps(String sourceQueue, String destQueue) throws YarnException { @@ -628,4 +635,40 @@ protected void updateMaximumAllocation(SchedulerNode node, boolean add) { writeLock.unlock(); } } + + // Remove running containers + protected void removeRunningContainers(N node) { + List runningContainers = node.getRunningContainers(); + for (RMContainer container : runningContainers) { + completedContainer(container, + SchedulerUtils.createAbnormalContainerStatus( + container.getContainerId(), + SchedulerUtils.LOST_CONTAINER), + RMContainerEventType.KILL); + } + } + + // If the node was asked to drain its containers earlier, we want to + // make sure we remove those outstanding containers when the node + // reconnects. + protected void removeRunningContainersFromUnhealthyNode(RMNode node) { + N unhealthyNode = unhealthyNodes.get(node.getNodeID()); + if (unhealthyNode != null) { + LOG.warn("RemoveNode: unhealthyNode " + unhealthyNode); + removeRunningContainers(unhealthyNode); + } + } + + // Remove the unhealthy node if there is no container on it. + protected void removeEmptyUnhealthyNode(N unhealthyNode) { + if (unhealthyNode != null + && unhealthyNode.getReservedContainer() == null + && unhealthyNode.getRunningContainers().isEmpty()) { + if (LOG.isInfoEnabled()) { + LOG.info("Previously unhealthy " + unhealthyNode + + " no longer has any reserved or running containers. Removing!"); + } + unhealthyNodes.remove(unhealthyNode.getNodeID()); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 28158c1..314b811 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -98,6 +98,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.Lock; +import org.apache.hadoop.yarn.util.NodeUtil; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -1038,7 +1039,8 @@ public void handle(SchedulerEvent event) { case NODE_REMOVED: { NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event; - removeNode(nodeRemovedEvent.getRemovedRMNode()); + removeNode(nodeRemovedEvent.getRemovedRMNode(), + nodeRemovedEvent.becameUnhealthy()); } break; case NODE_RESOURCE_UPDATE: @@ -1138,16 +1140,31 @@ private synchronized void addNode(RMNode nodeManager) { } } - private synchronized void removeNode(RMNode nodeInfo) { + private synchronized void removeNode(RMNode nodeInfo, + boolean becameUnhealthy) { // update this node to node label manager if (labelManager != null) { labelManager.deactivateNode(nodeInfo.getNodeID()); } FiCaSchedulerNode node = nodes.get(nodeInfo.getNodeID()); + // We don't check if rmNode.getState() == NodeState.UNHEALTHY as it is + // possible rmNode became healthy again before the scheduler processes + // SchedulerEventType.NODE_REMOVED. + final boolean drainUnhealthy = NodeUtil.drainUnhealthy(conf) + && becameUnhealthy; + + // This can occur when an UNHEALTHY node reconnects if (node == null) { + // If the node was asked to drain its containers earlier, we want to + // make sure we remove those outstanding containers when the node + // reconnects. + if (!drainUnhealthy) { + removeRunningContainersFromUnhealthyNode(nodeInfo); + } return; } + Resources.subtractFrom(clusterResource, node.getRMNode().getTotalCapability()); root.updateClusterResource(clusterResource); int numNodes = numNodeManagers.decrementAndGet(); @@ -1155,15 +1172,9 @@ private synchronized void removeNode(RMNode nodeInfo) { if (scheduleAsynchronously && numNodes == 0) { asyncSchedulerThread.suspendSchedule(); } - - // Remove running containers - List runningContainers = node.getRunningContainers(); - for (RMContainer container : runningContainers) { - completedContainer(container, - SchedulerUtils.createAbnormalContainerStatus( - container.getContainerId(), - SchedulerUtils.LOST_CONTAINER), - RMContainerEventType.KILL); + + if (!drainUnhealthy) { + removeRunningContainers(node); } // Remove reservations, if any @@ -1179,7 +1190,10 @@ private synchronized void removeNode(RMNode nodeInfo) { this.nodes.remove(nodeInfo.getNodeID()); updateMaximumAllocation(node, false); - LOG.info("Removed node " + nodeInfo.getNodeAddress() + + if (drainUnhealthy) { + unhealthyNodes.put(nodeInfo.getNodeID(), node); + } + LOG.info("Removed node " + nodeInfo.getNodeAddress() + " clusterResource: " + clusterResource); } @@ -1207,7 +1221,21 @@ protected synchronized void completedContainer(RMContainer rmContainer, // Get the node on which the container was allocated FiCaSchedulerNode node = getNode(container.getNodeId()); - + FiCaSchedulerNode unhealthyNode = null; + if (node == null) { + node = unhealthyNode = unhealthyNodes.get(container.getNodeId()); + if (LOG.isInfoEnabled()) { + LOG.info("Checking previously unhealthy node " + node + + " for " + container); + } + } + + if (node == null) { + if (LOG.isErrorEnabled()) { + LOG.error("Coudn't find node for " + container); + } + } + // Inform the queue LeafQueue queue = (LeafQueue)application.getQueue(); queue.completedContainer(clusterResource, application, node, @@ -1216,6 +1244,8 @@ protected synchronized void completedContainer(RMContainer rmContainer, LOG.info("Application attempt " + application.getApplicationAttemptId() + " released container " + container.getId() + " on node: " + node + " with event: " + event); + + removeEmptyUnhealthyNode(unhealthyNode); } @Lock(Lock.NoLock.class) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeRemovedSchedulerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeRemovedSchedulerEvent.java index 5fe541e..a6921c1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeRemovedSchedulerEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeRemovedSchedulerEvent.java @@ -23,14 +23,25 @@ public class NodeRemovedSchedulerEvent extends SchedulerEvent { private final RMNode rmNode; + private final boolean becameUnhealthy; public NodeRemovedSchedulerEvent(RMNode rmNode) { + this(rmNode, false); + } + + public NodeRemovedSchedulerEvent( + RMNode rmNode, boolean becameUnhealthy) { super(SchedulerEventType.NODE_REMOVED); this.rmNode = rmNode; + this.becameUnhealthy = becameUnhealthy; } public RMNode getRemovedRMNode() { return rmNode; } -} + public boolean becameUnhealthy() { + return becameUnhealthy; + } + +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index a687e71..309a601 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -81,6 +81,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.hadoop.yarn.util.NodeUtil; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -813,9 +814,25 @@ protected synchronized void completedContainer(RMContainer rmContainer, // Get the node on which the container was allocated FSSchedulerNode node = getFSSchedulerNode(container.getNodeId()); + FSSchedulerNode unhealthyNode = null; + if (node == null) { + node = unhealthyNode = unhealthyNodes.get(container.getNodeId()); + if (LOG.isInfoEnabled()) { + LOG.info("Checking previously unhealthy node " + node + + " for " + container); + } + } + + if (node == null) { + if (LOG.isErrorEnabled()) { + LOG.error("Coudn't find node for " + container); + } + } if (rmContainer.getState() == RMContainerState.RESERVED) { - application.unreserve(rmContainer.getReservedPriority(), node); + if (node != null) { + application.unreserve(rmContainer.getReservedPriority(), node); + } } else { application.containerCompleted(rmContainer, containerStatus, event); node.releaseContainer(container); @@ -823,8 +840,11 @@ protected synchronized void completedContainer(RMContainer rmContainer, } LOG.info("Application attempt " + application.getApplicationAttemptId() - + " released container " + container.getId() + " on node: " + node + + " released container " + container.getId() + " on node: " + + (node != null ? node.toString() : container.getNodeId()) + " with event: " + event); + + removeEmptyUnhealthyNode(unhealthyNode); } private synchronized void addNode(RMNode node) { @@ -840,23 +860,31 @@ private synchronized void addNode(RMNode node) { " cluster capacity: " + clusterResource); } - private synchronized void removeNode(RMNode rmNode) { + private synchronized void removeNode(RMNode rmNode, + boolean becameUnhealthy) { FSSchedulerNode node = getFSSchedulerNode(rmNode.getNodeID()); + // We don't check if rmNode.getState() == NodeState.UNHEALTHY as it is + // possible rmNode became healthy again before the scheduler processes + // SchedulerEventType.NODE_REMOVED. + final boolean drainUnhealthy = NodeUtil.drainUnhealthy(conf) + && becameUnhealthy; + // This can occur when an UNHEALTHY node reconnects if (node == null) { + // If the node was asked to drain its containers earlier, we want to + // make sure we remove those outstanding containers when the node + // reconnects. + if (!drainUnhealthy) { + removeRunningContainersFromUnhealthyNode(rmNode); + } return; } + Resources.subtractFrom(clusterResource, rmNode.getTotalCapability()); updateRootQueueMetrics(); - // Remove running containers - List runningContainers = node.getRunningContainers(); - for (RMContainer container : runningContainers) { - completedContainer(container, - SchedulerUtils.createAbnormalContainerStatus( - container.getContainerId(), - SchedulerUtils.LOST_CONTAINER), - RMContainerEventType.KILL); + if (!drainUnhealthy) { + removeRunningContainers(node); } // Remove reservations, if any @@ -870,6 +898,9 @@ private synchronized void removeNode(RMNode rmNode) { } nodes.remove(rmNode.getNodeID()); + if (drainUnhealthy) { + unhealthyNodes.put(rmNode.getNodeID(), node); + } queueMgr.getRootQueue().setSteadyFairShare(clusterResource); queueMgr.getRootQueue().recomputeSteadyShares(); updateMaximumAllocation(node, false); @@ -1152,7 +1183,8 @@ public void handle(SchedulerEvent event) { throw new RuntimeException("Unexpected event type: " + event); } NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event; - removeNode(nodeRemovedEvent.getRemovedRMNode()); + removeNode(nodeRemovedEvent.getRemovedRMNode(), + nodeRemovedEvent.becameUnhealthy()); break; case NODE_UPDATE: if (!(event instanceof NodeUpdateSchedulerEvent)) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 5f53805..89b7a87 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -99,6 +99,11 @@ public void containerStatus(ContainerStatus containerStatus) throws Exception { nodeHeartbeat(conts, true); } + // Used to test REBOOT + public void resetResponseId() { + responseId = 0; + } + public RegisterNodeManagerResponse registerNode() throws Exception { return registerNode(null, null); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 9d0ac27..3fef708 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -468,11 +468,31 @@ public void sendNodeLost(MockNM nm) throws Exception { node.handle(new RMNodeEvent(nm.getNodeId(), RMNodeEventType.EXPIRE)); } + public void sendNodeDecommission(MockNM nm) throws Exception { + RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get( + nm.getNodeId()); + node.handle(new RMNodeEvent(nm.getNodeId(), RMNodeEventType.DECOMMISSION)); + } + public void NMwaitForState(NodeId nodeid, NodeState finalState) throws Exception { - RMNode node = getRMContext().getRMNodes().get(nodeid); - Assert.assertNotNull("node shouldn't be null", node); + RMNode node = null; int timeoutSecs = 0; + while (node == null && timeoutSecs++ < 20) { + if (finalState != NodeState.DECOMMISSIONED + && finalState != NodeState.LOST + && finalState != NodeState.REBOOTED) { + node = getRMContext().getRMNodes().get(nodeid); + } else { + // If we expect the node to be in DECOMMISSIONED, LOST or REBOOTED, + // the node will eventually be added to the inactive list. + node = getRMContext().getInactiveRMNodes().get(nodeid.getHost()); + } + Thread.sleep(500); + } + + Assert.assertNotNull("node shouldn't be null", node); + timeoutSecs = 0; while (!finalState.equals(node.getState()) && timeoutSecs++ < 20) { System.out.println("Node State is : " + node.getState() + " Waiting for state : " + finalState); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAMCompletedContainersNotification.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAMCompletedContainersNotification.java new file mode 100644 index 0000000..a545c72 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAMCompletedContainersNotification.java @@ -0,0 +1,256 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + + +public class TestRMAMCompletedContainersNotification extends ParameterizedSchedulerTestBase{ + + private static final Log LOG = LogFactory + .getLog(TestRMAMCompletedContainersNotification.class); + + private MockRM rm; + private MockNM nm1, nm2; + MockAM am; + int request = 2; + RMAppAttempt attempt; + List conts; + + public TestRMAMCompletedContainersNotification(SchedulerType type) { + super(type); + } + + @Before + public void setUp() throws Exception { + Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.DEBUG); + //Configuration conf = new YarnConfiguration(); + //conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class, + // ResourceScheduler.class); + + rm = new MockRM(getConf()); + rm.start(); + + nm1 = rm.registerNode("h1:1234", 3000); + + RMApp app = rm.submitApp(2000); + + //kick the scheduling so that AM is scheduled on nm1 + nm1.nodeHeartbeat(true); + attempt = app.getCurrentAppAttempt(); + am = rm.sendAMLaunched(attempt.getAppAttemptId()); + am.registerAppAttempt(); + + nm2 = rm.registerNode("h2:5678", 5000); + //request for containers + am.allocate("h2" , 2000, request, new ArrayList()); + //kick the scheduler so that these two worker containers are scheduled + //on nm2. + conts = getAllocatedContainers(request, am, nm2); + nmNotifyRMContainerState(conts, am.getApplicationAttemptId(), nm2, + ContainerState.RUNNING, true); + } + + @After + public void tearDown() throws Exception { + am.unregisterAppAttempt(); + nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1, + ContainerState.COMPLETE); + am.waitForState(RMAppAttemptState.FINISHED); + rm.stop(); + } + + private void nmNotifyRMContainerState(List conts, + ApplicationAttemptId applicationAttemptId, MockNM nm, + ContainerState containerState, boolean isHealthy) throws Exception { + List runningContainerStatuses = + new ArrayList(); + + for(Container cont : conts) { + runningContainerStatuses.add( + ContainerStatus.newInstance(cont.getId(), containerState, "", 0)); + } + + Map> runningStatusUpdate = + new HashMap>(); + runningStatusUpdate.put(applicationAttemptId.getApplicationId(), + runningContainerStatuses); + + nm.nodeHeartbeat(runningStatusUpdate, isHealthy); + } + + private List getAllocatedContainers( + int request, MockAM am, MockNM nm) throws Exception { + nm.nodeHeartbeat(true); + List conts = new ArrayList(); + int waitCount = 0; + while (conts.size() < request && waitCount++ < 200) { + LOG.info("Got " + conts.size() + " containers. Waiting to get " + + request); + Thread.sleep(100); + nm.nodeHeartbeat(true); + conts.addAll(am.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers()); + } + Assert.assertEquals(request, conts.size()); + return conts; + } + + private void verifyNumOfRunningContainersOnUnhealthyNode( + MockNM nm, int count) { + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm.getResourceScheduler(); + SchedulerNode schedulerNode = scheduler.getUnhealthyNode(nm.getNodeId()); + Assert.assertEquals(count, schedulerNode.getRunningContainers().size()); + } + + // Make sure AM receives the completed container notification. + private void verifyAMReceivedCompletedContainers( + int request, MockAM am) throws Exception { + List completedConts = new ArrayList(); + int waitCount = 0; + while (completedConts.size() < request && waitCount++ < 200) { + Thread.sleep(100); + completedConts.addAll(am.allocate(new ArrayList(), + new ArrayList()).getCompletedContainersStatuses()); + } + Assert.assertEquals(request, completedConts.size()); + } + + // NM delivers unhealthy and completed containers notifications in the same + // heartbeat. Verify AM receives the completed containers from RM. + @Test + public void testNMRMCompletedContainersNotificationInUnhealthyHeartbeat() + throws Exception { + nmNotifyRMContainerState(conts, am.getApplicationAttemptId(), nm2, + ContainerState.COMPLETE, false); + rm.NMwaitForState(nm2.getNodeId(), NodeState.UNHEALTHY); + + verifyAMReceivedCompletedContainers(request, am); + } + + // NM becomes unhealthy first, then it restarts. + // Verify AM receives the completed containers from RM. + @Test + public void testUnhealthyThenRestart() throws Exception { + nm2.nodeHeartbeat(false); + rm.NMwaitForState(nm2.getNodeId(), NodeState.UNHEALTHY); + verifyNumOfRunningContainersOnUnhealthyNode(nm2, request); + + nm2.registerNode(); + nm2.nodeHeartbeat(true); + rm.NMwaitForState(nm2.getNodeId(), NodeState.RUNNING); + + verifyAMReceivedCompletedContainers(request, am); + } + + // NM becomes unhealthy first, then RM asks NM to reboot. + // Verify AM receives the completed containers from RM. + @Test + public void testUnhealthyThenReboot() throws Exception { + nm2.nodeHeartbeat(false); + rm.NMwaitForState(nm2.getNodeId(), NodeState.UNHEALTHY); + verifyNumOfRunningContainersOnUnhealthyNode(nm2, request); + + // Trigger reboot + nm2.resetResponseId(); + nm2.nodeHeartbeat(true); + rm.NMwaitForState(nm2.getNodeId(), NodeState.REBOOTED); + + verifyAMReceivedCompletedContainers(request, am); + } + + // NM becomes unhealthy first, then it becomes healthy + // Verify AM receives the completed containers from RM. + @Test + public void testUnhealthyThenHealthy() throws Exception { + nm2.nodeHeartbeat(false); + rm.NMwaitForState(nm2.getNodeId(), NodeState.UNHEALTHY); + verifyNumOfRunningContainersOnUnhealthyNode(nm2, request); + + nm2.nodeHeartbeat(true); + rm.NMwaitForState(nm2.getNodeId(), NodeState.RUNNING); + + nmNotifyRMContainerState(conts, am.getApplicationAttemptId(), nm2, + ContainerState.COMPLETE, true); + + verifyAMReceivedCompletedContainers(request, am); + } + + // NM becomes unhealthy first, then RM lost NM. + // Verify AM receives the completed containers from RM. + @Test + public void testUnhealthyThenLost() throws Exception { + nm2.nodeHeartbeat(false); + rm.NMwaitForState(nm2.getNodeId(), NodeState.UNHEALTHY); + verifyNumOfRunningContainersOnUnhealthyNode(nm2, request); + + rm.sendNodeLost(nm2); + rm.NMwaitForState(nm2.getNodeId(), NodeState.LOST); + + verifyAMReceivedCompletedContainers(request, am); + } + + // NM becomes unhealthy first, then RM decommissions NM. + // Verify AM receives the completed containers from RM. + @Test + public void testUnhealthyThenDecommission() throws Exception { + nm2.nodeHeartbeat(false); + rm.NMwaitForState(nm2.getNodeId(), NodeState.UNHEALTHY); + verifyNumOfRunningContainersOnUnhealthyNode(nm2, request); + + rm.sendNodeDecommission(nm2); + rm.NMwaitForState(nm2.getNodeId(), NodeState.DECOMMISSIONED); + + verifyAMReceivedCompletedContainers(request, am); + } + +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index d877e25..e9fd152 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -30,6 +30,7 @@ import java.util.Collections; import java.util.List; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.HostsFileReader; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -109,6 +110,8 @@ public void setUp() throws Exception { NodesListManager nodesListManager = mock(NodesListManager.class); HostsFileReader reader = mock(HostsFileReader.class); when(nodesListManager.getHostsReader()).thenReturn(reader); + Configuration configuration = mock(Configuration.class); + when(nodesListManager.getConfig()).thenReturn(configuration); ((RMContextImpl) rmContext).setNodesListManager(nodesListManager); scheduler = mock(YarnScheduler.class); doAnswer( @@ -326,13 +329,18 @@ public void testUnhealthyExpire() { initialRebooted, cm.getNumRebootedNMs()); Assert.assertEquals(NodeState.LOST, node.getState()); } - + @Test public void testUnhealthyExpireForSchedulerRemove() { RMNodeImpl node = getUnhealthyNode(); - verify(scheduler,times(2)).handle(any(NodeRemovedSchedulerEvent.class)); + // When the node becomes unhealthy, it will also generate + // NodeUpdateSchedulerEvent event. So 3 scheduler events will be delivered. + // NodeAddedSchedulerEvent + // NodeUpdateSchedulerEvent + // NodeRemovedSchedulerEvent + verify(scheduler,times(3)).handle(any(SchedulerEvent.class)); node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.EXPIRE)); - verify(scheduler,times(2)).handle(any(NodeRemovedSchedulerEvent.class)); + verify(scheduler,times(3)).handle(any(SchedulerEvent.class)); Assert.assertEquals(NodeState.LOST, node.getState()); }