commit de69d6e81128470dd5d2fd865d4b3a79188f740b Author: Wangda Tan Date: Thu Apr 20 21:54:18 2017 -0700 YARN-6363. Extending SLS: Synthetic Load Generator. (Carlo Curino via wangda) diff --git a/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/TaskInfo.java b/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/TaskInfo.java index 9aa6373bbae..6159f85fef1 100644 --- a/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/TaskInfo.java +++ b/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/TaskInfo.java @@ -23,21 +23,37 @@ private final long bytesOut; private final int recsOut; private final long maxMemory; + private final long maxVcores; private final ResourceUsageMetrics metrics; + + public TaskInfo(long bytesIn, int recsIn, long bytesOut, int recsOut, + long maxMemory) { + this(bytesIn, recsIn, bytesOut, recsOut, maxMemory, 1, + new ResourceUsageMetrics()); + } + public TaskInfo(long bytesIn, int recsIn, long bytesOut, int recsOut, - long maxMemory) { - this(bytesIn, recsIn, bytesOut, recsOut, maxMemory, + long maxMemory, ResourceUsageMetrics + metrics) { + this(bytesIn, recsIn, bytesOut, recsOut, maxMemory, 1, metrics); + } + + public TaskInfo(long bytesIn, int recsIn, long bytesOut, int recsOut, + long maxMemory, long maxVcores) { + this(bytesIn, recsIn, bytesOut, recsOut, maxMemory, maxVcores, new ResourceUsageMetrics()); } public TaskInfo(long bytesIn, int recsIn, long bytesOut, int recsOut, - long maxMemory, ResourceUsageMetrics metrics) { + long maxMemory, long maxVcores, ResourceUsageMetrics + metrics) { this.bytesIn = bytesIn; this.recsIn = recsIn; this.bytesOut = bytesOut; this.recsOut = recsOut; this.maxMemory = maxMemory; + this.maxVcores = maxVcores; this.metrics = metrics; } @@ -79,6 +95,13 @@ public long getTaskMemory() { } /** + * @return Vcores used by the task. + */ + public long getTaskVCores() { + return maxVcores; + } + + /** * @return Resource usage metrics */ public ResourceUsageMetrics getResourceUsageMetrics() { diff --git a/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/ZombieJob.java b/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/ZombieJob.java index 3857e1fea90..64008403d7e 100644 --- a/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/ZombieJob.java +++ b/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/ZombieJob.java @@ -426,7 +426,7 @@ public TaskAttemptInfo getTaskAttemptInfo(TaskType taskType, int taskNumber, LoggedTask loggedTask = getLoggedTask(taskType, taskNumber); if (loggedTask == null) { // TODO insert parameters - TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0); + TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0, 0); return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber, taskNumber, locality); } @@ -473,7 +473,7 @@ public TaskAttemptInfo getMapTaskAttemptInfoAdjusted(int taskNumber, LoggedTask loggedTask = getLoggedTask(taskType, taskNumber); if (loggedTask == null) { // TODO insert parameters - TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0); + TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0, 0); return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber, taskNumber, locality); } @@ -639,7 +639,7 @@ private TaskAttemptInfo getTaskAttemptInfo(LoggedTask loggedTask, private TaskInfo getTaskInfo(LoggedTask loggedTask) { if (loggedTask == null) { - return new TaskInfo(0, 0, 0, 0, 0); + return new TaskInfo(0, 0, 0, 0, 0, 0); } List attempts = loggedTask.getAttempts(); @@ -688,9 +688,10 @@ private TaskInfo getTaskInfo(LoggedTask loggedTask) { break; } + //note: hardcoding vCores, as they are not collected TaskInfo taskInfo = new TaskInfo(inputBytes, (int) inputRecords, outputBytes, - (int) outputRecords, (int) heapMegabytes, + (int) outputRecords, (int) heapMegabytes, 1, metrics); return taskInfo; } diff --git a/hadoop-tools/hadoop-sls/pom.xml b/hadoop-tools/hadoop-sls/pom.xml index 0d4ef588cb4..8fb57f389ac 100644 --- a/hadoop-tools/hadoop-sls/pom.xml +++ b/hadoop-tools/hadoop-sls/pom.xml @@ -132,6 +132,9 @@ src/test/resources/simulate.html.template src/test/resources/simulate.info.html.template src/test/resources/track.html.template + src/test/resources/syn.json + src/test/resources/inputsls.json + src/test/resources/nodes.json diff --git a/hadoop-tools/hadoop-sls/src/main/bin/slsrun.sh b/hadoop-tools/hadoop-sls/src/main/bin/slsrun.sh index 5f8d9fcee9c..cbc5bc9500b 100644 --- a/hadoop-tools/hadoop-sls/src/main/bin/slsrun.sh +++ b/hadoop-tools/hadoop-sls/src/main/bin/slsrun.sh @@ -16,7 +16,9 @@ function hadoop_usage() { echo "Usage: slsrun.sh " - echo " --input-rumen= | --input-sls=" + echo " --tracetype=" + echo " --tracelocation=" + echo " (deprecated --input-rumen= | --input-sls=)" echo " --output-dir=" echo " [--nodes=]" echo " [--track-jobs=]" @@ -33,6 +35,12 @@ function parse_args() --input-sls=*) inputsls=${i#*=} ;; + --tracetype=*) + tracetype=${i#*=} + ;; + --tracelocation=*) + tracelocation=${i#*=} + ;; --output-dir=*) outputdir=${i#*=} ;; @@ -52,14 +60,12 @@ function parse_args() esac done - if [[ -z "${inputrumen}" && -z "${inputsls}" ]] ; then - hadoop_error "ERROR: Either --input-rumen or --input-sls must be specified." - hadoop_exit_with_usage 1 + if [[ -z "${inputrumen}" && -z "${inputsls}" && -z "${tracetype}" ]] ; then + hadoop_error "ERROR: Either --input-rumen, --input-sls, or --tracetype (with --tracelocation) must be specified." fi - if [[ -n "${inputrumen}" && -n "${inputsls}" ]] ; then - hadoop_error "ERROR: Only specify one of --input-rumen or --input-sls." - hadoop_exit_with_usage 1 + if [[ -n "${inputrumen}" && -n "${inputsls}" && -n "${tracetype}" ]] ; then + hadoop_error "ERROR: Only specify one of --input-rumen, --input-sls, or --tracetype (with --tracelocation)" fi if [[ -z "${outputdir}" ]] ; then @@ -74,11 +80,17 @@ function calculate_classpath } function run_simulation() { - if [[ "${inputsls}" == "" ]] ; then - hadoop_add_param args -inputrumen "-inputrumen ${inputrumen}" - else - hadoop_add_param args -inputsls "-inputsls ${inputsls}" - fi + + if [[ "${inputsls}" != "" ]] ; then + hadoop_add_param args -inputsls "-inputsls ${inputsls}" + fi + if [[ "${inputrumen}" != "" ]] ; then + hadoop_add_param args -inputrumen "-inputrumen ${inputrumen}" + fi + if [[ "${tracetype}" != "" ]] ; then + hadoop_add_param args -tracetype "-tracetype ${tracetype}" + hadoop_add_param args -tracelocation "-tracelocation ${tracelocation}" + fi hadoop_add_param args -output "-output ${outputdir}" diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/ReservationClientUtil.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/ReservationClientUtil.java new file mode 100644 index 00000000000..7c10a57b1df --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/ReservationClientUtil.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.sls; + +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; +import org.apache.hadoop.yarn.api.records.*; + +import java.util.ArrayList; +import java.util.List; + +/** + * Simple support class, used to create reservation requests. + */ +public final class ReservationClientUtil { + + private ReservationClientUtil(){ + //avoid instantiation + } + + /** + * Creates a request that envelopes a MR jobs, picking max number of maps and + * reducers, max durations, and max resources per container. + * + * @param reservationId the id of the reservation + * @param name the name of a reservation + * @param maxMapRes maximum resources used by any mapper + * @param numberMaps number of mappers + * @param maxMapDur maximum duration of any mapper + * @param maxRedRes maximum resources used by any reducer + * @param numberReduces number of reducers + * @param maxRedDur maximum duration of any reducer + * @param arrival start time of valid range for reservation + * @param deadline deadline for this reservation + * @param queueName queue to submit to + * @return a submission request + */ + @SuppressWarnings("checkstyle:parameternumber") + public static ReservationSubmissionRequest createMRReservation( + ReservationId reservationId, String name, Resource maxMapRes, + int numberMaps, long maxMapDur, Resource maxRedRes, int numberReduces, + long maxRedDur, long arrival, long deadline, String queueName) { + + ReservationRequest mapRR = ReservationRequest.newInstance(maxMapRes, + numberMaps, numberMaps, maxMapDur); + ReservationRequest redRR = ReservationRequest.newInstance(maxRedRes, + numberReduces, numberReduces, maxRedDur); + + List listResReq = new ArrayList(); + listResReq.add(mapRR); + listResReq.add(redRR); + + ReservationRequests reservationRequests = ReservationRequests + .newInstance(listResReq, ReservationRequestInterpreter.R_ORDER_NO_GAP); + ReservationDefinition resDef = ReservationDefinition.newInstance(arrival, + deadline, reservationRequests, name); + + // outermost request + ReservationSubmissionRequest request = ReservationSubmissionRequest + .newInstance(resDef, queueName, reservationId); + + return request; + } +} diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java index ba43816de19..523d22a90e5 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java @@ -41,17 +41,25 @@ import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.tools.rumen.JobTraceReader; import org.apache.hadoop.tools.rumen.LoggedJob; import org.apache.hadoop.tools.rumen.LoggedTask; import org.apache.hadoop.tools.rumen.LoggedTaskAttempt; +import org.apache.hadoop.tools.rumen.TaskAttemptInfo; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -67,25 +75,27 @@ import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator; import org.apache.hadoop.yarn.sls.resourcemanager.MockAMLauncher; import org.apache.hadoop.yarn.sls.scheduler.*; +import org.apache.hadoop.yarn.sls.synthetic.SynthJob; +import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer; import org.apache.hadoop.yarn.sls.utils.SLSUtils; +import org.apache.hadoop.yarn.util.UTCClock; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.log4j.Logger; @Private @Unstable -public class SLSRunner { +public class SLSRunner extends Configured implements Tool { // RM, Runner private ResourceManager rm; private static TaskRunner runner = new TaskRunner(); private String[] inputTraces; - private Configuration conf; private Map queueAppNumMap; - + // NM simulator private HashMap nmMap; private int nmMemoryMB, nmVCores; private String nodeFile; - + // AM simulator private int AM_ID; private Map amMap; @@ -106,43 +116,64 @@ // logger public final static Logger LOG = Logger.getLogger(SLSRunner.class); - // input traces, input-rumen or input-sls - private boolean isSLS; - - public SLSRunner(boolean isSLS, String inputTraces[], String nodeFile, - String outputDir, Set trackedApps, - boolean printsimulation) - throws IOException, ClassNotFoundException { - this.isSLS = isSLS; - this.inputTraces = inputTraces.clone(); - this.nodeFile = nodeFile; - this.trackedApps = trackedApps; - this.printSimulation = printsimulation; - metricsOutputDir = outputDir; - + /** + * The type of trace in input. + */ + public enum TraceType { + SLS, RUMEN, SYNTH + } + + private TraceType inputType; + private SynthTraceJobProducer stjp; + + public SLSRunner() throws ClassNotFoundException { + Configuration tempConf = new Configuration(false); + init(tempConf); + } + + public SLSRunner(Configuration tempConf) throws ClassNotFoundException { + init(tempConf); + } + + private void init(Configuration tempConf) throws ClassNotFoundException { nmMap = new HashMap<>(); queueAppNumMap = new HashMap<>(); amMap = new ConcurrentHashMap<>(); amClassMap = new HashMap<>(); - + // runner configuration - conf = new Configuration(false); - conf.addResource("sls-runner.xml"); + tempConf.addResource("sls-runner.xml"); + super.setConf(tempConf); + // runner - int poolSize = conf.getInt(SLSConfiguration.RUNNER_POOL_SIZE, - SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT); + int poolSize = tempConf.getInt(SLSConfiguration.RUNNER_POOL_SIZE, + SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT); SLSRunner.runner.setQueueSize(poolSize); // map - for (Map.Entry e : conf) { + for (Map.Entry e : tempConf) { String key = e.getKey().toString(); if (key.startsWith(SLSConfiguration.AM_TYPE)) { String amType = key.substring(SLSConfiguration.AM_TYPE.length()); - amClassMap.put(amType, Class.forName(conf.get(key))); + amClassMap.put(amType, Class.forName(tempConf.get(key))); } } } - - public void start() throws Exception { + + public void setSimulationParams(TraceType inType, String[] inTraces, + String nodes, String outDir, Set trackApps, + boolean printsimulation) throws IOException, ClassNotFoundException { + + this.inputType = inType; + this.inputTraces = inTraces.clone(); + this.nodeFile = nodes; + this.trackedApps = trackApps; + this.printSimulation = printsimulation; + metricsOutputDir = outDir; + + } + + public void start() throws IOException, ClassNotFoundException, YarnException, + InterruptedException { // start resource manager startRM(); // start node managers @@ -151,9 +182,9 @@ public void start() throws Exception { startAM(); // set queue & tracked apps information ((SchedulerWrapper) rm.getResourceScheduler()).getTracker() - .setQueueSet(this.queueAppNumMap.keySet()); + .setQueueSet(this.queueAppNumMap.keySet()); ((SchedulerWrapper) rm.getResourceScheduler()).getTracker() - .setTrackedAppSet(this.trackedApps); + .setTrackedAppSet(this.trackedApps); // print out simulation info printSimulationInfo(); // blocked until all nodes RUNNING @@ -162,23 +193,23 @@ public void start() throws Exception { runner.start(); } - private void startRM() throws Exception { - Configuration rmConf = new YarnConfiguration(); + private void startRM() throws ClassNotFoundException, YarnException { + Configuration rmConf = new YarnConfiguration(getConf()); String schedulerClass = rmConf.get(YarnConfiguration.RM_SCHEDULER); // For CapacityScheduler we use a sub-classing instead of wrapping // to allow scheduler-specific invocations from monitors to work // this can be used for other schedulers as well if we care to // exercise/track behaviors that are not common to the scheduler api - if(Class.forName(schedulerClass) == CapacityScheduler.class) { + if (Class.forName(schedulerClass) == CapacityScheduler.class) { rmConf.set(YarnConfiguration.RM_SCHEDULER, SLSCapacityScheduler.class.getName()); } else if (Class.forName(schedulerClass) == FairScheduler.class) { rmConf.set(YarnConfiguration.RM_SCHEDULER, SLSFairScheduler.class.getName()); - } else if (Class.forName(schedulerClass) == FifoScheduler.class){ + } else if (Class.forName(schedulerClass) == FifoScheduler.class) { // TODO add support for FifoScheduler - throw new Exception("Fifo Scheduler is not supported yet."); + throw new YarnException("Fifo Scheduler is not supported yet."); } rmConf.set(SLSConfiguration.METRICS_OUTPUT_DIR, metricsOutputDir); @@ -196,37 +227,47 @@ protected ApplicationMasterLauncher createAMLauncher() { private void startNM() throws YarnException, IOException { // nm configuration - nmMemoryMB = conf.getInt(SLSConfiguration.NM_MEMORY_MB, - SLSConfiguration.NM_MEMORY_MB_DEFAULT); - nmVCores = conf.getInt(SLSConfiguration.NM_VCORES, - SLSConfiguration.NM_VCORES_DEFAULT); - int heartbeatInterval = conf.getInt( - SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS, + nmMemoryMB = getConf().getInt(SLSConfiguration.NM_MEMORY_MB, + SLSConfiguration.NM_MEMORY_MB_DEFAULT); + nmVCores = getConf().getInt(SLSConfiguration.NM_VCORES, + SLSConfiguration.NM_VCORES_DEFAULT); + int heartbeatInterval = + getConf().getInt(SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS, SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS_DEFAULT); // nm information (fetch from topology file, or from sls/rumen json file) Set nodeSet = new HashSet(); if (nodeFile.isEmpty()) { - if (isSLS) { - for (String inputTrace : inputTraces) { + for (String inputTrace : inputTraces) { + + switch (inputType) { + case SLS: nodeSet.addAll(SLSUtils.parseNodesFromSLSTrace(inputTrace)); - } - } else { - for (String inputTrace : inputTraces) { + break; + case RUMEN: nodeSet.addAll(SLSUtils.parseNodesFromRumenTrace(inputTrace)); + break; + case SYNTH: + stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0])); + nodeSet.addAll(SLSUtils.generateNodesFromSynth(stjp.getNumNodes(), + stjp.getNodesPerRack())); + break; + default: + throw new YarnException("Input configuration not recognized, " + + "trace type should be SLS, RUMEN, or SYNTH"); } } - } else { nodeSet.addAll(SLSUtils.parseNodesFromNodeFile(nodeFile)); } + // create NM simulators Random random = new Random(); Set rackSet = new HashSet(); for (String hostName : nodeSet) { // we randomize the heartbeat start time from zero to 1 interval NMSimulator nm = new NMSimulator(); - nm.init(hostName, nmMemoryMB, nmVCores, - random.nextInt(heartbeatInterval), heartbeatInterval, rm); + nm.init(hostName, nmMemoryMB, nmVCores, random.nextInt(heartbeatInterval), + heartbeatInterval, rm); nmMap.put(nm.getNode().getNodeID(), nm); runner.schedule(nm); rackSet.add(nm.getNode().getRackName()); @@ -241,39 +282,50 @@ private void waitForNodesRunning() throws InterruptedException { int numRunningNodes = 0; for (RMNode node : rm.getRMContext().getRMNodes().values()) { if (node.getState() == NodeState.RUNNING) { - numRunningNodes ++; + numRunningNodes++; } } if (numRunningNodes == numNMs) { break; } - LOG.info(MessageFormat.format("SLSRunner is waiting for all " + - "nodes RUNNING. {0} of {1} NMs initialized.", - numRunningNodes, numNMs)); + LOG.info(MessageFormat.format( + "SLSRunner is waiting for all " + + "nodes RUNNING. {0} of {1} NMs initialized.", + numRunningNodes, numNMs)); Thread.sleep(1000); } LOG.info(MessageFormat.format("SLSRunner takes {0} ms to launch all nodes.", - (System.currentTimeMillis() - startTimeMS))); + (System.currentTimeMillis() - startTimeMS))); } @SuppressWarnings("unchecked") private void startAM() throws YarnException, IOException { // application/container configuration - int heartbeatInterval = conf.getInt( - SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS, + int heartbeatInterval = + getConf().getInt(SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS, SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS_DEFAULT); - int containerMemoryMB = conf.getInt(SLSConfiguration.CONTAINER_MEMORY_MB, + int containerMemoryMB = + getConf().getInt(SLSConfiguration.CONTAINER_MEMORY_MB, SLSConfiguration.CONTAINER_MEMORY_MB_DEFAULT); - int containerVCores = conf.getInt(SLSConfiguration.CONTAINER_VCORES, - SLSConfiguration.CONTAINER_VCORES_DEFAULT); + int containerVCores = getConf().getInt(SLSConfiguration.CONTAINER_VCORES, + SLSConfiguration.CONTAINER_VCORES_DEFAULT); Resource containerResource = - BuilderUtils.newResource(containerMemoryMB, containerVCores); + BuilderUtils.newResource(containerMemoryMB, containerVCores); // application workload - if (isSLS) { + switch (inputType) { + case SLS: startAMFromSLSTraces(containerResource, heartbeatInterval); - } else { + break; + case RUMEN: startAMFromRumenTraces(containerResource, heartbeatInterval); + break; + case SYNTH: + startAMFromSynthGenerator(heartbeatInterval); + break; + default: + throw new YarnException("Input configuration not recognized, " + + "trace type should be SLS, RUMEN, or SYNTH"); } numAMs = amMap.size(); remainingApps = numAMs; @@ -284,7 +336,7 @@ private void startAM() throws YarnException, IOException { */ @SuppressWarnings("unchecked") private void startAMFromSLSTraces(Resource containerResource, - int heartbeatInterval) throws IOException { + int heartbeatInterval) throws IOException { // parse from sls traces JsonFactory jsonF = new JsonFactory(); ObjectMapper mapper = new ObjectMapper(); @@ -292,26 +344,28 @@ private void startAMFromSLSTraces(Resource containerResource, Reader input = new InputStreamReader(new FileInputStream(inputTrace), "UTF-8"); try { - Iterator i = mapper.readValues(jsonF.createParser(input), - Map.class); + Iterator i = + mapper.readValues(jsonF.createParser(input), Map.class); while (i.hasNext()) { Map jsonJob = i.next(); // load job information - long jobStartTime = Long.parseLong( - jsonJob.get("job.start.ms").toString()); - long jobFinishTime = Long.parseLong( - jsonJob.get("job.end.ms").toString()); + long jobStartTime = + Long.parseLong(jsonJob.get("job.start.ms").toString()); + long jobFinishTime = + Long.parseLong(jsonJob.get("job.end.ms").toString()); String user = (String) jsonJob.get("job.user"); - if (user == null) user = "default"; + if (user == null) { + user = "default"; + } String queue = jsonJob.get("job.queue.name").toString(); String oldAppId = jsonJob.get("job.id").toString(); boolean isTracked = trackedApps.contains(oldAppId); - int queueSize = queueAppNumMap.containsKey(queue) ? - queueAppNumMap.get(queue) : 0; - queueSize ++; + int queueSize = + queueAppNumMap.containsKey(queue) ? queueAppNumMap.get(queue) : 0; + queueSize++; queueAppNumMap.put(queue, queueSize); // tasks List tasks = (List) jsonJob.get("job.tasks"); @@ -319,45 +373,45 @@ private void startAMFromSLSTraces(Resource containerResource, continue; } List containerList = - new ArrayList(); + new ArrayList(); for (Object o : tasks) { Map jsonTask = (Map) o; String hostname = jsonTask.get("container.host").toString(); - long taskStart = Long.parseLong( - jsonTask.get("container.start.ms").toString()); - long taskFinish = Long.parseLong( - jsonTask.get("container.end.ms").toString()); + long taskStart = + Long.parseLong(jsonTask.get("container.start.ms").toString()); + long taskFinish = + Long.parseLong(jsonTask.get("container.end.ms").toString()); long lifeTime = taskFinish - taskStart; // Set memory and vcores from job trace file Resource res = Resources.clone(containerResource); if (jsonTask.containsKey("container.memory")) { - int containerMemory = Integer.parseInt( - jsonTask.get("container.memory").toString()); + int containerMemory = + Integer.parseInt(jsonTask.get("container.memory").toString()); res.setMemorySize(containerMemory); } if (jsonTask.containsKey("container.vcores")) { - int containerVCores = Integer.parseInt( - jsonTask.get("container.vcores").toString()); + int containerVCores = + Integer.parseInt(jsonTask.get("container.vcores").toString()); res.setVirtualCores(containerVCores); } - int priority = Integer.parseInt( - jsonTask.get("container.priority").toString()); + int priority = + Integer.parseInt(jsonTask.get("container.priority").toString()); String type = jsonTask.get("container.type").toString(); - containerList.add(new ContainerSimulator(res, - lifeTime, hostname, priority, type)); + containerList.add(new ContainerSimulator(res, lifeTime, hostname, + priority, type)); } // create a new AM String amType = jsonJob.get("am.type").toString(); - AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance( - amClassMap.get(amType), new Configuration()); + AMSimulator amSim = (AMSimulator) ReflectionUtils + .newInstance(amClassMap.get(amType), new Configuration()); if (amSim != null) { - amSim.init(AM_ID++, heartbeatInterval, containerList, rm, - this, jobStartTime, jobFinishTime, user, queue, - isTracked, oldAppId); + amSim.init(AM_ID++, heartbeatInterval, containerList, rm, this, + jobStartTime, jobFinishTime, user, queue, isTracked, oldAppId, + null, runner.getStartTimeMS()); runner.schedule(amSim); maxRuntime = Math.max(maxRuntime, jobFinishTime); numTasks += containerList.size(); @@ -375,22 +429,21 @@ private void startAMFromSLSTraces(Resource containerResource, */ @SuppressWarnings("unchecked") private void startAMFromRumenTraces(Resource containerResource, - int heartbeatInterval) - throws IOException { + int heartbeatInterval) throws IOException { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "file:///"); long baselineTimeMS = 0; for (String inputTrace : inputTraces) { File fin = new File(inputTrace); - JobTraceReader reader = new JobTraceReader( - new Path(fin.getAbsolutePath()), conf); + JobTraceReader reader = + new JobTraceReader(new Path(fin.getAbsolutePath()), conf); try { LoggedJob job = null; while ((job = reader.getNext()) != null) { // only support MapReduce currently String jobType = "mapreduce"; - String user = job.getUser() == null ? - "default" : job.getUser().getValue(); + String user = + job.getUser() == null ? "default" : job.getUser().getValue(); String jobQueue = job.getQueue().getValue(); String oldJobId = job.getJobID().toString(); long jobStartTimeMS = job.getSubmitTime(); @@ -407,48 +460,48 @@ private void startAMFromRumenTraces(Resource containerResource, } boolean isTracked = trackedApps.contains(oldJobId); - int queueSize = queueAppNumMap.containsKey(jobQueue) ? - queueAppNumMap.get(jobQueue) : 0; - queueSize ++; + int queueSize = queueAppNumMap.containsKey(jobQueue) + ? queueAppNumMap.get(jobQueue) : 0; + queueSize++; queueAppNumMap.put(jobQueue, queueSize); List containerList = - new ArrayList(); + new ArrayList(); // map tasks - for(LoggedTask mapTask : job.getMapTasks()) { + for (LoggedTask mapTask : job.getMapTasks()) { if (mapTask.getAttempts().size() == 0) { continue; } - LoggedTaskAttempt taskAttempt = mapTask.getAttempts() - .get(mapTask.getAttempts().size() - 1); + LoggedTaskAttempt taskAttempt = + mapTask.getAttempts().get(mapTask.getAttempts().size() - 1); String hostname = taskAttempt.getHostName().getValue(); - long containerLifeTime = taskAttempt.getFinishTime() - - taskAttempt.getStartTime(); + long containerLifeTime = + taskAttempt.getFinishTime() - taskAttempt.getStartTime(); containerList.add(new ContainerSimulator(containerResource, - containerLifeTime, hostname, 10, "map")); + containerLifeTime, hostname, 10, "map")); } // reduce tasks - for(LoggedTask reduceTask : job.getReduceTasks()) { + for (LoggedTask reduceTask : job.getReduceTasks()) { if (reduceTask.getAttempts().size() == 0) { continue; } LoggedTaskAttempt taskAttempt = reduceTask.getAttempts() - .get(reduceTask.getAttempts().size() - 1); + .get(reduceTask.getAttempts().size() - 1); String hostname = taskAttempt.getHostName().getValue(); - long containerLifeTime = taskAttempt.getFinishTime() - - taskAttempt.getStartTime(); + long containerLifeTime = + taskAttempt.getFinishTime() - taskAttempt.getStartTime(); containerList.add(new ContainerSimulator(containerResource, - containerLifeTime, hostname, 20, "reduce")); + containerLifeTime, hostname, 20, "reduce")); } // create a new AM - AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance( - amClassMap.get(jobType), conf); + AMSimulator amSim = (AMSimulator) ReflectionUtils + .newInstance(amClassMap.get(jobType), conf); if (amSim != null) { - amSim.init(AM_ID ++, heartbeatInterval, containerList, - rm, this, jobStartTimeMS, jobFinishTimeMS, user, jobQueue, - isTracked, oldJobId); + amSim.init(AM_ID++, heartbeatInterval, containerList, rm, this, + jobStartTimeMS, jobFinishTimeMS, user, jobQueue, isTracked, + oldJobId, null, runner.getStartTimeMS()); runner.schedule(amSim); maxRuntime = Math.max(maxRuntime, jobFinishTimeMS); numTasks += containerList.size(); @@ -460,34 +513,168 @@ private void startAMFromRumenTraces(Resource containerResource, } } } - + + /** + * parse workload information from synth-generator trace files. + */ + @SuppressWarnings("unchecked") + private void startAMFromSynthGenerator(int heartbeatInterval) + throws IOException { + Configuration localConf = new Configuration(); + localConf.set("fs.defaultFS", "file:///"); + long baselineTimeMS = 0; + + // reservations use wall clock time, so need to have a reference for that + UTCClock clock = new UTCClock(); + long now = clock.getTime(); + + try { + + // if we use the nodeFile this could have been not initialized yet. + if (stjp == null) { + stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0])); + } + + SynthJob job = null; + // we use stjp, a reference to the job producer instantiated during node + // creation + while ((job = (SynthJob) stjp.getNextJob()) != null) { + // only support MapReduce currently + String jobType = "mapreduce"; + String user = job.getUser(); + String jobQueue = job.getQueueName(); + String oldJobId = job.getJobID().toString(); + long jobStartTimeMS = job.getSubmissionTime(); + + // CARLO: Finish time is only used for logging, omit for now + long jobFinishTimeMS = -1L; + + if (baselineTimeMS == 0) { + baselineTimeMS = jobStartTimeMS; + } + jobStartTimeMS -= baselineTimeMS; + jobFinishTimeMS -= baselineTimeMS; + if (jobStartTimeMS < 0) { + LOG.warn("Warning: reset job " + oldJobId + " start time to 0."); + jobFinishTimeMS = jobFinishTimeMS - jobStartTimeMS; + jobStartTimeMS = 0; + } + + boolean isTracked = trackedApps.contains(oldJobId); + int queueSize = queueAppNumMap.containsKey(jobQueue) + ? queueAppNumMap.get(jobQueue) : 0; + queueSize++; + queueAppNumMap.put(jobQueue, queueSize); + + List containerList = + new ArrayList(); + ArrayList keyAsArray = new ArrayList(nmMap.keySet()); + Random rand = new Random(stjp.getSeed()); + + Resource maxMapRes = Resource.newInstance(0, 0); + long maxMapDur = 0; + // map tasks + for (int i = 0; i < job.getNumberMaps(); i++) { + TaskAttemptInfo tai = job.getTaskAttemptInfo(TaskType.MAP, i, 0); + RMNode node = nmMap + .get(keyAsArray.get(rand.nextInt(keyAsArray.size()))).getNode(); + String hostname = "/" + node.getRackName() + "/" + node.getHostName(); + long containerLifeTime = tai.getRuntime(); + Resource containerResource = + Resource.newInstance((int) tai.getTaskInfo().getTaskMemory(), + (int) tai.getTaskInfo().getTaskVCores()); + containerList.add(new ContainerSimulator(containerResource, + containerLifeTime, hostname, 10, "map")); + maxMapRes = Resources.componentwiseMax(maxMapRes, containerResource); + maxMapDur = + containerLifeTime > maxMapDur ? containerLifeTime : maxMapDur; + + } + + Resource maxRedRes = Resource.newInstance(0, 0); + long maxRedDur = 0; + // reduce tasks + for (int i = 0; i < job.getNumberReduces(); i++) { + TaskAttemptInfo tai = job.getTaskAttemptInfo(TaskType.REDUCE, i, 0); + RMNode node = nmMap + .get(keyAsArray.get(rand.nextInt(keyAsArray.size()))).getNode(); + String hostname = "/" + node.getRackName() + "/" + node.getHostName(); + long containerLifeTime = tai.getRuntime(); + Resource containerResource = + Resource.newInstance((int) tai.getTaskInfo().getTaskMemory(), + (int) tai.getTaskInfo().getTaskVCores()); + containerList.add(new ContainerSimulator(containerResource, + containerLifeTime, hostname, 20, "reduce")); + maxRedRes = Resources.componentwiseMax(maxRedRes, containerResource); + maxRedDur = + containerLifeTime > maxRedDur ? containerLifeTime : maxRedDur; + + } + + // generating reservations for the jobs that require them + + ReservationSubmissionRequest rr = null; + if (job.hasDeadline()) { + ReservationId reservationId = + ReservationId.newInstance(this.rm.getStartTime(), AM_ID); + + rr = ReservationClientUtil.createMRReservation(reservationId, + "reservation_" + AM_ID, maxMapRes, job.getNumberMaps(), maxMapDur, + maxRedRes, job.getNumberReduces(), maxRedDur, + now + jobStartTimeMS, now + job.getDeadline(), + job.getQueueName()); + + } + // create a new AM + AMSimulator amSim = (AMSimulator) ReflectionUtils + .newInstance(amClassMap.get(jobType), localConf); + if (amSim != null) { + amSim.init(AM_ID++, heartbeatInterval, containerList, rm, this, + jobStartTimeMS, jobFinishTimeMS, user, jobQueue, isTracked, + oldJobId, rr, runner.getStartTimeMS()); + runner.schedule(amSim); + maxRuntime = Math.max(maxRuntime, jobFinishTimeMS); + numTasks += containerList.size(); + amMap.put(oldJobId, amSim); + } + } + } finally { + stjp.close(); + } + + } + private void printSimulationInfo() { if (printSimulation) { // node LOG.info("------------------------------------"); - LOG.info(MessageFormat.format("# nodes = {0}, # racks = {1}, capacity " + - "of each node {2} MB memory and {3} vcores.", - numNMs, numRacks, nmMemoryMB, nmVCores)); + LOG.info(MessageFormat.format( + "# nodes = {0}, # racks = {1}, capacity " + + "of each node {2} MB memory and {3} vcores.", + numNMs, numRacks, nmMemoryMB, nmVCores)); LOG.info("------------------------------------"); // job - LOG.info(MessageFormat.format("# applications = {0}, # total " + - "tasks = {1}, average # tasks per application = {2}", - numAMs, numTasks, (int)(Math.ceil((numTasks + 0.0) / numAMs)))); + LOG.info(MessageFormat.format( + "# applications = {0}, # total " + + "tasks = {1}, average # tasks per application = {2}", + numAMs, numTasks, (int) (Math.ceil((numTasks + 0.0) / numAMs)))); LOG.info("JobId\tQueue\tAMType\tDuration\t#Tasks"); for (Map.Entry entry : amMap.entrySet()) { AMSimulator am = entry.getValue(); - LOG.info(entry.getKey() + "\t" + am.getQueue() + "\t" + am.getAMType() + LOG.info(entry.getKey() + "\t" + am.getQueue() + "\t" + am.getAMType() + "\t" + am.getDuration() + "\t" + am.getNumTasks()); } LOG.info("------------------------------------"); // queue - LOG.info(MessageFormat.format("number of queues = {0} average " + - "number of apps = {1}", queueAppNumMap.size(), - (int)(Math.ceil((numAMs + 0.0) / queueAppNumMap.size())))); + LOG.info(MessageFormat.format( + "number of queues = {0} average " + "number of apps = {1}", + queueAppNumMap.size(), + (int) (Math.ceil((numAMs + 0.0) / queueAppNumMap.size())))); LOG.info("------------------------------------"); // runtime - LOG.info(MessageFormat.format("estimated simulation time is {0}" + - " seconds", (long)(Math.ceil(maxRuntime / 1000.0)))); + LOG.info( + MessageFormat.format("estimated simulation time is {0}" + " seconds", + (long) (Math.ceil(maxRuntime / 1000.0)))); LOG.info("------------------------------------"); } // package these information in the simulateInfoMap used by other places @@ -510,69 +697,121 @@ private void printSimulationInfo() { return nmMap; } - public static TaskRunner getRunner() { - return runner; - } - public static void decreaseRemainingApps() { - remainingApps --; + remainingApps--; if (remainingApps == 0) { LOG.info("SLSRunner tears down."); - System.exit(0); } } - public static void main(String args[]) throws Exception { + public void stop() throws InterruptedException { + rm.stop(); + runner.stop(); + } + + public int run(final String[] argv) throws IOException, InterruptedException, + ParseException, ClassNotFoundException, YarnException { + Options options = new Options(); + + // Left for compatibility options.addOption("inputrumen", true, "input rumen files"); options.addOption("inputsls", true, "input sls files"); + + // New more general format + options.addOption("tracetype", true, "the type of trace"); + options.addOption("tracelocation", true, "input trace files"); + options.addOption("nodes", true, "input topology"); options.addOption("output", true, "output directory"); options.addOption("trackjobs", true, - "jobs to be tracked during simulating"); + "jobs to be tracked during simulating"); options.addOption("printsimulation", false, - "print out simulation information"); - + "print out simulation information"); + CommandLineParser parser = new GnuParser(); - CommandLine cmd = parser.parse(options, args); + CommandLine cmd = parser.parse(options, argv); - String inputRumen = cmd.getOptionValue("inputrumen"); - String inputSLS = cmd.getOptionValue("inputsls"); - String output = cmd.getOptionValue("output"); - - if ((inputRumen == null && inputSLS == null) || output == null) { - System.err.println(); - System.err.println("ERROR: Missing input or output file"); - System.err.println(); - System.err.println("Options: -inputrumen|-inputsls FILE,FILE... " + - "-output FILE [-nodes FILE] [-trackjobs JobId,JobId...] " + - "[-printsimulation]"); - System.err.println(); - System.exit(1); + String traceType = null; + String traceLocation = null; + + // compatibility with old commandline + if (cmd.hasOption("inputrumen")) { + traceType = "RUMEN"; + traceLocation = cmd.getOptionValue("inputrumen"); + } + if (cmd.hasOption("inputsls")) { + traceType = "SLS"; + traceLocation = cmd.getOptionValue("inputsls"); + } + + if (cmd.hasOption("tracetype")) { + traceType = cmd.getOptionValue("tracetype"); + traceLocation = cmd.getOptionValue("tracelocation"); } - + + String output = cmd.getOptionValue("output"); + File outputFile = new File(output); - if (! outputFile.exists() - && ! outputFile.mkdirs()) { + if (!outputFile.exists() && !outputFile.mkdirs()) { System.err.println("ERROR: Cannot create output directory " - + outputFile.getAbsolutePath()); - System.exit(1); + + outputFile.getAbsolutePath()); + throw new YarnException("Cannot create output directory"); } - + Set trackedJobSet = new HashSet(); if (cmd.hasOption("trackjobs")) { String trackjobs = cmd.getOptionValue("trackjobs"); String jobIds[] = trackjobs.split(","); trackedJobSet.addAll(Arrays.asList(jobIds)); } - - String nodeFile = cmd.hasOption("nodes") ? cmd.getOptionValue("nodes") : ""; - boolean isSLS = inputSLS != null; - String inputFiles[] = isSLS ? inputSLS.split(",") : inputRumen.split(","); - SLSRunner sls = new SLSRunner(isSLS, inputFiles, nodeFile, output, + String tempNodeFile = + cmd.hasOption("nodes") ? cmd.getOptionValue("nodes") : ""; + + TraceType tempTraceType = TraceType.SLS; + switch (traceType) { + case "SLS": + tempTraceType = TraceType.SLS; + break; + case "RUMEN": + tempTraceType = TraceType.RUMEN; + break; + + case "SYNTH": + tempTraceType = TraceType.SYNTH; + break; + default: + printUsage(); + throw new YarnException("Misconfigured input"); + } + + String[] inputFiles = traceLocation.split(","); + + setSimulationParams(tempTraceType, inputFiles, tempNodeFile, output, trackedJobSet, cmd.hasOption("printsimulation")); - sls.start(); + + start(); + + return 0; } + + public static void main(String[] argv) throws Exception { + ToolRunner.run(new Configuration(), new SLSRunner(), argv); + } + + static void printUsage() { + System.err.println(); + System.err.println("ERROR: Wrong tracetype"); + System.err.println(); + System.err.println( + "Options: -tracetype " + "SLS|RUMEN|SYNTH -tracelocation FILE,FILE... " + + "(deprecated alternative options --inputsls FILE, FILE,... " + + " | --inputrumen FILE,FILE,...)" + + "-output FILE [-nodes FILE] [-trackjobs JobId,JobId...] " + + "[-printsimulation]"); + System.err.println(); + } + } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java index a62f2b60240..45a3c072bc9 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.sls.appmaster; import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; import java.text.MessageFormat; @@ -37,6 +38,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords .FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; @@ -55,6 +57,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -97,6 +100,7 @@ // am type protected String amtype; // job start/end time + private long baselineTimeMS; protected long traceStartTimeMS; protected long traceFinishTimeMS; protected long simulateStartTimeMS; @@ -117,25 +121,30 @@ private final static int MR_AM_CONTAINER_RESOURCE_MEMORY_MB = 1024; private final static int MR_AM_CONTAINER_RESOURCE_VCORES = 1; + private ReservationSubmissionRequest reservationRequest; + public AMSimulator() { this.responseQueue = new LinkedBlockingQueue<>(); } - public void init(int id, int heartbeatInterval, - List containerList, ResourceManager rm, SLSRunner se, - long traceStartTime, long traceFinishTime, String user, String queue, - boolean isTracked, String oldAppId) { - super.init(traceStartTime, traceStartTime + 1000000L * heartbeatInterval, - heartbeatInterval); - this.user = user; - this.rm = rm; - this.se = se; - this.user = user; - this.queue = queue; - this.oldAppId = oldAppId; - this.isTracked = isTracked; - this.traceStartTimeMS = traceStartTime; - this.traceFinishTimeMS = traceFinishTime; + @SuppressWarnings("checkstyle:parameternumber") + public void init(int id, int heartbeatInterval, + List containerList, ResourceManager resourceManager, + SLSRunner slsRunnner, long startTime, long finishTime, String simUser, + String simQueue, boolean tracked, String oldApp, + ReservationSubmissionRequest rr, long baseTimeMS) { + super.init(startTime, startTime + 1000000L * heartbeatInterval, + heartbeatInterval); + this.user = simUser; + this.rm = resourceManager; + this.se = slsRunnner; + this.queue = simQueue; + this.oldAppId = oldApp; + this.isTracked = tracked; + this.baselineTimeMS = baseTimeMS; + this.traceStartTimeMS = startTime; + this.traceFinishTimeMS = finishTime; + this.reservationRequest = rr; } /** @@ -143,11 +152,21 @@ public void init(int id, int heartbeatInterval, */ @Override public void firstStep() throws Exception { - simulateStartTimeMS = System.currentTimeMillis() - - SLSRunner.getRunner().getStartTimeMS(); + simulateStartTimeMS = System.currentTimeMillis() - baselineTimeMS; + + ReservationId reservationId = null; + + // submit a reservation if one is required, exceptions naturally happen + // when the reservation does not fit, catch, log, and move on running job + // without reservation. + try { + reservationId = submitReservationWhenSpecified(); + } catch (UndeclaredThrowableException y) { + LOG.warn("Unable to place reservation: " + y.getMessage()); + } // submit application, waiting until ACCEPTED - submitApp(); + submitApp(reservationId); // track app metrics trackApp(); @@ -161,6 +180,26 @@ public synchronized void notifyAMContainerLaunched(Container masterContainer) isAMContainerRunning = true; } + private ReservationId submitReservationWhenSpecified() + throws IOException, InterruptedException { + if (reservationRequest != null) { + UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user); + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws YarnException, IOException { + rm.getClientRMService().submitReservation(reservationRequest); + LOG.info("RESERVATION SUCCESSFULLY SUBMITTED " + + reservationRequest.getReservationId()); + return null; + + } + }); + return reservationRequest.getReservationId(); + } else { + return null; + } + } + @Override public void middleStep() throws Exception { if (isAMContainerRunning) { @@ -217,14 +256,13 @@ public Object run() throws Exception { } }); - simulateFinishTimeMS = System.currentTimeMillis() - - SLSRunner.getRunner().getStartTimeMS(); + simulateFinishTimeMS = System.currentTimeMillis() - baselineTimeMS; // record job running information SchedulerMetrics schedulerMetrics = - ((SchedulerWrapper)rm.getResourceScheduler()).getSchedulerMetrics(); + ((SchedulerWrapper)rm.getResourceScheduler()).getSchedulerMetrics(); if (schedulerMetrics != null) { schedulerMetrics.addAMRuntime(appId, traceStartTimeMS, traceFinishTimeMS, - simulateStartTimeMS, simulateFinishTimeMS); + simulateStartTimeMS, simulateFinishTimeMS); } } @@ -261,7 +299,7 @@ protected AllocateRequest createAllocateRequest(List ask) { protected abstract void checkStop(); - private void submitApp() + private void submitApp(ReservationId reservationId) throws YarnException, InterruptedException, IOException { // ask for new application GetNewApplicationRequest newAppRequest = @@ -291,6 +329,11 @@ private void submitApp() appSubContext.setResource(Resources .createResource(MR_AM_CONTAINER_RESOURCE_MEMORY_MB, MR_AM_CONTAINER_RESOURCE_VCORES)); + + if(reservationId != null) { + appSubContext.setReservationID(reservationId); + } + subAppRequest.setApplicationSubmissionContext(appSubContext); UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user); ugi.doAs(new PrivilegedExceptionAction() { diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java index e726b09a26d..de6d19deec3 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java @@ -27,13 +27,13 @@ import java.util.List; import java.util.Map; -import org.apache.avro.Protocol; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -42,7 +42,6 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; -import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; import org.apache.hadoop.yarn.sls.SLSRunner; @@ -114,13 +113,15 @@ scheduled when all maps have finished (not support slow-start currently). public final Logger LOG = Logger.getLogger(MRAMSimulator.class); + @SuppressWarnings("checkstyle:parameternumber") public void init(int id, int heartbeatInterval, List containerList, ResourceManager rm, SLSRunner se, long traceStartTime, long traceFinishTime, String user, String queue, - boolean isTracked, String oldAppId) { + boolean isTracked, String oldAppId, ReservationSubmissionRequest rr, + long baselineStartTimeMS) { super.init(id, heartbeatInterval, containerList, rm, se, traceStartTime, traceFinishTime, user, queue, - isTracked, oldAppId); + isTracked, oldAppId, rr, baselineStartTimeMS); amtype = "mapreduce"; // get map/reduce tasks diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java index 20cf3e5fd91..b4ffb617c65 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java @@ -65,6 +65,11 @@ protected void serviceStart() throws Exception { // Do nothing } + @Override + protected void serviceStop() throws Exception { + // Do nothing + } + private void setupAMRMToken(RMAppAttempt appAttempt) { // Setup AMRMToken Token amrmToken = diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java index 7c37465d7b6..56190df4326 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java @@ -28,7 +28,6 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; @@ -52,7 +51,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.sls.SLSRunner; import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; -import org.apache.hadoop.yarn.sls.utils.SLSUtils; import org.apache.hadoop.yarn.util.resource.Resources; import com.codahale.metrics.Timer; @@ -96,16 +94,6 @@ public void setConf(Configuration conf) { } catch (Exception e) { e.printStackTrace(); } - - ShutdownHookManager.get().addShutdownHook(new Runnable() { - @Override public void run() { - try { - schedulerMetrics.tearDown(); - } catch (Exception e) { - e.printStackTrace(); - } - } - }, SLSUtils.SHUTDOWN_HOOK_PRIORITY); } } @@ -344,7 +332,6 @@ private void initQueueMetrics(CSQueue queue) { initQueueMetrics(child); } } - @Override public void serviceInit(Configuration configuration) throws Exception { super.serviceInit(configuration); @@ -354,6 +341,17 @@ public void serviceInit(Configuration configuration) throws Exception { } } + @Override + public void serviceStop() throws Exception { + try { + schedulerMetrics.tearDown(); + } catch (Exception e) { + e.printStackTrace(); + } + super.serviceStop(); + } + + public SchedulerMetrics getSchedulerMetrics() { return schedulerMetrics; } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java index 572dacfc55d..f740f5a170a 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java @@ -22,7 +22,6 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; @@ -44,7 +43,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.sls.SLSRunner; import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; -import org.apache.hadoop.yarn.sls.utils.SLSUtils; import org.apache.hadoop.yarn.util.resource.Resources; import java.io.IOException; @@ -90,16 +88,6 @@ public void setConf(Configuration conf) { } catch (Exception e) { e.printStackTrace(); } - - ShutdownHookManager.get().addShutdownHook(new Runnable() { - @Override public void run() { - try { - schedulerMetrics.tearDown(); - } catch (Exception e) { - e.printStackTrace(); - } - } - }, SLSUtils.SHUTDOWN_HOOK_PRIORITY); } } @@ -335,5 +323,15 @@ public void serviceInit(Configuration conf) throws Exception { initQueueMetrics(getQueueManager().getRootQueue()); } } + + @Override + public void serviceStop() throws Exception { + try { + schedulerMetrics.tearDown(); + } catch (Exception e) { + e.printStackTrace(); + } + super.serviceStop(); + } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/TaskRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/TaskRunner.java index d35290428c7..19cfe88d1ab 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/TaskRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/TaskRunner.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.yarn.sls.scheduler; -import java.io.IOException; import java.text.MessageFormat; import java.util.Queue; import java.util.concurrent.DelayQueue; @@ -27,7 +26,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.yarn.exceptions.YarnException; @Private @Unstable @@ -148,8 +146,8 @@ public void setQueueSize(int threadPoolSize) { @SuppressWarnings("unchecked") public void start() { - if (executor != null) { - throw new IllegalStateException("Already started"); + if (executor != null && !executor.isTerminated()) { + throw new IllegalStateException("Executor already running"); } DelayQueue preStartQueue = queue; @@ -164,8 +162,9 @@ public void start() { } } - public void stop() { + public void stop() throws InterruptedException { executor.shutdownNow(); + executor.awaitTermination(20, TimeUnit.SECONDS); } @SuppressWarnings("unchecked") diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java new file mode 100644 index 00000000000..3ed81e1f2cd --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java @@ -0,0 +1,306 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.sls.synthetic; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.math3.distribution.LogNormalDistribution; +import org.apache.commons.math3.random.JDKRandomGenerator; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.TaskStatus.State; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.tools.rumen.*; +import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values; + +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.hadoop.mapreduce.MRJobConfig.QUEUE_NAME; + +/** + * Generates random task data for a synthetic job. + */ +public class SynthJob implements JobStory { + + @SuppressWarnings("StaticVariableName") + private static Log LOG = LogFactory.getLog(SynthJob.class); + + private final Configuration conf; + private final int id; + + @SuppressWarnings("ConstantName") + private static final AtomicInteger sequence = new AtomicInteger(0); + private final String name; + private final String queueName; + private final SynthJobClass jobClass; + + // job timing + private final long submitTime; + private final long duration; + private final long deadline; + + private final int numMapTasks; + private final int numRedTasks; + private final long mapMaxMemory; + private final long reduceMaxMemory; + private final long mapMaxVcores; + private final long reduceMaxVcores; + private final long[] mapRuntime; + private final float[] reduceRuntime; + private long totMapRuntime; + private long totRedRuntime; + + public SynthJob(JDKRandomGenerator rand, Configuration conf, + SynthJobClass jobClass, long actualSubmissionTime) { + + this.conf = conf; + this.jobClass = jobClass; + + this.duration = MILLISECONDS.convert(jobClass.getDur(), SECONDS); + this.numMapTasks = jobClass.getMtasks(); + this.numRedTasks = jobClass.getRtasks(); + + // sample memory distributions, correct for sub-minAlloc sizes + long tempMapMaxMemory = jobClass.getMapMaxMemory(); + this.mapMaxMemory = tempMapMaxMemory < MRJobConfig.DEFAULT_MAP_MEMORY_MB + ? MRJobConfig.DEFAULT_MAP_MEMORY_MB : tempMapMaxMemory; + long tempReduceMaxMemory = jobClass.getReduceMaxMemory(); + this.reduceMaxMemory = + tempReduceMaxMemory < MRJobConfig.DEFAULT_REDUCE_MEMORY_MB + ? MRJobConfig.DEFAULT_REDUCE_MEMORY_MB : tempReduceMaxMemory; + + // sample vcores distributions, correct for sub-minAlloc sizes + long tempMapMaxVCores = jobClass.getMapMaxVcores(); + this.mapMaxVcores = tempMapMaxVCores < MRJobConfig.DEFAULT_MAP_CPU_VCORES + ? MRJobConfig.DEFAULT_MAP_CPU_VCORES : tempMapMaxVCores; + long tempReduceMaxVcores = jobClass.getReduceMaxVcores(); + this.reduceMaxVcores = + tempReduceMaxVcores < MRJobConfig.DEFAULT_REDUCE_CPU_VCORES + ? MRJobConfig.DEFAULT_REDUCE_CPU_VCORES : tempReduceMaxVcores; + + if (numMapTasks > 0) { + conf.setLong(MRJobConfig.MAP_MEMORY_MB, this.mapMaxMemory); + conf.set(MRJobConfig.MAP_JAVA_OPTS, + "-Xmx" + (this.mapMaxMemory - 100) + "m"); + } + + if (numRedTasks > 0) { + conf.setLong(MRJobConfig.REDUCE_MEMORY_MB, this.reduceMaxMemory); + conf.set(MRJobConfig.REDUCE_JAVA_OPTS, + "-Xmx" + (this.reduceMaxMemory - 100) + "m"); + } + + boolean hasDeadline = + (rand.nextDouble() <= jobClass.jobClass.chance_of_reservation); + + LogNormalDistribution deadlineFactor = + SynthUtils.getLogNormalDist(rand, jobClass.jobClass.deadline_factor_avg, + jobClass.jobClass.deadline_factor_stddev); + + double deadlineFactorSample = + (deadlineFactor != null) ? deadlineFactor.sample() : -1; + + this.queueName = jobClass.workload.getQueueName(); + + this.submitTime = MILLISECONDS.convert(actualSubmissionTime, SECONDS); + + this.deadline = + hasDeadline ? MILLISECONDS.convert(actualSubmissionTime, SECONDS) + + (long) Math.ceil(deadlineFactorSample * duration) : -1; + + conf.set(QUEUE_NAME, queueName); + + // name and initialize job randomness + final long seed = rand.nextLong(); + rand.setSeed(seed); + id = sequence.getAndIncrement(); + + name = String.format(jobClass.getClassName() + "_%06d", id); + LOG.debug(name + " (" + seed + ")"); + + LOG.info("JOB TIMING`: job: " + name + " submission:" + submitTime + + " deadline:" + deadline + " duration:" + duration + + " deadline-submission: " + (deadline - submitTime)); + + // generate map and reduce runtimes + mapRuntime = new long[numMapTasks]; + for (int i = 0; i < numMapTasks; i++) { + mapRuntime[i] = jobClass.getMapTimeSample(); + totMapRuntime += mapRuntime[i]; + } + reduceRuntime = new float[numRedTasks]; + for (int i = 0; i < numRedTasks; i++) { + reduceRuntime[i] = jobClass.getReduceTimeSample(); + totRedRuntime += (long) Math.ceil(reduceRuntime[i]); + } + } + + public boolean hasDeadline() { + return deadline > 0; + } + + @Override + public String getName() { + return name; + } + + @Override + public String getUser() { + return jobClass.getUserName(); + } + + @Override + public JobID getJobID() { + return new JobID("job_mock_" + name, id); + } + + @Override + public Values getOutcome() { + return Values.SUCCESS; + } + + @Override + public long getSubmissionTime() { + return submitTime; + } + + @Override + public int getNumberMaps() { + return numMapTasks; + } + + @Override + public int getNumberReduces() { + return numRedTasks; + } + + @Override + public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) { + switch (taskType) { + case MAP: + return new TaskInfo(-1, -1, -1, -1, mapMaxMemory, mapMaxVcores); + case REDUCE: + return new TaskInfo(-1, -1, -1, -1, reduceMaxMemory, reduceMaxVcores); + default: + throw new IllegalArgumentException("Not interested"); + } + } + + @Override + public InputSplit[] getInputSplits() { + throw new UnsupportedOperationException(); + } + + @Override + public TaskAttemptInfo getTaskAttemptInfo(TaskType taskType, int taskNumber, + int taskAttemptNumber) { + switch (taskType) { + case MAP: + return new MapTaskAttemptInfo(State.SUCCEEDED, + getTaskInfo(taskType, taskNumber), mapRuntime[taskNumber], null); + + case REDUCE: + // We assume uniform split between pull/sort/reduce + // aligned with naive progress reporting assumptions + return new ReduceTaskAttemptInfo(State.SUCCEEDED, + getTaskInfo(taskType, taskNumber), + (long) Math.round((reduceRuntime[taskNumber] / 3)), + (long) Math.round((reduceRuntime[taskNumber] / 3)), + (long) Math.round((reduceRuntime[taskNumber] / 3)), null); + + default: + break; + } + throw new UnsupportedOperationException(); + } + + @Override + public TaskAttemptInfo getMapTaskAttemptInfoAdjusted(int taskNumber, + int taskAttemptNumber, int locality) { + throw new UnsupportedOperationException(); + } + + @Override + public org.apache.hadoop.mapred.JobConf getJobConf() { + return new JobConf(conf); + } + + @Override + public String getQueueName() { + return queueName; + } + + @Override + public String toString() { + return "SynthJob [\n" + " workload=" + jobClass.getWorkload().getId() + + "\n" + " jobClass=" + + jobClass.getWorkload().getClassList().indexOf(jobClass) + "\n" + + " conf=" + conf + ",\n" + " id=" + id + ",\n" + " name=" + name + + ",\n" + " mapRuntime=" + Arrays.toString(mapRuntime) + ",\n" + + " reduceRuntime=" + Arrays.toString(reduceRuntime) + ",\n" + + " submitTime=" + submitTime + ",\n" + " numMapTasks=" + numMapTasks + + ",\n" + " numRedTasks=" + numRedTasks + ",\n" + " mapMaxMemory=" + + mapMaxMemory + ",\n" + " reduceMaxMemory=" + reduceMaxMemory + ",\n" + + " queueName=" + queueName + "\n" + "]"; + } + + public SynthJobClass getJobClass() { + return jobClass; + } + + public long getTotalSlotTime() { + return totMapRuntime + totRedRuntime; + } + + public long getDuration() { + return duration; + } + + public long getDeadline() { + return deadline; + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof SynthJob)) { + return false; + } + SynthJob o = (SynthJob) other; + return Arrays.equals(mapRuntime, o.mapRuntime) + && Arrays.equals(reduceRuntime, o.reduceRuntime) + && submitTime == o.submitTime && numMapTasks == o.numMapTasks + && numRedTasks == o.numRedTasks && mapMaxMemory == o.mapMaxMemory + && reduceMaxMemory == o.reduceMaxMemory + && mapMaxVcores == o.mapMaxVcores + && reduceMaxVcores == o.reduceMaxVcores && queueName.equals(o.queueName) + && jobClass.equals(o.jobClass) && totMapRuntime == o.totMapRuntime + && totRedRuntime == o.totRedRuntime; + } + + @Override + public int hashCode() { + // could have a bad distr; investigate if a relevant use case exists + return jobClass.hashCode() * (int) submitTime; + } +} diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJobClass.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJobClass.java new file mode 100644 index 00000000000..439698f8a45 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJobClass.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.sls.synthetic; + +import org.apache.commons.math3.distribution.AbstractRealDistribution; +import org.apache.commons.math3.distribution.LogNormalDistribution; +import org.apache.commons.math3.random.JDKRandomGenerator; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.tools.rumen.JobStory; +import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer.JobClass; +import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer.Trace; + +/** + * This is a class that represent a class of Jobs. It is used to generate an + * individual job, by picking random durations, task counts, container size, + * etc. + */ +public class SynthJobClass { + + private final JDKRandomGenerator rand; + private final LogNormalDistribution dur; + private final LogNormalDistribution mapRuntime; + private final LogNormalDistribution redRuntime; + private final LogNormalDistribution mtasks; + private final LogNormalDistribution rtasks; + private final LogNormalDistribution mapMem; + private final LogNormalDistribution redMem; + private final LogNormalDistribution mapVcores; + private final LogNormalDistribution redVcores; + + private final Trace trace; + @SuppressWarnings("VisibilityModifier") + protected final SynthWorkload workload; + @SuppressWarnings("VisibilityModifier") + protected final JobClass jobClass; + + public SynthJobClass(JDKRandomGenerator rand, Trace trace, + SynthWorkload workload, int classId) { + + this.trace = trace; + this.workload = workload; + this.rand = new JDKRandomGenerator(); + this.rand.setSeed(rand.nextLong()); + jobClass = trace.workloads.get(workload.getId()).job_classes.get(classId); + + this.dur = SynthUtils.getLogNormalDist(rand, jobClass.dur_avg, + jobClass.dur_stddev); + this.mapRuntime = SynthUtils.getLogNormalDist(rand, jobClass.mtime_avg, + jobClass.mtime_stddev); + this.redRuntime = SynthUtils.getLogNormalDist(rand, jobClass.rtime_avg, + jobClass.rtime_stddev); + this.mtasks = SynthUtils.getLogNormalDist(rand, jobClass.mtasks_avg, + jobClass.mtasks_stddev); + this.rtasks = SynthUtils.getLogNormalDist(rand, jobClass.rtasks_avg, + jobClass.rtasks_stddev); + + this.mapMem = SynthUtils.getLogNormalDist(rand, jobClass.map_max_memory_avg, + jobClass.map_max_memory_stddev); + this.redMem = SynthUtils.getLogNormalDist(rand, + jobClass.reduce_max_memory_avg, jobClass.reduce_max_memory_stddev); + this.mapVcores = SynthUtils.getLogNormalDist(rand, + jobClass.map_max_vcores_avg, jobClass.map_max_vcores_stddev); + this.redVcores = SynthUtils.getLogNormalDist(rand, + jobClass.reduce_max_vcores_avg, jobClass.reduce_max_vcores_stddev); + } + + public JobStory getJobStory(Configuration conf, long actualSubmissionTime) { + return new SynthJob(rand, conf, this, actualSubmissionTime); + } + + @Override + public String toString() { + return "SynthJobClass [workload=" + workload.getName() + ", class=" + + jobClass.class_name + " job_count=" + jobClass.class_weight + ", dur=" + + ((dur != null) ? dur.getNumericalMean() : 0) + ", mapRuntime=" + + ((mapRuntime != null) ? mapRuntime.getNumericalMean() : 0) + + ", redRuntime=" + + ((redRuntime != null) ? redRuntime.getNumericalMean() : 0) + + ", mtasks=" + ((mtasks != null) ? mtasks.getNumericalMean() : 0) + + ", rtasks=" + ((rtasks != null) ? rtasks.getNumericalMean() : 0) + + ", chance_of_reservation=" + jobClass.chance_of_reservation + "]\n"; + + } + + public double getClassWeight() { + return jobClass.class_weight; + } + + public long getDur() { + return genLongSample(dur); + } + + public int getMtasks() { + return genIntSample(mtasks); + } + + public int getRtasks() { + return genIntSample(rtasks); + } + + public long getMapMaxMemory() { + return genLongSample(mapMem); + } + + public long getReduceMaxMemory() { + return genLongSample(redMem); + } + + public long getMapMaxVcores() { + return genLongSample(mapVcores); + } + + public long getReduceMaxVcores() { + return genLongSample(redVcores); + } + + public SynthWorkload getWorkload() { + return workload; + } + + public int genIntSample(AbstractRealDistribution dist) { + if (dist == null) { + return 0; + } + double baseSample = dist.sample(); + if (baseSample < 0) { + baseSample = 0; + } + return (int) (Integer.MAX_VALUE & (long) Math.ceil(baseSample)); + } + + public long genLongSample(AbstractRealDistribution dist) { + return dist != null ? (long) Math.ceil(dist.sample()) : 0; + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof SynthJobClass)) { + return false; + } + SynthJobClass o = (SynthJobClass) other; + return workload.equals(o.workload); + } + + @Override + public int hashCode() { + return workload.hashCode() * workload.getId(); + } + + public String getClassName() { + return jobClass.class_name; + } + + public long getMapTimeSample() { + return genLongSample(mapRuntime); + } + + public long getReduceTimeSample() { + return genLongSample(redRuntime); + } + + public String getUserName() { + return jobClass.user_name; + } +} diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java new file mode 100644 index 00000000000..3d2ec947fb6 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java @@ -0,0 +1,316 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.sls.synthetic; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.math3.random.JDKRandomGenerator; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.tools.rumen.JobStory; +import org.apache.hadoop.tools.rumen.JobStoryProducer; +import org.codehaus.jackson.annotate.JsonProperty; +import org.codehaus.jackson.map.ObjectMapper; + +import javax.xml.bind.annotation.XmlRootElement; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.codehaus.jackson.JsonParser.Feature.INTERN_FIELD_NAMES; +import static org.codehaus.jackson.map.DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES; + +/** + * This is a JobStoryProducer that operates from distribution of different + * workloads. The .json input file is used to determine how many jobs, which + * size, number of maps/reducers and their duration, as well as the temporal + * distributed of submissions. For each parameter we control avg and stdev, and + * generate values via normal or log-normal distributions. + */ +public class SynthTraceJobProducer implements JobStoryProducer { + + @SuppressWarnings("StaticVariableName") + private static final Log LOG = LogFactory.getLog(SynthTraceJobProducer.class); + + private final Configuration conf; + private final AtomicInteger numJobs; + private final Trace trace; + private final long seed; + + private int totalWeight; + private final List weightList; + private final Map workloads; + + private final Queue listStoryParams; + + private final JDKRandomGenerator rand; + + public static final String SLS_SYNTHETIC_TRACE_FILE = + "sls.synthetic" + ".trace_file"; + + public SynthTraceJobProducer(Configuration conf) throws IOException { + this(conf, new Path(conf.get(SLS_SYNTHETIC_TRACE_FILE))); + } + + public SynthTraceJobProducer(Configuration conf, Path path) + throws IOException { + + LOG.info("SynthTraceJobProducer"); + + this.conf = conf; + this.rand = new JDKRandomGenerator(); + workloads = new HashMap(); + weightList = new ArrayList(); + + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(INTERN_FIELD_NAMES, true); + mapper.configure(FAIL_ON_UNKNOWN_PROPERTIES, false); + + FileSystem ifs = path.getFileSystem(conf); + FSDataInputStream fileIn = ifs.open(path); + + this.trace = mapper.readValue(fileIn, Trace.class); + seed = trace.rand_seed; + rand.setSeed(seed); + + this.numJobs = new AtomicInteger(trace.num_jobs); + + for (int workloadId = 0; workloadId < trace.workloads + .size(); workloadId++) { + SynthWorkload workload = new SynthWorkload(workloadId, trace); + for (int classId = + 0; classId < trace.workloads.get(workloadId).job_classes + .size(); classId++) { + SynthJobClass cls = new SynthJobClass(rand, trace, workload, classId); + workload.add(cls); + } + workloads.put(workloadId, workload); + } + + for (int i = 0; i < workloads.size(); i++) { + double w = workloads.get(i).getWorkloadWeight(); + totalWeight += w; + weightList.add(w); + } + + // create priority queue to keep start-time sorted + listStoryParams = + new PriorityQueue(10, new Comparator() { + @Override + public int compare(StoryParams o1, StoryParams o2) { + return Math + .toIntExact(o2.actualSubmissionTime - o1.actualSubmissionTime); + } + }); + + // initialize it + createStoryParams(); + LOG.info("Generated " + listStoryParams.size() + " deadlines for " + + this.numJobs.get() + " jobs "); + } + + public long getSeed() { + return seed; + } + + public int getNodesPerRack() { + return trace.nodes_per_rack; + } + + public int getNumNodes() { + return trace.num_nodes; + } + + /** + * Class used to parse a trace configuration file. + */ + @SuppressWarnings({ "membername", "checkstyle:visibilitymodifier" }) + @XmlRootElement + public static class Trace { + @JsonProperty("description") + String description; + @JsonProperty("num_nodes") + int num_nodes; + @JsonProperty("nodes_per_rack") + int nodes_per_rack; + @JsonProperty("num_jobs") + int num_jobs; + + // in sec (selects a portion of time_distribution + @JsonProperty("rand_seed") + long rand_seed; + @JsonProperty("workloads") + List workloads; + + } + + /** + * Class used to parse a workload from file. + */ + @SuppressWarnings({ "membername", "checkstyle:visibilitymodifier" }) + public static class Workload { + @JsonProperty("workload_name") + String workload_name; + // used to change probability this workload is picked for each job + @JsonProperty("workload_weight") + double workload_weight; + @JsonProperty("queue_name") + String queue_name; + @JsonProperty("job_classes") + List job_classes; + @JsonProperty("time_distribution") + List time_distribution; + } + + /** + * Class used to parse a job class from file. + */ + @SuppressWarnings({ "membername", "checkstyle:visibilitymodifier" }) + public static class JobClass { + + @JsonProperty("class_name") + String class_name; + @JsonProperty("user_name") + String user_name; + + // used to change probability this class is chosen + @JsonProperty("class_weight") + double class_weight; + + // reservation related params + @JsonProperty("chance_of_reservation") + double chance_of_reservation; + @JsonProperty("deadline_factor_avg") + double deadline_factor_avg; + @JsonProperty("deadline_factor_stddev") + double deadline_factor_stddev; + + // durations in sec + @JsonProperty("dur_avg") + double dur_avg; + @JsonProperty("dur_stddev") + double dur_stddev; + @JsonProperty("mtime_avg") + double mtime_avg; + @JsonProperty("mtime_stddev") + double mtime_stddev; + @JsonProperty("rtime_avg") + double rtime_avg; + @JsonProperty("rtime_stddev") + double rtime_stddev; + + // number of tasks + @JsonProperty("mtasks_avg") + double mtasks_avg; + @JsonProperty("mtasks_stddev") + double mtasks_stddev; + @JsonProperty("rtasks_avg") + double rtasks_avg; + @JsonProperty("rtasks_stddev") + double rtasks_stddev; + + // memory in MB + @JsonProperty("map_max_memory_avg") + long map_max_memory_avg; + @JsonProperty("map_max_memory_stddev") + double map_max_memory_stddev; + @JsonProperty("reduce_max_memory_avg") + long reduce_max_memory_avg; + @JsonProperty("reduce_max_memory_stddev") + double reduce_max_memory_stddev; + + // vcores + @JsonProperty("map_max_vcores_avg") + long map_max_vcores_avg; + @JsonProperty("map_max_vcores_stddev") + double map_max_vcores_stddev; + @JsonProperty("reduce_max_vcores_avg") + long reduce_max_vcores_avg; + @JsonProperty("reduce_max_vcores_stddev") + double reduce_max_vcores_stddev; + + } + + /** + * This is used to define time-varying probability of a job start-time (e.g., + * to simulate daily patterns). + */ + @SuppressWarnings({ "membername", "checkstyle:visibilitymodifier" }) + public static class TimeSample { + // in sec + @JsonProperty("time") + int time; + @JsonProperty("weight") + double jobs; + } + + static class StoryParams { + private SynthJobClass pickedJobClass; + private long actualSubmissionTime; + + StoryParams(SynthJobClass pickedJobClass, long actualSubmissionTime) { + this.pickedJobClass = pickedJobClass; + this.actualSubmissionTime = actualSubmissionTime; + } + } + + + void createStoryParams() { + + for (int i = 0; i < numJobs.get(); i++) { + int workload = SynthUtils.getWeighted(weightList, rand); + SynthWorkload pickedWorkload = workloads.get(workload); + long jobClass = + SynthUtils.getWeighted(pickedWorkload.getWeightList(), rand); + SynthJobClass pickedJobClass = + pickedWorkload.getClassList().get((int) jobClass); + long actualSubmissionTime = pickedWorkload.getBaseSubmissionTime(rand); + // long actualSubmissionTime = (i + 1) * 10; + listStoryParams + .add(new StoryParams(pickedJobClass, actualSubmissionTime)); + } + } + + @Override + public JobStory getNextJob() throws IOException { + if (numJobs.decrementAndGet() < 0) { + return null; + } + StoryParams storyParams = listStoryParams.poll(); + return storyParams.pickedJobClass.getJobStory(conf, + storyParams.actualSubmissionTime); + } + + @Override + public void close() { + } + + @Override + public String toString() { + return "SynthTraceJobProducer [ conf=" + conf + ", numJobs=" + numJobs + + ", weightList=" + weightList + ", r=" + rand + ", totalWeight=" + + totalWeight + ", workloads=" + workloads + "]"; + } + + public int getNumJobs() { + return trace.num_jobs; + } + +} diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthUtils.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthUtils.java new file mode 100644 index 00000000000..a7f8c7f1933 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthUtils.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.sls.synthetic; + +import org.apache.commons.math3.distribution.LogNormalDistribution; +import org.apache.commons.math3.distribution.NormalDistribution; +import org.apache.commons.math3.random.JDKRandomGenerator; + +import java.util.Collection; +import java.util.Random; + +/** + * Utils for the Synthetic generator. + */ +public final class SynthUtils { + + private SynthUtils(){ + //class is not meant to be instantiated + } + + public static int getWeighted(Collection weights, Random rr) { + + double totalWeight = 0; + for (Double i : weights) { + totalWeight += i; + } + + double rand = rr.nextDouble() * totalWeight; + + double cur = 0; + int ind = 0; + for (Double i : weights) { + cur += i; + if (cur > rand) { + break; + } + ind++; + } + + return ind; + } + + public static NormalDistribution getNormalDist(JDKRandomGenerator rand, + double average, double stdDev) { + + if (average <= 0) { + return null; + } + + // set default for missing param + if (stdDev == 0) { + stdDev = average / 6; + } + + NormalDistribution ret = new NormalDistribution(average, stdDev, + NormalDistribution.DEFAULT_INVERSE_ABSOLUTE_ACCURACY); + ret.reseedRandomGenerator(rand.nextLong()); + return ret; + } + + public static LogNormalDistribution getLogNormalDist(JDKRandomGenerator rand, + double mean, double stdDev) { + + if (mean <= 0) { + return null; + } + + // set default for missing param + if (stdDev == 0) { + stdDev = mean / 6; + } + + // derive lognormal parameters for X = LogNormal(mu, sigma) + // sigma^2 = ln (1+Var[X]/(E[X])^2) + // mu = ln(E[X]) - 1/2 * sigma^2 + double var = stdDev * stdDev; + double sigmasq = Math.log1p(var / (mean * mean)); + double sigma = Math.sqrt(sigmasq); + double mu = Math.log(mean) - 0.5 * sigmasq; + + LogNormalDistribution ret = new LogNormalDistribution(mu, sigma, + LogNormalDistribution.DEFAULT_INVERSE_ABSOLUTE_ACCURACY); + ret.reseedRandomGenerator(rand.nextLong()); + return ret; + } +} diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthWorkload.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthWorkload.java new file mode 100644 index 00000000000..9e5fd4ef742 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthWorkload.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.sls.synthetic; + +import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer.Trace; + +import java.util.*; + +/** + * This class represent a workload (made up of multiple SynthJobClass(es)). It + * also stores the temporal distributions of jobs in this workload. + */ +public class SynthWorkload { + + private final int id; + private final List classList; + private final Trace trace; + private final SortedMap timeWeights; + + public SynthWorkload(int identifier, Trace inTrace) { + classList = new ArrayList(); + this.id = identifier; + this.trace = inTrace; + timeWeights = new TreeMap(); + for (SynthTraceJobProducer.TimeSample ts : trace.workloads + .get(id).time_distribution) { + timeWeights.put(ts.time, ts.jobs); + } + } + + public boolean add(SynthJobClass s) { + return classList.add(s); + } + + public List getWeightList() { + ArrayList ret = new ArrayList(); + for (SynthJobClass s : classList) { + ret.add(s.getClassWeight()); + } + return ret; + } + + public int getId() { + return id; + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof SynthWorkload)) { + return false; + } + // assume ID determines job classes by construction + return getId() == ((SynthWorkload) other).getId(); + } + + @Override + public int hashCode() { + return getId(); + } + + @Override + public String toString() { + return "SynthWorkload " + trace.workloads.get(id).workload_name + "[\n" + + classList + "]\n"; + } + + public String getName() { + return trace.workloads.get(id).workload_name; + } + + public double getWorkloadWeight() { + return trace.workloads.get(id).workload_weight; + } + + public String getQueueName() { + return trace.workloads.get(id).queue_name; + } + + public long getBaseSubmissionTime(Random rand) { + + // pick based on weights the "bucket" for this start time + int position = SynthUtils.getWeighted(timeWeights.values(), rand); + + int[] time = new int[timeWeights.keySet().size()]; + int index = 0; + for (Integer i : timeWeights.keySet()) { + time[index++] = i; + } + + // uniformly pick a time between start and end time of this bucket + int startRange = time[position]; + int endRange = startRange; + // if there is no subsequent bucket pick startRange + if (position < timeWeights.keySet().size() - 1) { + endRange = time[position + 1]; + return startRange + rand.nextInt((endRange - startRange)); + } else { + return startRange; + } + } + + public List getClassList() { + return classList; + } + +} diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/package-info.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/package-info.java new file mode 100644 index 00000000000..e06961069df --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Classes comprising the synthetic load generator for SLS. + */ +package org.apache.hadoop.yarn.sls.synthetic; \ No newline at end of file diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java index 085edc00856..eaa59ddfd0f 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java @@ -149,4 +149,13 @@ } return nodeSet; } + + public static Set generateNodesFromSynth( + int numNodes, int nodesPerRack) { + Set nodeSet = new HashSet(); + for (int i = 0; i < numNodes; i++) { + nodeSet.add("/rack" + i % nodesPerRack + "/node" + i); + } + return nodeSet; + } } diff --git a/hadoop-tools/hadoop-sls/src/site/markdown/SchedulerLoadSimulator.md b/hadoop-tools/hadoop-sls/src/site/markdown/SchedulerLoadSimulator.md index dfd872ca297..f0e3b8c4337 100644 --- a/hadoop-tools/hadoop-sls/src/site/markdown/SchedulerLoadSimulator.md +++ b/hadoop-tools/hadoop-sls/src/site/markdown/SchedulerLoadSimulator.md @@ -27,9 +27,11 @@ Yarn Scheduler Load Simulator (SLS) * [Metrics](#Metrics) * [Real-time Tracking](#Real-time_Tracking) * [Offline Analysis](#Offline_Analysis) + * [Synthetic Load Generator](#SynthGen) * [Appendix](#Appendix) * [Resources](#Resources) * [SLS JSON input file format](#SLS_JSON_input_file_format) + * [SYNTH JSON input file format](#SYNTH_JSON_input_file_format) * [Simulator input topology file format](#Simulator_input_topology_file_format) Overview @@ -72,7 +74,7 @@ The following figure illustrates the implementation architecture of the simulato ![The architecture of the simulator](images/sls_arch.png) -The simulator takes input of workload traces, and fetches the cluster and applications information. For each NM and AM, the simulator builds a simulator to simulate their running. All NM/AM simulators run in a thread pool. The simulator reuses Yarn Resource Manager, and builds a wrapper out of the scheduler. The Scheduler Wrapper can track the scheduler behaviors and generates several logs, which are the outputs of the simulator and can be further analyzed. +The simulator takes input of workload traces, or synthetic load distributions and generaters the cluster and applications information. For each NM and AM, the simulator builds a simulator to simulate their running. All NM/AM simulators run in a thread pool. The simulator reuses Yarn Resource Manager, and builds a wrapper out of the scheduler. The Scheduler Wrapper can track the scheduler behaviors and generates several logs, which are the outputs of the simulator and can be further analyzed. ### Usecases @@ -179,17 +181,30 @@ The simulator supports two types of input files: the rumen traces and its own in $ cd $HADOOP_ROOT/share/hadoop/tools/sls $ bin/slsrun.sh - --input-rumen |--input-sls= - --output-dir= [--nodes=] - [--track-jobs=] [--print-simulation] + Usage: slsrun.sh + --tracetype= + --tracelocation= + (deprecated --input-rumen= | --input-sls=) + --output-dir= + [--nodes=] + [--track-jobs=] + [--print-simulation] + * `--input-rumen`: The input rumen trace files. Users can input multiple files, separated by comma. One example trace is provided in `$HADOOP_ROOT/share/hadoop/tools/sls/sample-data/2jobs2min-rumen-jh.json`. + This is equivalent to `--tracetype=RUMEN --tracelocation=`. * `--input-sls`: Simulator its own file format. The simulator also provides a tool to convert rumen traces to sls traces (`rumen2sls.sh`). Refer to appendix for an example of sls input json file. + This is equivalent to `--tracetype=SLS --tracelocation=`. + +* `--tracetype`: This is the new way to configure the trace generation and + takes values RUMEN, SLS, or SYNTH, to trigger the three type of load generation + +* `--tracelocation`: Path to the input file, matching the tracetype above. * `--output-dir`: The output directory for generated running logs and metrics. @@ -279,12 +294,34 @@ After the simulator finishes, all logs are saved in the output directory specifi * Folder `metrics`: logs generated by the Metrics. +Users can also reproduce those real-time tracking charts in offline mode. Just upload the `realtimetrack.json` to `$HADOOP_ROOT/share/hadoop/tools/sls/html/showSimulationTrace.html`. For browser security problem, need to put files `realtimetrack.json` and `showSimulationTrace.html` in the same directory. + + +Synthetic Load Generator +------------------------ +The Synthetic Load Generator complements the extensive nature of SLS-native and RUMEN traces, by providing a +distribution-driven generation of load. The load generator is organized as a JobStoryProducer +(compatible with rumen, and thus gridmix for later integration). We seed the Random number generator so +that results randomized but deterministic---hence reproducible. +We organize the jobs being generated around */workloads/job_class* hierarchy, which allow to easily +group jobs with similar behaviors and categorize them (e.g., jobs with long running containers, or maponly +computations, etc..). The user can control average and standard deviations for many of the +important parameters, such as number of mappers/reducers, duration of mapper/reducers, size +(mem/cpu) of containers, chance of reservation, etc. We use weighted-random sampling (whenever we +pick among a small number of options) or LogNormal distributions (to avoid negative values) when we +pick from wide ranges of values---see appendix on LogNormal distributions. + +The SYNTH mode of SLS is very convenient to generate very large loads without the need for extensive input +files. This allows to easily explore wide range of use cases (e.g., imagine simulating 100k jobs, and in different +runs simply tune the average number of mappers, or average task duration), in an efficient and compact way. + Appendix -------- ### Resources [YARN-1021](https://issues.apache.org/jira/browse/YARN-1021) is the main JIRA that introduces Yarn Scheduler Load Simulator to Hadoop Yarn project. +[YARN-6363](https://issues.apache.org/jira/browse/YARN-6363) is the main JIRA that introduces the Synthetic Load Generator to SLS. ### SLS JSON input file format @@ -339,6 +376,77 @@ Here we provide an example format of the sls json file, which contains 2 jobs. T } ] } + +### SYNTH JSON input file format +Here we provide an example format of the synthetic generator json file. We use *(json-non-conforming)* inline comments to explain the use of each parameter. + + { + "description" : "tiny jobs workload", //description of the meaning of this collection of workloads + "num_nodes" : 10, //total nodes in the simulated cluster + "nodes_per_rack" : 4, //number of nodes in each simulated rack + "num_jobs" : 10, // total number of jobs being simulated + "rand_seed" : 2, //the random seed used for deterministic randomized runs + + // a list of “workloads”, each of which has job classes, and temporal properties + "workloads" : [ + { + "workload_name" : "tiny-test", // name of the workload + "workload_weight": 0.5, // used for weighted random selection of which workload to sample from + "queue_name" : "sls_queue_1", //queue the job will be submitted to + + //different classes of jobs for this workload + "job_classes" : [ + { + "class_name" : "class_1", //name of the class + "class_weight" : 1.0, //used for weighted random selection of class within workload + + //nextr group controls average and standard deviation of a LogNormal distribution that + //determines the number of mappers and reducers for thejob. + "mtasks_avg" : 5, + "mtasks_stddev" : 1, + "rtasks_avg" : 5, + "rtasks_stddev" : 1, + + //averge and stdev input param of LogNormal distribution controlling job duration + "dur_avg" : 60, + "dur_stddev" : 5, + + //averge and stdev input param of LogNormal distribution controlling mappers and reducers durations + "mtime_avg" : 10, + "mtime_stddev" : 2, + "rtime_avg" : 20, + "rtime_stddev" : 4, + + //averge and stdev input param of LogNormal distribution controlling memory and cores for map and reduce + "map_max_memory_avg" : 1024, + "map_max_memory_stddev" : 0.001, + "reduce_max_memory_avg" : 2048, + "reduce_max_memory_stddev" : 0.001, + "map_max_vcores_avg" : 1, + "map_max_vcores_stddev" : 0.001, + "reduce_max_vcores_avg" : 2, + "reduce_max_vcores_stddev" : 0.001, + + //probability of running this job with a reservation + "chance_of_reservation" : 0.5, + //input parameters of LogNormal distribution that determines the deadline slack (as a multiplier of job duration) + "deadline_factor_avg" : 10.0, + "deadline_factor_stddev" : 0.001, + } + ], + // for each workload determines with what probability each time bucket is picked to choose the job starttime. + // In the example below the jobs have twice as much chance to start in the first minute than in the second minute + // of simulation, and then zero chance thereafter. + "time_distribution" : [ + { "time" : 1, "weight" : 66 }, + { "time" : 60, "weight" : 33 }, + { "time" : 120, "jobs" : 0 } + ] + } + ] + } + + ### Simulator input topology file format Here is an example input topology file which has 3 nodes organized in 1 rack. @@ -353,3 +461,9 @@ Here is an example input topology file which has 3 nodes organized in 1 rack. "node" : "node3" }] } + +### Notes on LogNormal distribution: +LogNormal distributions represent well many of the parameters we see in practice (e.g., most jobs have +a small number of mappers, but few might be very large, and few very small, but greater than zero. It is +however worth noticing that it might be tricky to use, as the average is typically on the right side of the +peak (most common value) of the distribution, because the distribution has a one-side tail. diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/BaseSLSRunnerTest.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/BaseSLSRunnerTest.java new file mode 100644 index 00000000000..8ef72abcdc9 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/BaseSLSRunnerTest.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.sls; + +import net.jcip.annotations.NotThreadSafe; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.After; +import org.junit.Assert; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +/** + * This is a base class to ease the implementation of SLS-based tests. + */ +@RunWith(value = Parameterized.class) +@NotThreadSafe +@SuppressWarnings("VisibilityModifier") +public class BaseSLSRunnerTest { + + @Parameter(value = 0) + public String schedulerType; + + @Parameter(value = 1) + public String traceType; + + @Parameter(value = 2) + public String traceLocation; + + @Parameter(value = 3) + public String nodeFile; + + protected SLSRunner sls; + + @After + public void tearDown() throws InterruptedException { + sls.stop(); + } + + public void runSLS(Configuration conf, long timeout) throws Exception { + File tempDir = new File("target", UUID.randomUUID().toString()); + final List exceptionList = + Collections.synchronizedList(new ArrayList()); + + Thread.setDefaultUncaughtExceptionHandler( + new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + e.printStackTrace(); + exceptionList.add(e); + } + }); + + // start the simulator + File slsOutputDir = new File(tempDir.getAbsolutePath() + "/slsoutput/"); + + String[] args; + + switch (traceType) { + case "OLD_SLS": + args = new String[] {"-inputsls", traceLocation, "-output", + slsOutputDir.getAbsolutePath()}; + break; + case "OLD_RUMEN": + args = new String[] {"-inputrumen", traceLocation, "-output", + slsOutputDir.getAbsolutePath()}; + break; + default: + args = new String[] {"-tracetype", traceType, "-tracelocation", + traceLocation, "-output", slsOutputDir.getAbsolutePath()}; + } + + if (nodeFile != null) { + args = ArrayUtils.addAll(args, new String[] {"-nodes", nodeFile}); + } + + conf.set(YarnConfiguration.RM_SCHEDULER, schedulerType); + sls = new SLSRunner(conf); + sls.run(args); + + // wait for timeout seconds before stop, unless there is an uncaught + // exception in which + // case fail fast. + while (timeout >= 0) { + Thread.sleep(1000); + + if (!exceptionList.isEmpty()) { + sls.stop(); + Assert.fail("TestSLSRunner catched exception from child thread " + + "(TaskRunner.Task): " + exceptionList); + break; + } + timeout--; + } + } + +} diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSRunner.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSRunner.java index 9da8ef34a20..b2bc8d51bb1 100644 --- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSRunner.java +++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSRunner.java @@ -18,53 +18,67 @@ package org.apache.hadoop.yarn.sls; -import org.junit.Assert; +import net.jcip.annotations.NotThreadSafe; +import org.apache.hadoop.conf.Configuration; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.*; -import java.io.File; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.UUID; +import java.util.*; -public class TestSLSRunner { +/** + * This test performs simple runs of the SLS with different trace types and + * schedulers. + */ +@RunWith(value = Parameterized.class) +@NotThreadSafe +public class TestSLSRunner extends BaseSLSRunnerTest { - @Test - @SuppressWarnings("all") - public void testSimulatorRunning() throws Exception { - File tempDir = new File("target", UUID.randomUUID().toString()); - final List exceptionList = - Collections.synchronizedList(new ArrayList()); + @Parameters(name = "Testing with: {1}, {0}, (nodeFile {3})") + public static Collection data() { - Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { - @Override - public void uncaughtException(Thread t, Throwable e) { - exceptionList.add(e); - } - }); + String capScheduler = + "org.apache.hadoop.yarn.server.resourcemanager.scheduler." + + "capacity.CapacityScheduler"; + String fairScheduler = + "org.apache.hadoop.yarn.server.resourcemanager.scheduler." + + "fair.FairScheduler"; + String slsTraceFile = "src/test/resources/inputsls.json"; + String rumenTraceFile = "src/main/data/2jobs2min-rumen-jh.json"; + String synthTraceFile = "src/test/resources/syn.json"; + String nodeFile = "src/test/resources/nodes.json"; + + // Test with both schedulers, and all three load producers. + return Arrays.asList(new Object[][] { - // start the simulator - File slsOutputDir = new File(tempDir.getAbsolutePath() + "/slsoutput/"); - String args[] = new String[]{ - "-inputrumen", "src/main/data/2jobs2min-rumen-jh.json", - "-output", slsOutputDir.getAbsolutePath()}; - SLSRunner.main(args); + // covering old commandline in tests + {capScheduler, "OLD_RUMEN", rumenTraceFile, nodeFile }, + {capScheduler, "OLD_SLS", slsTraceFile, nodeFile }, - // wait for 20 seconds before stop - int count = 20; - while (count >= 0) { - Thread.sleep(1000); + // covering the no nodeFile case + {capScheduler, "SYNTH", synthTraceFile, null }, + {capScheduler, "RUMEN", rumenTraceFile, null }, + {capScheduler, "SLS", slsTraceFile, null }, - if (! exceptionList.isEmpty()) { - SLSRunner.getRunner().stop(); - Assert.fail("TestSLSRunner catched exception from child thread " + - "(TaskRunner.Task): " + exceptionList.get(0).getMessage()); - break; - } - count--; - } + // covering new commandline and CapacityScheduler + {capScheduler, "SYNTH", synthTraceFile, nodeFile }, + {capScheduler, "RUMEN", rumenTraceFile, nodeFile }, + {capScheduler, "SLS", slsTraceFile, nodeFile }, - SLSRunner.getRunner().stop(); + // covering FairScheduler + {fairScheduler, "SYNTH", synthTraceFile, nodeFile }, + {fairScheduler, "RUMEN", rumenTraceFile, nodeFile }, + {fairScheduler, "SLS", slsTraceFile, nodeFile } + }); + } + + @Test(timeout = 60000) + @SuppressWarnings("all") + public void testSimulatorRunning() throws Exception { + Configuration conf = new Configuration(false); + long timeTillShutdownInsec = 20L; + runSLS(conf, timeTillShutdownInsec); } } diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSynthJobGeneration.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSynthJobGeneration.java new file mode 100644 index 00000000000..2b1971a8ec5 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSynthJobGeneration.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.sls; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.tools.rumen.TaskAttemptInfo; +import org.apache.hadoop.yarn.sls.synthetic.SynthJob; +import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.log4j.Logger; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertTrue; + +/** + * Simple test class driving the {@code SynthTraceJobProducer}, and validating + * jobs produce are within expected range. + */ +public class TestSynthJobGeneration { + + public final static Logger LOG = + Logger.getLogger(TestSynthJobGeneration.class); + + @Test + public void test() throws IllegalArgumentException, IOException { + + Configuration conf = new Configuration(); + + conf.set(SynthTraceJobProducer.SLS_SYNTHETIC_TRACE_FILE, + "src/test/resources/syn.json"); + + SynthTraceJobProducer stjp = new SynthTraceJobProducer(conf); + + SynthJob js = (SynthJob) stjp.getNextJob(); + + int jobCount = 0; + + while (js != null) { + LOG.info((jobCount++) + " " + js.getQueueName() + " -- " + + js.getJobClass().getClassName() + " (conf: " + + js.getJobConf().get(MRJobConfig.QUEUE_NAME) + ") " + " submission: " + + js.getSubmissionTime() + ", " + " duration: " + js.getDuration() + + " numMaps: " + js.getNumberMaps() + " numReduces: " + + js.getNumberReduces()); + + validateJob(js); + js = (SynthJob) stjp.getNextJob(); + } + + Assert.assertEquals(stjp.getNumJobs(), jobCount); + } + + private void validateJob(SynthJob js) { + + assertTrue(js.getSubmissionTime() > 0); + assertTrue(js.getDuration() > 0); + assertTrue(js.getNumberMaps() >= 0); + assertTrue(js.getNumberReduces() >= 0); + assertTrue(js.getNumberMaps() + js.getNumberReduces() > 0); + assertTrue(js.getTotalSlotTime() >= 0); + + for (int i = 0; i < js.getNumberMaps(); i++) { + TaskAttemptInfo tai = js.getTaskAttemptInfo(TaskType.MAP, i, 0); + assertTrue(tai.getRuntime() > 0); + } + + for (int i = 0; i < js.getNumberReduces(); i++) { + TaskAttemptInfo tai = js.getTaskAttemptInfo(TaskType.REDUCE, i, 0); + assertTrue(tai.getRuntime() > 0); + } + + if (js.hasDeadline()) { + assertTrue(js.getDeadline() > js.getSubmissionTime() + js.getDuration()); + } + + } +} diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java index ca3d1958a3b..56aa2199a23 100644 --- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java @@ -134,7 +134,7 @@ public void testAMSimulator() throws Exception { String queue = "default"; List containers = new ArrayList<>(); app.init(1, 1000, containers, rm, null, 0, 1000000L, "user1", queue, - true, appId); + true, appId, null, 0); app.firstStep(); verifySchedulerMetrics(appId); diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/scheduler/TestTaskRunner.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/scheduler/TestTaskRunner.java index 23f2bb61677..ce6c1b30b65 100644 --- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/scheduler/TestTaskRunner.java +++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/scheduler/TestTaskRunner.java @@ -35,7 +35,7 @@ public void setUp() { } @After - public void cleanUp() { + public void cleanUp() throws InterruptedException { runner.stop(); } diff --git a/hadoop-tools/hadoop-sls/src/test/resources/capacity-scheduler.xml b/hadoop-tools/hadoop-sls/src/test/resources/capacity-scheduler.xml index 61be96ae6d4..1762265f6d8 100644 --- a/hadoop-tools/hadoop-sls/src/test/resources/capacity-scheduler.xml +++ b/hadoop-tools/hadoop-sls/src/test/resources/capacity-scheduler.xml @@ -39,6 +39,16 @@ + yarn.scheduler.capacity.root.sls_queue_1.reservable + true + + + + yarn.scheduler.capacity.root.sls_queue_1.show-reservations-as-queues + true + + + yarn.scheduler.capacity.root.sls_queue_2.capacity 25 diff --git a/hadoop-tools/hadoop-sls/src/test/resources/fair-scheduler.xml b/hadoop-tools/hadoop-sls/src/test/resources/fair-scheduler.xml index fa10359c501..7c46767737f 100644 --- a/hadoop-tools/hadoop-sls/src/test/resources/fair-scheduler.xml +++ b/hadoop-tools/hadoop-sls/src/test/resources/fair-scheduler.xml @@ -21,6 +21,7 @@ --> + drf yarn.sls.nm.memory.mb - 10240 + 100240 yarn.sls.nm.vcores - 10 + 100 yarn.sls.nm.heartbeat.interval.ms @@ -77,5 +77,5 @@ org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler org.apache.hadoop.yarn.sls.scheduler.CapacitySchedulerMetrics - + diff --git a/hadoop-tools/hadoop-sls/src/test/resources/syn.json b/hadoop-tools/hadoop-sls/src/test/resources/syn.json new file mode 100644 index 00000000000..8479d23c318 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/test/resources/syn.json @@ -0,0 +1,53 @@ +{ + "description": "tiny jobs workload", + "num_nodes": 20, + "nodes_per_rack": 4, + "num_jobs": 10, + "rand_seed": 2, + "workloads": [ + { + "workload_name": "tiny-test", + "workload_weight": 0.5, + "description": "Sort jobs", + "queue_name": "sls_queue_1", + "job_classes": [ + { + "class_name": "class_1", + "user_name": "foobar", + "class_weight": 1.0, + "mtasks_avg": 5, + "mtasks_stddev": 1, + "rtasks_avg": 5, + "rtasks_stddev": 1, + "dur_avg": 60, + "dur_stddev": 5, + "mtime_avg": 10, + "mtime_stddev": 2, + "rtime_avg": 20, + "rtime_stddev": 4, + "map_max_memory_avg": 1024, + "map_max_memory_stddev": 0.001, + "reduce_max_memory_avg": 2048, + "reduce_max_memory_stddev": 0.001, + "map_max_vcores_avg": 1, + "map_max_vcores_stddev": 0.001, + "reduce_max_vcores_avg": 2, + "reduce_max_vcores_stddev": 0.001, + "chance_of_reservation": 0.5, + "deadline_factor_avg": 10.0, + "deadline_factor_stddev": 0.001 + } + ], + "time_distribution": [ + { + "time": 1, + "weight": 100 + }, + { + "time": 60, + "jobs": 0 + } + ] + } + ] +} diff --git a/hadoop-tools/hadoop-sls/src/test/resources/yarn-site.xml b/hadoop-tools/hadoop-sls/src/test/resources/yarn-site.xml index 78aa6f2dd7a..7b2e674c8ff 100644 --- a/hadoop-tools/hadoop-sls/src/test/resources/yarn-site.xml +++ b/hadoop-tools/hadoop-sls/src/test/resources/yarn-site.xml @@ -17,7 +17,7 @@ yarn.resourcemanager.scheduler.class - org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler + org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler @@ -79,4 +79,12 @@ yarn.scheduler.fair.assignmultiple true + + + + Enable reservation system. + yarn.resourcemanager.reservation-system.enable + true + +