diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index bde7890..c1d4eae 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -467,4 +467,11 @@ public static void main(String[] args) { Configuration conf = new YarnConfiguration(); nodeManager.initAndStartNodeManager(conf, false); } + + @VisibleForTesting + @Private + public void clearTrackedFinishedContainersFromCache() { + ((NodeStatusUpdaterImpl) nodeStatusUpdater) + .clearTrackedFinishedContainersFromCache(); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 7814897..bb61cc4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -88,6 +89,10 @@ private Map appTokenKeepAliveMap = new HashMap(); private Random keepAliveDelayRandom = new Random(); + // It will be used to track recently stopped containers on node manager. + private final Map recentlyStoppedContainersTracker; + // Duration for which to track recently stopped container. + private long durationToTrackStoppedContainers; private final NodeHealthCheckerService healthChecker; private final NodeManagerMetrics metrics; @@ -103,6 +108,8 @@ public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, this.context = context; this.dispatcher = dispatcher; this.metrics = metrics; + this.recentlyStoppedContainersTracker = + new LinkedHashMap(); } @Override @@ -132,7 +139,23 @@ protected void serviceInit(Configuration conf) throws Exception { LOG.info("Initialized nodemanager for " + nodeId + ":" + " physical-memory=" + memoryMb + " virtual-memory=" + virtualMemoryMb + " virtual-cores=" + virtualCores); - + // Default duration to track stopped containers on nodemanager is 10Min. + // This should not be assigned very large value as it will remember all the + // containers stopped during that time. + durationToTrackStoppedContainers = + conf.getLong("yarn.nodemanager.duration-to-track-stopped-containers", + 600000); + if (durationToTrackStoppedContainers < 0) { + String message = "Invalid configuration for " + + "\"yarn.nodemanager.duration-to-track-stopped-containers\" default " + + "value is 10Min(600000)."; + LOG.error(message); + throw new YarnException(message); + } + if (LOG.isDebugEnabled()) { + LOG.debug("\"yarn.nodemanager.duration-to-track-stopped-containers\" :" + + durationToTrackStoppedContainers); + } super.serviceInit(conf); } @@ -290,7 +313,11 @@ public NodeStatus getNodeStatusAndUpdateContainersInContext() { if (containerStatus.getState() == ContainerState.COMPLETE) { // Remove i.remove(); - + // Adding to tracker. Tracker will keep it around at least for + // #durationToTrackStoppedContainers duration. In the subsequent call + // to stop container it will get removed from tracker. + addStoppedContainerToTracker(containerId); + LOG.info("Removed completed container " + containerId); } } @@ -340,6 +367,42 @@ public void sendOutofBandHeartBeat() { } } + public boolean isContainerRecentlyStopped(ContainerId containerId) { + synchronized (recentlyStoppedContainersTracker) { + return recentlyStoppedContainersTracker.containsKey(containerId); + } + } + + public void addStoppedContainerToTracker(ContainerId containerId) { + synchronized (recentlyStoppedContainersTracker) { + removeVeryOldStoppedContainersFromTracker(); + recentlyStoppedContainersTracker.put(containerId, + System.currentTimeMillis() + durationToTrackStoppedContainers); + } + } + + @VisibleForTesting + @Private + public void clearTrackedFinishedContainersFromCache() { + synchronized (recentlyStoppedContainersTracker) { + recentlyStoppedContainersTracker.clear(); + } + } + @Private + @VisibleForTesting + public void removeVeryOldStoppedContainersFromTracker() { + synchronized (recentlyStoppedContainersTracker) { + long currentTime = System.currentTimeMillis(); + Iterator i = + recentlyStoppedContainersTracker.keySet().iterator(); + while (i.hasNext()) { + if (recentlyStoppedContainersTracker.get(i.next()) < currentTime) { + i.remove(); + } + } + } + } + @Override public long getRMIdentifier() { return this.rmIdentifier; @@ -461,4 +524,6 @@ private void updateMasterKeys(NodeHeartbeatResponse response) { new Thread(statusUpdaterRunnable, "Node Status Updater"); statusUpdater.start(); } + + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index d2e7510..e81d72d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -84,6 +84,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; +import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerInitEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; @@ -573,17 +574,28 @@ private void stopContainerInternal(NMTokenIdentifier nmTokenIdentifier, authorizeGetAndStopContainerRequest(containerID, container, true, nmTokenIdentifier); - dispatcher.getEventHandler().handle( - new ContainerKillEvent(containerID, - "Container killed by the ApplicationMaster.")); + boolean containerRecentlyStopped = + ((NodeStatusUpdaterImpl) nodeStatusUpdater) + .isContainerRecentlyStopped(containerID); + + if (container == null) { + if (!containerRecentlyStopped) { + throw RPCUtil.getRemoteException("Container " + containerIDStr + + " is not handled by this NodeManager"); + } + } else { + dispatcher.getEventHandler().handle( + new ContainerKillEvent(containerID, + "Container killed by the ApplicationMaster.")); - NMAuditLogger.logSuccess(container.getUser(), - AuditConstants.STOP_CONTAINER, "ContainerManageImpl", containerID - .getApplicationAttemptId().getApplicationId(), containerID); + NMAuditLogger.logSuccess(container.getUser(), + AuditConstants.STOP_CONTAINER, "ContainerManageImpl", containerID + .getApplicationAttemptId().getApplicationId(), containerID); - // TODO: Move this code to appropriate place once kill_container is - // implemented. - nodeStatusUpdater.sendOutofBandHeartBeat(); + // TODO: Move this code to appropriate place once kill_container is + // implemented. + nodeStatusUpdater.sendOutofBandHeartBeat(); + } } /** @@ -619,6 +631,16 @@ private ContainerStatus getContainerStatusInternal(ContainerId containerID, authorizeGetAndStopContainerRequest(containerID, container, false, nmTokenIdentifier); + if (container == null) { + if (((NodeStatusUpdaterImpl) nodeStatusUpdater) + .isContainerRecentlyStopped(containerID)) { + throw RPCUtil.getRemoteException("Container " + containerIDStr + + " was recently stopped on node manager."); + } else { + throw RPCUtil.getRemoteException("Container " + containerIDStr + + " is not handled by this NodeManager"); + } + } ContainerStatus containerStatus = container.cloneAndGetContainerStatus(); LOG.info("Returning " + containerStatus); return containerStatus; @@ -650,17 +672,11 @@ protected void authorizeGetAndStopContainerRequest(ContainerId containerId, container.getContainerId()); } else { LOG.warn(identifier.getApplicationAttemptId() - + " attempted to get get status for non-application container : " + + " attempted to get status for non-application container : " + container.getContainerId().toString()); } - throw RPCUtil.getRemoteException("Container " + containerId.toString() - + " is not started by this application attempt."); } - if (container == null) { - throw RPCUtil.getRemoteException("Container " + containerId.toString() - + " is not handled by this NodeManager"); - } } class ContainerEventDispatcher implements EventHandler { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java index 743bf8a..5655b08 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java @@ -24,6 +24,7 @@ import java.net.InetSocketAddress; import java.security.PrivilegedAction; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import junit.framework.Assert; @@ -41,6 +42,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -56,6 +59,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; @@ -237,6 +241,11 @@ private void testNMTokens(Configuration conf) throws Exception { Assert.assertTrue(testStartContainer(rpc, validAppAttemptId, validNode, validContainerToken, validNMToken, true).contains(sb.toString())); + // Container is removed from node manager's memory by this time. + // trying to stop the container. It should not throw any exception. + testStopContainer(rpc, validAppAttemptId, validNode, validContainerId, + validNMToken, false); + // Rolling over master key twice so that we can check whether older keys // are used for authentication. rollNMTokenMasterKey(nmTokenSecretManagerRM, nmTokenSecretManagerNM); @@ -244,13 +253,25 @@ private void testNMTokens(Configuration conf) throws Exception { rollNMTokenMasterKey(nmTokenSecretManagerRM, nmTokenSecretManagerNM); // trying get container status. Now saved nmToken should be used for - // authentication. + // authentication... It should complain saying container was recently + // stopped. + sb = new StringBuilder("Container "); + sb.append(validContainerId); + sb.append(" was recently stopped on node manager"); + Assert.assertTrue(testGetContainer(rpc, validAppAttemptId, validNode, + validContainerId, validNMToken, true).contains(sb.toString())); + + // Now lets remove the container from nm-memory + nm.clearTrackedFinishedContainersFromCache(); + + // This should fail as container is removed from recently tracked finished + // containers. sb = new StringBuilder("Container "); sb.append(validContainerId.toString()); sb.append(" is not handled by this NodeManager"); Assert.assertTrue(testGetContainer(rpc, validAppAttemptId, validNode, validContainerId, validNMToken, false).contains(sb.toString())); - + } private void waitForContainerToFinishOnNM(ContainerId containerId) { @@ -292,6 +313,23 @@ protected void rollNMTokenMasterKey( Assert.assertTrue((nmTokenSecretManagerNM.getCurrentKey().getKeyId() == nmTokenSecretManagerRM.getCurrentKey().getKeyId())); } + + private String testStopContainer(YarnRPC rpc, + ApplicationAttemptId appAttemptId, NodeId nodeId, + ContainerId containerId, Token nmToken, boolean isExceptionExpected) { + try { + stopContainer(rpc, nmToken, + Arrays.asList(new ContainerId[] { containerId }), appAttemptId, + nodeId); + if (isExceptionExpected) { + fail("Exception was expected!!"); + } + return ""; + } catch (Exception e) { + e.printStackTrace(); + return e.getMessage(); + } + } private String testGetContainer(YarnRPC rpc, ApplicationAttemptId appAttemptId, NodeId nodeId, @@ -311,7 +349,7 @@ private String testGetContainer(YarnRPC rpc, } } - protected String testStartContainer(YarnRPC rpc, + private String testStartContainer(YarnRPC rpc, ApplicationAttemptId appAttemptId, NodeId nodeId, org.apache.hadoop.yarn.api.records.Token containerToken, org.apache.hadoop.yarn.api.records.Token nmToken, @@ -329,6 +367,29 @@ protected String testStartContainer(YarnRPC rpc, } } + private void stopContainer(YarnRPC rpc, Token nmToken, + List containerId, ApplicationAttemptId appAttemptId, + NodeId nodeId) throws Exception { + StopContainersRequest request = + StopContainersRequest.newInstance(containerId); + ContainerManagementProtocol proxy = null; + try { + proxy = + getContainerManagementProtocolProxy(rpc, nmToken, nodeId, + appAttemptId.toString()); + StopContainersResponse response = proxy.stopContainers(request); + if (response.getFailedRequests() != null && + response.getFailedRequests().containsKey(containerId)) { + parseAndThrowException(response.getFailedRequests().get(containerId) + .deSerialize()); + } + } catch (Exception e) { + if (proxy != null) { + rpc.stopProxy(proxy, conf); + } + } + } + private void getContainerStatus(YarnRPC rpc, org.apache.hadoop.yarn.api.records.Token nmToken,