diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java index 6f75bd17c6f..62da99c4f4b 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java @@ -87,7 +87,6 @@ import org.apache.hadoop.yarn.sls.resourcemanager.MockAMLauncher; import org.apache.hadoop.yarn.sls.scheduler.SLSCapacityScheduler; import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics; -import org.apache.hadoop.yarn.sls.scheduler.TaskRunner; import org.apache.hadoop.yarn.sls.scheduler.SLSFairScheduler; import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper; @@ -105,7 +104,7 @@ public class SLSRunner extends Configured implements Tool { // RM, Runner private ResourceManager rm; - private static TaskRunner runner = new TaskRunner(); + private SLSTaskRunners slsTaskRunners; private String[] inputTraces; private Map queueAppNumMap; private int poolSize; @@ -185,10 +184,6 @@ private void init(Configuration tempConf) throws ClassNotFoundException { // runner configuration setConf(tempConf); - // runner - poolSize = tempConf.getInt(SLSConfiguration.RUNNER_POOL_SIZE, - SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT); - SLSRunner.runner.setQueueSize(poolSize); // map for (Map.Entry e : tempConf) { String key = e.getKey().toString(); @@ -198,6 +193,8 @@ private void init(Configuration tempConf) throws ClassNotFoundException { } } + slsTaskRunners = new SLSTaskRunners(tempConf); + poolSize = slsTaskRunners.getPoolSize(); nodeManagerResource = getNodeManagerResource(); } @@ -263,8 +260,8 @@ public void start() throws IOException, ClassNotFoundException, YarnException, printSimulationInfo(); // blocked until all nodes RUNNING waitForNodesRunning(); - // starting the runner once everything is ready to go, - runner.start(); + // starting the runners once everything is ready to go, + slsTaskRunners.startRunners(); } /** @@ -391,7 +388,7 @@ private void startNM() throws YarnException, IOException, random.nextInt(heartbeatInterval), heartbeatInterval, rm, resourceUtilizationRatio, nodeLabels); nmMap.put(nm.getNode().getNodeID(), nm); - runner.schedule(nm); + slsTaskRunners.scheduleNMTask(nm); rackSet.add(nm.getNode().getRackName()); } catch (IOException | YarnException e) { LOG.error("Got an error while adding node", e); @@ -862,15 +859,15 @@ private void runNewAM(String jobType, String user, AM_ID++; amSim.init(heartbeatInterval, containerList, rm, this, jobStartTimeMS, jobFinishTimeMS, user, jobQueue, isTracked, oldJobId, - runner.getStartTimeMS(), amContainerResource, labelExpr, params, - appIdAMSim); + slsTaskRunners.getAMTaskRunner().getStartTimeMS(), + amContainerResource, labelExpr, params, appIdAMSim); if(reservationId != null) { // if we have a ReservationId, delegate reservation creation to // AMSim (reservation shape is impl specific) UTCClock clock = new UTCClock(); amSim.initReservation(reservationId, deadline, clock.getTime()); } - runner.schedule(amSim); + slsTaskRunners.scheduleAMTask(amSim); maxRuntime = Math.max(maxRuntime, jobFinishTimeMS); numTasks += containerList.size(); amMap.put(oldJobId, amSim); @@ -941,7 +938,7 @@ public static void decreaseRemainingApps() { public void stop() throws InterruptedException { rm.stop(); - runner.stop(); + slsTaskRunners.stopRunners(); } public int run(final String[] argv) throws IOException, InterruptedException, diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSTaskRunners.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSTaskRunners.java new file mode 100644 index 00000000000..fd195a01fab --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSTaskRunners.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.sls; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; +import org.apache.hadoop.yarn.sls.scheduler.TaskRunner; +import org.apache.hadoop.yarn.sls.scheduler.TaskRunner.Task; + +/** + * A wrapper class for SLS Task Runners (AM and NM). + */ +public class SLSTaskRunners { + private static TaskRunner amRunner = new TaskRunner(); + private static TaskRunner nmRunner = new TaskRunner(); + private static TaskRunner runner = new TaskRunner(); + private int nmPoolSize, amPoolSize, poolSize; + private boolean runnersConfigured = false; + + /** + * This Constructor sets thread pools for each NM and AM. If both the values + * are not set, the default value is set to both NM and AM thread pools. + * @param conf + */ + public SLSTaskRunners(Configuration conf){ + + if(conf.get(SLSConfiguration.NM_RUNNER_POOL_SIZE) == null && + conf.get(SLSConfiguration.AM_RUNNER_POOL_SIZE) == null){ + // Values are not configured. Using the default runner to retain backward + // comparability. + poolSize = conf.getInt(SLSConfiguration.RUNNER_POOL_SIZE, + SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT); + runner.setQueueSize(poolSize); + } else { + nmPoolSize = conf.getInt(SLSConfiguration.NM_RUNNER_POOL_SIZE, + SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT); + amPoolSize = conf.getInt(SLSConfiguration.AM_RUNNER_POOL_SIZE, + SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT); + + nmRunner.setQueueSize(nmPoolSize); + amRunner.setQueueSize(amPoolSize); + // Setting poolSize to nmPoolSize as it is used to create nmSimulators + // thread pool in SLSRunner + poolSize = nmPoolSize; + runnersConfigured = true; + } + } + + public TaskRunner getNMTaskRunner(){ + if(runnersConfigured) { + return nmRunner; + } else { + return runner; + } + } + + public TaskRunner getAMTaskRunner(){ + if(runnersConfigured) { + return amRunner; + } else { + return runner; + } + } + + public int getPoolSize(){ + return poolSize; + } + + public int getAmPoolSize(){ + if(runnersConfigured) { + return amPoolSize; + } + return poolSize; + } + + public int getNmPoolSize(){ + if(runnersConfigured) { + return nmPoolSize; + } + return poolSize; + } + + public void startRunners(){ + if(runnersConfigured) { + amRunner.start(); + nmRunner.start(); + } else { + runner.start(); + } + } + + public void scheduleNMTask(Task task){ + if(runnersConfigured) { + nmRunner.schedule(task); + } else { + runner.schedule(task); + } + } + + public void scheduleAMTask(Task task){ + if(runnersConfigured) { + amRunner.schedule(task); + } else { + runner.schedule(task); + } + } + + public void stopRunners() throws InterruptedException { + if(runnersConfigured) { + amRunner.stop(); + nmRunner.stop(); + } else { + runner.stop(); + } + } +} diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java index 119960c92a4..e601ca5a28c 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java @@ -34,6 +34,10 @@ // runner public static final String RUNNER_PREFIX = PREFIX + "runner."; public static final String RUNNER_POOL_SIZE = RUNNER_PREFIX + "pool.size"; + public static final String AM_RUNNER_POOL_SIZE = RUNNER_PREFIX + + "am.pool.size"; + public static final String NM_RUNNER_POOL_SIZE = RUNNER_PREFIX + + "nm.pool.size"; public static final int RUNNER_POOL_SIZE_DEFAULT = 10; // scheduler public static final String SCHEDULER_PREFIX = PREFIX + "scheduler."; diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSTaskRunners.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSTaskRunners.java new file mode 100644 index 00000000000..eb5c00c7e26 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSTaskRunners.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.sls; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; +import org.junit.Assert; +import org.junit.Test; + +/** + * Tests are performed with the default configuration to support backward + * compatibility. + */ +public class TestSLSTaskRunners { + private SLSTaskRunners slsTaskRunners; + private Integer amTestPoolSize = 101; + private Integer nmTestPoolSize = 102; + + @Test + public void testDefaultRunnerConf() { + Configuration conf = new Configuration(); + slsTaskRunners = new SLSTaskRunners(conf); + + // Since, AM_RUNNER_POOL_SIZE and NM_RUNNER_POOL_SIZE are not set, + // default runner is returned for both NM and AM. + Assert.assertEquals(slsTaskRunners.getAMTaskRunner(), + slsTaskRunners.getNMTaskRunner()); + + Assert.assertEquals(slsTaskRunners.getPoolSize(), + SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT); + } + + @Test + public void testAMRunnerConf() { + Configuration newConf = new Configuration(); + newConf.set(SLSConfiguration.AM_RUNNER_POOL_SIZE, amTestPoolSize.toString()); + slsTaskRunners = new SLSTaskRunners(newConf); + + Assert.assertNotEquals(slsTaskRunners.getAMTaskRunner(), + slsTaskRunners.getNMTaskRunner()); + + Assert.assertEquals(amTestPoolSize.intValue(), + slsTaskRunners.getAmPoolSize()); + } + + @Test + public void testNMRunnerConf() { + Configuration newConf = new Configuration(); + newConf.set(SLSConfiguration.NM_RUNNER_POOL_SIZE, nmTestPoolSize.toString()); + slsTaskRunners = new SLSTaskRunners(newConf); + + Assert.assertNotEquals(slsTaskRunners.getAMTaskRunner(), + slsTaskRunners.getNMTaskRunner()); + + Assert.assertEquals(nmTestPoolSize.intValue(), + slsTaskRunners.getNmPoolSize()); + } +}