diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 2067aca..0e9a4e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -115,7 +115,7 @@ public class Client { private static final Log LOG = LogFactory.getLog(Client.class); - + // Configuration private Configuration conf; private YarnClient yarnClient; @@ -152,6 +152,7 @@ private int containerVirtualCores = 1; // No. of containers in which the shell script needs to be executed private int numContainers = 1; + private String nodeLabelExpression = null; // log4j.properties file // if available, add to local resources and set into classpath @@ -280,7 +281,12 @@ public Client(Configuration conf) throws Exception { opts.addOption("create", false, "Flag to indicate whether to create the " + "domain specified with -domain."); opts.addOption("help", false, "Print usage"); - + opts.addOption("node_label_expression", true, + "Node label expression to determine the nodes" + + " where all the containers of this application" + + " will be allocated, \"\" means containers" + + " can be allocated anywhere, if you don't specify the option," + + " default node_label_expression of queue will be used."); } /** @@ -391,6 +397,7 @@ public boolean init(String[] args) throws ParseException { containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10")); containerVirtualCores = Integer.parseInt(cliParser.getOptionValue("container_vcores", "1")); numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1")); + if (containerMemory < 0 || containerVirtualCores < 0 || numContainers < 1) { throw new IllegalArgumentException("Invalid no. of containers or container memory/vcores specified," @@ -399,6 +406,8 @@ public boolean init(String[] args) throws ParseException { + ", containerVirtualCores=" + containerVirtualCores + ", numContainer=" + numContainers); } + + nodeLabelExpression = cliParser.getOptionValue("node_label_expression", null); clientTimeout = Integer.parseInt(cliParser.getOptionValue("timeout", "600000")); @@ -617,6 +626,9 @@ public boolean run() throws IOException, YarnException { vargs.add("--container_memory " + String.valueOf(containerMemory)); vargs.add("--container_vcores " + String.valueOf(containerVirtualCores)); vargs.add("--num_containers " + String.valueOf(numContainers)); + if (null != nodeLabelExpression) { + appContext.setNodeLabelExpression(nodeLabelExpression); + } vargs.add("--priority " + String.valueOf(shellCmdPriority)); for (Map.Entry entry : shellEnv.entrySet()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 2414d4d..537c187 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -30,7 +30,10 @@ import java.net.URL; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; @@ -42,6 +45,7 @@ import org.apache.hadoop.util.JarFinder; import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; @@ -49,40 +53,116 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.junit.After; import org.junit.Assert; +import org.junit.Assume; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import com.google.common.collect.ImmutableMap; + +@RunWith(Parameterized.class) public class TestDistributedShell { private static final Log LOG = LogFactory.getLog(TestDistributedShell.class); protected MiniYARNCluster yarnCluster = null; - protected Configuration conf = new YarnConfiguration(); + private int numNodeManager = 1; + + private SchedulerType schedulerType; + private YarnConfiguration conf = null; + + public enum SchedulerType { + CAPACITY, FIFO + } + + public TestDistributedShell(SchedulerType type) { + schedulerType = type; + } + + @Parameterized.Parameters + public static Collection getParameters() { + return Arrays.asList(new SchedulerType[][] { { SchedulerType.CAPACITY }, + { SchedulerType.FIFO}}); + } protected final static String APPMASTER_JAR = JarFinder.getJar(ApplicationMaster.class); + + private void configureAccordingToSchedulerType() throws IOException { + conf = new YarnConfiguration(); + switch (schedulerType) { + case CAPACITY: + conf.set(YarnConfiguration.RM_SCHEDULER, + CapacityScheduler.class.getName()); + // we need two node manager to run tests for node label + numNodeManager = 2; + break; + case FIFO: + conf.set(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class.getName()); + break; + } + } + + private void initializeNodeLabels() throws IOException { + RMContext rmContext = yarnCluster.getResourceManager(0).getRMContext(); + + // Setup node labels + RMNodeLabelsManager labelsMgr = rmContext.getNodeLabelManager(); + Set labels = new HashSet(); + labels.add("x"); + labelsMgr.addToCluserNodeLabels(labels); + + // Setup queue access to node labels + conf.set("yarn.scheduler.capacity.root.accessible-node-labels", "x"); + conf.set("yarn.scheduler.capacity.root.default.accessible-node-labels", "x"); + conf.set( + "yarn.scheduler.capacity.root.default.accessible-node-labels.x.capacity", + "100"); + + rmContext.getScheduler().reinitialize(conf, rmContext); + + // Fetch node-ids from yarn cluster + NodeId[] nodeIds = new NodeId[numNodeManager]; + for (int i = 0; i < numNodeManager; i++) { + NodeManager mgr = this.yarnCluster.getNodeManager(i); + nodeIds[i] = mgr.getNMContext().getNodeId(); + } + + // Set label x to NM[1] + labelsMgr.addLabelsToNode(ImmutableMap.of(nodeIds[1], labels)); + } @Before public void setup() throws Exception { LOG.info("Starting up YARN cluster"); + configureAccordingToSchedulerType(); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128); - conf.setClass(YarnConfiguration.RM_SCHEDULER, - FifoScheduler.class, ResourceScheduler.class); conf.set("yarn.log.dir", "target"); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + if (yarnCluster == null) { - yarnCluster = new MiniYARNCluster( - TestDistributedShell.class.getSimpleName(), 1, 1, 1, 1, true); + yarnCluster = + new MiniYARNCluster(TestDistributedShell.class.getSimpleName(), 1, + numNodeManager, 1, 1, true); yarnCluster.init(conf); + yarnCluster.start(); - NodeManager nm = yarnCluster.getNodeManager(0); - waitForNMToRegister(nm); + + waitForNMsToRegister(); + + // currently only capacity scheduler support node labels, + if (SchedulerType.CAPACITY == schedulerType) { + initializeNodeLabels(); + } URL url = Thread.currentThread().getContextClassLoader().getResource("yarn-site.xml"); if (url == null) { @@ -436,61 +516,77 @@ public void testDSShellWithCustomLogPropertyFile() throws Exception { new File("target", TestDistributedShell.class.getName()); final File tmpDir = new File(basedir, "tmpDir"); tmpDir.mkdirs(); - final File customLogProperty = new File(tmpDir, "custom_log4j.properties"); + final File customLogProperty = + new File(tmpDir, "custom_log4j.properties"); if (customLogProperty.exists()) { customLogProperty.delete(); } - if(!customLogProperty.createNewFile()) { + if (!customLogProperty.createNewFile()) { Assert.fail("Can not create custom log4j property file."); } - PrintWriter fileWriter = new PrintWriter(customLogProperty); - // set the output to DEBUG level - fileWriter.write("log4j.rootLogger=debug,stdout"); - fileWriter.close(); - String[] args = { - "--jar", - APPMASTER_JAR, - "--num_containers", - "3", - "--shell_command", - "echo", - "--shell_args", - "HADOOP", - "--log_properties", - customLogProperty.getAbsolutePath(), - "--master_memory", - "512", - "--master_vcores", - "2", - "--container_memory", - "128", - "--container_vcores", - "1" - }; - - //Before run the DS, the default the log level is INFO - final Log LOG_Client = - LogFactory.getLog(Client.class); - Assert.assertTrue(LOG_Client.isInfoEnabled()); - Assert.assertFalse(LOG_Client.isDebugEnabled()); - final Log LOG_AM = LogFactory.getLog(ApplicationMaster.class); - Assert.assertTrue(LOG_AM.isInfoEnabled()); - Assert.assertFalse(LOG_AM.isDebugEnabled()); + + try { + PrintWriter fileWriter = new PrintWriter(customLogProperty); + // set the output to DEBUG level + fileWriter.write("log4j.rootLogger=debug,stdout"); + fileWriter.close(); + String[] args = + { "--jar", APPMASTER_JAR, "--num_containers", "3", "--shell_command", + "echo", "--shell_args", "HADOOP", "--log_properties", + customLogProperty.getAbsolutePath(), "--master_memory", "512", + "--master_vcores", "2", "--container_memory", "128", + "--container_vcores", "1" }; + + // Before run the DS, the default the log level is INFO + final Log LOG_Client = LogFactory.getLog(Client.class); + Assert.assertTrue(LOG_Client.isInfoEnabled()); + Assert.assertFalse(LOG_Client.isDebugEnabled()); + final Log LOG_AM = LogFactory.getLog(ApplicationMaster.class); + Assert.assertTrue(LOG_AM.isInfoEnabled()); + Assert.assertFalse(LOG_AM.isDebugEnabled()); - LOG.info("Initializing DS Client"); - final Client client = - new Client(new Configuration(yarnCluster.getConfig())); - boolean initSuccess = client.init(args); - Assert.assertTrue(initSuccess); - LOG.info("Running DS Client"); - boolean result = client.run(); - LOG.info("Client run completed. Result=" + result); - Assert.assertTrue(verifyContainerLog(3, null, true, "DEBUG") > 10); - //After DS is finished, the log level should be DEBUG - Assert.assertTrue(LOG_Client.isInfoEnabled()); - Assert.assertTrue(LOG_Client.isDebugEnabled()); - Assert.assertTrue(LOG_AM.isInfoEnabled()); - Assert.assertTrue(LOG_AM.isDebugEnabled()); + LOG.info("Initializing DS Client"); + final Client client = + new Client(new Configuration(yarnCluster.getConfig())); + boolean initSuccess = client.init(args); + Assert.assertTrue(initSuccess); + LOG.info("Running DS Client"); + boolean result = client.run(); + LOG.info("Client run completed. Result=" + result); + Assert.assertTrue(verifyContainerLog(3, null, true, "DEBUG") > 10); + // After DS is finished, the log level should be DEBUG + Assert.assertTrue(LOG_Client.isInfoEnabled()); + Assert.assertTrue(LOG_Client.isDebugEnabled()); + Assert.assertTrue(LOG_AM.isInfoEnabled()); + Assert.assertTrue(LOG_AM.isDebugEnabled()); + } finally { + // Reset log4j after this test + + PrintWriter fileWriter = new PrintWriter(customLogProperty); + // set the output to DEBUG level + fileWriter.write("log4j.rootLogger=info,stdout"); + fileWriter.close(); + String[] args = + { "--jar", APPMASTER_JAR, "--num_containers", "3", "--shell_command", + "echo", "--shell_args", "HADOOP", "--log_properties", + customLogProperty.getAbsolutePath(), "--master_memory", "512", + "--master_vcores", "2", "--container_memory", "128", + "--container_vcores", "1" }; + final Client client = + new Client(new Configuration(yarnCluster.getConfig())); + boolean initSuccess = client.init(args); + Assert.assertTrue(initSuccess); + boolean runSuccess = client.run(); + Assert.assertTrue(runSuccess); + + // after run the DS, the default the log level is INFO + final Log LOG_Client = LogFactory.getLog(Client.class); + Assert.assertTrue(LOG_Client.isInfoEnabled()); + Assert.assertFalse(LOG_Client.isDebugEnabled()); + final Log LOG_AM = LogFactory.getLog(ApplicationMaster.class); + Assert.assertTrue(LOG_AM.isInfoEnabled()); + Assert.assertFalse(LOG_AM.isDebugEnabled()); + } } public void testDSShellWithCommands() throws Exception { @@ -757,13 +853,15 @@ public void testDSShellWithInvalidArgs() throws Exception { } } - protected static void waitForNMToRegister(NodeManager nm) - throws Exception { - int attempt = 60; - ContainerManagerImpl cm = - ((ContainerManagerImpl) nm.getNMContext().getContainerManager()); - while (cm.getBlockNewContainerRequestsStatus() && attempt-- > 0) { - Thread.sleep(2000); + protected void waitForNMsToRegister() throws Exception { + int sec = 60; + while (sec >= 0) { + if (yarnCluster.getResourceManager().getRMContext().getRMNodes().size() + >= numNodeManager) { + break; + } + Thread.sleep(1000); + sec--; } } @@ -892,5 +990,91 @@ private int verifyContainerLog(int containerNum, } return numOfWords; } + + @Test(timeout=90000) + public void testDSShellWithNodeLabelExpression() throws Exception { + Assume.assumeTrue("Only CapacityScheduler work with node label test", + SchedulerType.CAPACITY == schedulerType); + + // Start NMContainerMonitor + NMContainerMonitor mon = new NMContainerMonitor(); + Thread t = new Thread(mon); + t.start(); + + // Submit a job which will sleep for 60 sec + String[] args = { + "--jar", + APPMASTER_JAR, + "--num_containers", + "4", + "--shell_command", + "sleep", + "--shell_args", + "15", + "--master_memory", + "512", + "--master_vcores", + "2", + "--container_memory", + "128", + "--container_vcores", + "1", + "--node_label_expression", + "x" + }; + + LOG.info("Initializing DS Client"); + final Client client = + new Client(new Configuration(yarnCluster.getConfig())); + boolean initSuccess = client.init(args); + Assert.assertTrue(initSuccess); + LOG.info("Running DS Client"); + boolean result = client.run(); + LOG.info("Client run completed. Result=" + result); + + t.interrupt(); + + // Check maximum number of containers on each NMs + int[] maxRunningContainersOnNMs = mon.getMaxRunningContainersReport(); + // Check no container allocated on NM[0] + Assert.assertEquals(0, maxRunningContainersOnNMs[0]); + // Check there're some containers allocated on NM[1] + Assert.assertTrue(maxRunningContainersOnNMs[1] > 0); + } + + /** + * Monitor containers running on NMs + */ + private class NMContainerMonitor implements Runnable { + // The interval of milliseconds of sampling (500ms) + final static int SAMPLING_INTERVAL_MS = 500; + + // The maximum number of containers running on each NMs + int[] maxRunningContainersOnNMs = new int[numNodeManager]; + + @Override + public void run() { + while (true) { + for (int i = 0; i < numNodeManager; i++) { + int nContainers = + yarnCluster.getNodeManager(i).getNMContext().getContainers() + .size(); + if (nContainers > maxRunningContainersOnNMs[i]) { + maxRunningContainersOnNMs[i] = nContainers; + } + } + try { + Thread.sleep(SAMPLING_INTERVAL_MS); + } catch (InterruptedException e) { + e.printStackTrace(); + break; + } + } + } + + public int[] getMaxRunningContainersReport() { + return maxRunningContainersOnNMs; + } + } }