diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index b0e71e9..53ab86a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -88,6 +89,11 @@ private Map appTokenKeepAliveMap = new HashMap(); private Random keepAliveDelayRandom = new Random(); + // It will be used to track recently stopped containers on node manager. + private final Map recentlyStoppedContainersTracker; + // Duration for which to track recently stopped container. Not making it + // configurable as 10 min is sufficient time. + private final long durationToTrackStoppedContainers = 10 * 60 * 1000; private final NodeHealthCheckerService healthChecker; private final NodeManagerMetrics metrics; @@ -103,6 +109,8 @@ public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, this.context = context; this.dispatcher = dispatcher; this.metrics = metrics; + this.recentlyStoppedContainersTracker = + new LinkedHashMap(); } @Override @@ -290,7 +298,11 @@ public NodeStatus getNodeStatusAndUpdateContainersInContext() { if (containerStatus.getState() == ContainerState.COMPLETE) { // Remove i.remove(); - + // Adding to tracker. Tracker will keep it around at least for + // #durationToTrackStoppedContainers duration. In the subsequent call + // to stop container it will get removed from tracker. + addStoppedContainerToTracker(containerId); + LOG.info("Removed completed container " + containerId); } } @@ -340,6 +352,35 @@ public void sendOutofBandHeartBeat() { } } + public boolean isContainerRecentlyStopped(ContainerId containerId) { + synchronized (recentlyStoppedContainersTracker) { + return recentlyStoppedContainersTracker.containsKey(containerId); + } + } + + public void addStoppedContainerToTracker(ContainerId containerId) { + synchronized (recentlyStoppedContainersTracker) { + removeVeryOldStoppedContainersFromTracker(); + recentlyStoppedContainersTracker.put(containerId, + System.currentTimeMillis() + durationToTrackStoppedContainers); + } + } + + @Private + @VisibleForTesting + public void removeVeryOldStoppedContainersFromTracker() { + synchronized (recentlyStoppedContainersTracker) { + long currentTime = System.currentTimeMillis(); + Iterator i = + recentlyStoppedContainersTracker.keySet().iterator(); + while (i.hasNext()) { + if (recentlyStoppedContainersTracker.get(i.next()) < currentTime) { + i.remove(); + } + } + } + } + @Override public long getRMIdentifier() { return this.rmIdentifier; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 712bc43..db7234f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -24,11 +24,8 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.Arrays; -import java.util.List; import java.util.Map; import java.util.Set; -import java.util.SortedSet; -import java.util.TreeMap; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; @@ -84,6 +81,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; +import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerInitEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; @@ -538,23 +536,34 @@ public StopContainerResponse stopContainer(StopContainerRequest request) ContainerId containerID = request.getContainerId(); String containerIDStr = containerID.toString(); Container container = this.context.getContainers().get(containerID); - LOG.info("Getting container-status for " + containerIDStr); + LOG.info("Stopping container : " + containerIDStr); authorizeGetAndStopContainerRequest(containerID, container, true); - StopContainerResponse response = - recordFactory.newRecordInstance(StopContainerResponse.class); + boolean containerRecentlyStopped = + ((NodeStatusUpdaterImpl) nodeStatusUpdater) + .isContainerRecentlyStopped(containerID); + + if (container == null && !containerRecentlyStopped) { + throw RPCUtil.getRemoteException("Container " + containerIDStr + + " is not handled by this NodeManager"); + } else if (!containerRecentlyStopped && container != null) { + dispatcher.getEventHandler().handle( + new ContainerKillEvent(containerID, + "Container killed by the ApplicationMaster.")); - dispatcher.getEventHandler().handle( - new ContainerKillEvent(containerID, - "Container killed by the ApplicationMaster.")); + NMAuditLogger.logSuccess(container.getUser(), + AuditConstants.STOP_CONTAINER, "ContainerManageImpl", containerID + .getApplicationAttemptId().getApplicationId(), containerID); - NMAuditLogger.logSuccess(container.getUser(), - AuditConstants.STOP_CONTAINER, "ContainerManageImpl", containerID - .getApplicationAttemptId().getApplicationId(), containerID); + // TODO: Move this code to appropriate place once kill_container is + // implemented. + nodeStatusUpdater.sendOutofBandHeartBeat(); + } + + - // TODO: Move this code to appropriate place once kill_container is - // implemented. - nodeStatusUpdater.sendOutofBandHeartBeat(); + StopContainerResponse response = + recordFactory.newRecordInstance(StopContainerResponse.class); return response; } @@ -569,7 +578,11 @@ public GetContainerStatusResponse getContainerStatus( LOG.info("Getting container-status for " + containerIDStr); authorizeGetAndStopContainerRequest(containerID, container, false); - + + if (container == null) { + throw RPCUtil.getRemoteException("Container " + containerIDStr + + " is not handled by this NodeManager"); + } ContainerStatus containerStatus = container.cloneAndGetContainerStatus(); LOG.info("Returning " + containerStatus); GetContainerStatusResponse response = @@ -614,10 +627,6 @@ protected void authorizeGetAndStopContainerRequest(ContainerId containerId, + " is not started by this application attempt."); } - if (container == null) { - throw RPCUtil.getRemoteException("Container " + containerId.toString() - + " is not handled by this NodeManager"); - } } class ContainerEventDispatcher implements EventHandler {