diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index ebbe503..fcdd2c1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -66,8 +66,8 @@ import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.util.YarnVersionInfo; @@ -115,6 +115,7 @@ private Runnable statusUpdaterRunnable; private Thread statusUpdater; private long rmIdentifier = ResourceManagerConstants.RM_INVALID_IDENTIFIER; + Set pendingContainersToRemove = new HashSet(); public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { @@ -446,19 +447,27 @@ public void addCompletedContainer(ContainerId containerId) { @VisibleForTesting @Private - public void removeCompletedContainersFromContext( + public void removeOrTrackCompletedContainersFromContext( List containerIds) throws IOException { Set removedContainers = new HashSet(); - // If the AM has pulled the completedContainer it can be removed - for (ContainerId containerId : containerIds) { - context.getContainers().remove(containerId); - removedContainers.add(containerId); + pendingContainersToRemove.addAll(containerIds); + Iterator iter = pendingContainersToRemove.iterator(); + while (iter.hasNext()) { + ContainerId containerId = iter.next(); + // remove the container only if the container is at DONE state + Container nmContainer = context.getContainers().get(containerId); + if (nmContainer != null && nmContainer.getContainerState().equals( + org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.DONE)) { + context.getContainers().remove(containerId); + removedContainers.add(containerId); + iter.remove(); + } } if (!removedContainers.isEmpty()) { - LOG.info("Removed completed containers from NM context: " + - removedContainers); + LOG.info("Removed completed containers from NM context: " + + removedContainers); } } @@ -601,7 +610,7 @@ public void run() { // because these completed containers will be reported back to RM // when NM re-registers with RM. // Only remove the cleanedup containers that are acked - removeCompletedContainersFromContext(response + removeOrTrackCompletedContainersFromContext(response .getContainersToBeRemovedFromNM()); lastHeartBeatID = response.getResponseId(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 5c2dd2c..925a249 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -30,9 +30,11 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; @@ -866,18 +868,57 @@ public void testRemovePreviousCompletedContainersFromContext() throws Exception public ContainerState getCurrentState() { return ContainerState.COMPLETE; } + + @Override + public org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState getContainerState() { + return org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.DONE; + } }; + ContainerId runningContainerId = + ContainerId.newInstance(appAttemptId, 3); + Token runningContainerToken = + BuilderUtils.newContainerToken(runningContainerId, "anyHost", + 1234, "anyUser", BuilderUtils.newResource(1024, 1), 0, 123, + "password".getBytes(), 0); + Container runningContainer = + new ContainerImpl(conf, null, null, null, null, null, + BuilderUtils.newContainerTokenIdentifier(runningContainerToken)) { + @Override + public ContainerState getCurrentState() { + return ContainerState.RUNNING; + } + + @Override + public org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState getContainerState() { + return org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.RUNNING; + } + }; + nm.getNMContext().getApplications().putIfAbsent(appId, mock(Application.class)); nm.getNMContext().getContainers().put(cId, anyCompletedContainer); - Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size()); + nm.getNMContext().getContainers() + .put(runningContainerId, runningContainer); + + Assert.assertEquals(2, nodeStatusUpdater.getContainerStatuses().size()); List ackedContainers = new ArrayList(); ackedContainers.add(cId); + ackedContainers.add(runningContainerId); - nodeStatusUpdater.removeCompletedContainersFromContext(ackedContainers); - Assert.assertTrue(nodeStatusUpdater.getContainerStatuses().isEmpty()); + nodeStatusUpdater.removeOrTrackCompletedContainersFromContext(ackedContainers); + + Set containerIdSet = new HashSet(); + for (ContainerStatus status : nodeStatusUpdater.getContainerStatuses()) { + containerIdSet.add(status.getContainerId()); + } + + Assert.assertTrue(nodeStatusUpdater.getContainerStatuses().size() == 1); + // completed container is removed; + Assert.assertFalse(containerIdSet.contains(anyCompletedContainer)); + // running container is not removed; + Assert.assertTrue(containerIdSet.contains(runningContainerId)); } @Test @@ -1467,6 +1508,13 @@ public static Container getMockContainer(ContainerStatus containerStatus) { when(container.getCurrentState()).thenReturn(containerStatus.getState()); when(container.getContainerId()).thenReturn( containerStatus.getContainerId()); + if (containerStatus.getState().equals(ContainerState.COMPLETE)) { + when(container.getContainerState()) + .thenReturn(org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.DONE); + } else if (containerStatus.getState().equals(ContainerState.RUNNING)) { + when(container.getContainerState()) + .thenReturn(org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.RUNNING); + } return container; }