diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index f410c43..a70c1d2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -287,6 +287,8 @@ protected final Set launchedContainers = Collections.newSetFromMap(new ConcurrentHashMap()); + private String containerHosts = null; + private boolean disableRelaxLocality = false; /** * @param args Command line args */ @@ -372,6 +374,10 @@ public boolean init(String[] args) throws ParseException, IOException { "No. of containers on which the shell command needs to be executed"); opts.addOption("priority", true, "Application Priority. Default 0"); opts.addOption("debug", false, "Dump out debug information"); + opts.addOption("container_hosts", true, "nodes to allcate containers on. " + + "List of node names seperated by comma. Works with " + + "--disableRelaxLocality for strictly request"); + opts.addOption("disableRelaxLocality", false, "Disable relax locality."); opts.addOption("help", false, "Print usage"); CommandLine cliParser = new GnuParser().parse(opts, args); @@ -401,6 +407,11 @@ public boolean init(String[] args) throws ParseException, IOException { dumpOutDebugInfo(); } + if (cliParser.hasOption("disableRelaxLocality")) { + disableRelaxLocality = true; + } + containerHosts = cliParser.getOptionValue("container_hosts", null); + Map envs = System.getenv(); if (!envs.containsKey(Environment.CONTAINER_ID.name())) { @@ -508,6 +519,7 @@ public boolean init(String[] args) throws ParseException, IOException { } requestPriority = Integer.parseInt(cliParser .getOptionValue("priority", "0")); + return true; } @@ -1101,8 +1113,13 @@ private ContainerRequest setupContainerAskForRM() { Resource capability = Resource.newInstance(containerMemory, containerVirtualCores); - ContainerRequest request = new ContainerRequest(capability, null, null, - pri); + String[] hosts = + containerHosts == null || containerHosts.isEmpty() ? null + : containerHosts.split(","); + + ContainerRequest request = new ContainerRequest(capability, hosts, null, + pri, !disableRelaxLocality); + LOG.info("Requested container ask: " + request.toString()); return request; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 68d2bde..8934ffa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -171,6 +171,9 @@ // Debug flag boolean debugFlag = false; + private String containerHosts = null; + private boolean disableRelaxLocality = false; + // Timeline domain ID private String domainId = null; @@ -287,6 +290,10 @@ public Client(Configuration conf) throws Exception { + " will be allocated, \"\" means containers" + " can be allocated anywhere, if you don't specify the option," + " default node_label_expression of queue will be used."); + opts.addOption("container_hosts", true, "nodes to allcate containers on. " + + "List of node names seperated by comma. Works with " + + "--disableRelaxLocality for strictly request"); + opts.addOption("disableRelaxLocality", false, "Disable relax locality."); } /** @@ -335,6 +342,11 @@ public boolean init(String[] args) throws ParseException { } + if (cliParser.hasOption("disableRelaxLocality")) { + disableRelaxLocality = true; + } + containerHosts = cliParser.getOptionValue("container_hosts", null); + if (cliParser.hasOption("keep_containers_across_application_attempts")) { LOG.info("keep_containers_across_application_attempts"); keepContainers = true; @@ -638,6 +650,13 @@ public boolean run() throws IOException, YarnException { vargs.add("--debug"); } + if (null != containerHosts) { + vargs.add("--container_hosts " + String.valueOf(containerHosts)); + } + if (disableRelaxLocality) { + vargs.add("--disableRelaxLocality"); + } + vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout"); vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr"); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSWithRelaxLocalityDisabled.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSWithRelaxLocalityDisabled.java new file mode 100644 index 0000000..7e8e32d --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSWithRelaxLocalityDisabled.java @@ -0,0 +1,157 @@ +package org.apache.hadoop.yarn.applications.distributedshell; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestDSWithRelaxLocalityDisabled { + private static final Log LOG = + LogFactory.getLog(TestDSWithRelaxLocalityDisabled.class); + + static final int NUM_NMS = 3; + static final int NUM_CONTAINERS = 3; + TestDistributedShell distShellTest; + + @Before + public void setup() throws Exception { + distShellTest = new TestDistributedShell(); + distShellTest.setupInternal(NUM_NMS, true); + } + + @Test(timeout=90000) + public void testDSShellWithNodeLabelExpression() throws Exception { + + // Fetch node-ids from yarn cluster + NodeId[] nodeIds = new NodeId[NUM_NMS]; + for (int i = 0; i < NUM_NMS; i++) { + NodeManager mgr = distShellTest.yarnCluster.getNodeManager(i); + nodeIds[i] = mgr.getNMContext().getNodeId(); + } + + // try to launch containers in this host + NodeId expectedNodeId = nodeIds[1]; + String expectedHost = + expectedNodeId.getHost() + ":" + expectedNodeId.getPort(); + + // Start NMContainerMonitor + NMContainerMonitor monitor = new NMContainerMonitor(); + Thread t = new Thread(monitor); + t.start(); + + // Submit a job which will sleep for 60 sec + String[] args = { + "--jar", + TestDistributedShell.APPMASTER_JAR, + "--num_containers", + String.valueOf(NUM_CONTAINERS), + "--shell_command", + "sleep", + "--shell_args", + "15", + "--master_memory", + "512", + "--master_vcores", + "2", + "--container_memory", + "128", + "--container_vcores", + "1", + "--container_hosts", + expectedHost, + "--disableRelaxLocality" + }; + + LOG.info("Initializing DS Client"); + final Client client = + new Client(new Configuration(distShellTest.yarnCluster.getConfig())); + boolean initSuccess = client.init(args); + Assert.assertTrue(initSuccess); + LOG.info("Running DS Client"); + boolean result = client.run(); + LOG.info("Client run completed. Result=" + result); + + monitor.stopRunning(); + + //verify all the containers launched in the expected NM + //The AM Container may or may not be launched in the expected NM + Map> ContainersLauchedOnNMs = + monitor.getContainersLauchedOnNMs(); + for (Entry> entry : ContainersLauchedOnNMs + .entrySet()) { + if (entry.getKey() == expectedNodeId) { + HashSet ids = entry.getValue(); + Assert.assertTrue(ids.size() >= NUM_CONTAINERS); + } else { + // if there is container launched in other NM, + // this should be the AM Container + HashSet ids = entry.getValue(); + if (ids.size() == 1) { + ContainerId id = (ContainerId) (ids.toArray()[0]); + Assert.assertTrue(id.getContainerId() == 1); + } else { + Assert.assertTrue(ids.size() == 0); + } + } + } + } + + /** + * Monitor containers running on NMs + */ + class NMContainerMonitor implements Runnable { + // The interval of milliseconds of sampling (500ms) + final static int SAMPLING_INTERVAL_MS = 500; + + Map> ContainersLauchedOnNMs = + new HashMap>(); + + volatile boolean stop = false; + + @Override + public void run() { + while (!stop) { + for (int i = 0; i < NUM_NMS; i++) { + Context context = + distShellTest.yarnCluster.getNodeManager(i).getNMContext(); + NodeId nodeId = context.getNodeId(); + Set containerIds = context.getContainers().keySet(); + if (ContainersLauchedOnNMs.containsKey(nodeId)) { + HashSet Ids = ContainersLauchedOnNMs.get(nodeId); + Ids.addAll(containerIds); + ContainersLauchedOnNMs.put(nodeId, Ids); + } else { + ContainersLauchedOnNMs.put(nodeId, new HashSet( + containerIds)); + } + } + try { + Thread.sleep(SAMPLING_INTERVAL_MS); + } catch (InterruptedException e) { + e.printStackTrace(); + break; + } + } + } + + public Map> getContainersLauchedOnNMs() { + return ContainersLauchedOnNMs; + } + + public void stopRunning() { + this.stop = true; + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 3197875..f61529a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -72,7 +72,8 @@ public void setup() throws Exception { setupInternal(NUM_NMS); } - protected void setupInternal(int numNodeManager) throws Exception { + protected void setupInternal(int numNodeManager, boolean usePortForNodeName) + throws Exception { LOG.info("Starting up YARN cluster"); @@ -84,7 +85,8 @@ protected void setupInternal(int numNodeManager) throws Exception { conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); conf.set("mapreduce.jobhistory.address", "0.0.0.0:" + ServerSocketUtil.getPort(10021, 10)); - + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, + usePortForNodeName); if (yarnCluster == null) { yarnCluster = new MiniYARNCluster(TestDistributedShell.class.getSimpleName(), 1, @@ -129,6 +131,9 @@ protected void setupInternal(int numNodeManager) throws Exception { } } + protected void setupInternal(int numNodeManager) throws Exception { + this.setupInternal(numNodeManager, false); + } @After public void tearDown() throws IOException { if (yarnCluster != null) {