diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index ebbe503..4f90aff 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -66,9 +66,10 @@ import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.util.YarnVersionInfo; @@ -115,6 +116,7 @@ private Runnable statusUpdaterRunnable; private Thread statusUpdater; private long rmIdentifier = ResourceManagerConstants.RM_INVALID_IDENTIFIER; + Set pendingContainersToRemove = new HashSet(); public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { @@ -446,19 +448,26 @@ public void addCompletedContainer(ContainerId containerId) { @VisibleForTesting @Private - public void removeCompletedContainersFromContext( + public void removeOrTrackCompletedContainersFromContext( List containerIds) throws IOException { Set removedContainers = new HashSet(); - // If the AM has pulled the completedContainer it can be removed - for (ContainerId containerId : containerIds) { - context.getContainers().remove(containerId); - removedContainers.add(containerId); + pendingContainersToRemove.addAll(containerIds); + Iterator iter = pendingContainersToRemove.iterator(); + while (iter.hasNext()) { + ContainerId containerId = iter.next(); + // remove the container only if the container is at completed state + if (context.getContainers().get(containerId).getCurrentState() + .equals(ContainerState.COMPLETE)) { + context.getContainers().remove(containerId); + removedContainers.add(containerId); + iter.remove(); + } } if (!removedContainers.isEmpty()) { - LOG.info("Removed completed containers from NM context: " + - removedContainers); + LOG.info("Removed completed containers from NM context: " + + removedContainers); } } @@ -601,7 +610,7 @@ public void run() { // because these completed containers will be reported back to RM // when NM re-registers with RM. // Only remove the cleanedup containers that are acked - removeCompletedContainersFromContext(response + removeOrTrackCompletedContainersFromContext(response .getContainersToBeRemovedFromNM()); lastHeartBeatID = response.getResponseId(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 35b232f..59a7cae 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -133,7 +133,6 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; -import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import com.google.common.annotations.VisibleForTesting; @@ -571,8 +570,7 @@ public void cleanupContainersOnNMResync() { while (!containers.isEmpty() && !allContainersCompleted) { allContainersCompleted = true; for (Entry container : containers.entrySet()) { - if (((ContainerImpl) container.getValue()).getCurrentState() - != ContainerState.COMPLETE) { + if ((container.getValue()).getCurrentState() != ContainerState.COMPLETE) { allContainersCompleted = false; try { Thread.sleep(1000); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java index 56b4fdd..776801d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java @@ -51,6 +51,9 @@ ContainerStatus cloneAndGetContainerStatus(); + // Get the user-facing container state. + org.apache.hadoop.yarn.api.records.ContainerState getCurrentState(); + NMContainerStatus getNMContainerStatus(); String toString(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 5c2dd2c..f409777 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -30,9 +30,11 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; @@ -868,16 +870,45 @@ public ContainerState getCurrentState() { } }; + ContainerId runningContainerId = + ContainerId.newInstance(appAttemptId, 3); + Token runningContainerToken = + BuilderUtils.newContainerToken(runningContainerId, "anyHost", + 1234, "anyUser", BuilderUtils.newResource(1024, 1), 0, 123, + "password".getBytes(), 0); + Container runningContainer = + new ContainerImpl(conf, null, null, null, null, null, + BuilderUtils.newContainerTokenIdentifier(runningContainerToken)) { + @Override + public ContainerState getCurrentState() { + return ContainerState.RUNNING; + } + }; + nm.getNMContext().getApplications().putIfAbsent(appId, mock(Application.class)); nm.getNMContext().getContainers().put(cId, anyCompletedContainer); - Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size()); + nm.getNMContext().getContainers() + .put(runningContainerId, runningContainer); + + Assert.assertEquals(2, nodeStatusUpdater.getContainerStatuses().size()); List ackedContainers = new ArrayList(); ackedContainers.add(cId); + ackedContainers.add(runningContainerId); + + nodeStatusUpdater.removeOrTrackCompletedContainersFromContext(ackedContainers); + + Set containerIdSet = new HashSet(); + for (ContainerStatus status : nodeStatusUpdater.getContainerStatuses()) { + containerIdSet.add(status.getContainerId()); + } - nodeStatusUpdater.removeCompletedContainersFromContext(ackedContainers); - Assert.assertTrue(nodeStatusUpdater.getContainerStatuses().isEmpty()); + Assert.assertTrue(nodeStatusUpdater.getContainerStatuses().size() == 1); + // completed container is removed; + Assert.assertFalse(containerIdSet.contains(anyCompletedContainer)); + // running container is not removed; + Assert.assertTrue(containerIdSet.contains(runningContainerId)); } @Test diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java index b2ccb61..185237a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java @@ -140,4 +140,9 @@ public ContainerTokenIdentifier getContainerTokenIdentifier() { public NMContainerStatus getNMContainerStatus() { return null; } + + @Override + public org.apache.hadoop.yarn.api.records.ContainerState getCurrentState() { + return null; + } }