diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index c0f02e9..742d757 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -47,6 +47,7 @@ import org.apache.hadoop.util.VersionUtil; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -61,6 +62,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.ResourceManagerConstants; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.ServerRMProxy; @@ -84,6 +86,7 @@ import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider; import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.YarnVersionInfo; import com.google.common.annotations.VisibleForTesting; @@ -531,6 +534,12 @@ private ResourceUtilization getNodeUtilization() { } } containerStatuses.addAll(pendingCompletedContainers.values()); + + // Account for all containers that got killed while they were still queued. + if (this.context.getQueuingContext() != null) { + containerStatuses.addAll(getKilledQueuedContainerStatuses()); + } + if (LOG.isDebugEnabled()) { LOG.debug("Sending out " + containerStatuses.size() + " container statuses: " + containerStatuses); @@ -538,6 +547,42 @@ private ResourceUtilization getNodeUtilization() { return containerStatuses; } + /** + * Add to the container statuses the status of the containers that got killed + * while they were queued. + */ + private List getKilledQueuedContainerStatuses() { + List killedQueuedContainerStatuses = new ArrayList(); + for (Map.Entry killedQueuedContainer : this.context + .getQueuingContext().getKilledQueuedContainers().entrySet()) { + ContainerTokenIdentifier containerTokenId = killedQueuedContainer + .getKey(); + ContainerId containerId = containerTokenId.getContainerID(); + ContainerStatus containerStatus = BuilderUtils.newContainerStatus( + containerId, ContainerState.COMPLETE, + killedQueuedContainer.getValue(), ContainerExitStatus.ABORTED, + containerTokenId.getResource(), containerTokenId.getExecutionType()); + ApplicationId applicationId = containerId.getApplicationAttemptId() + .getApplicationId(); + if (isApplicationStopped(applicationId)) { + if (LOG.isDebugEnabled()) { + LOG.debug(applicationId + " is completing, " + " remove " + + containerId + " from NM context."); + } + this.context.getQueuingContext().getKilledQueuedContainers() + .remove(containerId); + pendingCompletedContainers.put(containerId, containerStatus); + } else { + if (!isContainerRecentlyStopped(containerId)) { + pendingCompletedContainers.put(containerId, containerStatus); + } + } + addCompletedContainer(containerId); + killedQueuedContainerStatuses.add(containerStatus); + } + return killedQueuedContainerStatuses; + } + private List getRunningApplications() { List runningApplications = new ArrayList(); runningApplications.addAll(this.context.getApplications().keySet()); @@ -612,6 +657,11 @@ public void removeOrTrackCompletedContainersFromContext( Container nmContainer = context.getContainers().get(containerId); if (nmContainer == null) { iter.remove(); + // nmContainer may belong to the killed queued containers + if (this.context.getQueuingContext() != null) { + context.getQueuingContext().getKilledQueuedContainers() + .remove(containerId); + } } else if (nmContainer.getContainerState().equals( org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.DONE)) { context.getContainers().remove(containerId); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 2e3d10f..4b65675 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; @@ -1341,21 +1342,28 @@ private void handleContainerStatus(List containerStatuses) { // Process running containers if (remoteContainer.getState() == ContainerState.RUNNING) { - if (!launchedContainers.contains(containerId)) { - // Just launched container. RM knows about it the first time. - launchedContainers.add(containerId); - newlyLaunchedContainers.add(remoteContainer); - // Unregister from containerAllocationExpirer. - containerAllocationExpirer.unregister( - new AllocationExpirationInfo(containerId)); + // Process only GUARANTEED containers in the RM. + if (remoteContainer.getExecutionType() == ExecutionType.GUARANTEED) { + if (!launchedContainers.contains(containerId)) { + // Just launched container. RM knows about it the first time. + launchedContainers.add(containerId); + newlyLaunchedContainers.add(remoteContainer); + // Unregister from containerAllocationExpirer. + containerAllocationExpirer + .unregister(new AllocationExpirationInfo(containerId)); + } } } else { - // A finished container - launchedContainers.remove(containerId); + if (remoteContainer.getExecutionType() == ExecutionType.GUARANTEED) { + // A finished container + launchedContainers.remove(containerId); + // Unregister from containerAllocationExpirer. + containerAllocationExpirer + .unregister(new AllocationExpirationInfo(containerId)); + } + // Completed containers should also include the OPPORTUNISTIC containers + // so that the AM gets properly notified. completedContainers.add(remoteContainer); - // Unregister from containerAllocationExpirer. - containerAllocationExpirer.unregister( - new AllocationExpirationInfo(containerId)); } } if (newlyLaunchedContainers.size() != 0 || completedContainers.size() != 0) {