diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 5f8317e..0737618 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -130,6 +130,9 @@ private final Set launchedContainers = new HashSet(); + private final Set completedContainers = + new HashSet(); + /* set of containers that need to be cleaned */ private final Set containersToClean = new TreeSet( new ContainerIdComparator()); @@ -561,6 +564,7 @@ public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response response.addContainersToBeRemovedFromNM( new ArrayList(this.containersToBeRemovedFromNM)); response.addAllContainersToSignal(this.containersToSignal); + this.completedContainers.removeAll(this.containersToBeRemovedFromNM); this.containersToClean.clear(); this.finishedApplications.clear(); this.containersToSignal.clear(); @@ -1253,6 +1257,11 @@ public int getQueueSize() { return this.launchedContainers; } + @VisibleForTesting + public Set getCompletedContainers() { + return this.completedContainers; + } + @Override public Set getNodeLabels() { RMNodeLabelsManager nlm = context.getNodeLabelManager(); @@ -1295,7 +1304,7 @@ private void handleContainerStatus(List containerStatuses) { // containers. List newlyLaunchedContainers = new ArrayList(); - List completedContainers = + List newlyCompletedContainers = new ArrayList(); for (ContainerStatus remoteContainer : containerStatuses) { ContainerId containerId = remoteContainer.getContainerId(); @@ -1339,15 +1348,18 @@ private void handleContainerStatus(List containerStatuses) { } else { // A finished container launchedContainers.remove(containerId); - completedContainers.add(remoteContainer); + if (!completedContainers.contains(containerId)) { + completedContainers.add(containerId); + newlyCompletedContainers.add(remoteContainer); + } // Unregister from containerAllocationExpirer. containerAllocationExpirer.unregister( new AllocationExpirationInfo(containerId)); } } - if (newlyLaunchedContainers.size() != 0 || completedContainers.size() != 0) { + if (newlyLaunchedContainers.size() != 0 || newlyCompletedContainers.size() != 0) { nodeUpdateQueue.add(new UpdatedContainerInfo(newlyLaunchedContainers, - completedContainers)); + newlyCompletedContainers)); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 6ba360b..e496bc7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeFinishedContainersPulledByAMEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; @@ -1023,4 +1024,46 @@ public void testResourceUpdateOnRecommissioningNode() { Resource originalCapacity = node.getOriginalTotalCapability(); assertEquals("Original total capability not null after recommission", null, originalCapacity); } + + @Test + public void testForHandlingDuplicatedCompltedContainers() { + // Start the node + node.handle(new RMNodeStartedEvent(null, null, null)); + // Add info to the queue first + node.setNextHeartBeat(false); + + ContainerId completedContainerId1 = + BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId( + BuilderUtils.newApplicationId(0, 0), 0), 0); + + RMNodeStatusEvent statusEvent1 = getMockRMNodeStatusEvent(null); + + ContainerStatus containerStatus1 = mock(ContainerStatus.class); + + doReturn(completedContainerId1).when(containerStatus1).getContainerId(); + doReturn(Collections.singletonList(containerStatus1)).when(statusEvent1) + .getContainers(); + + verify(scheduler, times(1)).handle(any(NodeUpdateSchedulerEvent.class)); + node.handle(statusEvent1); + verify(scheduler, times(1)).handle(any(NodeUpdateSchedulerEvent.class)); + Assert.assertEquals(1, node.getQueueSize()); + Assert.assertEquals(1, node.getCompletedContainers().size()); + + // test for duplicate entries + node.handle(statusEvent1); + Assert.assertEquals(1, node.getQueueSize()); + + // send clean up container event + node.handle(new RMNodeFinishedContainersPulledByAMEvent(node.getNodeID(), + Collections.singletonList(completedContainerId1))); + + + NodeHeartbeatResponse hbrsp = + Records.newRecord(NodeHeartbeatResponse.class); + node.updateNodeHeartbeatResponseForCleanup(hbrsp); + Assert.assertEquals(1, hbrsp.getContainersToBeRemovedFromNM().size()); + Assert.assertEquals(0, node.getCompletedContainers().size()); + } } -- 1.9.2.msysgit.0