diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 95dbddc..5c33b88 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -37,6 +37,7 @@ import java.util.Map; import java.util.Set; import java.util.Vector; +import java.util.Arrays; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; @@ -223,6 +224,8 @@ private int containerVirtualCores = 1; // Priority of the request private int requestPriority; + // Blacklist Nodes that should not be allocated + private List blacklist = null; // Counter for completed containers ( complete denotes successful or failed ) private AtomicInteger numCompletedContainers = new AtomicInteger(); @@ -371,6 +374,7 @@ public boolean init(String[] args) throws ParseException, IOException { opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed"); opts.addOption("priority", true, "Application Priority. Default 0"); + opts.addOption("blacklist", true, "Nodes that should not be allocated"); opts.addOption("debug", false, "Dump out debug information"); opts.addOption("help", false, "Print usage"); @@ -508,6 +512,12 @@ public boolean init(String[] args) throws ParseException, IOException { } requestPriority = Integer.parseInt(cliParser .getOptionValue("priority", "0")); + + String opt_blacklist = cliParser.getOptionValue("blacklist", null); + if (opt_blacklist != null) { + blacklist = new ArrayList(Arrays.asList(opt_blacklist.split(","))); + } + return true; } @@ -608,6 +618,8 @@ public void run() throws YarnException, IOException, InterruptedException { containerVirtualCores = maxVCores; } + amRMClient.updateBlacklist(blacklist, null); + List previousAMRunningContainers = response.getContainersFromPreviousAttempts(); LOG.info(appAttemptID + " received " + previousAMRunningContainers.size() diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index e864ad2..ab88bde 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -154,6 +154,7 @@ // No. of containers in which the shell script needs to be executed private int numContainers = 1; private String nodeLabelExpression = null; + private String blacklist = null; // log4j.properties file // if available, add to local resources and set into classpath @@ -288,6 +289,7 @@ public Client(Configuration conf) throws Exception { + " will be allocated, \"\" means containers" + " can be allocated anywhere, if you don't specify the option," + " default node_label_expression of queue will be used."); + opts.addOption("blacklist", true, "Nodes that should not be allocated"); } /** @@ -410,6 +412,8 @@ public boolean init(String[] args) throws ParseException { nodeLabelExpression = cliParser.getOptionValue("node_label_expression", null); + blacklist = cliParser.getOptionValue("blacklist", null); + clientTimeout = Integer.parseInt(cliParser.getOptionValue("timeout", "600000")); attemptFailuresValidityInterval = @@ -630,6 +634,9 @@ public boolean run() throws IOException, YarnException { if (null != nodeLabelExpression) { appContext.setNodeLabelExpression(nodeLabelExpression); } + if (null != blacklist) { + vargs.add("--blacklist " + blacklist); + } vargs.add("--priority " + String.valueOf(shellCmdPriority)); for (Map.Entry entry : shellEnv.entrySet()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 3197875..3ee0545 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -72,7 +72,7 @@ public void setup() throws Exception { setupInternal(NUM_NMS); } - protected void setupInternal(int numNodeManager) throws Exception { + protected void setupInternal(int numNodeManager, boolean usePortForNodeName) throws Exception { LOG.info("Starting up YARN cluster"); @@ -84,6 +84,8 @@ protected void setupInternal(int numNodeManager) throws Exception { conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); conf.set("mapreduce.jobhistory.address", "0.0.0.0:" + ServerSocketUtil.getPort(10021, 10)); + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, + usePortForNodeName); if (yarnCluster == null) { yarnCluster = @@ -129,6 +131,10 @@ protected void setupInternal(int numNodeManager) throws Exception { } } + protected void setupInternal(int numNodeManager) throws Exception { + this.setupInternal(numNodeManager, false); + } + @After public void tearDown() throws IOException { if (yarnCluster != null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShellWithBlacklist.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShellWithBlacklist.java new file mode 100644 index 0000000..54b98a7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShellWithBlacklist.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.distributedshell; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Set; + + +public class TestDistributedShellWithBlacklist { + private static final Log LOG = + LogFactory.getLog(TestDistributedShellWithBlacklist.class); + + static final int NUM_NMS = 2; + static final int NUM_CONTAINERS = 4; + TestDistributedShell distShellTest; + + @Before + public void setup() throws Exception { + distShellTest = new TestDistributedShell(); + distShellTest.setupInternal(NUM_NMS, true); + } + + @Test(timeout=90000) + public void testDSShellWithBlacklist() throws Exception { + + // Fetch node-ids from yarn cluster + NodeId[] nodeIds = new NodeId[NUM_NMS]; + for (int i = 0; i < NUM_NMS; i++) { + NodeManager mgr = distShellTest.yarnCluster.getNodeManager(i); + nodeIds[i] = mgr.getNMContext().getNodeId(); + } + + NodeId blacklistedNodeId = nodeIds[0]; + String blacklistedNodeName = blacklistedNodeId.toString(); + + // Start NMContainerMonitor + NMContainerMonitor mon = new NMContainerMonitor(); + Thread t = new Thread(mon); + t.start(); + + // Submit a job which will sleep for 60 sec + String[] args = { + "--jar", + TestDistributedShell.APPMASTER_JAR, + "--num_containers", + String.valueOf(NUM_CONTAINERS), + "--shell_command", + "sleep", + "--shell_args", + "15", + "--master_memory", + "512", + "--master_vcores", + "2", + "--container_memory", + "128", + "--container_vcores", + "1", + "--blacklist", + blacklistedNodeName + }; + + LOG.info("Initializing DS Client"); + final Client client = + new Client(new Configuration(distShellTest.yarnCluster.getConfig())); + boolean initSuccess = client.init(args); + Assert.assertTrue(initSuccess); + LOG.info("Running DS Client"); + boolean result = client.run(); + LOG.info("Client run completed. Result=" + result); + + t.interrupt(); + + // Check maximum number of containers on each NMs + int[] maxRunningContainersOnNMs = mon.getMaxRunningContainersReport(); + // Check no container allocated on NM[0] + Assert.assertEquals(0, maxRunningContainersOnNMs[0]); + // Check there're some containers allocated on NM[1] + Assert.assertTrue(maxRunningContainersOnNMs[1] > 0); + } + + /** + * Monitor containers running on NMs + */ + class NMContainerMonitor implements Runnable { + // The interval of milliseconds of sampling (500ms) + final static int SAMPLING_INTERVAL_MS = 500; + + // The maximum number of containers running on each NMs + int[] maxRunningContainersOnNMs = new int[NUM_NMS]; + + @Override + public void run() { + while (true) { + for (int i = 0; i < NUM_NMS; i++) { + + Set containerIds = distShellTest.yarnCluster.getNodeManager(i).getNMContext() + .getContainers().keySet(); + + int nContainers = containerIds.size(); + + // AM container should be excepted from counting + for (ContainerId containerid : containerIds) { + if (containerid.getContainerId() == 1) { + nContainers -= 1; + } + } + + if (nContainers > maxRunningContainersOnNMs[i]) { + maxRunningContainersOnNMs[i] = nContainers; + } + } + try { + Thread.sleep(SAMPLING_INTERVAL_MS); + } catch (InterruptedException e) { + e.printStackTrace(); + break; + } + } + } + + public int[] getMaxRunningContainersReport() { + return maxRunningContainersOnNMs; + } + } +}