diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 75f4073..cca5676 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -31,6 +31,7 @@ import java.nio.charset.StandardCharsets; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -105,6 +106,7 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; +import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.client.api.TimelineV2Client; @@ -1060,32 +1062,48 @@ public void onContainersCompleted(List completedContainers) { public void onContainersAllocated(List allocatedContainers) { LOG.info("Got response from RM for container ask, allocatedCnt=" + allocatedContainers.size()); - numAllocatedContainers.addAndGet(allocatedContainers.size()); for (Container allocatedContainer : allocatedContainers) { - String yarnShellId = Integer.toString(yarnShellIdCounter); - yarnShellIdCounter++; - LOG.info("Launching shell command on a new container." - + ", containerId=" + allocatedContainer.getId() - + ", yarnShellId=" + yarnShellId - + ", containerNode=" + allocatedContainer.getNodeId().getHost() - + ":" + allocatedContainer.getNodeId().getPort() - + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress() - + ", containerResourceMemory" - + allocatedContainer.getResource().getMemorySize() - + ", containerResourceVirtualCores" - + allocatedContainer.getResource().getVirtualCores()); - // + ", containerToken" - // +allocatedContainer.getContainerToken().getIdentifier().toString()); - - Thread launchThread = createLaunchContainerThread(allocatedContainer, - yarnShellId); - - // launch and start the container on a separate thread to keep - // the main thread unblocked - // as all containers may not be allocated at one go. - launchThreads.add(launchThread); - launchedContainers.add(allocatedContainer.getId()); - launchThread.start(); + if (numAllocatedContainers.get() == numTotalContainers) { + LOG.info("The requested number of containers have been allocated." + + " Releasing the extra container allocation from the RM."); + amRMClient.releaseAssignedContainer(allocatedContainer.getId()); + } else { + numAllocatedContainers.addAndGet(1); + String yarnShellId = Integer.toString(yarnShellIdCounter); + yarnShellIdCounter++; + LOG.info( + "Launching shell command on a new container." + + ", containerId=" + allocatedContainer.getId() + + ", yarnShellId=" + yarnShellId + + ", containerNode=" + + allocatedContainer.getNodeId().getHost() + + ":" + allocatedContainer.getNodeId().getPort() + + ", containerNodeURI=" + + allocatedContainer.getNodeHttpAddress() + + ", containerResourceMemory" + + allocatedContainer.getResource().getMemorySize() + + ", containerResourceVirtualCores" + + allocatedContainer.getResource().getVirtualCores()); + + Thread launchThread = + createLaunchContainerThread(allocatedContainer, yarnShellId); + + // launch and start the container on a separate thread to keep + // the main thread unblocked + // as all containers may not be allocated at one go. + launchThreads.add(launchThread); + launchedContainers.add(allocatedContainer.getId()); + launchThread.start(); + + // Remove the corresponding request + Collection requests = + amRMClient.getMatchingRequests( + allocatedContainer.getAllocationRequestId()); + if (requests.iterator().hasNext()) { + AMRMClient.ContainerRequest request = requests.iterator().next(); + amRMClient.removeContainerRequest(request); + } + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java index f11bdf8..f2a8041 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java @@ -106,7 +106,6 @@ public void testDSAppMasterAllocateHandler() throws Exception { handler.onContainersAllocated(containers); Assert.assertEquals("Wrong container allocation count", 1, master.getAllocatedContainers()); - Mockito.verifyZeroInteractions(mockClient); Assert.assertEquals("Incorrect number of threads launched", 1, master.threadsLaunched); Assert.assertEquals("Incorrect YARN Shell IDs", @@ -121,15 +120,14 @@ public void testDSAppMasterAllocateHandler() throws Exception { ContainerId id4 = BuilderUtils.newContainerId(1, 1, 1, 4); containers.add(generateContainer(id4)); handler.onContainersAllocated(containers); - Assert.assertEquals("Wrong final container allocation count", 4, + Assert.assertEquals("Wrong final container allocation count", 2, master.getAllocatedContainers()); - Assert.assertEquals("Incorrect number of threads launched", 4, + Assert.assertEquals("Incorrect number of threads launched", 2, master.threadsLaunched); Assert.assertEquals("Incorrect YARN Shell IDs", - Arrays.asList("1", "2", "3", "4"), master.yarnShellIds); - + Arrays.asList("1", "2"), master.yarnShellIds); // make sure we handle completion events correctly List status = new ArrayList<>(); status.add(generateContainerStatus(id1, ContainerExitStatus.SUCCESS));