diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java index ba43816..a997c50 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java @@ -61,7 +61,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; -import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.sls.appmaster.AMSimulator; import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator; @@ -108,7 +107,9 @@ // input traces, input-rumen or input-sls private boolean isSLS; - + + long baselineTimeMS = 0; + public SLSRunner(boolean isSLS, String inputTraces[], String nodeFile, String outputDir, Set trackedApps, boolean printsimulation) @@ -257,210 +258,221 @@ private void waitForNodesRunning() throws InterruptedException { } @SuppressWarnings("unchecked") - private void startAM() throws YarnException, IOException { - // application/container configuration - int heartbeatInterval = conf.getInt( - SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS, - SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS_DEFAULT); - int containerMemoryMB = conf.getInt(SLSConfiguration.CONTAINER_MEMORY_MB, - SLSConfiguration.CONTAINER_MEMORY_MB_DEFAULT); - int containerVCores = conf.getInt(SLSConfiguration.CONTAINER_VCORES, - SLSConfiguration.CONTAINER_VCORES_DEFAULT); - Resource containerResource = - BuilderUtils.newResource(containerMemoryMB, containerVCores); - - // application workload - if (isSLS) { - startAMFromSLSTraces(containerResource, heartbeatInterval); - } else { - startAMFromRumenTraces(containerResource, heartbeatInterval); + private void startAM() throws IOException { + for (String inputTrace : inputTraces) { + if (isSLS) { + startAMFromSLSTrace(inputTrace); + } else { + startAMFromRumenTrace(inputTrace); + } } + numAMs = amMap.size(); remainingApps = numAMs; } /** - * parse workload information from sls trace files + * Parse workload from SLS trace files. */ @SuppressWarnings("unchecked") - private void startAMFromSLSTraces(Resource containerResource, - int heartbeatInterval) throws IOException { - // parse from sls traces + private void startAMFromSLSTrace(String inputTrace) throws IOException { JsonFactory jsonF = new JsonFactory(); ObjectMapper mapper = new ObjectMapper(); - for (String inputTrace : inputTraces) { - Reader input = - new InputStreamReader(new FileInputStream(inputTrace), "UTF-8"); - try { - Iterator i = mapper.readValues(jsonF.createParser(input), - Map.class); - while (i.hasNext()) { - Map jsonJob = i.next(); - - // load job information - long jobStartTime = Long.parseLong( - jsonJob.get("job.start.ms").toString()); - long jobFinishTime = Long.parseLong( - jsonJob.get("job.end.ms").toString()); - - String user = (String) jsonJob.get("job.user"); - if (user == null) user = "default"; - String queue = jsonJob.get("job.queue.name").toString(); - - String oldAppId = jsonJob.get("job.id").toString(); - boolean isTracked = trackedApps.contains(oldAppId); - int queueSize = queueAppNumMap.containsKey(queue) ? - queueAppNumMap.get(queue) : 0; - queueSize ++; - queueAppNumMap.put(queue, queueSize); - // tasks - List tasks = (List) jsonJob.get("job.tasks"); - if (tasks == null || tasks.size() == 0) { - continue; - } - List containerList = - new ArrayList(); - for (Object o : tasks) { - Map jsonTask = (Map) o; - String hostname = jsonTask.get("container.host").toString(); - long taskStart = Long.parseLong( - jsonTask.get("container.start.ms").toString()); - long taskFinish = Long.parseLong( - jsonTask.get("container.end.ms").toString()); - long lifeTime = taskFinish - taskStart; - - // Set memory and vcores from job trace file - Resource res = Resources.clone(containerResource); - if (jsonTask.containsKey("container.memory")) { - int containerMemory = Integer.parseInt( - jsonTask.get("container.memory").toString()); - res.setMemorySize(containerMemory); - } - - if (jsonTask.containsKey("container.vcores")) { - int containerVCores = Integer.parseInt( - jsonTask.get("container.vcores").toString()); - res.setVirtualCores(containerVCores); - } - - int priority = Integer.parseInt( - jsonTask.get("container.priority").toString()); - String type = jsonTask.get("container.type").toString(); - containerList.add(new ContainerSimulator(res, - lifeTime, hostname, priority, type)); - } - - // create a new AM - String amType = jsonJob.get("am.type").toString(); - AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance( - amClassMap.get(amType), new Configuration()); - if (amSim != null) { - amSim.init(AM_ID++, heartbeatInterval, containerList, rm, - this, jobStartTime, jobFinishTime, user, queue, - isTracked, oldAppId); - runner.schedule(amSim); - maxRuntime = Math.max(maxRuntime, jobFinishTime); - numTasks += containerList.size(); - amMap.put(oldAppId, amSim); - } + + try (Reader input = new InputStreamReader( + new FileInputStream(inputTrace), "UTF-8")) { + Iterator jobIter = mapper.readValues( + jsonF.createParser(input), Map.class); + + while (jobIter.hasNext()) { + try { + createAMForJob(jobIter.next()); + } catch (Exception e) { + LOG.error("Fail to create an AM: " + e.getMessage()); } - } finally { - input.close(); } } } + private void createAMForJob(Map jsonJob) throws Exception { + long jobStartTime = Long.parseLong(jsonJob.get("job.start.ms").toString()); + long jobFinishTime = Long.parseLong(jsonJob.get("job.end.ms").toString()); + + String user = (String) jsonJob.get("job.user"); + if (user == null) { + user = "default"; + } + + String queue = jsonJob.get("job.queue.name").toString(); + updateQueueAppNum(queue); + + String oldAppId = jsonJob.get("job.id").toString(); + + // tasks + List tasks = (List) jsonJob.get("job.tasks"); + if (tasks == null || tasks.size() == 0) { + throw new Exception("No task for the job!"); + } + + List containerList = new ArrayList<>(); + for (Object o : tasks) { + Map jsonTask = (Map) o; + String hostname = jsonTask.get("container.host").toString(); + long taskStart = Long.parseLong(jsonTask.get("container.start.ms") + .toString()); + long taskFinish = Long.parseLong(jsonTask.get("container.end.ms") + .toString()); + long lifeTime = taskFinish - taskStart; + + // Set memory and vcores from job trace file + Resource res = getDefaultContainerResource(); + if (jsonTask.containsKey("container.memory")) { + int containerMemory = + Integer.parseInt(jsonTask.get("container.memory").toString()); + res.setMemorySize(containerMemory); + } + + if (jsonTask.containsKey("container.vcores")) { + int containerVCores = + Integer.parseInt(jsonTask.get("container.vcores").toString()); + res.setVirtualCores(containerVCores); + } + + int priority = Integer.parseInt(jsonTask.get("container.priority") + .toString()); + String type = jsonTask.get("container.type").toString(); + containerList.add( + new ContainerSimulator(res, lifeTime, hostname, priority, type)); + } + + // create a new AM + String amType = jsonJob.get("am.type").toString(); + newAndRunAM(amType, user, queue, oldAppId, jobStartTime, jobFinishTime, + containerList); + } + /** - * parse workload information from rumen trace files + * Parse workload from rumen trace files. */ @SuppressWarnings("unchecked") - private void startAMFromRumenTraces(Resource containerResource, - int heartbeatInterval) - throws IOException { + private void startAMFromRumenTrace(String inputTrace) throws IOException { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "file:///"); - long baselineTimeMS = 0; - for (String inputTrace : inputTraces) { - File fin = new File(inputTrace); - JobTraceReader reader = new JobTraceReader( - new Path(fin.getAbsolutePath()), conf); - try { - LoggedJob job = null; - while ((job = reader.getNext()) != null) { - // only support MapReduce currently - String jobType = "mapreduce"; - String user = job.getUser() == null ? - "default" : job.getUser().getValue(); - String jobQueue = job.getQueue().getValue(); - String oldJobId = job.getJobID().toString(); - long jobStartTimeMS = job.getSubmitTime(); - long jobFinishTimeMS = job.getFinishTime(); - if (baselineTimeMS == 0) { - baselineTimeMS = jobStartTimeMS; - } - jobStartTimeMS -= baselineTimeMS; - jobFinishTimeMS -= baselineTimeMS; - if (jobStartTimeMS < 0) { - LOG.warn("Warning: reset job " + oldJobId + " start time to 0."); - jobFinishTimeMS = jobFinishTimeMS - jobStartTimeMS; - jobStartTimeMS = 0; - } - - boolean isTracked = trackedApps.contains(oldJobId); - int queueSize = queueAppNumMap.containsKey(jobQueue) ? - queueAppNumMap.get(jobQueue) : 0; - queueSize ++; - queueAppNumMap.put(jobQueue, queueSize); - - List containerList = - new ArrayList(); - // map tasks - for(LoggedTask mapTask : job.getMapTasks()) { - if (mapTask.getAttempts().size() == 0) { - continue; - } - LoggedTaskAttempt taskAttempt = mapTask.getAttempts() - .get(mapTask.getAttempts().size() - 1); - String hostname = taskAttempt.getHostName().getValue(); - long containerLifeTime = taskAttempt.getFinishTime() - - taskAttempt.getStartTime(); - containerList.add(new ContainerSimulator(containerResource, - containerLifeTime, hostname, 10, "map")); - } - - // reduce tasks - for(LoggedTask reduceTask : job.getReduceTasks()) { - if (reduceTask.getAttempts().size() == 0) { - continue; - } - LoggedTaskAttempt taskAttempt = reduceTask.getAttempts() - .get(reduceTask.getAttempts().size() - 1); - String hostname = taskAttempt.getHostName().getValue(); - long containerLifeTime = taskAttempt.getFinishTime() - - taskAttempt.getStartTime(); - containerList.add(new ContainerSimulator(containerResource, - containerLifeTime, hostname, 20, "reduce")); - } - - // create a new AM - AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance( - amClassMap.get(jobType), conf); - if (amSim != null) { - amSim.init(AM_ID ++, heartbeatInterval, containerList, - rm, this, jobStartTimeMS, jobFinishTimeMS, user, jobQueue, - isTracked, oldJobId); - runner.schedule(amSim); - maxRuntime = Math.max(maxRuntime, jobFinishTimeMS); - numTasks += containerList.size(); - amMap.put(oldJobId, amSim); - } + File fin = new File(inputTrace); + + try (JobTraceReader reader = new JobTraceReader( + new Path(fin.getAbsolutePath()), conf)) { + LoggedJob job = reader.getNext(); + + while (job != null) { + try { + createAMForJob(job); + } catch (Exception e) { + LOG.error("Fail to create an AM: " + e.getMessage()); } - } finally { - reader.close(); + + job = reader.getNext(); } } } - + + private void createAMForJob(LoggedJob job) throws Exception { + // only support MapReduce currently + String jobType = "mapreduce"; + String user = job.getUser() == null ? "default" : + job.getUser().getValue(); + String jobQueue = job.getQueue().getValue(); + String oldJobId = job.getJobID().toString(); + long jobStartTimeMS = job.getSubmitTime(); + long jobFinishTimeMS = job.getFinishTime(); + if (baselineTimeMS == 0) { + baselineTimeMS = job.getSubmitTime(); + } + jobStartTimeMS -= baselineTimeMS; + jobFinishTimeMS -= baselineTimeMS; + if (jobStartTimeMS < 0) { + LOG.warn("Warning: reset job " + oldJobId + " start time to 0."); + jobFinishTimeMS = jobFinishTimeMS - jobStartTimeMS; + jobStartTimeMS = 0; + } + + updateQueueAppNum(jobQueue); + + List containerList = new ArrayList<>(); + // mapper + for (LoggedTask mapTask : job.getMapTasks()) { + if (mapTask.getAttempts().size() == 0) { + throw new Exception("Invalid map task, not attempt for mapper!"); + } + LoggedTaskAttempt taskAttempt = + mapTask.getAttempts().get(mapTask.getAttempts().size() - 1); + String hostname = taskAttempt.getHostName().getValue(); + long containerLifeTime = taskAttempt.getFinishTime() - + taskAttempt.getStartTime(); + containerList.add( + new ContainerSimulator(getDefaultContainerResource(), + containerLifeTime, hostname, 10, "map")); + } + + // reducer + for (LoggedTask reduceTask : job.getReduceTasks()) { + if (reduceTask.getAttempts().size() == 0) { + throw new Exception("Invalid reduce task, not attempt for reducer!"); + } + LoggedTaskAttempt taskAttempt = + reduceTask.getAttempts().get(reduceTask.getAttempts().size() - 1); + String hostname = taskAttempt.getHostName().getValue(); + long containerLifeTime = taskAttempt.getFinishTime() - + taskAttempt.getStartTime(); + containerList.add( + new ContainerSimulator(getDefaultContainerResource(), + containerLifeTime, hostname, 20, "reduce")); + } + + newAndRunAM(jobType, user, jobQueue, oldJobId, jobStartTimeMS, + jobFinishTimeMS, containerList); + } + + private Resource getDefaultContainerResource() { + int containerMemory = conf.getInt(SLSConfiguration.CONTAINER_MEMORY_MB, + SLSConfiguration.CONTAINER_MEMORY_MB_DEFAULT); + int containerVCores = conf.getInt(SLSConfiguration.CONTAINER_VCORES, + SLSConfiguration.CONTAINER_VCORES_DEFAULT); + return Resources.createResource(containerMemory, containerVCores); + } + + private void newAndRunAM(String jobType, String user, + String jobQueue, String oldJobId, long jobStartTimeMS, + long jobFinishTimeMS, List containerList) { + + AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance( + amClassMap.get(jobType), new Configuration()); + + if (amSim != null) { + int heartbeatInterval = conf.getInt( + SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS, + SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS_DEFAULT); + boolean isTracked = trackedApps.contains(oldJobId); + amSim.init(AM_ID++, heartbeatInterval, containerList, + rm, this, jobStartTimeMS, jobFinishTimeMS, user, jobQueue, + isTracked, oldJobId); + runner.schedule(amSim); + maxRuntime = Math.max(maxRuntime, jobFinishTimeMS); + numTasks += containerList.size(); + amMap.put(oldJobId, amSim); + } + } + + private void updateQueueAppNum(String queue) throws Exception { + SchedulerWrapper wrapper = (SchedulerWrapper)rm.getResourceScheduler(); + String queueName = wrapper.getRealQueueName(queue); + if (!queueAppNumMap.containsKey(queueName)) { + queueAppNumMap.put(queueName, 1); + } else { + queueAppNumMap.put(queueName, queueAppNumMap.get(queueName) + 1); + } + } + private void printSimulationInfo() { if (printSimulation) { // node diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java index 7c37465..f326154 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java @@ -362,4 +362,12 @@ public SchedulerMetrics getSchedulerMetrics() { public Configuration getConf() { return conf; } + + public String getRealQueueName(String queue) throws Exception { + if (getQueue(queue) == null) { + throw new Exception("Can't find the queue by given name: " + queue + "! " + + "Please check if queue " + queue + " is in allocation file."); + } + return getQueue(queue).getQueueName(); + } } \ No newline at end of file diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java index 572dacf..8d5a2cc 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java @@ -335,5 +335,13 @@ public void serviceInit(Configuration conf) throws Exception { initQueueMetrics(getQueueManager().getRootQueue()); } } + + public String getRealQueueName(String queue) throws Exception { + if (!getQueueManager().exists(queue)) { + throw new Exception("Can't find the queue by given name: " + queue +"! " + + "Please check if queue " + queue + " is in allocation file."); + } + return getQueueManager().getQueue(queue).getQueueName(); + } } diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java index 406f050..12fb527 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java @@ -24,5 +24,8 @@ @Unstable public interface SchedulerWrapper { SchedulerMetrics getSchedulerMetrics(); + Tracker getTracker(); + + String getRealQueueName(String queue) throws Exception; }