diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java index 6f75bd17c6f..663d2626260 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java @@ -105,7 +105,7 @@ public class SLSRunner extends Configured implements Tool { // RM, Runner private ResourceManager rm; - private static TaskRunner runner = new TaskRunner(); + private SLSTaskRunners slsTaskRunners; private String[] inputTraces; private Map queueAppNumMap; private int poolSize; @@ -185,10 +185,6 @@ private void init(Configuration tempConf) throws ClassNotFoundException { // runner configuration setConf(tempConf); - // runner - poolSize = tempConf.getInt(SLSConfiguration.RUNNER_POOL_SIZE, - SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT); - SLSRunner.runner.setQueueSize(poolSize); // map for (Map.Entry e : tempConf) { String key = e.getKey().toString(); @@ -198,6 +194,8 @@ private void init(Configuration tempConf) throws ClassNotFoundException { } } + slsTaskRunners = new SLSTaskRunners(tempConf); + poolSize = slsTaskRunners.getPoolSize(); nodeManagerResource = getNodeManagerResource(); } @@ -263,8 +261,8 @@ public void start() throws IOException, ClassNotFoundException, YarnException, printSimulationInfo(); // blocked until all nodes RUNNING waitForNodesRunning(); - // starting the runner once everything is ready to go, - runner.start(); + // starting the runners once everything is ready to go, + slsTaskRunners.startRunners(); } /** @@ -391,7 +389,7 @@ private void startNM() throws YarnException, IOException, random.nextInt(heartbeatInterval), heartbeatInterval, rm, resourceUtilizationRatio, nodeLabels); nmMap.put(nm.getNode().getNodeID(), nm); - runner.schedule(nm); + slsTaskRunners.scheduleNMTask(nm); rackSet.add(nm.getNode().getRackName()); } catch (IOException | YarnException e) { LOG.error("Got an error while adding node", e); @@ -862,15 +860,15 @@ private void runNewAM(String jobType, String user, AM_ID++; amSim.init(heartbeatInterval, containerList, rm, this, jobStartTimeMS, jobFinishTimeMS, user, jobQueue, isTracked, oldJobId, - runner.getStartTimeMS(), amContainerResource, labelExpr, params, - appIdAMSim); + slsTaskRunners.getAMTaskRunner().getStartTimeMS(), + amContainerResource, labelExpr, params, appIdAMSim); if(reservationId != null) { // if we have a ReservationId, delegate reservation creation to // AMSim (reservation shape is impl specific) UTCClock clock = new UTCClock(); amSim.initReservation(reservationId, deadline, clock.getTime()); } - runner.schedule(amSim); + slsTaskRunners.scheduleAMTask(amSim); maxRuntime = Math.max(maxRuntime, jobFinishTimeMS); numTasks += containerList.size(); amMap.put(oldJobId, amSim); @@ -941,7 +939,7 @@ public static void decreaseRemainingApps() { public void stop() throws InterruptedException { rm.stop(); - runner.stop(); + slsTaskRunners.stopRunners(); } public int run(final String[] argv) throws IOException, InterruptedException, diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSTaskRunners.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSTaskRunners.java new file mode 100644 index 00000000000..5deac191afb --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSTaskRunners.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.sls; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; +import org.apache.hadoop.yarn.sls.scheduler.TaskRunner; +import org.apache.hadoop.yarn.sls.scheduler.TaskRunner.Task; + +/** + * A wrapper class for SLS Task Runners (AM and NM). + */ +public class SLSTaskRunners { + private static TaskRunner amRunner = new TaskRunner(); + private static TaskRunner nmRunner = new TaskRunner(); + private static TaskRunner runner = new TaskRunner(); + int nmPoolSize, amPoolSize, poolSize; + boolean runnersConfigured = false; + + /** + * This Constructor sets thread pools for each NM and AM. If either of the + * values are not set, the default value is set to both NM and AM thread + * pools. + * @param conf + */ + public SLSTaskRunners(Configuration conf){ + + if(conf.get(SLSConfiguration.NM_RUNNER_POOL_SIZE) == null && + conf.get(SLSConfiguration.AM_RUNNER_POOL_SIZE) == null){ + // Values are not configured. Using the default runner to retain backward + // comparability. + poolSize = conf.getInt(SLSConfiguration.RUNNER_POOL_SIZE, + SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT); + runner.setQueueSize(poolSize); + } else { + nmPoolSize = conf.getInt(SLSConfiguration.NM_RUNNER_POOL_SIZE, + SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT); + amPoolSize = conf.getInt(SLSConfiguration.AM_RUNNER_POOL_SIZE, + SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT); + + nmRunner.setQueueSize(nmPoolSize); + amRunner.setQueueSize(amPoolSize); + // Setting poolSize to nmPoolSize as it is used to create nmSimulators + // thread pool in SLSRunner + poolSize = nmPoolSize; + runnersConfigured = true; + } + } + + TaskRunner getNMTaskRunner(){ + if(runnersConfigured) { + return nmRunner; + } else { + return runner; + } + } + + TaskRunner getAMTaskRunner(){ + if(runnersConfigured) { + return amRunner; + } else { + return runner; + } + } + + int getPoolSize(){ + return poolSize; + } + + void startRunners(){ + if(runnersConfigured) { + amRunner.start(); + nmRunner.start(); + } else { + runner.start(); + } + } + + void scheduleNMTask(Task task){ + if(runnersConfigured) { + nmRunner.schedule(task); + } else { + runner.schedule(task); + } + } + + void scheduleAMTask(Task task){ + if(runnersConfigured) { + amRunner.schedule(task); + } else { + runner.schedule(task); + } + } + + void stopRunners() throws InterruptedException { + if(runnersConfigured) { + amRunner.stop(); + nmRunner.stop(); + } else { + runner.stop(); + } + } +} diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java index 119960c92a4..e601ca5a28c 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java @@ -34,6 +34,10 @@ // runner public static final String RUNNER_PREFIX = PREFIX + "runner."; public static final String RUNNER_POOL_SIZE = RUNNER_PREFIX + "pool.size"; + public static final String AM_RUNNER_POOL_SIZE = RUNNER_PREFIX + + "am.pool.size"; + public static final String NM_RUNNER_POOL_SIZE = RUNNER_PREFIX + + "nm.pool.size"; public static final int RUNNER_POOL_SIZE_DEFAULT = 10; // scheduler public static final String SCHEDULER_PREFIX = PREFIX + "scheduler."; diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSTaskRunners.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSTaskRunners.java new file mode 100644 index 00000000000..64feefa8d3b --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSTaskRunners.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.sls; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Tests are performed with the default configuration to support backward + * compatibility. + */ +public class TestSLSTaskRunners { + Configuration conf; + SLSTaskRunners slsTaskRunners; + Integer AM_TEST_POOL_SIZE = 101; + Integer NM_TEST_POOL_SIZE = 102; + + @Before + public void setup() { + Integer poolSize = 99; + conf = new Configuration(); + conf.set(SLSConfiguration.AM_RUNNER_POOL_SIZE, poolSize.toString()); + conf.set(SLSConfiguration.NM_RUNNER_POOL_SIZE, poolSize.toString()); + } + @Test + public void testDefaultRunnerConf() { + Configuration conf = new Configuration(); + slsTaskRunners = new SLSTaskRunners(conf); + + Assert.assertEquals(slsTaskRunners.getAMTaskRunner(), + slsTaskRunners.getNMTaskRunner()); + + Assert.assertEquals(slsTaskRunners.getPoolSize(), + SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT); + } + + @Test + public void testAMRunnerConf() { + conf = new Configuration(); + conf.set(SLSConfiguration.AM_RUNNER_POOL_SIZE, AM_TEST_POOL_SIZE.toString()); + slsTaskRunners = new SLSTaskRunners(conf); + + Assert.assertNotEquals(slsTaskRunners.getAMTaskRunner(), + slsTaskRunners.getNMTaskRunner()); + + Assert.assertEquals(AM_TEST_POOL_SIZE.intValue(), + slsTaskRunners.amPoolSize); + } + + @Test + public void testNMRunnerConf() { + conf = new Configuration(); + conf.set(SLSConfiguration.NM_RUNNER_POOL_SIZE, NM_TEST_POOL_SIZE.toString()); + slsTaskRunners = new SLSTaskRunners(conf); + + Assert.assertNotEquals(slsTaskRunners.getAMTaskRunner(), + slsTaskRunners.getNMTaskRunner()); + + Assert.assertEquals(NM_TEST_POOL_SIZE.intValue(), + slsTaskRunners.nmPoolSize); + } +}