diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index f5b3d0a..fdf5e0f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -35,6 +35,7 @@ import java.util.List; import java.util.Map; import java.util.Vector; +import java.util.Collections; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; @@ -168,6 +169,8 @@ private static final Log LOG = LogFactory.getLog(ApplicationMaster.class); + private static final int INVALID_CONTAINER_TAG = -1; + @VisibleForTesting @Private public static enum DSEvent { @@ -219,6 +222,9 @@ private int containerVirtualCores = 1; // Priority of the request private int requestPriority; + // When enabled, unique id starting at 0 up to (num_containers - 1) will be assigned to each container + protected boolean container_tag_enabled = false; + // Counter for completed containers ( complete denotes successful or failed ) private AtomicInteger numCompletedContainers = new AtomicInteger(); @@ -361,6 +367,8 @@ public boolean init(String[] args) throws ParseException, IOException { opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed"); opts.addOption("priority", true, "Application Priority. Default 0"); + opts.addOption("container_tag_enabled", false, + "When enabled, unique id starting at 0 up to (num_containers - 1) will be assigned to each container."); opts.addOption("debug", false, "Dump out debug information"); opts.addOption("help", false, "Print usage"); @@ -498,6 +506,10 @@ public boolean init(String[] args) throws ParseException, IOException { } requestPriority = Integer.parseInt(cliParser .getOptionValue("priority", "0")); + if (cliParser.hasOption("container_tag_enabled")) { + container_tag_enabled = true; + } + return true; } @@ -717,6 +729,45 @@ protected boolean finish() { } private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler { + + private List containerTags = Collections + .synchronizedList(new ArrayList()); + + public RMCallbackHandler() { + for (int i = 0; i < numTotalContainers; i++) { + containerTags.add(i, null); + } + LOG.info("containerTags size is " + containerTags.size()); + } + + private int setContainerTag(String containerId) { + synchronized (containerTags) { + for (int i = 0; i < containerTags.size(); i++) { + if (containerTags.get(i) == null) { + LOG.info("assign containerTag " + i + " to " + containerId); + containerTags.set(i, containerId); + return i; + } + } + } + LOG.warn("Failed to assign containerTag"); + return INVALID_CONTAINER_TAG; + } + + private void clearContainerTag(String containerId) { + synchronized (containerTags) { + for (int i = 0; i < containerTags.size(); i++) { + String id = containerTags.get(i); + if (id != null && id.equals(containerId)) { + LOG.info("clear containerTag " + i); + containerTags.set(i, null); + return; + } + } + } + LOG.warn("Failed to clear containerTag"); + } + @SuppressWarnings("unchecked") @Override public void onContainersCompleted(List completedContainers) { @@ -748,6 +799,9 @@ public void onContainersCompleted(List completedContainers) { numRequestedContainers.decrementAndGet(); // we do not need to release the container as it would be done // by the RM + + if (container_tag_enabled) + clearContainerTag(containerStatus.getContainerId().toString()); } } else { // nothing to do @@ -784,6 +838,11 @@ public void onContainersAllocated(List allocatedContainers) { + allocatedContainers.size()); numAllocatedContainers.addAndGet(allocatedContainers.size()); for (Container allocatedContainer : allocatedContainers) { + + int containerTag = INVALID_CONTAINER_TAG; + if (container_tag_enabled) + containerTag = setContainerTag(allocatedContainer.getId().toString()); + LOG.info("Launching shell command on a new container." + ", containerId=" + allocatedContainer.getId() + ", containerNode=" + allocatedContainer.getNodeId().getHost() @@ -792,12 +851,13 @@ public void onContainersAllocated(List allocatedContainers) { + ", containerResourceMemory" + allocatedContainer.getResource().getMemory() + ", containerResourceVirtualCores" - + allocatedContainer.getResource().getVirtualCores()); + + allocatedContainer.getResource().getVirtualCores() + + ", containerTag=" + containerTag); // + ", containerToken" // +allocatedContainer.getContainerToken().getIdentifier().toString()); LaunchContainerRunnable runnableLaunchContainer = - new LaunchContainerRunnable(allocatedContainer, containerListener); + new LaunchContainerRunnable(allocatedContainer, containerListener, containerTag); Thread launchThread = new Thread(runnableLaunchContainer); // launch and start the container on a separate thread to keep @@ -913,14 +973,17 @@ public void onStopContainerError(ContainerId containerId, Throwable t) { NMCallbackHandler containerListener; + int containerTag; + /** * @param lcontainer Allocated container * @param containerListener Callback handler of the container */ public LaunchContainerRunnable( - Container lcontainer, NMCallbackHandler containerListener) { + Container lcontainer, NMCallbackHandler containerListener, int containerTag) { this.container = lcontainer; this.containerListener = containerListener; + this.containerTag = containerTag; } @Override @@ -1011,6 +1074,14 @@ public void run() { List commands = new ArrayList(); commands.add(command.toString()); + Map shellEnvAddedContainerSpecific = new HashMap(); + shellEnvAddedContainerSpecific.putAll(shellEnv); + if (container_tag_enabled) { + // add CONTAINER_TAG to shellEnv + shellEnvAddedContainerSpecific.put("CONTAINER_TAG", "" + containerTag); + shellEnvAddedContainerSpecific.put("NUM_CONTAINERS", "" + numTotalContainers); + } + // Set up ContainerLaunchContext, setting local resource, environment, // command and token for constructor. @@ -1021,7 +1092,7 @@ public void run() { // otherwise also useful in cases, for e.g., when one is running a // "hadoop dfs" command inside the distributed shell. ContainerLaunchContext ctx = ContainerLaunchContext.newInstance( - localResources, shellEnv, commands, null, allTokens.duplicate(), null); + localResources, shellEnvAddedContainerSpecific, commands, null, allTokens.duplicate(), null); containerListener.addContainer(container.getId(), container); nmClientAsync.startContainerAsync(container, ctx); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 0e9a4e4..9f1f118 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -154,6 +154,9 @@ private int numContainers = 1; private String nodeLabelExpression = null; + // When enabled, unique id starting at 0 up to (num_containers - 1) will be assigned to each container. + private boolean containerTagEnabled = false; + // log4j.properties file // if available, add to local resources and set into classpath private String log4jPropFile = ""; @@ -280,6 +283,8 @@ public Client(Configuration conf) throws Exception { + "modify the timeline entities in the given domain"); opts.addOption("create", false, "Flag to indicate whether to create the " + "domain specified with -domain."); + opts.addOption("container_tag_enabled", false, "When enabled, unique id " + + "starting at 0 up to (num_containers - 1) will be assigned to each container."); opts.addOption("help", false, "Print usage"); opts.addOption("node_label_expression", true, "Node label expression to determine the nodes" @@ -409,6 +414,15 @@ public boolean init(String[] args) throws ParseException { nodeLabelExpression = cliParser.getOptionValue("node_label_expression", null); + if (cliParser.hasOption("container_tag_enabled")) { + LOG.info("container_tag_enabled"); + containerTagEnabled = true; + } + if (containerTagEnabled && keepContainers) { + throw new IllegalArgumentException("container_tag_enabled can not be used with" + + " keep_containers_across_application_attempts"); + } + clientTimeout = Integer.parseInt(cliParser.getOptionValue("timeout", "600000")); attemptFailuresValidityInterval = @@ -630,6 +644,9 @@ public boolean run() throws IOException, YarnException { appContext.setNodeLabelExpression(nodeLabelExpression); } vargs.add("--priority " + String.valueOf(shellCmdPriority)); + if (containerTagEnabled) { + vargs.add("--container_tag_enabled"); + } for (Map.Entry entry : shellEnv.entrySet()) { vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 47b9dfb..2894179 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -907,5 +907,41 @@ private int verifyContainerLog(int containerNum, } return numOfWords; } + + @Test(timeout=90000) + public void testDSShellWithContainerTag() throws Exception { + + String[] args = { + "--jar", + APPMASTER_JAR, + "--num_containers", + "1", + "--shell_command", + "echo", + "--shell_args", + "$CONTAINER_TAG $NUM_CONTAINERS", + "--master_memory", + "512", + "--master_vcores", + "2", + "--container_memory", + "128", + "--container_vcores", + "1", + "--container_tag_enabled" + }; + + LOG.info("Initializing DS Client"); + final Client client = + new Client(new Configuration(yarnCluster.getConfig())); + boolean initSuccess = client.init(args); + Assert.assertTrue(initSuccess); + LOG.info("Running DS Client"); + boolean result = client.run(); + LOG.info("Client run completed. Result=" + result); + List expectedContent = new ArrayList(); + expectedContent.add("0 1"); + verifyContainerLog(1, expectedContent, false, ""); + } }