From 977e949d057ce7fd140a13583ccd8f1789959a75 Mon Sep 17 00:00:00 2001 From: Rohith Sharma K S Date: Mon, 27 Jun 2016 17:34:28 +0530 Subject: [PATCH] YARN-4862 --- .../server/resourcemanager/rmnode/RMNodeImpl.java | 32 +++++++++++++---- .../resourcemanager/TestRMNodeTransitions.java | 40 ++++++++++++++++++++++ 2 files changed, 66 insertions(+), 6 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index a3a6b30..f29566e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -139,6 +139,10 @@ private final Set launchedContainers = new HashSet(); + /* track completed container globally */ + private final Set completedContainers = + new HashSet(); + /* set of containers that need to be cleaned */ private final Set containersToClean = new TreeSet( new ContainerIdComparator()); @@ -574,6 +578,7 @@ public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response response.addContainersToBeRemovedFromNM( new ArrayList(this.containersToBeRemovedFromNM)); response.addAllContainersToSignal(this.containersToSignal); + this.completedContainers.removeAll(this.containersToBeRemovedFromNM); this.containersToClean.clear(); this.finishedApplications.clear(); this.containersToSignal.clear(); @@ -1269,6 +1274,11 @@ public int getQueueSize() { return this.launchedContainers; } + @VisibleForTesting + public Set getCompletedContainers() { + return this.completedContainers; + } + @Override public Set getNodeLabels() { RMNodeLabelsManager nlm = context.getNodeLabelManager(); @@ -1311,7 +1321,7 @@ private void handleContainerStatus(List containerStatuses) { // containers. List newlyLaunchedContainers = new ArrayList(); - List completedContainers = + List newlyCompletedContainers = new ArrayList(); int numRemoteRunningContainers = 0; for (ContainerStatus remoteContainer : containerStatuses) { @@ -1367,15 +1377,25 @@ private void handleContainerStatus(List containerStatuses) { } // Completed containers should also include the OPPORTUNISTIC containers // so that the AM gets properly notified. - completedContainers.add(remoteContainer); + if (completedContainers.add(containerId)) { + newlyCompletedContainers.add(remoteContainer); + } } } - completedContainers.addAll(findLostContainers( - numRemoteRunningContainers, containerStatuses)); - if (newlyLaunchedContainers.size() != 0 || completedContainers.size() != 0) { + List lostContainers = + findLostContainers(numRemoteRunningContainers, containerStatuses); + for (ContainerStatus remoteContainer : lostContainers) { + ContainerId containerId = remoteContainer.getContainerId(); + if (completedContainers.add(containerId)) { + newlyCompletedContainers.add(remoteContainer); + } + } + + if (newlyLaunchedContainers.size() != 0 + || newlyCompletedContainers.size() != 0) { nodeUpdateQueue.add(new UpdatedContainerInfo(newlyLaunchedContainers, - completedContainers)); + newlyCompletedContainers)); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 83a7c73..32f8320 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeFinishedContainersPulledByAMEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; @@ -1065,4 +1066,43 @@ public void testDisappearingContainer() { Assert.assertTrue("second container not running", node.getLaunchedContainers().contains(cid2)); } + + @Test + public void testForHandlingDuplicatedCompltedContainers() { + // Start the node + node.handle(new RMNodeStartedEvent(null, null, null)); + // Add info to the queue first + node.setNextHeartBeat(false); + + ContainerId completedContainerId1 = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId(BuilderUtils.newApplicationId(0, 0), 0), 0); + + RMNodeStatusEvent statusEvent1 = getMockRMNodeStatusEvent(null); + + ContainerStatus containerStatus1 = mock(ContainerStatus.class); + + doReturn(completedContainerId1).when(containerStatus1).getContainerId(); + doReturn(Collections.singletonList(containerStatus1)).when(statusEvent1) + .getContainers(); + + verify(scheduler, times(1)).handle(any(NodeUpdateSchedulerEvent.class)); + node.handle(statusEvent1); + verify(scheduler, times(1)).handle(any(NodeUpdateSchedulerEvent.class)); + Assert.assertEquals(1, node.getQueueSize()); + Assert.assertEquals(1, node.getCompletedContainers().size()); + + // test for duplicate entries + node.handle(statusEvent1); + Assert.assertEquals(1, node.getQueueSize()); + + // send clean up container event + node.handle(new RMNodeFinishedContainersPulledByAMEvent(node.getNodeID(), + Collections.singletonList(completedContainerId1))); + + NodeHeartbeatResponse hbrsp = + Records.newRecord(NodeHeartbeatResponse.class); + node.updateNodeHeartbeatResponseForCleanup(hbrsp); + Assert.assertEquals(1, hbrsp.getContainersToBeRemovedFromNM().size()); + Assert.assertEquals(0, node.getCompletedContainers().size()); + } } -- 1.9.2.msysgit.0