diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index b3ea865..ee0b96a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -30,6 +30,7 @@ import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -185,7 +186,7 @@ // Handle to communicate with the Resource Manager @SuppressWarnings("rawtypes") - private AMRMClientAsync amRMClient; + private AMRMClientAsync amRMClient; // In both secure and non-secure modes, this points to the job-submitter. private UserGroupInformation appSubmitterUgi; @@ -224,7 +225,7 @@ // Allocated container count so that we know how many containers has the RM // allocated to us @VisibleForTesting - protected AtomicInteger numAllocatedContainers = new AtomicInteger(); + protected final AtomicInteger numAllocatedContainers = new AtomicInteger(); // Count of failed containers private AtomicInteger numFailedContainers = new AtomicInteger(); // Count of containers already requested from the RM @@ -765,8 +766,28 @@ public void onContainersCompleted(List completedContainers) { public void onContainersAllocated(List allocatedContainers) { LOG.info("Got response from RM for container ask, allocatedCnt=" + allocatedContainers.size()); - numAllocatedContainers.addAndGet(allocatedContainers.size()); for (Container allocatedContainer : allocatedContainers) { + AMRMClientAsync amRMClient = ApplicationMaster.this + .amRMClient; + synchronized (numAllocatedContainers) { + if (numAllocatedContainers.get() == numTotalContainers) { + // Given that AM tells RM how many containers each time AM + // allocates, the last allocate call might get more allocated + // containers than expected, we should break here if it happens. + break; + } else { + numAllocatedContainers.addAndGet(1); + } + } + List> matchingRequests = + amRMClient.getMatchingRequests(allocatedContainer.getPriority(), + ResourceRequest.ANY, allocatedContainer.getResource()); + if (!matchingRequests.isEmpty()) { + for (ContainerRequest containerRequest : matchingRequests.get(0)) { + amRMClient.removeContainerRequest(containerRequest); + break; + } + } LOG.info("Launching shell command on a new container." + ", containerId=" + allocatedContainer.getId() + ", containerNode=" + allocatedContainer.getNodeId().getHost()