commit 55ad6f015aebe0da3096a87a233e526fd728a1fa Author: Gera Shegalov Date: Tue Apr 29 02:45:30 2014 -0700 YARN-1996.v01.patch diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 96d3832..8fd68c0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -128,6 +128,7 @@ import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.NodeUtil; /** Implementation of Job interface. Maintains the state machines of Job. * The read and write calls use ReadWriteLock for concurrency. @@ -2120,7 +2121,7 @@ public void transition(JobImpl job, JobEvent event) { JobUpdatedNodesEvent updateEvent = (JobUpdatedNodesEvent) event; for(NodeReport nr: updateEvent.getUpdatedNodes()) { NodeState nodeState = nr.getNodeState(); - if(nodeState.isUnusable()) { + if (NodeUtil.isUnusable(nodeState, job.conf)) { // act on the updates job.actOnUnusableNode(nr.getNodeId(), nodeState); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index b9d283f..302d9bc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -73,6 +73,7 @@ import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.util.NodeUtil; import org.apache.hadoop.yarn.util.RackResolver; import com.google.common.annotations.VisibleForTesting; @@ -688,7 +689,7 @@ private void handleUpdatedNodes(AllocateResponse response) { HashSet unusableNodes = new HashSet(); for (NodeReport nr : updatedNodes) { NodeState nodeState = nr.getNodeState(); - if (nodeState.isUnusable()) { + if (NodeUtil.isUnusable(nodeState, getConfig())) { unusableNodes.add(nr.getNodeId()); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java index 8baddc8..bb467ae 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java @@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.junit.Test; @@ -192,6 +193,7 @@ public void testUpdatedNodes() throws Exception { MRApp app = new MRAppWithHistory(2, 2, false, this.getClass().getName(), true, ++runCount); Configuration conf = new Configuration(); + conf.setBoolean(YarnConfiguration.YARN_NM_DRAIN_UNHEATHY, false); // after half of the map completion, reduce will start conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.5f); // uberization forces full slowstart (1.0), so disable that diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java index 9c04187..73a9a4a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java @@ -86,6 +86,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.Event; @@ -104,6 +105,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; + import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -794,6 +796,7 @@ protected ContainerAllocator createContainerAllocator( @Test public void testUpdatedNodes() throws Exception { Configuration conf = new Configuration(); + conf.setBoolean(YarnConfiguration.YARN_NM_DRAIN_UNHEATHY, false); MyResourceManager rm = new MyResourceManager(conf); rm.start(); DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeState.java index ff1ca48..c402fa4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeState.java @@ -44,8 +44,4 @@ /** Node has rebooted */ REBOOTED; - - public boolean isUnusable() { - return (this == UNHEALTHY || this == DECOMMISSIONED || this == LOST); - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 0a11948..b1f7314 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -657,6 +657,11 @@ NM_PREFIX + "remote-app-log-dir"; public static final String DEFAULT_NM_REMOTE_APP_LOG_DIR = "/tmp/logs"; + + public static final String YARN_NM_DRAIN_UNHEATHY = NM_PREFIX + + "unheathy.drain.containers"; + public static final boolean YARN_NM_DRAIN_UNHEATHY_DEFAULT = true; + /** * The remote log dir will be created at * NM_REMOTE_APP_LOG_DIR/${user}/NM_REMOTE_APP_LOG_DIR_SUFFIX/${appId} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/NodeUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/NodeUtil.java new file mode 100644 index 0000000..e7374cb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/NodeUtil.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.util; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +public class NodeUtil { + + public static boolean drainUnhealthy(Configuration conf) { + return conf.getBoolean(YarnConfiguration.YARN_NM_DRAIN_UNHEATHY, + YarnConfiguration.YARN_NM_DRAIN_UNHEATHY_DEFAULT); + } + + public static boolean isUnusable(NodeState nodeState, Configuration conf) { + return (nodeState == NodeState.UNHEALTHY && !drainUnhealthy(conf)) + || nodeState == NodeState.DECOMMISSIONED + || nodeState == NodeState.LOST; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 0b455e7..af4f59e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1039,6 +1039,14 @@ ${hadoop.tmp.dir}/yarn-nm-recovery + + If set to true containers are allowed to complete instead of + being killed when the node transitions to UNHEALTHY state. However no new + containers are accepted by the UNHEALTHY node. + yarn.nodemanager.unheathy.drain.containers + true + + yarn.nodemanager.aux-services.mapreduce_shuffle.class diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java index 9ed5179..ffb344d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java @@ -196,7 +196,7 @@ private void reserve(Priority priority, FSSchedulerNode node, */ public void unreserve(Priority priority, FSSchedulerNode node) { RMContainer rmContainer = node.getReservedContainer(); - app.unreserve(node, priority); + app.unreserve(node.getNodeID(), priority); node.unreserveResource(app); getMetrics().unreserveResource( app.getUser(), rmContainer.getContainer().getResource()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java index adabfef..abd1998 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java @@ -106,10 +106,10 @@ synchronized public void containerCompleted(RMContainer rmContainer, preemptionMap.remove(rmContainer); } - public synchronized void unreserve(FSSchedulerNode node, Priority priority) { + public synchronized void unreserve(NodeId nodeId, Priority priority) { Map reservedContainers = this.reservedContainers.get(priority); - RMContainer reservedContainer = reservedContainers.remove(node.getNodeID()); + RMContainer reservedContainer = reservedContainers.remove(nodeId); if (reservedContainers.isEmpty()) { this.reservedContainers.remove(priority); } @@ -121,8 +121,9 @@ public synchronized void unreserve(FSSchedulerNode node, Priority priority) { Resources.subtractFrom(currentReservation, resource); LOG.info("Application " + getApplicationId() + " unreserved " + " on node " - + node + ", currently has " + reservedContainers.size() + " at priority " - + priority + "; currentReservation " + currentReservation); + + nodeId + ", currently has " + reservedContainers.size() + + " at priority " + priority + "; currentReservation " + + currentReservation); } public synchronized float getLocalityWaitFactor( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index fab9ebe..ea47f63 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -99,6 +100,7 @@ import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.util.NodeUtil; /** * A scheduler that schedules resources between a set of queues. The scheduler @@ -156,6 +158,10 @@ private Map nodes = new ConcurrentHashMap(); + // Unhealthy nodes in case drain is enabled in the cluster, indexed by NodeId + private Map unhealthyNodes = + new ConcurrentHashMap(); + // Aggregate capacity of the cluster private Resource clusterCapacity = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class); @@ -810,19 +816,49 @@ private synchronized void completedContainer(RMContainer rmContainer, // Get the node on which the container was allocated FSSchedulerNode node = nodes.get(container.getNodeId()); + FSSchedulerNode unhealthyNode = null; + if (node == null) { + node = unhealthyNode = unhealthyNodes.get(container.getNodeId()); + if (LOG.isInfoEnabled()) { + LOG.info("Checking previously unhealthy node " + node + " for " + + container); + } + } + + if (node == null) { + if (LOG.isErrorEnabled()) { + LOG.error("Couldn't find node for " + container); + } + } if (rmContainer.getState() == RMContainerState.RESERVED) { - application.unreserve(node, rmContainer.getReservedPriority()); - node.unreserveResource(application); + application.unreserve(container.getNodeId(), + rmContainer.getReservedPriority()); + if (node != null) { + node.unreserveResource(application); + } } else { application.containerCompleted(rmContainer, containerStatus, event); - node.releaseContainer(container); + if (node != null) { + node.releaseContainer(container); + } updateRootQueueMetrics(); } LOG.info("Application attempt " + application.getApplicationAttemptId() - + " released container " + container.getId() + " on node: " + node + + " released container " + container.getId() + " on node: " + + (node != null ? node.toString() : container.getNodeId()) + " with event: " + event); + + if ( unhealthyNode != null + && unhealthyNode.getReservedContainer() == null + && unhealthyNode.getRunningContainers().isEmpty()) { + if (LOG.isInfoEnabled()) { + LOG.info("Previously unhealthy " + unhealthyNode + + " no longer has any reserverd or running containers. Removing!"); + } + unhealthyNodes.remove(unhealthyNode.getNodeID()); + } } private synchronized void addNode(RMNode node) { @@ -843,14 +879,19 @@ private synchronized void removeNode(RMNode rmNode) { Resources.subtractFrom(clusterCapacity, rmNode.getTotalCapability()); updateRootQueueMetrics(); - // Remove running containers - List runningContainers = node.getRunningContainers(); - for (RMContainer container : runningContainers) { - completedContainer(container, - SchedulerUtils.createAbnormalContainerStatus( - container.getContainerId(), - SchedulerUtils.LOST_CONTAINER), - RMContainerEventType.KILL); + final boolean drainUnhealthy = rmNode.getState() == NodeState.UNHEALTHY + && NodeUtil.drainUnhealthy(conf); + + if (!drainUnhealthy) { + // Remove running containers + List runningContainers = node.getRunningContainers(); + for (RMContainer container : runningContainers) { + completedContainer(container, + SchedulerUtils.createAbnormalContainerStatus( + container.getContainerId(), + SchedulerUtils.LOST_CONTAINER), + RMContainerEventType.KILL); + } } // Remove reservations, if any @@ -864,6 +905,9 @@ private synchronized void removeNode(RMNode rmNode) { } nodes.remove(rmNode.getNodeID()); + if (drainUnhealthy) { + unhealthyNodes.put(rmNode.getNodeID(), node); + } LOG.info("Removed node " + rmNode.getNodeAddress() + " cluster capacity: " + clusterCapacity); }