diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java index a1e3bdb..38b1b07 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java @@ -175,8 +175,9 @@ protected void stopContainerInternal(ContainerId containerID) } nodeStatusUpdater.sendOutofBandHeartBeat(); + } else { + super.stopContainerInternal(containerID); } - super.stopContainerInternal(containerID); } /** @@ -456,6 +457,18 @@ protected ContainerStatus getContainerStatusInternal(ContainerId containerID, ContainerExitStatus.INVALID, this.context.getQueuingContext() .getQueuedContainers().get(containerID).getResource(), executionType); + } else { + // Check if part of the stopped/killed queued containers. + for (ContainerTokenIdentifier cTokenId : this.context + .getQueuingContext().getKilledQueuedContainers().keySet()) { + if (cTokenId.getContainerID().equals(containerID)) { + return BuilderUtils.newContainerStatus(containerID, + org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE, + this.context.getQueuingContext().getKilledQueuedContainers() + .get(cTokenId), ContainerExitStatus.ABORTED, cTokenId + .getResource(), cTokenId.getExecutionType()); + } + } } } return super.getContainerStatusInternal(containerID, nmTokenIdentifier); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java index 4d44d8d..7f0efab 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -518,4 +519,103 @@ public void testKillMultipleOpportunisticContainers() throws Exception { Assert.assertEquals(2, killedContainers); Assert.assertEquals(2, runningContainers); } + + /** + * Start running one GUARANTEED container and queue two OPPORTUNISTIC ones. + * Try killing one of the two queued containers. + * @throws Exception + */ + @Test + public void testStopQueuedContainer() throws Exception { + shouldDeleteWait = true; + containerManager.start(); + + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + + List list = new ArrayList<>(); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(2048, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.GUARANTEED))); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(512, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(2), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(512, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + containerManager.startContainers(allRequests); + + Thread.sleep(2000); + + // Assert there is initially one container running and two queued. + int runningContainersNo = 0; + int queuedContainersNo = 0; + List statList = new ArrayList(); + for (int i = 0; i < 3; i++) { + statList.add(createContainerId(i)); + } + GetContainerStatusesRequest statRequest = GetContainerStatusesRequest + .newInstance(statList); + List containerStatuses = containerManager + .getContainerStatuses(statRequest).getContainerStatuses(); + for (ContainerStatus status : containerStatuses) { + if (status.getState() == + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING) { + runningContainersNo++; + } else if (status.getState() == + org.apache.hadoop.yarn.api.records.ContainerState.QUEUED) { + queuedContainersNo++; + } + System.out.println("\nStatus : [" + status + "]\n"); + } + + Assert.assertEquals(1, runningContainersNo); + Assert.assertEquals(2, queuedContainersNo); + + // Stop one of the two queued containers. + StopContainersRequest stopRequest = StopContainersRequest. + newInstance(Arrays.asList(createContainerId(1))); + containerManager.stopContainers(stopRequest); + + Thread.sleep(2000); + + // Assert queued container got properly stopped. + statList.clear(); + for (int i = 0; i < 3; i++) { + statList.add(createContainerId(i)); + } + statRequest = GetContainerStatusesRequest.newInstance(statList); + containerStatuses = containerManager.getContainerStatuses(statRequest) + .getContainerStatuses(); + for (ContainerStatus status : containerStatuses) { + if (status.getContainerId().equals(createContainerId(0))) { + Assert.assertEquals( + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING, + status.getState()); + } else if (status.getContainerId().equals(createContainerId(1))) { + Assert.assertTrue(status.getDiagnostics().contains( + "Queued container request removed")); + } else if (status.getContainerId().equals(createContainerId(2))) { + Assert.assertEquals( + org.apache.hadoop.yarn.api.records.ContainerState.QUEUED, + status.getState()); + } + System.out.println("\nStatus : [" + status + "]\n"); + } + } }