From a76aeb48a88d9c80468ec219f60eb3cd475a6f32 Mon Sep 17 00:00:00 2001 From: Rohith Sharma K S Date: Mon, 27 Jun 2016 16:43:49 +0530 Subject: [PATCH] YARN-5279 --- .../scheduler/AbstractYarnScheduler.java | 13 ++++++ .../scheduler/capacity/CapacityScheduler.java | 10 +++++ .../scheduler/fair/FairScheduler.java | 13 +++++- .../scheduler/fifo/FifoScheduler.java | 13 +++++- .../TestResourceTrackerService.java | 47 ++++++++++++++++++++++ 5 files changed, 94 insertions(+), 2 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 64eb777..3431346 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeFinishedContainersPulledByAMEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity .LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; @@ -503,6 +504,18 @@ public void clearPendingContainerCache() { } } + // Ack NM to remove RM-untracked-containers from NM context. It is called when + // YARN scheduler does not track RMContainer which has sent by NM + protected void acknowledgeUntrackedContainersToNM(NodeId nodeId, + List untrackedContainerIdList) { + if (untrackedContainerIdList != null + && !untrackedContainerIdList.isEmpty()) { + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMNodeFinishedContainersPulledByAMEvent(nodeId, + untrackedContainerIdList)); + } + } + @VisibleForTesting @Private // clean up a completed container diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index ee62a70..6dcfbeb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1071,6 +1071,7 @@ private synchronized void nodeUpdate(RMNode nm) { // Process completed containers int releasedContainers = 0; + List untrackedContainerIdList = new ArrayList(); for (ContainerStatus completedContainer : completedContainers) { ContainerId containerId = completedContainer.getContainerId(); RMContainer container = getRMContainer(containerId); @@ -1086,9 +1087,18 @@ private synchronized void nodeUpdate(RMNode nm) { if (rs != null) { Resources.addTo(releaseResources, rs); } + } else { + // Add containers which are untracked by RM. + untrackedContainerIdList.add(containerId); } } + // Ack NM to remove RM-untracked-containers from NM context. If not + // acknowledged to NM, NM will be keep sending this container status in + // heartbeat which is a leak at NM + acknowledgeUntrackedContainersToNM(nm.getNodeID(), + untrackedContainerIdList); + // If the node is decommissioning, send an update to have the total // resource equal to the used resource, so no available resource to // schedule. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index bc953ba..2292eaf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -1033,14 +1033,25 @@ private synchronized void nodeUpdate(RMNode nm) { containerLaunchedOnNode(launchedContainer.getContainerId(), node); } + List untrackedContainerIdList = new ArrayList(); // Process completed containers for (ContainerStatus completedContainer : completedContainers) { ContainerId containerId = completedContainer.getContainerId(); LOG.debug("Container FINISHED: " + containerId); - super.completedContainer(getRMContainer(containerId), + RMContainer container = getRMContainer(containerId); + super.completedContainer(container, completedContainer, RMContainerEventType.FINISHED); + if (container == null) { + untrackedContainerIdList.add(containerId); + } } + // Ack NM to remove RM-untracked-containers from NM context. If not + // acknowledged to NM, NM will be keep sending this container status in + // heartbeat which is a leak at NM + acknowledgeUntrackedContainersToNM(nm.getNodeID(), + untrackedContainerIdList); + // If the node is decommissioning, send an update to have the total // resource equal to the used resource, so no available resource to // schedule. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index eaab495..96d92d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -742,14 +742,25 @@ private synchronized void nodeUpdate(RMNode rmNode) { containerLaunchedOnNode(launchedContainer.getContainerId(), node); } + List untrackedContainerIdList = new ArrayList(); // Process completed containers for (ContainerStatus completedContainer : completedContainers) { ContainerId containerId = completedContainer.getContainerId(); LOG.debug("Container FINISHED: " + containerId); - super.completedContainer(getRMContainer(containerId), + RMContainer container = getRMContainer(containerId); + super.completedContainer(container, completedContainer, RMContainerEventType.FINISHED); + if (container == null) { + untrackedContainerIdList.add(containerId); + } } + // Ack NM to remove RM-untracked-containers from NM context. If not + // acknowledged to NM, NM will be keep sending this container status in + // heartbeat which is a leak at NM + acknowledgeUntrackedContainersToNM(rmNode.getNodeID(), + untrackedContainerIdList); + // Updating node resource utilization node.setAggregatedContainersUtilization( rmNode.getAggregatedContainersUtilization()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 73bef0c..f26aaf7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -30,6 +30,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -1800,4 +1801,50 @@ public void tearDown() { DefaultMetricsSystem.shutdown(); } } + + @Test + public void testNodeHeartBeatResponseForUnknownContainerCleanUp() + throws Exception { + Configuration conf = new Configuration(); + rm = new MockRM(conf); + rm.init(conf); + rm.start(); + + MockNM nm1 = rm.registerNode("host1:1234", 5120); + rm.drainEvents(); + + // send 1st heartbeat + nm1.nodeHeartbeat(true); + + // Create 2 unknown containers tracked by NM + ApplicationAttemptId applicationAttemptId = BuilderUtils + .newApplicationAttemptId(BuilderUtils.newApplicationId(1, 1), 1); + ContainerId cid1 = BuilderUtils.newContainerId(applicationAttemptId, 1); + ContainerId cid2 = BuilderUtils.newContainerId(applicationAttemptId, 2); + ArrayList containerStats = + new ArrayList(); + containerStats.add( + ContainerStatus.newInstance(cid1, ContainerState.COMPLETE, "", -1)); + containerStats.add( + ContainerStatus.newInstance(cid2, ContainerState.COMPLETE, "", -1)); + + Map> conts = + new HashMap>(); + conts.put(applicationAttemptId.getApplicationId(), containerStats); + + // Send unknown container status in heartbeat + nm1.nodeHeartbeat(conts, true); + rm.drainEvents(); + + int containersToBeRemovedFromNM = 0; + while (true) { + NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); + containersToBeRemovedFromNM += + nodeHeartbeat.getContainersToBeRemovedFromNM().size(); + // asserting for 2 since two unknown containers status has been sent + if (containersToBeRemovedFromNM == 2) { + break; + } + } + } } -- 1.9.2.msysgit.0