diff --git a/hadoop-tools/hadoop-sls/src/main/bin/slsrunForRealRM.sh b/hadoop-tools/hadoop-sls/src/main/bin/slsrunForRealRM.sh new file mode 100644 index 0000000..fac46b0 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/bin/slsrunForRealRM.sh @@ -0,0 +1,144 @@ +#!/usr/bin/env bash +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. See accompanying LICENSE file. +# + +function hadoop_usage() +{ + echo "Usage: slsrun.sh " + echo " --tracetype=" + echo " --tracelocation=" + echo " (deprecated --input-rumen= | --input-sls=)" + echo " --output-dir=" + echo " [--nodes=]" + echo " [--track-jobs=]" + echo " [--print-simulation]" +} + +function parse_args() +{ + for i in "$@"; do + case $i in + --input-rumen=*) + inputrumen=${i#*=} + ;; + --input-sls=*) + inputsls=${i#*=} + ;; + --tracetype=*) + tracetype=${i#*=} + ;; + --tracelocation=*) + tracelocation=${i#*=} + ;; + --output-dir=*) + outputdir=${i#*=} + ;; + --nodes=*) + nodes=${i#*=} + ;; + --track-jobs=*) + trackjobs=${i#*=} + ;; + --print-simulation) + printsimulation="true" + ;; + *) + hadoop_error "ERROR: Invalid option ${i}" + hadoop_exit_with_usage 1 + ;; + esac + done + + if [[ -z "${inputrumen}" && -z "${inputsls}" && -z "${tracetype}" ]] ; then + hadoop_error "ERROR: Either --input-rumen, --input-sls, or --tracetype (with --tracelocation) must be specified." + fi + + if [[ -n "${inputrumen}" && -n "${inputsls}" && -n "${tracetype}" ]] ; then + hadoop_error "ERROR: Only specify one of --input-rumen, --input-sls, or --tracetype (with --tracelocation)" + fi + + if [[ -z "${outputdir}" ]] ; then + hadoop_error "ERROR: The output directory --output-dir must be specified." + hadoop_exit_with_usage 1 + fi +} + +function calculate_classpath +{ + hadoop_add_to_classpath_tools hadoop-sls +} + +function run_simulation() { + + local args + + if [[ "${inputsls}" != "" ]] ; then + hadoop_add_param args -inputsls "-inputsls ${inputsls}" + fi + if [[ "${inputrumen}" != "" ]] ; then + hadoop_add_param args -inputrumen "-inputrumen ${inputrumen}" + fi + if [[ "${tracetype}" != "" ]] ; then + hadoop_add_param args -tracetype "-tracetype ${tracetype}" + hadoop_add_param args -tracelocation "-tracelocation ${tracelocation}" + fi + + hadoop_add_param args -output "-output ${outputdir}" + + if [[ -n "${nodes}" ]] ; then + hadoop_add_param args -nodes "-nodes ${nodes}" + fi + + if [[ -n "${trackjobs}" ]] ; then + hadoop_add_param args -trackjobs "-trackjobs ${trackjobs}" + fi + + if [[ "${printsimulation}" == "true" ]] ; then + hadoop_add_param args -printsimulation "-printsimulation" + fi + + hadoop_add_client_opts + + hadoop_finalize + # shellcheck disable=SC2086 + hadoop_java_exec sls org.apache.hadoop.yarn.sls.SLSRunnerForRealRM ${args} +} + +# let's locate libexec... +if [[ -n "${HADOOP_HOME}" ]]; then + HADOOP_DEFAULT_LIBEXEC_DIR="${HADOOP_HOME}/libexec" +else + this="${BASH_SOURCE-$0}" + bin=$(cd -P -- "$(dirname -- "${this}")" >/dev/null && pwd -P) + HADOOP_DEFAULT_LIBEXEC_DIR="${bin}/../../../../../libexec" +fi + +HADOOP_LIBEXEC_DIR="${HADOOP_LIBEXEC_DIR:-$HADOOP_DEFAULT_LIBEXEC_DIR}" +# shellcheck disable=SC2034 +HADOOP_NEW_CONFIG=true +if [[ -f "${HADOOP_LIBEXEC_DIR}/hadoop-config.sh" ]]; then + # shellcheck disable=SC1090 + . "${HADOOP_LIBEXEC_DIR}/hadoop-config.sh" +else + echo "ERROR: Cannot execute ${HADOOP_LIBEXEC_DIR}/hadoop-config.sh." 2>&1 + exit 1 +fi + +if [[ $# = 0 ]]; then + hadoop_exit_with_usage 1 +fi + +parse_args "${@}" +calculate_classpath +run_simulation diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunnerForRealRM.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunnerForRealRM.java new file mode 100644 index 0000000..94f042d --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunnerForRealRM.java @@ -0,0 +1,610 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.sls; + +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.io.Reader; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.Options; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.tools.rumen.JobTraceReader; +import org.apache.hadoop.tools.rumen.LoggedJob; +import org.apache.hadoop.tools.rumen.LoggedTask; +import org.apache.hadoop.tools.rumen.LoggedTaskAttempt; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.sls.appmaster.AMSimulator; +import org.apache.hadoop.yarn.sls.appmaster.AMSimulatorForRealRM; +import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; +import org.apache.hadoop.yarn.sls.nodemanager.NMSimulatorForRealRM; +import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; +import org.apache.hadoop.yarn.sls.scheduler.TaskRunner; +import org.apache.hadoop.yarn.sls.utils.SLSUtils; +import org.apache.log4j.Logger; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.map.ObjectMapper; + +@Private +@Unstable +public class SLSRunnerForRealRM { + // RM, Runner + private static TaskRunner amRunner = new TaskRunner(); + private static TaskRunner nmRunner = new TaskRunner(); + private String[] inputTraces; + private Configuration conf; + private Configuration yarnConf; + private Map queueAppNumMap; + + // NM simulator + private HashMap nmMap; + private int nmMemoryMB, nmVCores; + private String nodeFile; + + // AM simulator + private int AM_ID; + private Map amMap; + private Set trackedApps; + private Map amClassMap; + private static AtomicInteger remainingApps = new AtomicInteger(0); + + // metrics + private String metricsOutputDir; + private boolean printSimulation; + + // other simulation information + private int numNMs, numRacks, numAMs, numTasks; + private long maxRuntime; + public final static Map simulateInfoMap = + new HashMap(); + + public BlockingQueue noRunningAM = new LinkedBlockingQueue(); + + private YarnClient rmClient; + + // logger + public final static Logger LOG = Logger.getLogger(SLSRunner.class); + + // input traces, input-rumen or input-sls + private boolean isSLS; + + private int submitAppInteralMs = 1000 * 60; + private int submitAppInteralCount = 250; + private int parallAppMaxCount = 1000; + + public SLSRunnerForRealRM(boolean isSLS, String inputTraces[], String nodeFile, + String outputDir, Set trackedApps, + boolean printsimulation) + throws IOException, ClassNotFoundException { + this.isSLS = isSLS; + this.inputTraces = inputTraces.clone(); + this.nodeFile = nodeFile; + this.trackedApps = trackedApps; + this.printSimulation = printsimulation; + metricsOutputDir = outputDir; + + nmMap = new HashMap(); + queueAppNumMap = new HashMap(); + amMap = new HashMap(); + amClassMap = new HashMap(); + + // runner configuration + conf = new Configuration(false); + conf.addResource("sls-runner.xml"); + yarnConf = new YarnConfiguration(); + + //init YANRClient + initYARNClient(); + + // runner + int amPoolSize = conf.getInt(SLSConfiguration.RUNNER_AM_POOL_SIZE, + SLSConfiguration.RUNNER_AM_POOL_SIZE_DEFAULT); + int nmPoolSize = conf.getInt(SLSConfiguration.RUNNER_NM_POOL_SIZE,SLSConfiguration.RUNNER_NM_POOL_SIZE_DEFAULT); + + submitAppInteralMs = conf.getInt(SLSConfiguration.AM_SUBMIT_INTERVAL_MS, + SLSConfiguration.AM_SUBMIT_INTERVAL_MS_DEFAULT); + submitAppInteralCount = conf.getInt(SLSConfiguration.AM_SUBMIT_INTERVAL_COUNT, + SLSConfiguration.AM_SUBMIT_INTERVAL_COUNT_DEFAULT); + parallAppMaxCount = conf.getInt(SLSConfiguration.AM_MAX_PARALLEL_NUM,1000); + + LOG.info("submitAppInteralMs:" + submitAppInteralMs); + LOG.info("submitAppInteralCount:" + submitAppInteralCount); + LOG.info("parallAppMaxCount:" + parallAppMaxCount); + + LOG.info("amPoolSize:" + amPoolSize); + LOG.info("nmPoolSize:" + nmPoolSize); + SLSRunnerForRealRM.amRunner.setQueueSize(amPoolSize); + SLSRunnerForRealRM.nmRunner.setQueueSize(nmPoolSize); + // map + for (Map.Entry e : conf) { + String key = e.getKey().toString(); + if (key.startsWith(SLSConfiguration.AM_TYPE)) { + String amType = key.substring(SLSConfiguration.AM_TYPE.length()); + amClassMap.put(amType, Class.forName(conf.get(key))); + } + } + } + + private void initYARNClient() + { + YarnConfiguration yarnConf = new YarnConfiguration(this.yarnConf); + rmClient = YarnClient.createYarnClient(); + rmClient.init(yarnConf); + rmClient.start(); + } + + public void start() throws Exception { + // start node managers + startNM(); + nmRunner.start(); + // start application masters + startAM(); + // print out simulation info + printSimulationInfo(); + // blocked until all nodes RUNNING + // starting the runner once everything is ready to go, + waitForNodesRunning(); + amRunner.start(); + continuousSubmitApp(); + } + + private void continuousSubmitApp() { + Thread submitAppThread = new Thread(new Runnable() { + @Override + public void run() { + LOG.info("running continuousSubmitApp thread,current no Running am size:" + noRunningAM.size()); + Set appTypes = new HashSet(); + appTypes.add("YARN"); + int amPoolSize = conf.getInt(SLSConfiguration.RUNNER_AM_POOL_SIZE, + SLSConfiguration.RUNNER_AM_POOL_SIZE_DEFAULT); + try { + while (noRunningAM.size() > 0) + { + Thread.sleep(submitAppInteralMs); + LOG.info("ready continuous submit:" + submitAppInteralCount + " apps"); + int count = 0; + int currentParallAppCount = 0; + long startTimeForGetApplications = System.currentTimeMillis(); + List report = + rmClient.getApplications(); + + for(ApplicationReport app:report){ + if(app.getFinalApplicationStatus().equals(FinalApplicationStatus.UNDEFINED)){ + currentParallAppCount ++; + } + } + long getParallAppsCostTime = System.currentTimeMillis() - startTimeForGetApplications; + while ( count < submitAppInteralCount + && currentParallAppCount < parallAppMaxCount && SLSRunnerForRealRM.getAMRunner().getQueueSize() < parallAppMaxCount - amPoolSize) { + AMSimulatorForRealRM am = (AMSimulatorForRealRM) noRunningAM.poll(); + if (am != null) { + LOG.info("continuousSubmitApp sls has noRunning am size:" + + noRunningAM.size() + ",schedule am_id:" + am.getId() + + ",current_thread:" + Thread.currentThread()); + SLSRunnerForRealRM.getAMRunner().schedule(am); + } + count++; + } + LOG.info("continuous submited:" + count + " apps,currentParallAppCount:" + currentParallAppCount + + ",getParallAppsCostTime(ms):" + getParallAppsCostTime); + } + } catch (Exception ex) { + LOG.error(ex); + } + } + }); + submitAppThread.setDaemon(true); + submitAppThread.start(); + } + private void startNM() throws YarnException, IOException, InterruptedException { + // nm configuration + LOG.info("startNM begin ---------"); + nmMemoryMB = conf.getInt(SLSConfiguration.NM_MEMORY_MB, + SLSConfiguration.NM_MEMORY_MB_DEFAULT); + nmVCores = conf.getInt(SLSConfiguration.NM_VCORES, + SLSConfiguration.NM_VCORES_DEFAULT); + int heartbeatInterval = conf.getInt( + SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS, + SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS_DEFAULT); + // nm information (fetch from topology file, or from sls/rumen json file) + Set nodeSet = new HashSet(); + if (nodeFile.isEmpty()) { + if (isSLS) { + for (String inputTrace : inputTraces) { + nodeSet.addAll(SLSUtils.parseNodesFromSLSTrace(inputTrace)); + } + } else { + for (String inputTrace : inputTraces) { + nodeSet.addAll(SLSUtils.parseNodesFromRumenTrace(inputTrace)); + } + } + + } else { + nodeSet.addAll(SLSUtils.parseNodesFromNodeFile(nodeFile)); + } + // create NM simulators + Random random = new Random(); + Set rackSet = new HashSet(); + for (String hostName : nodeSet) { + // we randomize the heartbeat start time from zero to 1 interval + NMSimulatorForRealRM nm = new NMSimulatorForRealRM(); + nm.init(hostName, nmMemoryMB, nmVCores, + random.nextInt(heartbeatInterval), heartbeatInterval, yarnConf); + nmMap.put(nm.getNode().getNodeID(), nm); + rackSet.add(nm.getNode().getRackName()); + nmRunner.schedule(nm); + } + numRacks = rackSet.size(); + numNMs = nmMap.size(); + LOG.info("startNM end ---------"); + } + + private void waitForNodesRunning() throws InterruptedException, YarnException, IOException { + long startTimeMS = System.currentTimeMillis(); + while (true) { + List nodes = rmClient.getNodeReports(NodeState.RUNNING); + int numRunningNodes = nodes==null ? 0:nodes.size(); + if (numRunningNodes >= numNMs) { + break; + } + LOG.info(MessageFormat.format("SLSRunner is waiting for all " + + "nodes RUNNING. {0} of {1} NMs initialized.", + numRunningNodes, numNMs)); + Thread.sleep(1000); + } + LOG.info(MessageFormat.format("SLSRunner takes {0} ms to launch all nodes.", + (System.currentTimeMillis() - startTimeMS))); + } + + @SuppressWarnings("unchecked") + private void startAM() throws YarnException, IOException { + // application/container configuration + LOG.info("startAM begin --------------"); + int heartbeatInterval = conf.getInt( + SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS, + SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS_DEFAULT); + + int parallelInitNum = this.conf.getInt(SLSConfiguration.AM_INIT_PARALLEL_NUM, 1000); + + int containerMemoryMB = conf.getInt(SLSConfiguration.CONTAINER_MEMORY_MB, + SLSConfiguration.CONTAINER_MEMORY_MB_DEFAULT); + int containerVCores = conf.getInt(SLSConfiguration.CONTAINER_VCORES, + SLSConfiguration.CONTAINER_VCORES_DEFAULT); + Resource containerResource = + BuilderUtils.newResource(containerMemoryMB, containerVCores); + + // application workload + if (isSLS) { + startAMFromSLSTraces(containerResource, heartbeatInterval,parallelInitNum); + } else { + startAMFromRumenTraces(containerResource, heartbeatInterval); + } + numAMs = amMap.size(); + remainingApps.set(numAMs); + LOG.info("startAM end --------------"); + } + + /** + * parse workload information from sls trace files + */ + @SuppressWarnings("unchecked") + private void startAMFromSLSTraces(Resource containerResource, + int heartbeatInterval, + int parallelInitNum) throws IOException { + // parse from sls traces + JsonFactory jsonF = new JsonFactory(); + ObjectMapper mapper = new ObjectMapper(); + for (String inputTrace : inputTraces) { + Reader input = new FileReader(inputTrace); + try { + Iterator i = mapper.readValues(jsonF.createJsonParser(input), + Map.class); + while (i.hasNext()) { + Map jsonJob = i.next(); + + // load job information + long jobStartTime = Long.parseLong( + jsonJob.get("job.start.ms").toString()); + long jobFinishTime = Long.parseLong( + jsonJob.get("job.end.ms").toString()); + + String user = (String) jsonJob.get("job.user"); + if (user == null) user = "default"; + String queue = jsonJob.get("job.queue.name").toString(); + + String oldAppId = jsonJob.get("job.id").toString(); + boolean isTracked = trackedApps.contains(oldAppId); + int queueSize = queueAppNumMap.containsKey(queue) ? + queueAppNumMap.get(queue) : 0; + queueSize ++; + queueAppNumMap.put(queue, queueSize); + // tasks + List tasks = (List) jsonJob.get("job.tasks"); + if (tasks == null || tasks.size() == 0) { + continue; + } + List containerList = + new ArrayList(); + for (Object o : tasks) { + Map jsonTask = (Map) o; + String hostname = jsonTask.get("container.host").toString(); + long taskStart = Long.parseLong( + jsonTask.get("container.start.ms").toString()); + long taskFinish = Long.parseLong( + jsonTask.get("container.end.ms").toString()); + long lifeTime = taskFinish - taskStart; + int priority = Integer.parseInt( + jsonTask.get("container.priority").toString()); + String type = jsonTask.get("container.type").toString(); + containerList.add(new ContainerSimulator(containerResource, + lifeTime, hostname, priority, type)); + } + + // create a new AM + String amType = jsonJob.get("am.type").toString(); + AMSimulatorForRealRM amSim = (AMSimulatorForRealRM) ReflectionUtils.newInstance( + amClassMap.get(amType), new Configuration()); + if (amSim != null) { + amSim.init(AM_ID++, heartbeatInterval, containerList, yarnConf, + this, jobStartTime, jobFinishTime, user, queue, + isTracked, oldAppId); + if (this.AM_ID <= parallelInitNum) + amRunner.schedule(amSim); + else + this.noRunningAM.add(amSim); + maxRuntime = Math.max(maxRuntime, jobFinishTime); + numTasks += containerList.size(); + amMap.put(oldAppId, amSim); + } + } + } finally { + input.close(); + } + } + } + + /** + * parse workload information from rumen trace files + */ + @SuppressWarnings("unchecked") + private void startAMFromRumenTraces(Resource containerResource, + int heartbeatInterval) + throws IOException { + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", "file:///"); + long baselineTimeMS = 0; + for (String inputTrace : inputTraces) { + File fin = new File(inputTrace); + JobTraceReader reader = new JobTraceReader( + new Path(fin.getAbsolutePath()), conf); + try { + LoggedJob job = null; + while ((job = reader.getNext()) != null) { + // only support MapReduce currently + String jobType = "mapreduce"; + String user = job.getUser() == null ? + "default" : job.getUser().getValue(); + String jobQueue = job.getQueue().getValue(); + String oldJobId = job.getJobID().toString(); + long jobStartTimeMS = job.getSubmitTime(); + long jobFinishTimeMS = job.getFinishTime(); + if (baselineTimeMS == 0) { + baselineTimeMS = jobStartTimeMS; + } + jobStartTimeMS -= baselineTimeMS; + jobFinishTimeMS -= baselineTimeMS; + if (jobStartTimeMS < 0) { + LOG.warn("Warning: reset job " + oldJobId + " start time to 0."); + jobFinishTimeMS = jobFinishTimeMS - jobStartTimeMS; + jobStartTimeMS = 0; + } + + boolean isTracked = trackedApps.contains(oldJobId); + int queueSize = queueAppNumMap.containsKey(jobQueue) ? + queueAppNumMap.get(jobQueue) : 0; + queueSize ++; + queueAppNumMap.put(jobQueue, queueSize); + + List containerList = + new ArrayList(); + // map tasks + for(LoggedTask mapTask : job.getMapTasks()) { + LoggedTaskAttempt taskAttempt = mapTask.getAttempts() + .get(mapTask.getAttempts().size() - 1); + String hostname = taskAttempt.getHostName().getValue(); + long containerLifeTime = taskAttempt.getFinishTime() + - taskAttempt.getStartTime(); + containerList.add(new ContainerSimulator(containerResource, + containerLifeTime, hostname, 10, "map")); + } + + // reduce tasks + for(LoggedTask reduceTask : job.getReduceTasks()) { + LoggedTaskAttempt taskAttempt = reduceTask.getAttempts() + .get(reduceTask.getAttempts().size() - 1); + String hostname = taskAttempt.getHostName().getValue(); + long containerLifeTime = taskAttempt.getFinishTime() + - taskAttempt.getStartTime(); + containerList.add(new ContainerSimulator(containerResource, + containerLifeTime, hostname, 20, "reduce")); + } + + // create a new AM + AMSimulatorForRealRM amSim = (AMSimulatorForRealRM) ReflectionUtils.newInstance( + amClassMap.get(jobType), conf); + if (amSim != null) { + amSim.init(AM_ID ++, heartbeatInterval, containerList, + null, this, jobStartTimeMS, jobFinishTimeMS, user, jobQueue, + isTracked, oldJobId); + amRunner.schedule(amSim); + maxRuntime = Math.max(maxRuntime, jobFinishTimeMS); + numTasks += containerList.size(); + amMap.put(oldJobId, amSim); + } + } + } finally { + reader.close(); + } + } + } + + private void printSimulationInfo() { + if (printSimulation) { + // node + LOG.info("------------------------------------"); + LOG.info(MessageFormat.format("# nodes = {0}, # racks = {1}, capacity " + + "of each node {2} MB memory and {3} vcores.", + numNMs, numRacks, nmMemoryMB, nmVCores)); + LOG.info("------------------------------------"); + // job + LOG.info(MessageFormat.format("# applications = {0}, # total " + + "tasks = {1}, average # tasks per application = {2}", + numAMs, numTasks, (int)(Math.ceil((numTasks + 0.0) / numAMs)))); + LOG.info("JobId\tQueue\tAMType\tDuration\t#Tasks"); + for (Map.Entry entry : amMap.entrySet()) { + AMSimulatorForRealRM am = entry.getValue(); + LOG.info(entry.getKey() + "\t" + am.getQueue() + "\t" + am.getAMType() + + "\t" + am.getDuration() + "\t" + am.getNumTasks()); + } + LOG.info("------------------------------------"); + // queue + LOG.info(MessageFormat.format("number of queues = {0} average " + + "number of apps = {1}", queueAppNumMap.size(), + (int)(Math.ceil((numAMs + 0.0) / queueAppNumMap.size())))); + LOG.info("------------------------------------"); + // runtime + LOG.info(MessageFormat.format("estimated simulation time is {0}" + + " seconds", (long)(Math.ceil(maxRuntime / 1000.0)))); + LOG.info("------------------------------------"); + } + // package these information in the simulateInfoMap used by other places + simulateInfoMap.put("Number of racks", numRacks); + simulateInfoMap.put("Number of nodes", numNMs); + simulateInfoMap.put("Node memory (MB)", nmMemoryMB); + simulateInfoMap.put("Node VCores", nmVCores); + simulateInfoMap.put("Number of applications", numAMs); + simulateInfoMap.put("Number of tasks", numTasks); + simulateInfoMap.put("Average tasks per applicaion", + (int)(Math.ceil((numTasks + 0.0) / numAMs))); + simulateInfoMap.put("Number of queues", queueAppNumMap.size()); + simulateInfoMap.put("Average applications per queue", + (int)(Math.ceil((numAMs + 0.0) / queueAppNumMap.size()))); + simulateInfoMap.put("Estimated simulate time (s)", + (long)(Math.ceil(maxRuntime / 1000.0))); + } + + public HashMap getNmMap() { + return nmMap; + } + + public static TaskRunner getAMRunner() { + return amRunner; + } + + public static void decreaseRemainingApps() { + remainingApps.decrementAndGet(); + if (remainingApps.get() <= 0) { + LOG.info("All apps finished,so SLSRunner tears down."); + System.exit(0); + } + } + + public static void main(String args[]) throws Exception { + Options options = new Options(); + options.addOption("inputrumen", true, "input rumen files"); + options.addOption("inputsls", true, "input sls files"); + options.addOption("nodes", true, "input topology"); + options.addOption("output", true, "output directory"); + options.addOption("trackjobs", true, + "jobs to be tracked during simulating"); + options.addOption("printsimulation", false, + "print out simulation information"); + + CommandLineParser parser = new GnuParser(); + CommandLine cmd = parser.parse(options, args); + + String inputRumen = cmd.getOptionValue("inputrumen"); + String inputSLS = cmd.getOptionValue("inputsls"); + String output = cmd.getOptionValue("output"); + + if ((inputRumen == null && inputSLS == null) || output == null) { + System.err.println(); + System.err.println("ERROR: Missing input or output file"); + System.err.println(); + System.err.println("Options: -inputrumen|-inputsls FILE,FILE... " + + "-output FILE [-nodes FILE] [-trackjobs JobId,JobId...] " + + "[-printsimulation]"); + System.err.println(); + System.exit(1); + } + + File outputFile = new File(output); + if (! outputFile.exists() + && ! outputFile.mkdirs()) { + System.err.println("ERROR: Cannot create output directory " + + outputFile.getAbsolutePath()); + System.exit(1); + } + + Set trackedJobSet = new HashSet(); + if (cmd.hasOption("trackjobs")) { + String trackjobs = cmd.getOptionValue("trackjobs"); + String jobIds[] = trackjobs.split(","); + trackedJobSet.addAll(Arrays.asList(jobIds)); + } + + String nodeFile = cmd.hasOption("nodes") ? cmd.getOptionValue("nodes") : ""; + + boolean isSLS = inputSLS != null; + String inputFiles[] = isSLS ? inputSLS.split(",") : inputRumen.split(","); + SLSRunnerForRealRM sls = new SLSRunnerForRealRM(isSLS, inputFiles, nodeFile, output, + trackedJobSet, cmd.hasOption("printsimulation")); + sls.start(); + } +} diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulatorForRealRM.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulatorForRealRM.java new file mode 100644 index 0000000..763c523 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulatorForRealRM.java @@ -0,0 +1,562 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sls.appmaster; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.PrivilegedExceptionAction; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.SaslRpcServer; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.sls.SLSRunnerForRealRM; +import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; +import org.apache.hadoop.yarn.sls.scheduler.TaskRunner; +import org.apache.hadoop.yarn.sls.utils.SLSUtils; +import org.apache.hadoop.yarn.util.Records; +import org.apache.log4j.Logger; + +@Private +@Unstable +public abstract class AMSimulatorForRealRM extends TaskRunner.Task { + // resource manager + // protected ResourceManager rm; + // main + protected SLSRunnerForRealRM se; + // application + protected ApplicationId appId; + protected ApplicationAttemptId appAttemptId; + protected String oldAppId; // jobId from the jobhistory file + // record factory + protected final static RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + // response queue + protected final BlockingQueue responseQueue; + protected int RESPONSE_ID = 1; + // user name + protected String user; + // queue name + protected String queue; + // am type + protected String amtype; + // job start/end time + protected long traceStartTimeMS; + protected long traceFinishTimeMS; + protected long simulateStartTimeMS; + protected long simulateFinishTimeMS; + // whether tracked in Metrics + protected boolean isTracked; + // progress + protected int totalContainers; + protected int finishedContainers; + + protected int id; + + protected final Logger LOG = Logger.getLogger(AMSimulator.class); + + private static final long AM_STATE_WAIT_TIMEOUT_MS = 60 * 60 * 1000; + + protected YarnClient yarnClient; + + protected ApplicationMasterProtocol amRMClient; + + protected UserGroupInformation ugi = null; + + protected UserGroupInformation clientUGI = null; + + protected Configuration conf = null; + protected Configuration amConf = null; + + protected boolean isAMRegistered = false; + + public AMSimulatorForRealRM() { + this.responseQueue = new LinkedBlockingQueue(); + } + + public void init(int id, int heartbeatInterval, + List containerList, final Configuration conf, SLSRunnerForRealRM se, + long traceStartTime, long traceFinishTime, String user, String queue, + boolean isTracked, String oldAppId) { + super.init(traceStartTime, traceStartTime + 1000000L * heartbeatInterval, + heartbeatInterval); + this.id = id; + this.user = user; + this.se = se; + this.user = user; + this.queue = queue; + this.oldAppId = oldAppId; + this.isTracked = isTracked; + this.traceStartTimeMS = traceStartTime; + this.traceFinishTimeMS = traceFinishTime; + + this.conf = conf; + + this.amConf = new Configuration(this.conf); + amConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + SaslRpcServer.AuthMethod.TOKEN.toString()); + + clientUGI = UserGroupInformation.createRemoteUser(user); + try { + clientUGI.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() { + yarnClient = YarnClient.createYarnClient(); + yarnClient.init(conf); + return null; + } + }); + } catch (Exception e) { + LOG.error("create yarn client failed", e); + } + } + + public int getId() + { + return this.id; + } + /** + * register with RM + */ + @Override + public void firstStep() throws Exception { + simulateStartTimeMS = + System.currentTimeMillis() - SLSRunnerForRealRM.getAMRunner().getStartTimeMS(); + + // submit application, waiting until ACCEPTED + try{ + submitApp(); + }catch (Exception e){ + e.printStackTrace(); + SLSRunnerForRealRM.decreaseRemainingApps(); + throw e; + } + } + + @Override + public void middleStep() throws Exception { + // process responses in the queue + long begin = System.currentTimeMillis(); + + try{ + // register application master + if(this.isAMRegistered == false){ + registerAM(); + }else{ + processResponseQueue(); + + // send out request + sendContainerRequest(); + + // check whether finish + checkStop(); + } + }catch (Exception e){ + e.printStackTrace(); + SLSRunnerForRealRM.decreaseRemainingApps(); + throw e; + } + + + + + } + + @Override + public void lastStep() throws Exception { + try{ + LOG.info(MessageFormat.format("Application {0} is shutting down.", appId)); + // unregister application master + final FinishApplicationMasterRequest finishAMRequest = + recordFactory.newRecordInstance(FinishApplicationMasterRequest.class); + finishAMRequest.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); + + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + amRMClient.finishApplicationMaster(finishAMRequest); + return null; + } + }); + // Monitor the application for end state + ApplicationReport appReport = + monitorApplication(appId, EnumSet.of(YarnApplicationState.KILLED, + YarnApplicationState.FAILED, YarnApplicationState.FINISHED)); + LOG.info("app finsihed:" + appId + " last state:" + + appReport.getFinalApplicationStatus()); + simulateFinishTimeMS = + System.currentTimeMillis() - SLSRunnerForRealRM.getAMRunner().getStartTimeMS(); + if (yarnClient != null) + yarnClient.stop(); + if (amRMClient != null) + RPC.stopProxy(this.amRMClient); + LOG.info("app :" + appId + " all rpc client is stop"); + SLSRunnerForRealRM.decreaseRemainingApps(); + }catch (Exception e){ + e.printStackTrace(); + SLSRunnerForRealRM.decreaseRemainingApps(); + throw e; + } + + } + + protected ResourceRequest createResourceRequest(Resource resource, + String host, int priority, int numContainers) { + ResourceRequest request = + recordFactory.newRecordInstance(ResourceRequest.class); + request.setCapability(resource); + request.setResourceName(host); + request.setNumContainers(numContainers); + Priority prio = recordFactory.newRecordInstance(Priority.class); + prio.setPriority(priority); + request.setPriority(prio); + return request; + } + + protected AllocateRequest createAllocateRequest(List ask, + List toRelease) { + AllocateRequest allocateRequest = + recordFactory.newRecordInstance(AllocateRequest.class); + allocateRequest.setResponseId(RESPONSE_ID++); + allocateRequest.setAskList(ask); + allocateRequest.setReleaseList(toRelease); + return allocateRequest; + } + + protected AllocateRequest createAllocateRequest(List ask) { + return createAllocateRequest(ask, new ArrayList()); + } + + protected abstract void processResponseQueue() throws Exception; + + protected abstract void sendContainerRequest() throws Exception; + + protected abstract void checkStop(); + + private void submitApp() + throws YarnException, InterruptedException, IOException { + // ask for new application + clientUGI.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() { + yarnClient.start(); + return null; + } + }); + if(LOG.isDebugEnabled()) + LOG.debug("AM_ID:" + id + " ,yarnclient started."); + final ApplicationSubmissionContext appSubContext = + yarnClient.createApplication().getApplicationSubmissionContext(); + appId = appSubContext.getApplicationId(); + if(LOG.isDebugEnabled()) + LOG.debug( + "AM_ID:" + id + " , create application successfully, appID:" + appId); + // submit the application + appSubContext.setMaxAppAttempts(1); + appSubContext.setQueue(queue); + appSubContext.setPriority(Priority.newInstance(0)); + ContainerLaunchContext conLauContext = + Records.newRecord(ContainerLaunchContext.class); + conLauContext + .setApplicationACLs(new HashMap()); + conLauContext.setCommands(new ArrayList()); + conLauContext.setEnvironment(new HashMap()); + conLauContext.setLocalResources(new HashMap()); + conLauContext.setServiceData(new HashMap()); + appSubContext.setAMContainerSpec(conLauContext); + appSubContext.setUnmanagedAM(true); + + clientUGI.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws YarnException, IOException { + yarnClient.submitApplication(appSubContext); + return null; + } + }); + + LOG.info(MessageFormat.format("Submit a new application {0}", appId)); + + // waiting until application ACCEPTED + ApplicationReport appReport = + monitorApplication(appId, + EnumSet.of(YarnApplicationState.ACCEPTED, + YarnApplicationState.KILLED, YarnApplicationState.FAILED, + YarnApplicationState.FINISHED)); + + LOG.info(MessageFormat.format("application {0} is ACCEPTED ", appId)); + if (appReport.getYarnApplicationState() == YarnApplicationState.ACCEPTED) { + // Monitor the application attempt to wait for launch state + ApplicationAttemptReport attemptReport = + monitorCurrentAppAttempt(appId, YarnApplicationAttemptState.LAUNCHED); + ApplicationAttemptId attemptId = attemptReport.getApplicationAttemptId(); + LOG.info("Launching AM with application attempt id " + attemptId); + // generate credentials ugi + Token token = + yarnClient.getAMRMToken(attemptId.getApplicationId()); + ugi = UserGroupInformation.createRemoteUser(user); + Credentials credentials = new Credentials(); + credentials.addToken(token.getService(), token); + ugi.addCredentials(credentials); + new Thread(new AMRMClientCreater(), appId + "-AMRMClientCreater").start(); + } + } + + public class AMRMClientCreater implements Runnable{ + + @Override public void run() { + ApplicationMasterProtocol client = null; + while(true){ + try { + if(LOG.isDebugEnabled()) + LOG.debug(MessageFormat.format("application {0} create am proxy rpc beginning", appId)); + client = + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public ApplicationMasterProtocol run() throws Exception { + return ClientRMProxy.createRMProxy(amConf, + ApplicationMasterProtocol.class); + } + }); + amRMClient = client; + if(LOG.isDebugEnabled()) + LOG.debug(MessageFormat.format("application {0} create am proxy rpc successfully", appId)); + break; + } catch (IOException e) { + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + } + + /** + * Monitor the submitted application for completion. Kill application if time + * expires. + * + * @param appId Application Id of application to be monitored + * @return true if application completed successfully + * @throws YarnException + * @throws IOException + */ + private ApplicationReport monitorApplication(ApplicationId appId, + Set finalState) throws YarnException, IOException { + + while (true) { + // Check app status every 1 second. + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.debug("Thread sleep in monitoring loop interrupted"); + } + + // Get application report for the appId we are interested in + ApplicationReport report = yarnClient.getApplicationReport(appId); + + YarnApplicationState state = report.getYarnApplicationState(); + if (finalState.contains(state)) { + LOG.info("Got application report from ASM for" + ", appId=" + + appId.getId() + ", appAttemptId=" + + report.getCurrentApplicationAttemptId() + ", clientToAMToken=" + + report.getClientToAMToken() + ", appDiagnostics=" + + report.getDiagnostics() + ", appMasterHost=" + report.getHost() + + ", appQueue=" + report.getQueue() + ", appMasterRpcPort=" + + report.getRpcPort() + ", appStartTime=" + report.getStartTime() + + ", yarnAppState=" + report.getYarnApplicationState().toString() + + ", distributedFinalState=" + + report.getFinalApplicationStatus().toString() + ", appTrackingUrl=" + + report.getTrackingUrl() + ", appUser=" + report.getUser()); + + return report; + } + } + } + + private ApplicationAttemptReport monitorCurrentAppAttempt(ApplicationId appId, + YarnApplicationAttemptState attemptState) + throws YarnException, IOException { + long startTime = System.currentTimeMillis(); + ApplicationAttemptId attemptId = null; + while (true) { + if (attemptId == null) { + attemptId = yarnClient.getApplicationReport(appId) + .getCurrentApplicationAttemptId(); + } + ApplicationAttemptReport attemptReport = null; + if (attemptId != null) { + try{ + attemptReport = yarnClient.getApplicationAttemptReport(attemptId); + }catch (Exception e){ + + } + + if (attemptReport != null && attemptState + .equals(attemptReport.getYarnApplicationAttemptState())) { + return attemptReport; + } + } + if(attemptReport != null){ + LOG.info("Current attempt state of " + appId + " is " + + (attemptReport == null ? " N/A " + : attemptReport.getYarnApplicationAttemptState()) + + ", waiting for current attempt to reach " + attemptState); + } + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting for current attempt of " + appId + + " to reach " + attemptState); + } + if (System.currentTimeMillis() - startTime > AM_STATE_WAIT_TIMEOUT_MS) { + String errmsg = "Timeout for waiting current attempt of " + appId + + " to reach " + attemptState; + LOG.error(errmsg); + throw new RuntimeException(errmsg); + } + } + } + + private void registerAM() + throws YarnException, IOException, InterruptedException { + // register application master + if(amRMClient == null) return; + + final RegisterApplicationMasterRequest amRegisterRequest = + Records.newRecord(RegisterApplicationMasterRequest.class); + amRegisterRequest.setHost("localhost"); + amRegisterRequest.setRpcPort(-1); + amRegisterRequest.setTrackingUrl("localhost:-1"); + LOG.info(MessageFormat + .format("beging Register the application master for application {0}", appId)); + ugi.doAs( + new PrivilegedExceptionAction() { + @Override + public RegisterApplicationMasterResponse run() throws Exception { + return amRMClient.registerApplicationMaster(amRegisterRequest); + } + }); + LOG.info(MessageFormat + .format("Register the application master for application {0}", appId)); + this.isAMRegistered = true; + } + + protected List packageRequests( + List csList, int priority) { + // create requests + Map rackLocalRequestMap = + new HashMap(); + Map nodeLocalRequestMap = + new HashMap(); + ResourceRequest anyRequest = null; + for (ContainerSimulator cs : csList) { + String rackHostNames[] = SLSUtils.getRackHostName(cs.getHostname()); + // check rack local + String rackname = rackHostNames[0]; + if (rackLocalRequestMap.containsKey(rackname)) { + rackLocalRequestMap.get(rackname).setNumContainers( + rackLocalRequestMap.get(rackname).getNumContainers() + 1); + } else { + ResourceRequest request = + createResourceRequest(cs.getResource(), rackname, priority, 1); + rackLocalRequestMap.put(rackname, request); + } + // check node local + String hostname = rackHostNames[1]; + if (nodeLocalRequestMap.containsKey(hostname)) { + nodeLocalRequestMap.get(hostname).setNumContainers( + nodeLocalRequestMap.get(hostname).getNumContainers() + 1); + } else { + ResourceRequest request = + createResourceRequest(cs.getResource(), hostname, priority, 1); + nodeLocalRequestMap.put(hostname, request); + } + // any + if (anyRequest == null) { + anyRequest = createResourceRequest(cs.getResource(), + ResourceRequest.ANY, priority, 1); + } else { + anyRequest.setNumContainers(anyRequest.getNumContainers() + 1); + } + } + List ask = new ArrayList(); + ask.addAll(nodeLocalRequestMap.values()); + ask.addAll(rackLocalRequestMap.values()); + if (anyRequest != null) { + ask.add(anyRequest); + } + return ask; + } + + public String getQueue() { + return queue; + } + + public String getAMType() { + return amtype; + } + + public long getDuration() { + return simulateFinishTimeMS - simulateStartTimeMS; + } + + public int getNumTasks() { + return totalContainers; + } +} diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/UnManagedMRAMSimulatorForRealRM.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/UnManagedMRAMSimulatorForRealRM.java new file mode 100644 index 0000000..b60cbdf --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/UnManagedMRAMSimulatorForRealRM.java @@ -0,0 +1,310 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sls.appmaster; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.sls.SLSRunner; +import org.apache.hadoop.yarn.sls.SLSRunnerForRealRM; +import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +@Private +@Unstable +public class UnManagedMRAMSimulatorForRealRM extends AMSimulatorForRealRM { + /* + * Vocabulary Used: pending -> requests which are NOT yet sent to RM scheduled + * -> requests which are sent to RM but not yet assigned assigned -> requests + * which are assigned to a container completed -> request corresponding to + * which container has completed + * + * Maps are scheduled as soon as their requests are received. Reduces are + * scheduled when all maps have finished (not support slow-start currently). + */ + + private static final int PRIORITY_REDUCE = 10; + private static final int PRIORITY_MAP = 20; + + // pending maps + private LinkedList pendingMaps = + new LinkedList(); + + // pending failed maps + private LinkedList pendingFailedMaps = + new LinkedList(); + + // scheduled maps + private LinkedList scheduledMaps = + new LinkedList(); + + // assigned maps + private Map assignedMaps = + new HashMap(); + + // reduces which are not yet scheduled + private LinkedList pendingReduces = + new LinkedList(); + + // pending failed reduces + private LinkedList pendingFailedReduces = + new LinkedList(); + + // scheduled reduces + private LinkedList scheduledReduces = + new LinkedList(); + + // assigned reduces + private Map assignedReduces = + new HashMap(); + + // all maps & reduces + private LinkedList allMaps = + new LinkedList(); + private LinkedList allReduces = + new LinkedList(); + + // counters + private int mapFinished = 0; + private int mapTotal = 0; + private int reduceFinished = 0; + private int reduceTotal = 0; + // finished + private boolean isFinished = false; + + public final Logger LOG = Logger.getLogger(UnManagedMRAMSimulatorForRealRM.class); + + public void init(int id, int heartbeatInterval, + List containerList, Configuration conf, SLSRunnerForRealRM se, + long traceStartTime, long traceFinishTime, String user, String queue, + boolean isTracked, String oldAppId) { + super.init(id, heartbeatInterval, containerList, conf, se, traceStartTime, + traceFinishTime, user, queue, isTracked, oldAppId); + amtype = "mapreduce"; + + // get map/reduce tasks + for (ContainerSimulator cs : containerList) { + if (cs.getType().equals("map")) { + cs.setPriority(PRIORITY_MAP); + pendingMaps.add(cs); + } else if (cs.getType().equals("reduce")) { + cs.setPriority(PRIORITY_REDUCE); + pendingReduces.add(cs); + } + } + allMaps.addAll(pendingMaps); + allReduces.addAll(pendingReduces); + mapTotal = pendingMaps.size(); + reduceTotal = pendingReduces.size(); + totalContainers = mapTotal + reduceTotal; + } + + @Override + public void firstStep() throws Exception { + + long begin = System.currentTimeMillis(); + + super.firstStep(); + + } + + @Override + @SuppressWarnings("unchecked") + protected void processResponseQueue() + throws InterruptedException, YarnException, IOException { + + while (!responseQueue.isEmpty()) { + AllocateResponse response = responseQueue.take(); + + // check completed containers + if (!response.getCompletedContainersStatuses().isEmpty()) { + for (ContainerStatus cs : response.getCompletedContainersStatuses()) { + ContainerId containerId = cs.getContainerId(); + if (cs.getExitStatus() == ContainerExitStatus.SUCCESS) { + if (assignedMaps.containsKey(containerId)) { + if (LOG.isDebugEnabled()) + LOG.debug(MessageFormat.format( + "Application {0} has one" + "mapper finished ({1}).", appId, + containerId)); + assignedMaps.remove(containerId); + mapFinished++; + finishedContainers++; + } else if (assignedReduces.containsKey(containerId)) { + if (LOG.isDebugEnabled()) + LOG.debug(MessageFormat.format( + "Application {0} has one" + "reducer finished ({1}).", + appId, containerId)); + assignedReduces.remove(containerId); + reduceFinished++; + finishedContainers++; + } + } else { + // container to be killed + if (assignedMaps.containsKey(containerId)) { + if(LOG.isDebugEnabled()) + LOG.debug(MessageFormat.format( + "Application {0} has one " + "mapper ({1}) exitcode({2}).", appId, + containerId,cs.getExitStatus())); + pendingFailedMaps.add(assignedMaps.remove(containerId)); + } else if (assignedReduces.containsKey(containerId)) { + if(LOG.isDebugEnabled()) + LOG.debug(MessageFormat.format( + "Application {0} has one " + "reducer ({1}) exitcode({2}).", appId, + containerId,cs.getExitStatus())); + pendingFailedReduces.add(assignedReduces.remove(containerId)); + } + } + } + } + + // check finished + if ((mapFinished == mapTotal) + && (reduceFinished == reduceTotal)) { + isFinished = true; + return; + } + + // check allocated containers + for (Container container : response.getAllocatedContainers()) { + if (!scheduledMaps.isEmpty()) { + ContainerSimulator cs = scheduledMaps.remove(); + if (LOG.isDebugEnabled()) + LOG.debug(MessageFormat.format( + "Application {0} starts a " + "launch a mapper ({1}).", appId, + container.getId())); + assignedMaps.put(container.getId(), cs); + se.getNmMap().get(container.getNodeId()).addNewContainer(container, + cs.getLifeTime()); + } else if (!this.scheduledReduces.isEmpty()) { + ContainerSimulator cs = scheduledReduces.remove(); + if (LOG.isDebugEnabled()) + LOG.debug(MessageFormat.format( + "Application {0} starts a " + "launch a reducer ({1}).", appId, + container.getId())); + assignedReduces.put(container.getId(), cs); + se.getNmMap().get(container.getNodeId()).addNewContainer(container, + cs.getLifeTime()); + } + } + } + } + + @Override + protected void sendContainerRequest() + throws YarnException, IOException, InterruptedException { + if (isFinished) { + return; + } + // send out request + List ask = null; + if (mapFinished != mapTotal) { + // map phase + if (!pendingMaps.isEmpty()) { + ask = packageRequests(pendingMaps, PRIORITY_MAP); + if (LOG.isDebugEnabled()) + LOG.debug(MessageFormat + .format("Application {0} sends out " + "request for {1} mappers.", + appId, pendingMaps.size())); + scheduledMaps.addAll(pendingMaps); + pendingMaps.clear(); + } else if (!pendingFailedMaps.isEmpty() && scheduledMaps.isEmpty()) { + ask = packageRequests(pendingFailedMaps, PRIORITY_MAP); + if (LOG.isDebugEnabled()) + LOG.debug(MessageFormat.format( + "Application {0} sends out " + "requests for {1} failed mappers.", + appId, pendingFailedMaps.size())); + scheduledMaps.addAll(pendingFailedMaps); + pendingFailedMaps.clear(); + } + } else if (reduceFinished != reduceTotal) { + // reduce phase + if (!pendingReduces.isEmpty()) { + ask = packageRequests(pendingReduces, PRIORITY_REDUCE); + if (LOG.isDebugEnabled()) + LOG.debug(MessageFormat.format( + "Application {0} sends out " + "requests for {1} reducers.", + appId, pendingReduces.size())); + scheduledReduces.addAll(pendingReduces); + pendingReduces.clear(); + } else if (!pendingFailedReduces.isEmpty() && scheduledReduces + .isEmpty()) { + ask = packageRequests(pendingFailedReduces, PRIORITY_REDUCE); + if (LOG.isDebugEnabled()) + LOG.debug(MessageFormat.format( + "Application {0} sends out " + "request for {1} failed reducers.", + appId, pendingFailedReduces.size())); + scheduledReduces.addAll(pendingFailedReduces); + pendingFailedReduces.clear(); + } + } + + if (ask == null) { + ask = new ArrayList(); + } + + final AllocateRequest request = createAllocateRequest(ask); + if (totalContainers == 0) { + request.setProgress(1.0f); + } else { + request.setProgress((float) finishedContainers / totalContainers); + } + while (true) { + try { + AllocateResponse response = + ugi.doAs(new PrivilegedExceptionAction() { + @Override public AllocateResponse run() throws Exception { + return amRMClient.allocate(request); + } + }); + if (response != null) { + responseQueue.put(response); + } + break; + } catch (Exception e) { + e.printStackTrace(); + } + } + + } + + @Override + protected void checkStop() { + if (isFinished) { + super.setEndTime(System.currentTimeMillis()); + } + } + +} diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java index 7fc2a3c..0590aa4 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java @@ -37,7 +37,7 @@ public static final String RM_SCHEDULER = SCHEDULER_PREFIX + "class"; // metrics public static final String METRICS_PREFIX = PREFIX + "metrics."; - public static final String METRICS_SWITCH = METRICS_PREFIX + "switch"; + public static final String METRICS_SWITCH = METRICS_PREFIX + "switch"; public static final String METRICS_WEB_ADDRESS_PORT = METRICS_PREFIX + "web.address.port"; public static final String METRICS_OUTPUT_DIR = METRICS_PREFIX + "output"; @@ -82,6 +82,22 @@ public static final String CONTAINER_VCORES = CONTAINER_PREFIX + "vcores"; public static final int CONTAINER_VCORES_DEFAULT = 1; + public static final String RUNNER_AM_POOL_SIZE = RUNNER_PREFIX + "pool.am.size"; + public static final int RUNNER_AM_POOL_SIZE_DEFAULT = 10; + + public static final String RUNNER_NM_POOL_SIZE = RUNNER_PREFIX + "pool.nm.size"; + public static final int RUNNER_NM_POOL_SIZE_DEFAULT = 50; + + public static final String AM_INIT_PARALLEL_NUM = AM_PREFIX + "parallel.init-num"; + public static final String AM_MAX_PARALLEL_NUM = AM_PREFIX + "parallel.max-num"; + + public static final String AM_SUBMIT_INTERVAL_MS = AM_PREFIX + "submit.interval.ms"; + public static final int AM_SUBMIT_INTERVAL_MS_DEFAULT = 1000 * 60; + + public static final String AM_SUBMIT_INTERVAL_COUNT = AM_PREFIX + "submit.interval.count"; + public static final int AM_SUBMIT_INTERVAL_COUNT_DEFAULT = 250; + + public static Resource getAMContainerResource(Configuration conf) { return Resource.newInstance( conf.getLong(AM_CONTAINER_MEMORY, AM_CONTAINER_MEMORY_DEFAULT), diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulatorForRealRM.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulatorForRealRM.java new file mode 100644 index 0000000..75df5f9 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulatorForRealRM.java @@ -0,0 +1,339 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sls.nodemanager; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.api.ResourceTracker; +import org.apache.hadoop.yarn.server.api.ServerRMProxy; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; +import org.apache.hadoop.yarn.server.api.records.MasterKey; +import org.apache.hadoop.yarn.server.api.records.NodeAction; +import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; +import org.apache.hadoop.yarn.sls.scheduler.TaskRunner; +import org.apache.hadoop.yarn.sls.utils.SLSUtils; +import org.apache.hadoop.yarn.util.Records; +import org.apache.log4j.Logger; + +import com.google.common.annotations.VisibleForTesting; + +@Private +@Unstable +public class NMSimulatorForRealRM extends TaskRunner.Task { + // node resource + private RMNode node; + // master key + private MasterKey masterKey; + // container Key + private MasterKey containerKey = null; + // containers with various STATE + private List completedContainerList; + private List releasedContainerList; + private DelayQueue containerQueue; + private Map runningContainers; + private List amContainerList; + + // heart beat response id + private int RESPONSE_ID = 1; + private Configuration conf = null; + private ResourceTracker resourceTracker = null; + private UserGroupInformation ugi; + + private final static Logger LOG = Logger.getLogger(NMSimulator.class); + + public void init(String nodeIdStr, int memory, int cores, int dispatchTime, + int heartBeatInterval, final Configuration conf) + throws IOException, YarnException, InterruptedException { + super.init(dispatchTime, dispatchTime + 1000000L * heartBeatInterval, + heartBeatInterval); + // create resource + String rackHostName[] = SLSUtils.getRackHostName(nodeIdStr); + this.node = NodeInfo.newNodeInfo(rackHostName[0], rackHostName[1], + BuilderUtils.newResource(memory, cores)); + this.conf = conf; + // init data structures + completedContainerList = + Collections.synchronizedList(new ArrayList()); + releasedContainerList = + Collections.synchronizedList(new ArrayList()); + containerQueue = new DelayQueue(); + amContainerList = + Collections.synchronizedList(new ArrayList()); + runningContainers = + new ConcurrentHashMap(); + + ugi = UserGroupInformation.createRemoteUser(this.node.toString()); + resourceTracker = ugi.doAs(new PrivilegedExceptionAction() { + @Override + public ResourceTracker run() throws Exception { + return ServerRMProxy.createRMProxy(conf, ResourceTracker.class); + } + }); + + } + + @Override + public void firstStep() { + //register nodemanager + // register NM with RM + RegisterNodeManagerRequest req = + Records.newRecord(RegisterNodeManagerRequest.class); + req.setNodeId(node.getNodeID()); + req.setResource(node.getTotalCapability()); + req.setHttpPort(80); + + RegisterNodeManagerResponse response = null; + try { + response = resourceTracker.registerNodeManager(req); + } catch (Exception e) { + LOG.error("NMSimulator register rm failed!",e); + } + masterKey = response.getNMTokenMasterKey(); + } + + @Override + public void middleStep() throws Exception { + // we check the lifetime for each running containers + long begin = System.currentTimeMillis(); + long endTime = System.currentTimeMillis(); + ContainerSimulator cs = null; + + synchronized (completedContainerList) { + while ((cs = containerQueue.poll()) != null) { + runningContainers.remove(cs.getId()); + completedContainerList.add(cs.getId()); + if (LOG.isDebugEnabled()) + LOG.debug(MessageFormat + .format("node ({0}) Container {1} has completed", + this.node.getNodeID(), cs.getId())); + } + } + endTime = System.currentTimeMillis(); + + // send heart beat + NodeHeartbeatRequest beatRequest = + Records.newRecord(NodeHeartbeatRequest.class); + beatRequest.setLastKnownNMTokenMasterKey(masterKey); + beatRequest.setLastKnownNMTokenMasterKey(containerKey); + NodeStatus ns = Records.newRecord(NodeStatus.class); + + ns.setContainersStatuses(generateContainerStatusList()); + ns.setNodeId(node.getNodeID()); + ns.setKeepAliveApplications(new ArrayList()); + ns.setResponseId(RESPONSE_ID++); + ns.setNodeHealthStatus( + NodeHealthStatus.newInstance(true, "", System.currentTimeMillis())); + beatRequest.setNodeStatus(ns); + + NodeHeartbeatResponse beatResponse = + resourceTracker.nodeHeartbeat(beatRequest); + updateMasterKeys(beatResponse); + endTime = System.currentTimeMillis(); + + if (!beatResponse.getContainersToCleanup().isEmpty()) { + // remove from queue + synchronized (releasedContainerList) { + for (ContainerId containerId : beatResponse.getContainersToCleanup()) { + if (amContainerList.contains(containerId)) { + // AM container (not killed?, only release) + synchronized (amContainerList) { + amContainerList.remove(containerId); + } + if (LOG.isDebugEnabled()) + LOG.debug(MessageFormat.format( + "NodeManager {0} releases " + "an AM ({1}).", + node.getNodeID(), containerId)); + } else { + cs = runningContainers.remove(containerId); + containerQueue.remove(cs); + releasedContainerList.add(containerId); + if (LOG.isDebugEnabled()) + LOG.debug(MessageFormat.format( + "NodeManager {0} releases a " + "container ({1}).", + node.getNodeID(), containerId)); + } + } + } + } + if (LOG.isDebugEnabled()){ + LOG.debug(this.node.getNodeID().toString() + " middleStep cost time(ms): " + (System.currentTimeMillis() - begin)); + } + if (beatResponse.getNodeAction() == NodeAction.SHUTDOWN) { + lastStep(); + } + } + + private void updateMasterKeys(NodeHeartbeatResponse response) { + // See if the master-key has rolled over + MasterKey updatedMasterKey = response.getContainerTokenMasterKey(); + if (updatedMasterKey != null) { + // Will be non-null only on roll-over on RM side + containerKey = updatedMasterKey; + } + + updatedMasterKey = response.getNMTokenMasterKey(); + if (updatedMasterKey != null) { + masterKey = updatedMasterKey; + } + } + + @Override + public void lastStep() { + if (this.resourceTracker != null) { + RPC.stopProxy(this.resourceTracker); + } + } + + /** + * catch status of all containers located on current node + */ + private ArrayList generateContainerStatusList() { + ArrayList csList = new ArrayList(); + // add running containers + for (ContainerSimulator container : runningContainers.values()) { + csList.add(newContainerStatus(container.getId(), ContainerState.RUNNING, + ContainerExitStatus.SUCCESS)); + } + synchronized (amContainerList) { + for (ContainerId cId : amContainerList) { + csList.add(newContainerStatus(cId, ContainerState.RUNNING, + ContainerExitStatus.SUCCESS)); + } + } + // add complete containers + synchronized (completedContainerList) { + for (ContainerId cId : completedContainerList) { + if (LOG.isDebugEnabled()) + LOG.debug(MessageFormat.format( + "NodeManager {0} completed" + " container ({1}).", + node.getNodeID(), cId)); + csList.add(newContainerStatus(cId, ContainerState.COMPLETE, + ContainerExitStatus.SUCCESS)); + } + completedContainerList.clear(); + } + // released containers + synchronized (releasedContainerList) { + for (ContainerId cId : releasedContainerList) { + if (LOG.isDebugEnabled()) + LOG.debug(MessageFormat.format( + "NodeManager {0} released container" + " ({1}).", + node.getNodeID(), cId)); + csList.add(newContainerStatus(cId, ContainerState.COMPLETE, + ContainerExitStatus.ABORTED)); + } + releasedContainerList.clear(); + } + return csList; + } + + private ContainerStatus newContainerStatus(ContainerId cId, + ContainerState state, int exitState) { + ContainerStatus cs = Records.newRecord(ContainerStatus.class); + cs.setContainerId(cId); + cs.setState(state); + cs.setExitStatus(exitState); + return cs; + } + + public RMNode getNode() { + return node; + } + + /** + * launch a new container with the given life time + */ + public void addNewContainer(Container container, long lifeTimeMS) { + if (LOG.isDebugEnabled()) + LOG.debug(MessageFormat.format( + "NodeManager {0} launches a new " + "container ({1}).", + node.getNodeID(), container.getId())); + if (lifeTimeMS != -1) { + // normal container + ContainerSimulator cs = + new ContainerSimulator(container.getId(), container.getResource(), + lifeTimeMS + System.currentTimeMillis(), lifeTimeMS); + containerQueue.add(cs); + runningContainers.put(cs.getId(), cs); + } else { + // AM container + // -1 means AMContainer + synchronized (amContainerList) { + amContainerList.add(container.getId()); + } + } + } + + /** + * clean up an AM container and add to completed list + * + * @param containerId id of the container to be cleaned + */ + public void cleanupContainer(ContainerId containerId) { + synchronized (amContainerList) { + amContainerList.remove(containerId); + } + synchronized (completedContainerList) { + completedContainerList.add(containerId); + } + } + + @VisibleForTesting + Map getRunningContainers() { + return runningContainers; + } + + @VisibleForTesting + List getAMContainers() { + return amContainerList; + } + + @VisibleForTesting + List getCompletedContainers() { + return completedContainerList; + } +} diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/TaskRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/TaskRunner.java index 19cfe88..3c52520 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/TaskRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/TaskRunner.java @@ -144,22 +144,19 @@ public void setQueueSize(int threadPoolSize) { this.threadPoolSize = threadPoolSize; } + public int getQueueSize() { + return this.queue.size(); + } + @SuppressWarnings("unchecked") public void start() { if (executor != null && !executor.isTerminated()) { throw new IllegalStateException("Executor already running"); } - DelayQueue preStartQueue = queue; - queue = new DelayQueue(); executor = new ThreadPoolExecutor(threadPoolSize, threadPoolSize, 0, TimeUnit.MILLISECONDS, queue); executor.prestartAllCoreThreads(); - - startTimeMS = System.currentTimeMillis(); - for (Object d : preStartQueue) { - schedule((Task) d, startTimeMS); - } } public void stop() throws InterruptedException { diff --git a/hadoop-tools/hadoop-sls/src/main/sample-conf/sls-runner.xml b/hadoop-tools/hadoop-sls/src/main/sample-conf/sls-runner.xml index d7acc98..de9077f 100644 --- a/hadoop-tools/hadoop-sls/src/main/sample-conf/sls-runner.xml +++ b/hadoop-tools/hadoop-sls/src/main/sample-conf/sls-runner.xml @@ -21,6 +21,34 @@ yarn.sls.runner.pool.size 100 + + + + + yarn.sls.runner.pool.am.size + 100 + + + yarn.sls.runner.pool.nm.size + 100 + + + yarn.sls.am.parallel.init-num + 100 + + + yarn.sls.am.parallel.max-num + 1000 + + + yarn.sls.am.submit.interval.ms + 100 + + + yarn.sls.am.submit.interval.count + 100 + + @@ -43,7 +71,7 @@ yarn.sls.am.type.mapreduce - org.apache.hadoop.yarn.sls.appmaster.MRAMSimulator + org.apache.hadoop.yarn.sls.appmaster.UnManagedMRAMSimulatorForRealRM