diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index b28c0c9..4a685f5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -182,6 +182,8 @@ DS_APP_ATTEMPT, DS_CONTAINER } + private static final String SHELL_ID = "SHELL_ID"; + // Configuration private Configuration conf; @@ -279,6 +281,8 @@ private final String linux_bash_command = "bash"; private final String windows_command = "cmd /c"; + private int shellIdCounter = 1; + @VisibleForTesting protected final Set launchedContainers = Collections.newSetFromMap(new ConcurrentHashMap()); @@ -803,8 +807,11 @@ public void onContainersAllocated(List allocatedContainers) { + allocatedContainers.size()); numAllocatedContainers.addAndGet(allocatedContainers.size()); for (Container allocatedContainer : allocatedContainers) { + String shellId = Integer.toString(shellIdCounter); + shellIdCounter++; LOG.info("Launching shell command on a new container." + ", containerId=" + allocatedContainer.getId() + + ", shellId=" + shellId + ", containerNode=" + allocatedContainer.getNodeId().getHost() + ":" + allocatedContainer.getNodeId().getPort() + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress() @@ -815,7 +822,8 @@ public void onContainersAllocated(List allocatedContainers) { // + ", containerToken" // +allocatedContainer.getContainerToken().getIdentifier().toString()); - Thread launchThread = createLaunchContainerThread(allocatedContainer); + Thread launchThread = createLaunchContainerThread(allocatedContainer, + shellId); // launch and start the container on a separate thread to keep // the main thread unblocked @@ -928,6 +936,7 @@ public void onStopContainerError(ContainerId containerId, Throwable t) { // Allocated container Container container; + String shellId; NMCallbackHandler containerListener; @@ -935,10 +944,11 @@ public void onStopContainerError(ContainerId containerId, Throwable t) { * @param lcontainer Allocated container * @param containerListener Callback handler of the container */ - public LaunchContainerRunnable( - Container lcontainer, NMCallbackHandler containerListener) { + public LaunchContainerRunnable(Container lcontainer, + NMCallbackHandler containerListener, String shellId) { this.container = lcontainer; this.containerListener = containerListener; + this.shellId = shellId; } @Override @@ -949,7 +959,7 @@ public LaunchContainerRunnable( */ public void run() { LOG.info("Setting up container launch container for containerid=" - + container.getId()); + + container.getId() + " with shellid=" + shellId); // Set the local resources Map localResources = new HashMap(); @@ -1038,8 +1048,11 @@ public void run() { // download anyfiles in the distributed file-system. The tokens are // otherwise also useful in cases, for e.g., when one is running a // "hadoop dfs" command inside the distributed shell. + Map myShellEnv = new HashMap(shellEnv); + myShellEnv.put(SHELL_ID, shellId); ContainerLaunchContext ctx = ContainerLaunchContext.newInstance( - localResources, shellEnv, commands, null, allTokens.duplicate(), null); + localResources, myShellEnv, commands, null, allTokens.duplicate(), + null); containerListener.addContainer(container.getId(), container); nmClientAsync.startContainerAsync(container, ctx); } @@ -1189,9 +1202,11 @@ boolean getDone() { } @VisibleForTesting - Thread createLaunchContainerThread(Container allocatedContainer) { + Thread createLaunchContainerThread(Container allocatedContainer, + String shellId) { LaunchContainerRunnable runnableLaunchContainer = - new LaunchContainerRunnable(allocatedContainer, containerListener); + new LaunchContainerRunnable(allocatedContainer, containerListener, + shellId); return new Thread(runnableLaunchContainer); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java index 0fed14d..290ce2a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java @@ -41,6 +41,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; /** @@ -51,11 +52,14 @@ static class TestAppMaster extends ApplicationMaster { private int threadsLaunched = 0; + public List shellIds = new ArrayList(); @Override - protected Thread createLaunchContainerThread(Container allocatedContainer) { + protected Thread createLaunchContainerThread(Container allocatedContainer, + String shellId) { threadsLaunched++; launchedContainers.add(allocatedContainer.getId()); + shellIds.add(shellId); return new Thread(); } @@ -101,6 +105,8 @@ public void testDSAppMasterAllocateHandler() throws Exception { Mockito.verifyZeroInteractions(mockClient); Assert.assertEquals("Incorrect number of threads launched", 1, master.threadsLaunched); + Assert.assertEquals("Incorrect shell IDs", + Arrays.asList("1"), master.shellIds); // now send 3 extra containers containers.clear(); @@ -117,6 +123,9 @@ public void testDSAppMasterAllocateHandler() throws Exception { Assert.assertEquals("Incorrect number of threads launched", 4, master.threadsLaunched); + Assert.assertEquals("Incorrect shell IDs", + Arrays.asList("1", "2", "3", "4"), master.shellIds); + // make sure we handle completion events correctly List status = new ArrayList<>(); status.add(generateContainerStatus(id1, ContainerExitStatus.SUCCESS));