diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index b0e71e9..b8e4353 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -88,6 +89,11 @@ private Map appTokenKeepAliveMap = new HashMap(); private Random keepAliveDelayRandom = new Random(); + // It will be used to track recently stopped containers on node manager. + private final Map recentlyStoppedContainersTracker; + // Duration for which to track recently stopped container. Not making it + // configurable as 10 min is sufficient time. + private long durationToTrackStoppedContainers = 10 * 60 * 1000; private final NodeHealthCheckerService healthChecker; private final NodeManagerMetrics metrics; @@ -103,6 +109,8 @@ public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, this.context = context; this.dispatcher = dispatcher; this.metrics = metrics; + this.recentlyStoppedContainersTracker = + new LinkedHashMap(); } @Override @@ -290,7 +298,11 @@ public NodeStatus getNodeStatusAndUpdateContainersInContext() { if (containerStatus.getState() == ContainerState.COMPLETE) { // Remove i.remove(); - + // Adding to tracker. Tracker will keep it around at least for + // #durationToTrackStoppedContainers duration. In the subsequent call + // to stop container it will get removed from tracker. + addStoppedContainerToTracker(containerId); + LOG.info("Removed completed container " + containerId); } } @@ -340,6 +352,35 @@ public void sendOutofBandHeartBeat() { } } + public boolean isContainerRecentlyStopped(ContainerId containerId) { + synchronized (recentlyStoppedContainersTracker) { + return recentlyStoppedContainersTracker.containsKey(containerId); + } + } + + public void addStoppedContainerToTracker(ContainerId containerId) { + synchronized (recentlyStoppedContainersTracker) { + removeVeryOldStoppedContainersFromTracker(); + recentlyStoppedContainersTracker.put(containerId, + System.currentTimeMillis() + durationToTrackStoppedContainers); + } + } + + @Private + @VisibleForTesting + public void removeVeryOldStoppedContainersFromTracker() { + synchronized (recentlyStoppedContainersTracker) { + long currentTime = System.currentTimeMillis(); + Iterator i = + recentlyStoppedContainersTracker.keySet().iterator(); + while (i.hasNext()) { + if (recentlyStoppedContainersTracker.get(i.next()) < currentTime) { + i.remove(); + } + } + } + } + @Override public long getRMIdentifier() { return this.rmIdentifier; @@ -454,4 +495,10 @@ private void updateMasterKeys(NodeHeartbeatResponse response) { new Thread(statusUpdaterRunnable, "Node Status Updater"); statusUpdater.start(); } + + @VisibleForTesting + @Private + public void setDurationToTrackStoppedContainers(long duration) { + this.durationToTrackStoppedContainers = duration; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 712bc43..db7234f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -24,11 +24,8 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.Arrays; -import java.util.List; import java.util.Map; import java.util.Set; -import java.util.SortedSet; -import java.util.TreeMap; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; @@ -84,6 +81,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; +import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerInitEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; @@ -538,23 +536,34 @@ public StopContainerResponse stopContainer(StopContainerRequest request) ContainerId containerID = request.getContainerId(); String containerIDStr = containerID.toString(); Container container = this.context.getContainers().get(containerID); - LOG.info("Getting container-status for " + containerIDStr); + LOG.info("Stopping container : " + containerIDStr); authorizeGetAndStopContainerRequest(containerID, container, true); - StopContainerResponse response = - recordFactory.newRecordInstance(StopContainerResponse.class); + boolean containerRecentlyStopped = + ((NodeStatusUpdaterImpl) nodeStatusUpdater) + .isContainerRecentlyStopped(containerID); + + if (container == null && !containerRecentlyStopped) { + throw RPCUtil.getRemoteException("Container " + containerIDStr + + " is not handled by this NodeManager"); + } else if (!containerRecentlyStopped && container != null) { + dispatcher.getEventHandler().handle( + new ContainerKillEvent(containerID, + "Container killed by the ApplicationMaster.")); - dispatcher.getEventHandler().handle( - new ContainerKillEvent(containerID, - "Container killed by the ApplicationMaster.")); + NMAuditLogger.logSuccess(container.getUser(), + AuditConstants.STOP_CONTAINER, "ContainerManageImpl", containerID + .getApplicationAttemptId().getApplicationId(), containerID); - NMAuditLogger.logSuccess(container.getUser(), - AuditConstants.STOP_CONTAINER, "ContainerManageImpl", containerID - .getApplicationAttemptId().getApplicationId(), containerID); + // TODO: Move this code to appropriate place once kill_container is + // implemented. + nodeStatusUpdater.sendOutofBandHeartBeat(); + } + + - // TODO: Move this code to appropriate place once kill_container is - // implemented. - nodeStatusUpdater.sendOutofBandHeartBeat(); + StopContainerResponse response = + recordFactory.newRecordInstance(StopContainerResponse.class); return response; } @@ -569,7 +578,11 @@ public GetContainerStatusResponse getContainerStatus( LOG.info("Getting container-status for " + containerIDStr); authorizeGetAndStopContainerRequest(containerID, container, false); - + + if (container == null) { + throw RPCUtil.getRemoteException("Container " + containerIDStr + + " is not handled by this NodeManager"); + } ContainerStatus containerStatus = container.cloneAndGetContainerStatus(); LOG.info("Returning " + containerStatus); GetContainerStatusResponse response = @@ -614,10 +627,6 @@ protected void authorizeGetAndStopContainerRequest(ContainerId containerId, + " is not started by this application attempt."); } - if (container == null) { - throw RPCUtil.getRemoteException("Container " + containerId.toString() - + " is not handled by this NodeManager"); - } } class ContainerEventDispatcher implements EventHandler { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java index 7781d50..40c1633 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -230,6 +231,11 @@ private void testNMTokens(Configuration conf) throws Exception { Assert.assertTrue(testStartContainer(rpc, validAppAttemptId, validNode, validContainerToken, validNMToken, true).contains(sb.toString())); + // Container is removed from node manager's memory by this time. + // trying to stop the container. It should not throw any exception. + testStopContainer(rpc, validAppAttemptId, validNode, validContainerId, + validNMToken, false); + // Rolling over master key twice so that we can check whether older keys // are used for authentication. rollNMTokenMasterKey(nmTokenSecretManagerRM, nmTokenSecretManagerNM); @@ -285,6 +291,21 @@ protected void rollNMTokenMasterKey( Assert.assertTrue((nmTokenSecretManagerNM.getCurrentKey().getKeyId() == nmTokenSecretManagerRM.getCurrentKey().getKeyId())); } + + private String testStopContainer(YarnRPC rpc, + ApplicationAttemptId appAttemptId, NodeId nodeId, + ContainerId containerId, Token nmToken, boolean isExceptionExpected) { + try { + stopContainer(rpc, nmToken, containerId, appAttemptId, nodeId); + if (isExceptionExpected) { + fail("Exception was expected!!"); + } + return ""; + } catch (Exception e) { + e.printStackTrace(); + return e.getMessage(); + } + } private String testGetContainer(YarnRPC rpc, ApplicationAttemptId appAttemptId, NodeId nodeId, @@ -304,7 +325,7 @@ private String testGetContainer(YarnRPC rpc, } } - protected String testStartContainer(YarnRPC rpc, + private String testStartContainer(YarnRPC rpc, ApplicationAttemptId appAttemptId, NodeId nodeId, org.apache.hadoop.yarn.api.records.Token containerToken, org.apache.hadoop.yarn.api.records.Token nmToken, @@ -322,6 +343,21 @@ protected String testStartContainer(YarnRPC rpc, } } + private void stopContainer(YarnRPC rpc, Token nmToken, + ContainerId containerId, ApplicationAttemptId appAttemptId, + NodeId nodeId) throws Exception { + StopContainerRequest request = + StopContainerRequest.newInstance(containerId); + ContainerManagementProtocol proxy = null; + try { + proxy = getContainerManagementProtocolProxy(rpc, nmToken, nodeId, appAttemptId.toString()); + } catch (Exception e) { + if (proxy != null) { + rpc.stopProxy(proxy, conf); + } + } + } + private void getContainerStatus(YarnRPC rpc, org.apache.hadoop.yarn.api.records.Token nmToken, @@ -329,8 +365,7 @@ protected String testStartContainer(YarnRPC rpc, ApplicationAttemptId appAttemptId, NodeId nodeId, boolean isExceptionExpected) throws Exception { GetContainerStatusRequest request = - Records.newRecord(GetContainerStatusRequest.class); - request.setContainerId(containerId); + GetContainerStatusRequest.newInstance(containerId); ContainerManagementProtocol proxy = null; @@ -352,12 +387,11 @@ private void startContainer(final YarnRPC rpc, org.apache.hadoop.yarn.api.records.Token containerToken, NodeId nodeId, String user) throws Exception { - StartContainerRequest request = - Records.newRecord(StartContainerRequest.class); - request.setContainerToken(containerToken); ContainerLaunchContext context = - Records.newRecord(ContainerLaunchContext.class); - request.setContainerLaunchContext(context); + ContainerLaunchContext.newInstance(null, null, null, null, null, null); + + StartContainerRequest request = + StartContainerRequest.newInstance(context, containerToken); ContainerManagementProtocol proxy = null; try {