diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 75f4073..b247474 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -31,6 +31,7 @@ import java.nio.charset.StandardCharsets; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -105,6 +106,7 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; +import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.client.api.TimelineV2Client; @@ -1060,8 +1062,15 @@ public void onContainersCompleted(List completedContainers) { public void onContainersAllocated(List allocatedContainers) { LOG.info("Got response from RM for container ask, allocatedCnt=" + allocatedContainers.size()); - numAllocatedContainers.addAndGet(allocatedContainers.size()); for (Container allocatedContainer : allocatedContainers) { + if (numAllocatedContainers.get() == numTotalContainers) { + LOG.info("The requested number of containers have been allocated." + + " Skipping additional allocation request."); + break; + } else { + numAllocatedContainers.addAndGet(1); + } + String yarnShellId = Integer.toString(yarnShellIdCounter); yarnShellIdCounter++; LOG.info("Launching shell command on a new container." @@ -1074,8 +1083,6 @@ public void onContainersAllocated(List allocatedContainers) { + allocatedContainer.getResource().getMemorySize() + ", containerResourceVirtualCores" + allocatedContainer.getResource().getVirtualCores()); - // + ", containerToken" - // +allocatedContainer.getContainerToken().getIdentifier().toString()); Thread launchThread = createLaunchContainerThread(allocatedContainer, yarnShellId); @@ -1086,6 +1093,15 @@ public void onContainersAllocated(List allocatedContainers) { launchThreads.add(launchThread); launchedContainers.add(allocatedContainer.getId()); launchThread.start(); + + // Remove the corresponding request + Collection requests = + amRMClient.getMatchingRequests( + allocatedContainer.getAllocationRequestId()); + if (requests.iterator().hasNext()) { + AMRMClient.ContainerRequest request = requests.iterator().next(); + amRMClient.removeContainerRequest(request); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java index f11bdf8..f2a8041 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java @@ -106,7 +106,6 @@ public void testDSAppMasterAllocateHandler() throws Exception { handler.onContainersAllocated(containers); Assert.assertEquals("Wrong container allocation count", 1, master.getAllocatedContainers()); - Mockito.verifyZeroInteractions(mockClient); Assert.assertEquals("Incorrect number of threads launched", 1, master.threadsLaunched); Assert.assertEquals("Incorrect YARN Shell IDs", @@ -121,15 +120,14 @@ public void testDSAppMasterAllocateHandler() throws Exception { ContainerId id4 = BuilderUtils.newContainerId(1, 1, 1, 4); containers.add(generateContainer(id4)); handler.onContainersAllocated(containers); - Assert.assertEquals("Wrong final container allocation count", 4, + Assert.assertEquals("Wrong final container allocation count", 2, master.getAllocatedContainers()); - Assert.assertEquals("Incorrect number of threads launched", 4, + Assert.assertEquals("Incorrect number of threads launched", 2, master.threadsLaunched); Assert.assertEquals("Incorrect YARN Shell IDs", - Arrays.asList("1", "2", "3", "4"), master.yarnShellIds); - + Arrays.asList("1", "2"), master.yarnShellIds); // make sure we handle completion events correctly List status = new ArrayList<>(); status.add(generateContainerStatus(id1, ContainerExitStatus.SUCCESS));