From e0d5e5860213f3e0675ca2dcd0299912d25839b9 Mon Sep 17 00:00:00 2001 From: Prabhu Joseph Date: Sun, 3 Feb 2019 15:03:37 +0530 Subject: [PATCH] YARN-9253 --- .../TestDSWithMultipleNodeManager.java | 216 +++++++++++++++++++++ .../distributedshell/TestDistributedShell.java | 5 +- .../TestDistributedShellWithNodeLabels.java | 165 ---------------- 3 files changed, 219 insertions(+), 167 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSWithMultipleNodeManager.java delete mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShellWithNodeLabels.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSWithMultipleNodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSWithMultipleNodeManager.java new file mode 100644 index 0000000..a8b3be2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSWithMultipleNodeManager.java @@ -0,0 +1,216 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.distributedshell; +import java.io.IOException; +import java.util.concurrent.ConcurrentMap; +import java.util.HashSet; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.ImmutableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestDSWithMultipleNodeManager { + private static final Logger LOG = + LoggerFactory.getLogger(TestDSWithMultipleNodeManager.class); + + static final int NUM_NMS = 2; + TestDistributedShell distShellTest; + + @Before + public void setup() throws Exception { + distShellTest = new TestDistributedShell(); + distShellTest.setupInternal(NUM_NMS); + } + + private void initializeNodeLabels() throws IOException { + RMContext rmContext = distShellTest.yarnCluster.getResourceManager(0).getRMContext(); + + // Setup node labels + RMNodeLabelsManager labelsMgr = rmContext.getNodeLabelManager(); + Set labels = new HashSet(); + labels.add("x"); + labelsMgr.addToCluserNodeLabelsWithDefaultExclusivity(labels); + + // Setup queue access to node labels + distShellTest.conf.set(PREFIX + "root.accessible-node-labels", "x"); + distShellTest.conf.set(PREFIX + "root.accessible-node-labels.x.capacity", + "100"); + distShellTest.conf.set(PREFIX + "root.default.accessible-node-labels", "x"); + distShellTest.conf.set(PREFIX + + "root.default.accessible-node-labels.x.capacity", "100"); + + rmContext.getScheduler().reinitialize(distShellTest.conf, rmContext); + + // Fetch node-ids from yarn cluster + NodeId[] nodeIds = new NodeId[NUM_NMS]; + for (int i = 0; i < NUM_NMS; i++) { + NodeManager mgr = distShellTest.yarnCluster.getNodeManager(i); + nodeIds[i] = mgr.getNMContext().getNodeId(); + } + + // Set label x to NM[1] + labelsMgr.addLabelsToNode(ImmutableMap.of(nodeIds[1], labels)); + } + + @Test(timeout=90000) + public void testDSShellWithNodeLabelExpression() throws Exception { + initializeNodeLabels(); + + // Start NMContainerMonitor + NMContainerMonitor mon = new NMContainerMonitor(); + Thread t = new Thread(mon); + t.start(); + + // Submit a job which will sleep for 60 sec + String[] args = { + "--jar", + TestDistributedShell.APPMASTER_JAR, + "--num_containers", + "4", + "--shell_command", + "sleep", + "--shell_args", + "15", + "--master_memory", + "512", + "--master_vcores", + "2", + "--container_memory", + "128", + "--container_vcores", + "1", + "--node_label_expression", + "x" + }; + + LOG.info("Initializing DS Client"); + final Client client = + new Client(new Configuration(distShellTest.yarnCluster.getConfig())); + boolean initSuccess = client.init(args); + Assert.assertTrue(initSuccess); + LOG.info("Running DS Client"); + boolean result = client.run(); + LOG.info("Client run completed. Result=" + result); + + t.interrupt(); + + // Check maximum number of containers on each NMs + int[] maxRunningContainersOnNMs = mon.getMaxRunningContainersReport(); + // Check no container allocated on NM[0] + Assert.assertEquals(0, maxRunningContainersOnNMs[0]); + // Check there're some containers allocated on NM[1] + Assert.assertTrue(maxRunningContainersOnNMs[1] > 0); + } + + @Test(timeout = 90000) + public void testDistributedShellWithPlacementConstraint() + throws Exception { + NMContainerMonitor mon = new NMContainerMonitor(); + Thread t = new Thread(mon); + t.start(); + + String[] args = { + "--jar", + distShellTest.APPMASTER_JAR, + "1", + "--shell_command", + distShellTest.getSleepCommand(15), + "--placement_spec", + "zk=1,NOTIN,NODE,zk:spark=1,NOTIN,NODE,zk" + }; + LOG.info("Initializing DS Client"); + final Client client = + new Client(new Configuration(distShellTest.yarnCluster.getConfig())); + boolean initSuccess = client.init(args); + Assert.assertTrue(initSuccess); + LOG.info("Running DS Client"); + boolean result = client.run(); + LOG.info("Client run completed. Result=" + result); + + t.interrupt(); + + ConcurrentMap apps = distShellTest.yarnCluster. + getResourceManager().getRMContext().getRMApps(); + RMApp app = apps.values().iterator().next(); + RMAppAttempt appAttempt = app.getAppAttempts().values().iterator().next(); + NodeId masterNodeId = appAttempt.getMasterContainer().getNodeId(); + NodeManager nm1 = distShellTest.yarnCluster.getNodeManager(0); + + int expectedNM1Count = 1; + int expectedNM2Count = 1; + if (nm1.getNMContext().getNodeId().equals(masterNodeId)) { + expectedNM1Count++; + } else { + expectedNM2Count++; + } + + int[] maxRunningContainersOnNMs = mon.getMaxRunningContainersReport(); + Assert.assertEquals(expectedNM1Count, maxRunningContainersOnNMs[0]); + Assert.assertEquals(expectedNM2Count, maxRunningContainersOnNMs[1]); + } + + /** + * Monitor containers running on NMs + */ + class NMContainerMonitor implements Runnable { + // The interval of milliseconds of sampling (500ms) + final static int SAMPLING_INTERVAL_MS = 500; + + // The maximum number of containers running on each NMs + int[] maxRunningContainersOnNMs = new int[NUM_NMS]; + + @Override + public void run() { + while (true) { + for (int i = 0; i < NUM_NMS; i++) { + int nContainers = + distShellTest.yarnCluster.getNodeManager(i).getNMContext() + .getContainers().size(); + if (nContainers > maxRunningContainersOnNMs[i]) { + maxRunningContainersOnNMs[i] = nContainers; + } + } + try { + Thread.sleep(SAMPLING_INTERVAL_MS); + } catch (InterruptedException e) { + e.printStackTrace(); + break; + } + } + } + + public int[] getMaxRunningContainersReport() { + return maxRunningContainersOnNMs; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index e67e541..b4eaa9f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -179,7 +179,8 @@ private void setupInternal(int numNodeManager, float timelineVersion) true); conf.setBoolean( YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true); - + conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER); // ATS version specific settings if (timelineVersion == 1.0f) { conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f); @@ -782,7 +783,7 @@ private boolean checkIPs(String hostname, String localIP, String appIP) } - private String getSleepCommand(int sec) { + protected String getSleepCommand(int sec) { // Windows doesn't have a sleep command, ping -n does the trick return Shell.WINDOWS ? "ping -n " + (sec + 1) + " 127.0.0.1 >nul" : "sleep " + sec; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShellWithNodeLabels.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShellWithNodeLabels.java deleted file mode 100644 index e9ba6e8..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShellWithNodeLabels.java +++ /dev/null @@ -1,165 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.applications.distributedshell; -import java.io.IOException; -import java.util.HashSet; -import java.util.Set; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.server.nodemanager.NodeManager; -import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import com.google.common.collect.ImmutableMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TestDistributedShellWithNodeLabels { - private static final Logger LOG = - LoggerFactory.getLogger(TestDistributedShellWithNodeLabels.class); - - static final int NUM_NMS = 2; - TestDistributedShell distShellTest; - - @Before - public void setup() throws Exception { - distShellTest = new TestDistributedShell(); - distShellTest.setupInternal(NUM_NMS); - } - - private void initializeNodeLabels() throws IOException { - RMContext rmContext = distShellTest.yarnCluster.getResourceManager(0).getRMContext(); - - // Setup node labels - RMNodeLabelsManager labelsMgr = rmContext.getNodeLabelManager(); - Set labels = new HashSet(); - labels.add("x"); - labelsMgr.addToCluserNodeLabelsWithDefaultExclusivity(labels); - - // Setup queue access to node labels - distShellTest.conf.set(PREFIX + "root.accessible-node-labels", "x"); - distShellTest.conf.set(PREFIX + "root.accessible-node-labels.x.capacity", - "100"); - distShellTest.conf.set(PREFIX + "root.default.accessible-node-labels", "x"); - distShellTest.conf.set(PREFIX - + "root.default.accessible-node-labels.x.capacity", "100"); - - rmContext.getScheduler().reinitialize(distShellTest.conf, rmContext); - - // Fetch node-ids from yarn cluster - NodeId[] nodeIds = new NodeId[NUM_NMS]; - for (int i = 0; i < NUM_NMS; i++) { - NodeManager mgr = distShellTest.yarnCluster.getNodeManager(i); - nodeIds[i] = mgr.getNMContext().getNodeId(); - } - - // Set label x to NM[1] - labelsMgr.addLabelsToNode(ImmutableMap.of(nodeIds[1], labels)); - } - - @Test(timeout=90000) - public void testDSShellWithNodeLabelExpression() throws Exception { - initializeNodeLabels(); - - // Start NMContainerMonitor - NMContainerMonitor mon = new NMContainerMonitor(); - Thread t = new Thread(mon); - t.start(); - - // Submit a job which will sleep for 60 sec - String[] args = { - "--jar", - TestDistributedShell.APPMASTER_JAR, - "--num_containers", - "4", - "--shell_command", - "sleep", - "--shell_args", - "15", - "--master_memory", - "512", - "--master_vcores", - "2", - "--container_memory", - "128", - "--container_vcores", - "1", - "--node_label_expression", - "x" - }; - - LOG.info("Initializing DS Client"); - final Client client = - new Client(new Configuration(distShellTest.yarnCluster.getConfig())); - boolean initSuccess = client.init(args); - Assert.assertTrue(initSuccess); - LOG.info("Running DS Client"); - boolean result = client.run(); - LOG.info("Client run completed. Result=" + result); - - t.interrupt(); - - // Check maximum number of containers on each NMs - int[] maxRunningContainersOnNMs = mon.getMaxRunningContainersReport(); - // Check no container allocated on NM[0] - Assert.assertEquals(0, maxRunningContainersOnNMs[0]); - // Check there're some containers allocated on NM[1] - Assert.assertTrue(maxRunningContainersOnNMs[1] > 0); - } - - /** - * Monitor containers running on NMs - */ - class NMContainerMonitor implements Runnable { - // The interval of milliseconds of sampling (500ms) - final static int SAMPLING_INTERVAL_MS = 500; - - // The maximum number of containers running on each NMs - int[] maxRunningContainersOnNMs = new int[NUM_NMS]; - - @Override - public void run() { - while (true) { - for (int i = 0; i < NUM_NMS; i++) { - int nContainers = - distShellTest.yarnCluster.getNodeManager(i).getNMContext() - .getContainers().size(); - if (nContainers > maxRunningContainersOnNMs[i]) { - maxRunningContainersOnNMs[i] = nContainers; - } - } - try { - Thread.sleep(SAMPLING_INTERVAL_MS); - } catch (InterruptedException e) { - e.printStackTrace(); - break; - } - } - } - - public int[] getMaxRunningContainersReport() { - return maxRunningContainersOnNMs; - } - } -} -- 2.7.4 (Apple Git-66)