Index: hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthUtils.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthUtils.java (revision ) +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthUtils.java (revision ) @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.sls.synthetic; + +import org.apache.commons.math3.distribution.LogNormalDistribution; +import org.apache.commons.math3.distribution.NormalDistribution; +import org.apache.commons.math3.random.JDKRandomGenerator; + +import java.util.Collection; +import java.util.Random; + +/** + * Utils for the Synthetic generator. + */ +public class SynthUtils { + + public static int getWeighted(Collection weights, Random rr) { + + double totalWeight = 0; + for (Double i : weights) { + totalWeight += i; + } + + double rand = rr.nextDouble() * totalWeight; + + double cur = 0; + int ind = 0; + for (Double i : weights) { + cur += i; + if (cur > rand) + break; + ind++; + } + + return ind; + } + + public static NormalDistribution getNormalDist(JDKRandomGenerator rand, + double average, double stdDev) { + + if (average <= 0) { + return null; + } + + // set default for missing param + if (stdDev == 0) { + stdDev = average / 6; + } + + NormalDistribution ret = new NormalDistribution(average, stdDev, + NormalDistribution.DEFAULT_INVERSE_ABSOLUTE_ACCURACY); + ret.reseedRandomGenerator(rand.nextLong()); + return ret; + } + + public static LogNormalDistribution getLogNormalDist(JDKRandomGenerator rand, + double mean, double stdDev) { + + if (mean <= 0) { + return null; + } + + // set default for missing param + if (stdDev == 0) { + stdDev = mean / 6; + } + + // derive lognormal parameters for X = LogNormal(mu, sigma) + // sigma^2 = ln (1+Var[X]/(E[X])^2) + // mu = ln(E[X]) - 1/2 * sigma^2 + double Var = stdDev * stdDev; + double sigmasq = Math.log1p(Var / (mean * mean)); + double sigma = Math.sqrt(sigmasq); + double mu = Math.log(mean) - 0.5 * sigmasq; + + LogNormalDistribution ret = new LogNormalDistribution(mu, sigma, + LogNormalDistribution.DEFAULT_INVERSE_ABSOLUTE_ACCURACY); + ret.reseedRandomGenerator(rand.nextLong()); + return ret; + } +} Index: hadoop-tools/hadoop-sls/src/test/resources/syn.json IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-tools/hadoop-sls/src/test/resources/syn.json (revision ) +++ hadoop-tools/hadoop-sls/src/test/resources/syn.json (revision ) @@ -0,0 +1,52 @@ +{ + "description" : "tiny jobs workload", + "num_nodes" : 10, + "nodes_per_rack" : 4, + "num_jobs" : 10, + "rand_seed" : 2, + "workloads" : [ + { + "workload_name" : "tiny-test", + "workload_weight": 0.5, + "description" : "Sort jobs", + "queue_name" : "sls_queue_1", + "job_classes" : [ + { + "class_name" : "class_1", + "class_weight" : 1.0, + + "mtasks_avg" : 5, + "mtasks_stddev" : 1, + "rtasks_avg" : 5, + "rtasks_stddev" : 1, + + "dur_avg" : 60, + "dur_stddev" : 5, + + "mtime_avg" : 10, + "mtime_stddev" : 2, + "rtime_avg" : 20, + "rtime_stddev" : 4, + + "map_max_memory_avg" : 1024, + "map_max_memory_stddev" : 0.001, + "reduce_max_memory_avg" : 2048, + "reduce_max_memory_stddev" : 0.001, + "map_max_vcores_avg" : 1, + "map_max_vcores_stddev" : 0.001, + "reduce_max_vcores_avg" : 2, + "reduce_max_vcores_stddev" : 0.001, + + "chance_of_reservation" : 0.5, + "deadline_factor_avg" : 10.0, + "deadline_factor_stddev" : 0.001, + "gang_factor" : 2 + } + ], + "time_distribution" : [ + { "time" : 1, "weight" : 100 }, + { "time" : 60, "jobs" : 0 } + ] + } + ] +} Index: hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java (revision 4a8e3045027036afebbcb80f23b7a2886e56c255) +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java (revision ) @@ -70,4 +70,7 @@ public static final String CONTAINER_VCORES = CONTAINER_PREFIX + "vcores"; public static final int CONTAINER_VCORES_DEFAULT = 1; + public static final String ENABLE_RESERVATION_SYSTEM = PREFIX + "enable" + + ".reservation-system"; + } Index: hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJobClass.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJobClass.java (revision ) +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJobClass.java (revision ) @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.sls.synthetic; + +import org.apache.commons.math3.distribution.AbstractRealDistribution; +import org.apache.commons.math3.distribution.LogNormalDistribution; +import org.apache.commons.math3.random.JDKRandomGenerator; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.tools.rumen.JobStory; +import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer.JobClass; +import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer.Trace; + +/** + * This is a class that represent a class of Jobs. It is used to generate an + * individual job, by picking random durations, task counts, container size, + * etc. + */ +public class SynthJobClass { + + final LogNormalDistribution dur; + final LogNormalDistribution mapRuntime; + final LogNormalDistribution redRuntime; + final LogNormalDistribution mtasks; + final LogNormalDistribution rtasks; + final LogNormalDistribution mapMem; + final LogNormalDistribution redMem; + final LogNormalDistribution mapVcores; + final LogNormalDistribution redVcores; + + final JDKRandomGenerator rand; + final SynthWorkload workload; + final Trace trace; + final JobClass jobClass; + + public SynthJobClass(JDKRandomGenerator rand, Trace trace, + SynthWorkload workload, int classId) { + + this.trace = trace; + this.workload = workload; + this.rand = new JDKRandomGenerator(); + this.rand.setSeed(rand.nextLong()); + jobClass = trace.workloads.get(workload.getId()).job_classes.get(classId); + + this.dur = SynthUtils.getLogNormalDist(rand, jobClass.dur_avg, + jobClass.dur_stddev); + this.mapRuntime = SynthUtils.getLogNormalDist(rand, jobClass.mtime_avg, + jobClass.mtime_stddev); + this.redRuntime = SynthUtils.getLogNormalDist(rand, jobClass.rtime_avg, + jobClass.mtasks_stddev); + this.mtasks = SynthUtils.getLogNormalDist(rand, jobClass.mtasks_avg, + jobClass.mtasks_stddev); + this.rtasks = SynthUtils.getLogNormalDist(rand, jobClass.rtasks_avg, + jobClass.rtasks_stddev); + + this.mapMem = SynthUtils.getLogNormalDist(rand, jobClass.map_max_memory_avg, + jobClass.map_max_memory_stddev); + this.redMem = SynthUtils.getLogNormalDist(rand, + jobClass.reduce_max_memory_avg, jobClass.reduce_max_memory_stddev); + this.mapVcores = SynthUtils.getLogNormalDist(rand, + jobClass.map_max_vcores_avg, jobClass.map_max_vcores_stddev); + this.redVcores = SynthUtils.getLogNormalDist(rand, + jobClass.reduce_max_vcores_avg, jobClass.reduce_max_vcores_stddev); + } + + public JobStory getJobStory(Configuration conf, long actualSubmissionTime) { + return new SynthJob(rand, conf, this, actualSubmissionTime); + } + + @Override + public String toString() { + return "SynthJobClass [workload=" + workload.getName() + ", class=" + + jobClass.class_name + " job_count=" + jobClass.class_weight + ", dur=" + + ((dur != null) ? dur.getNumericalMean() : 0) + ", mapRuntime=" + + ((mapRuntime != null) ? mapRuntime.getNumericalMean() : 0) + + ", redRuntime=" + + ((redRuntime != null) ? redRuntime.getNumericalMean() : 0) + + ", mtasks=" + ((mtasks != null) ? mtasks.getNumericalMean() : 0) + + ", rtasks=" + ((rtasks != null) ? rtasks.getNumericalMean() : 0) + + ", chance_of_reservation=" + jobClass.chance_of_reservation + "]\n"; + + } + + public double getClassWeight() { + return jobClass.class_weight; + } + + public long getDur() { + return genLongSample(dur); + } + + public int getMtasks() { + return (int) genIntSample(mtasks); + } + + public int getRtasks() { + return (int) genIntSample(rtasks); + } + + public long getMapMaxMemory() { + return (long) genLongSample(mapMem); + } + + public long getReduceMaxMemory() { + return (long) genLongSample(redMem); + } + + public long getMapMaxVcores() { + return (long) genLongSample(mapVcores); + } + + public long getReduceMaxVcores() { + return (long) genLongSample(redVcores); + } + + public SynthWorkload getWorkload() { + return workload; + } + + public int genIntSample(AbstractRealDistribution dist) { + if (dist == null) { + return 0; + } + double baseSample = dist.sample(); + if (baseSample < 0) { + baseSample = 0; + } + return (int) (Integer.MAX_VALUE & (long) Math.ceil(baseSample)); + } + + public long genLongSample(AbstractRealDistribution dist) { + return dist != null ? (long) Math.ceil(dist.sample()) : 0; + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof SynthJobClass)) { + return false; + } + SynthJobClass o = (SynthJobClass) other; + return workload.equals(o.workload); + } + + @Override + public int hashCode() { + return workload.hashCode() * workload.getId(); + } + + public String getClassName() { + return jobClass.class_name; + } + + public long getMapTimeSample() { + return genLongSample(mapRuntime); + } + + public long getReduceTimeSample() { + return genLongSample(redRuntime); + } +} Index: hadoop-tools/hadoop-sls/src/test/resources/capacity-scheduler.xml IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-tools/hadoop-sls/src/test/resources/capacity-scheduler.xml (revision 4a8e3045027036afebbcb80f23b7a2886e56c255) +++ hadoop-tools/hadoop-sls/src/test/resources/capacity-scheduler.xml (revision ) @@ -38,6 +38,16 @@ 100 + + yarn.scheduler.capacity.root.sls_queue_1.reservable + true + + + + yarn.scheduler.capacity.root.sls_queue_1.show-reservations-as-queues + true + + yarn.scheduler.capacity.root.sls_queue_2.capacity 25 Index: hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/TaskInfo.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/TaskInfo.java (revision 4a8e3045027036afebbcb80f23b7a2886e56c255) +++ hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/TaskInfo.java (revision ) @@ -23,21 +23,37 @@ private final long bytesOut; private final int recsOut; private final long maxMemory; + private final long maxVcores; private final ResourceUsageMetrics metrics; + public TaskInfo(long bytesIn, int recsIn, long bytesOut, int recsOut, - long maxMemory) { - this(bytesIn, recsIn, bytesOut, recsOut, maxMemory, - new ResourceUsageMetrics()); + long maxMemory) { + this(bytesIn, recsIn, bytesOut, recsOut, maxMemory, 1, + new ResourceUsageMetrics()); } - + public TaskInfo(long bytesIn, int recsIn, long bytesOut, int recsOut, - long maxMemory, ResourceUsageMetrics metrics) { + long maxMemory, ResourceUsageMetrics + metrics) { + this(bytesIn, recsIn, bytesOut, recsOut, maxMemory, 1, metrics); + } + + public TaskInfo(long bytesIn, int recsIn, long bytesOut, int recsOut, + long maxMemory, long maxVcores) { + this(bytesIn, recsIn, bytesOut, recsOut, maxMemory, maxVcores, + new ResourceUsageMetrics()); + } + + public TaskInfo(long bytesIn, int recsIn, long bytesOut, int recsOut, + long maxMemory, long maxVcores, ResourceUsageMetrics + metrics) { this.bytesIn = bytesIn; this.recsIn = recsIn; this.bytesOut = bytesOut; this.recsOut = recsOut; this.maxMemory = maxMemory; + this.maxVcores = maxVcores; this.metrics = metrics; } @@ -78,6 +94,11 @@ return maxMemory; } + /** + * @return Vcores used by the task. + */ + public long getTaskVCores() { return maxVcores; } + /** * @return Resource usage metrics */ Index: hadoop-tools/hadoop-sls/src/test/resources/yarn-site.xml IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-tools/hadoop-sls/src/test/resources/yarn-site.xml (revision 4a8e3045027036afebbcb80f23b7a2886e56c255) +++ hadoop-tools/hadoop-sls/src/test/resources/yarn-site.xml (revision ) @@ -17,8 +17,9 @@ yarn.resourcemanager.scheduler.class - org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler - + + org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler @@ -79,4 +80,12 @@ yarn.scheduler.fair.assignmultiple true + + + + Enable reservation system. + yarn.resourcemanager.reservation-system.enable + true + + Index: hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthWorkload.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthWorkload.java (revision ) +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthWorkload.java (revision ) @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.sls.synthetic; + +import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer.Trace; + +import java.util.*; + +/** + * This class represent a workload (made up of multiple SynthJobClass(es)). It + * also stores the temporal distributions of jobs in this workload. + */ +public class SynthWorkload { + + final int id; + final List list; + final Trace trace; + final SortedMap timeWeights; + + public SynthWorkload(int id, Trace trace) { + list = new ArrayList(); + this.id = id; + this.trace = trace; + timeWeights = new TreeMap(); + for (SynthTraceJobProducer.TimeSample ts : trace.workloads + .get(id).time_distribution) { + timeWeights.put(ts.time, ts.jobs); + } + } + + public boolean add(SynthJobClass s) { + return list.add(s); + } + + public List getWeightList() { + ArrayList ret = new ArrayList(); + for (SynthJobClass s : list) { + ret.add(s.getClassWeight()); + } + return ret; + } + + public int getId() { + return id; + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof SynthWorkload)) { + return false; + } + // assume ID determines job classes by construction + return getId() == ((SynthWorkload) other).getId(); + } + + @Override + public int hashCode() { + return getId(); + } + + @Override + public String toString() { + return "SynthWorkload " + trace.workloads.get(id).workload_name + "[\n" + + list + "]\n"; + } + + public String getName() { + return trace.workloads.get(id).workload_name; + } + + public double getWorkloadWeight() { + return trace.workloads.get(id).workload_weight; + } + + public String getQueueName() { + return trace.workloads.get(id).queue_name; + } + + public long getBaseSubmissionTime(Random rand) { + + // pick based on weights the "bucket" for this start time + int position = SynthUtils.getWeighted(timeWeights.values(), rand); + + int[] time = new int[timeWeights.keySet().size()]; + int index = 0; + for (Integer i : timeWeights.keySet()) { + time[index++] = i; + } + + // uniformly pick a time between start and end time of this bucket + int startRange = time[position]; + int endRange = startRange; + // if there is no subsequent bucket pick startRange + if (position < timeWeights.keySet().size() - 1) { + endRange = time[position + 1]; + return startRange + rand.nextInt((endRange - startRange)); + } else { + return startRange; + } + } + +} Index: hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java (revision ) +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java (revision ) @@ -0,0 +1,319 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.sls.synthetic; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.math3.random.JDKRandomGenerator; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.tools.rumen.JobStory; +import org.apache.hadoop.tools.rumen.JobStoryProducer; +import org.codehaus.jackson.annotate.JsonProperty; +import org.codehaus.jackson.map.ObjectMapper; + +import javax.xml.bind.annotation.XmlRootElement; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.codehaus.jackson.JsonParser.Feature.INTERN_FIELD_NAMES; +import static org.codehaus.jackson.map.DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES; + +/** + * This is a JobStoryProducer that operates from distribution of different + * workloads. The .json input file is used to determine how many jobs, which + * size, number of maps/reducers and their duration, as well as the temporal + * distributed of submissions. For each parameter we control avg and stdev, and + * generate values via normal or log-normal distributions. + */ +public class SynthTraceJobProducer implements JobStoryProducer { + + public static final Log LOG = LogFactory.getLog(SynthTraceJobProducer.class); + + final Configuration conf; + final AtomicInteger numJobs; + final Trace trace; + final long seed; + + int totalWeight; + final List weightList; + final Map workloads; + + final JDKRandomGenerator rand; + + public static final String SLS_SYNTHETIC_TRACE_FILE = + "sls.synthetic" + ".trace_file"; + + public SynthTraceJobProducer(Configuration conf) throws IOException { + this(conf, new Path(conf.get(SLS_SYNTHETIC_TRACE_FILE))); + } + + public SynthTraceJobProducer(Configuration conf, Path path) + throws IOException { + + LOG.info("SynthTraceJobProducer"); + + this.conf = conf; + this.rand = new JDKRandomGenerator(); + workloads = new HashMap(); + weightList = new ArrayList(); + + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(INTERN_FIELD_NAMES, true); + mapper.configure(FAIL_ON_UNKNOWN_PROPERTIES, false); + + FileSystem ifs = path.getFileSystem(conf); + FSDataInputStream fileIn = ifs.open(path); + + this.trace = mapper.readValue(fileIn, Trace.class); + seed = trace.rand_seed; + rand.setSeed(seed); + + this.numJobs = new AtomicInteger(trace.num_jobs); + + for (int workloadId = 0; workloadId < trace.workloads + .size(); workloadId++) { + SynthWorkload workload = new SynthWorkload(workloadId, trace); + for (int classId = + 0; classId < trace.workloads.get(workloadId).job_classes + .size(); classId++) { + SynthJobClass cls = new SynthJobClass(rand, trace, workload, classId); + workload.add(cls); + } + workloads.put(workloadId, workload); + } + + for (int i = 0; i < workloads.size(); i++) { + double w = workloads.get(i).getWorkloadWeight(); + totalWeight += w; + weightList.add(w); + } + + createStoryParams(); + LOG.info("Generated " + listStoryParams.size() + " deadlines for " + + this.numJobs.get() + " jobs "); + } + + public long getSeed() { + return seed; + } + + public int getNodesPerRack() { + return trace.nodes_per_rack; + } + + public int getNumNodes() { + return trace.num_nodes; + } + + @XmlRootElement + public static class Trace { + @JsonProperty("description") + public String description; + @JsonProperty("num_nodes") + public int num_nodes; + @JsonProperty("nodes_per_rack") + public int nodes_per_rack; + @JsonProperty("num_jobs") + public int num_jobs; + + // in sec (selects a portion of time_distribution + @JsonProperty("trace_duration") + public long trace_duration; + @JsonProperty("rand_seed") + public long rand_seed; + @JsonProperty("workloads") + public List workloads; + + // the options are NLE, BASELINE_CONSERVATIVE, BASELINE_AGGRESSIVE + @JsonProperty("expType") + public String expType; + } + + public static class Workload { + @JsonProperty("workload_name") + public String workload_name; + // used to change probability this workload is picked for each job + @JsonProperty("workload_weight") + public double workload_weight; + @JsonProperty("description") + public String description; + @JsonProperty("queue_name") + public String queue_name; + @JsonProperty("job_classes") + public List job_classes; + @JsonProperty("time_distribution") + public List time_distribution; + } + + public static class JobClass { + + @JsonProperty("class_name") + public String class_name; + + // used to change probability this class is chosen + @JsonProperty("class_weight") + public double class_weight; + + // reservation related params + @JsonProperty("chance_of_reservation") + public double chance_of_reservation; + @JsonProperty("deadline_factor_avg") + public double deadline_factor_avg; + @JsonProperty("deadline_factor_stddev") + public double deadline_factor_stddev; + @JsonProperty("gang_factor") + public double gang_factor; + + // durations in sec + @JsonProperty("dur_avg") + public double dur_avg; + @JsonProperty("dur_stddev") + public double dur_stddev; + @JsonProperty("mtime_avg") + public double mtime_avg; + @JsonProperty("mtime_stddev") + public double mtime_stddev; + @JsonProperty("rtime_avg") + public double rtime_avg; + @JsonProperty("rtime_stddev") + public double rtime_stddev; + + // number of tasks + @JsonProperty("mtasks_avg") + public double mtasks_avg; + @JsonProperty("mtasks_stddev") + public double mtasks_stddev; + @JsonProperty("rtasks_avg") + public double rtasks_avg; + @JsonProperty("rtasks_stddev") + public double rtasks_stddev; + + // memory in MB + @JsonProperty("map_max_memory_avg") + public long map_max_memory_avg; + @JsonProperty("map_max_memory_stddev") + public double map_max_memory_stddev; + @JsonProperty("reduce_max_memory_avg") + public long reduce_max_memory_avg; + @JsonProperty("reduce_max_memory_stddev") + public double reduce_max_memory_stddev; + + // vcores + @JsonProperty("map_max_vcores_avg") + public long map_max_vcores_avg; + @JsonProperty("map_max_vcores_stddev") + public double map_max_vcores_stddev; + @JsonProperty("reduce_max_vcores_avg") + public long reduce_max_vcores_avg; + @JsonProperty("reduce_max_vcores_stddev") + public double reduce_max_vcores_stddev; + + // used to represetn the locality slow-down + @JsonProperty("slowDownFactor") + public double slowDownFactor; + + // duration slack in sec + @JsonProperty("dur_slack") + public long durationSlack; + + @JsonProperty("failOnNoReservation") + public boolean failOnNoReservation; + + @JsonProperty("reservationType") + public String reservationType; + + } + + /** + * This is used to define time-varying probability of a job start-time (e.g., + * to simulate daily patterns) + */ + public static class TimeSample { + // in sec + @JsonProperty("time") + public int time; + @JsonProperty("weight") + public double jobs; + } + + class StoryParams { + SynthJobClass pickedJobClass; + long actualSubmissionTime; + + StoryParams(SynthJobClass pickedJobClass, long actualSubmissionTime) { + this.pickedJobClass = pickedJobClass; + this.actualSubmissionTime = actualSubmissionTime; + } + } + + static final Comparator storyComparator = + new Comparator() { + @Override + public int compare(StoryParams o1, StoryParams o2) { + return Long.valueOf(o1.actualSubmissionTime) + .compareTo(o2.actualSubmissionTime); + } + }; + + Queue listStoryParams = + new PriorityQueue(10, storyComparator); + + void createStoryParams() { + for (int i = 0; i < numJobs.get(); i++) { + int workload = SynthUtils.getWeighted(weightList, rand); + SynthWorkload pickedWorkload = workloads.get(workload); + long jobClass = + SynthUtils.getWeighted(pickedWorkload.getWeightList(), rand); + SynthJobClass pickedJobClass = pickedWorkload.list.get((int) jobClass); + long actualSubmissionTime = pickedWorkload.getBaseSubmissionTime(rand); + // long actualSubmissionTime = (i + 1) * 10; + listStoryParams + .add(new StoryParams(pickedJobClass, actualSubmissionTime)); + } + } + + @Override + public JobStory getNextJob() throws IOException { + if (numJobs.decrementAndGet() < 0) { + return null; + } + StoryParams storyParams = listStoryParams.poll(); + return storyParams.pickedJobClass.getJobStory(conf, + storyParams.actualSubmissionTime); + } + + @Override + public void close() { + } + + @Override + public String toString() { + return "SynthTraceJobProducer [ conf=" + conf + ", numJobs=" + numJobs + + ", weightList=" + weightList + ", r=" + rand + ", totalWeight=" + + totalWeight + ", workloads=" + workloads + "]"; + } + + public double getNumJobs() { + return trace.num_jobs; + } + +} Index: hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/ZombieJob.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/ZombieJob.java (revision 4a8e3045027036afebbcb80f23b7a2886e56c255) +++ hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/ZombieJob.java (revision ) @@ -426,7 +426,7 @@ LoggedTask loggedTask = getLoggedTask(taskType, taskNumber); if (loggedTask == null) { // TODO insert parameters - TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0); + TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0, 0); return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber, taskNumber, locality); } @@ -473,7 +473,7 @@ LoggedTask loggedTask = getLoggedTask(taskType, taskNumber); if (loggedTask == null) { // TODO insert parameters - TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0); + TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0, 0); return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber, taskNumber, locality); } @@ -639,7 +639,7 @@ private TaskInfo getTaskInfo(LoggedTask loggedTask) { if (loggedTask == null) { - return new TaskInfo(0, 0, 0, 0, 0); + return new TaskInfo(0, 0, 0, 0, 0, 0); } List attempts = loggedTask.getAttempts(); @@ -688,9 +688,10 @@ break; } + //note: hardcoding vCores, as they are not collected TaskInfo taskInfo = new TaskInfo(inputBytes, (int) inputRecords, outputBytes, - (int) outputRecords, (int) heapMegabytes, + (int) outputRecords, (int) heapMegabytes, 1, metrics); return taskInfo; } Index: hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSynthJobGeneration.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSynthJobGeneration.java (revision ) +++ hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSynthJobGeneration.java (revision ) @@ -0,0 +1,122 @@ +package org.apache.hadoop.yarn.sls; + +import org.apache.commons.math3.distribution.LogNormalDistribution; +import org.apache.commons.math3.random.JDKRandomGenerator; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.sls.synthetic.SynthJob; +import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer; +import org.apache.hadoop.yarn.sls.synthetic.SynthUtils; +import org.apache.hadoop.yarn.sls.synthetic.SynthWorkload; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.tools.rumen.JobStory; +import org.junit.Test; + +import java.io.IOException; +import java.util.Random; +import java.util.TreeMap; + +public class TestSynthJobGeneration { + + @Test + public void test() throws IllegalArgumentException, IOException { + + Configuration conf = new Configuration(); + + conf.set(SynthTraceJobProducer.SLS_SYNTHETIC_TRACE_FILE, + "src/test/resources/syn.json"); + + SynthTraceJobProducer stjp = new SynthTraceJobProducer(conf); + + TreeMap acTotRuntime = new TreeMap(); + TreeMap acJobCount = new TreeMap(); + + long nodeadline = 0; + long withdeadline = 0; + + SynthJob js = (SynthJob) stjp.getNextJob(); + int i = 0; + while (js != null) { + System.out.println((i++) + " " + js.getQueueName() + " -- " + + js.getJobClass().getClassName() + " (conf: " + + js.getJobConf().get(MRJobConfig.QUEUE_NAME) + ") " + + " submission: " + js.getSubmissionTime() + ", " + " duration: " + + js.getDuration() + " numMaps: " + js.getNumberMaps() + + " numReduces: " + js.getNumberReduces()); + + String queueName = js.getQueueName(); + long work = js.getTotalSlotTime(); + + if (!acTotRuntime.containsKey(queueName)) + acTotRuntime.put(queueName, work); + else + acTotRuntime.put(queueName, acTotRuntime.get(queueName) + work); + + if (!acJobCount.containsKey(queueName)) + acJobCount.put(queueName, 1L); + else + acJobCount.put(queueName, acJobCount.get(queueName) + 1); + + if (js.getDeadline() > 0) { + withdeadline++; + } else { + nodeadline++; + } + + js = (SynthJob) stjp.getNextJob(); + } + + System.out.println("-------------------"); + + double tot = 0; + for (java.util.Map.Entry e : acTotRuntime.entrySet()) { + tot += e.getValue(); + } + + for (java.util.Map.Entry e : acTotRuntime.entrySet()) { + System.out.println(e.getKey() + ", " + acJobCount.get(e.getKey()) + / (double) stjp.getNumJobs() * 100 + ", " + e.getValue() / tot * 100); + } + + System.out.println("-------------------"); + + System.out.println(" Total Jobs with No deadline:" + nodeadline + " (" + + nodeadline / (double) (nodeadline + withdeadline) * 100 + "%)"); + System.out.println(" Total Jobs With deadline:" + withdeadline + " (" + + withdeadline / (double) (nodeadline + withdeadline) * 100 + "%)"); + } + + @Test + public void testTimeGeneration() throws IOException { + + Configuration conf = new Configuration(); + + conf.set(SynthTraceJobProducer.SLS_SYNTHETIC_TRACE_FILE, + "src/test/resources/syn.json"); + + SynthTraceJobProducer stjp = new SynthTraceJobProducer(conf); + + for(int i = 0; iUTF-8 =================================================================== --- hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java (revision 4a8e3045027036afebbcb80f23b7a2886e56c255) +++ hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java (revision ) @@ -70,7 +70,7 @@ MockAMSimulator app = new MockAMSimulator(); List containers = new ArrayList(); app.init(1, 1000, containers, rm, null, 0, 1000000l, "user1", "default", - false, "app1"); + false, "app1", null); app.firstStep(); Assert.assertEquals(1, rm.getRMContext().getRMApps().size()); Assert.assertNotNull(rm.getRMContext().getRMApps().get(app.appId)); Index: hadoop-tools/hadoop-sls/src/test/resources/log4j.properties IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>windows-1252 =================================================================== --- hadoop-tools/hadoop-sls/src/test/resources/log4j.properties (revision ) +++ hadoop-tools/hadoop-sls/src/test/resources/log4j.properties (revision ) @@ -0,0 +1,19 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# log4j configuration used during build and unit tests + +log4j.rootLogger=info,stdout +log4j.threshold=ALL +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} (%F:%M(%L)) - %m%n Index: hadoop-tools/hadoop-gridmix/src/main/java/org/apache/hadoop/mapred/gridmix/JobFactory.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-tools/hadoop-gridmix/src/main/java/org/apache/hadoop/mapred/gridmix/JobFactory.java (revision 4a8e3045027036afebbcb80f23b7a2886e56c255) +++ hadoop-tools/hadoop-gridmix/src/main/java/org/apache/hadoop/mapred/gridmix/JobFactory.java (revision ) @@ -115,7 +115,8 @@ public MinTaskInfo(TaskInfo info) { super(info.getInputBytes(), info.getInputRecords(), info.getOutputBytes(), info.getOutputRecords(), - info.getTaskMemory(), info.getResourceUsageMetrics()); + info.getTaskMemory(), info.getTaskVCores(), + info.getResourceUsageMetrics()); } public long getInputBytes() { return Math.max(0, super.getInputBytes()); @@ -219,7 +220,7 @@ if (info != null) { info = new MinTaskInfo(info); } else { - info = new MinTaskInfo(new TaskInfo(0, 0, 0, 0, 0)); + info = new MinTaskInfo(new TaskInfo(0, 0, 0, 0, 0, 0)); } return info; } Index: hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java (revision 4a8e3045027036afebbcb80f23b7a2886e56c255) +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java (revision ) @@ -45,18 +45,24 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.tools.rumen.JobTraceReader; import org.apache.hadoop.tools.rumen.LoggedJob; import org.apache.hadoop.tools.rumen.LoggedTask; import org.apache.hadoop.tools.rumen.LoggedTaskAttempt; +import org.apache.hadoop.tools.rumen.TaskAttemptInfo; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.CapacityReservationSystem; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.CapacitySchedulerPlanFollower; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -69,7 +75,10 @@ import org.apache.hadoop.yarn.sls.scheduler.SLSCapacityScheduler; import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper; import org.apache.hadoop.yarn.sls.scheduler.TaskRunner; +import org.apache.hadoop.yarn.sls.synthetic.SynthJob; +import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer; import org.apache.hadoop.yarn.sls.utils.SLSUtils; +import org.apache.hadoop.yarn.util.UTCClock; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.log4j.Logger; @@ -82,12 +91,12 @@ private String[] inputTraces; private Configuration conf; private Map queueAppNumMap; - + // NM simulator private HashMap nmMap; private int nmMemoryMB, nmVCores; private String nodeFile; - + // AM simulator private int AM_ID; private Map amMap; @@ -103,36 +112,41 @@ private int numNMs, numRacks, numAMs, numTasks; private long maxRuntime; public final static Map simulateInfoMap = - new HashMap(); + new HashMap(); // logger public final static Logger LOG = Logger.getLogger(SLSRunner.class); // input traces, input-rumen or input-sls - private boolean isSLS; - - public SLSRunner(boolean isSLS, String inputTraces[], String nodeFile, - String outputDir, Set trackedApps, - boolean printsimulation) - throws IOException, ClassNotFoundException { - this.isSLS = isSLS; + public enum TraceType { + SLS, RUMEN, SYNTH + }; + + private TraceType inputType; + private SynthTraceJobProducer stjp; + + public SLSRunner(TraceType inputType, String inputTraces[], String nodeFile, + String outputDir, Set trackedApps, boolean printsimulation) + throws IOException, ClassNotFoundException { + this.inputType = inputType; this.inputTraces = inputTraces.clone(); this.nodeFile = nodeFile; this.trackedApps = trackedApps; this.printSimulation = printsimulation; metricsOutputDir = outputDir; - + nmMap = new HashMap<>(); queueAppNumMap = new HashMap<>(); amMap = new ConcurrentHashMap<>(); amClassMap = new HashMap<>(); - + // runner configuration conf = new Configuration(false); conf.addResource("sls-runner.xml"); + // runner - int poolSize = conf.getInt(SLSConfiguration.RUNNER_POOL_SIZE, - SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT); + int poolSize = conf.getInt(SLSConfiguration.RUNNER_POOL_SIZE, + SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT); SLSRunner.runner.setQueueSize(poolSize); // map for (Map.Entry e : conf) { @@ -143,7 +157,7 @@ } } } - + public void start() throws Exception { // start resource manager startRM(); @@ -155,7 +169,7 @@ ((SchedulerWrapper) rm.getResourceScheduler()) .setQueueSet(this.queueAppNumMap.keySet()); ((SchedulerWrapper) rm.getResourceScheduler()) - .setTrackedAppSet(this.trackedApps); + .setTrackedAppSet(this.trackedApps); // print out simulation info printSimulationInfo(); // blocked until all nodes RUNNING @@ -183,6 +197,20 @@ rmConf.set(SLSConfiguration.METRICS_OUTPUT_DIR, metricsOutputDir); + // if SLS is configured to run reservation system configure it. + if (conf.getBoolean(SLSConfiguration.ENABLE_RESERVATION_SYSTEM, false)) { + if (schedulerClass.equals(CapacityScheduler.class.getName())) { + rmConf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true); + rmConf.set(YarnConfiguration.RM_RESERVATION_SYSTEM_CLASS, + CapacityReservationSystem.class.getName()); + rmConf.set(YarnConfiguration.RM_RESERVATION_SYSTEM_PLAN_FOLLOWER, + CapacitySchedulerPlanFollower.class.getName()); + } else { + throw new RuntimeException("Reservation System is not supported for " + + schedulerClass + " in SLS yet."); + } + } + final SLSRunner se = this; rm = new ResourceManager() { @Override @@ -197,36 +225,48 @@ private void startNM() throws YarnException, IOException { // nm configuration nmMemoryMB = conf.getInt(SLSConfiguration.NM_MEMORY_MB, - SLSConfiguration.NM_MEMORY_MB_DEFAULT); + SLSConfiguration.NM_MEMORY_MB_DEFAULT); nmVCores = conf.getInt(SLSConfiguration.NM_VCORES, - SLSConfiguration.NM_VCORES_DEFAULT); - int heartbeatInterval = conf.getInt( - SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS, + SLSConfiguration.NM_VCORES_DEFAULT); + int heartbeatInterval = + conf.getInt(SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS, SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS_DEFAULT); // nm information (fetch from topology file, or from sls/rumen json file) Set nodeSet = new HashSet(); + if (nodeFile.isEmpty()) { - if (isSLS) { - for (String inputTrace : inputTraces) { + for (String inputTrace : inputTraces) { + switch (inputType) { + case SLS: { nodeSet.addAll(SLSUtils.parseNodesFromSLSTrace(inputTrace)); + break; } - } else { - for (String inputTrace : inputTraces) { + case RUMEN: { nodeSet.addAll(SLSUtils.parseNodesFromRumenTrace(inputTrace)); + break; } + case SYNTH: { + stjp = new SynthTraceJobProducer(conf, new Path(inputTraces[0])); + nodeSet.addAll(SLSUtils.generateNodesFromSynth( + (int) stjp.getNumNodes(), stjp.getNodesPerRack())); + break; + } + default: { + throw new YarnException("Input configuration not recognized, " + + "trace type should be SLS, RUMEN, or SYNTH"); + } + } } - - } else { - nodeSet.addAll(SLSUtils.parseNodesFromNodeFile(nodeFile)); } + // create NM simulators Random random = new Random(); Set rackSet = new HashSet(); for (String hostName : nodeSet) { // we randomize the heartbeat start time from zero to 1 interval NMSimulator nm = new NMSimulator(); - nm.init(hostName, nmMemoryMB, nmVCores, - random.nextInt(heartbeatInterval), heartbeatInterval, rm); + nm.init(hostName, nmMemoryMB, nmVCores, random.nextInt(heartbeatInterval), + heartbeatInterval, rm); nmMap.put(nm.getNode().getNodeID(), nm); runner.schedule(nm); rackSet.add(nm.getNode().getRackName()); @@ -241,39 +281,53 @@ int numRunningNodes = 0; for (RMNode node : rm.getRMContext().getRMNodes().values()) { if (node.getState() == NodeState.RUNNING) { - numRunningNodes ++; + numRunningNodes++; } } if (numRunningNodes == numNMs) { break; } - LOG.info(MessageFormat.format("SLSRunner is waiting for all " + - "nodes RUNNING. {0} of {1} NMs initialized.", - numRunningNodes, numNMs)); + LOG.info(MessageFormat.format( + "SLSRunner is waiting for all " + + "nodes RUNNING. {0} of {1} NMs initialized.", + numRunningNodes, numNMs)); Thread.sleep(1000); } LOG.info(MessageFormat.format("SLSRunner takes {0} ms to launch all nodes.", - (System.currentTimeMillis() - startTimeMS))); + (System.currentTimeMillis() - startTimeMS))); } @SuppressWarnings("unchecked") private void startAM() throws YarnException, IOException { // application/container configuration - int heartbeatInterval = conf.getInt( - SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS, + int heartbeatInterval = + conf.getInt(SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS, SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS_DEFAULT); int containerMemoryMB = conf.getInt(SLSConfiguration.CONTAINER_MEMORY_MB, - SLSConfiguration.CONTAINER_MEMORY_MB_DEFAULT); + SLSConfiguration.CONTAINER_MEMORY_MB_DEFAULT); int containerVCores = conf.getInt(SLSConfiguration.CONTAINER_VCORES, - SLSConfiguration.CONTAINER_VCORES_DEFAULT); + SLSConfiguration.CONTAINER_VCORES_DEFAULT); Resource containerResource = - BuilderUtils.newResource(containerMemoryMB, containerVCores); + BuilderUtils.newResource(containerMemoryMB, containerVCores); // application workload - if (isSLS) { + switch (inputType) { + case SLS: { startAMFromSLSTraces(containerResource, heartbeatInterval); - } else { + break; + } + case RUMEN: { startAMFromRumenTraces(containerResource, heartbeatInterval); + break; + } + case SYNTH: { + startAMFromSynthGenerator(heartbeatInterval); + break; + } + default: { + throw new YarnException("Input configuration not recognized, " + + "trace type should be SLS, RUMEN, or SYNTH"); + } } numAMs = amMap.size(); remainingApps = numAMs; @@ -284,7 +338,7 @@ */ @SuppressWarnings("unchecked") private void startAMFromSLSTraces(Resource containerResource, - int heartbeatInterval) throws IOException { + int heartbeatInterval) throws IOException { // parse from sls traces JsonFactory jsonF = new JsonFactory(); ObjectMapper mapper = new ObjectMapper(); @@ -298,20 +352,21 @@ Map jsonJob = i.next(); // load job information - long jobStartTime = Long.parseLong( - jsonJob.get("job.start.ms").toString()); - long jobFinishTime = Long.parseLong( - jsonJob.get("job.end.ms").toString()); + long jobStartTime = + Long.parseLong(jsonJob.get("job.start.ms").toString()); + long jobFinishTime = + Long.parseLong(jsonJob.get("job.end.ms").toString()); String user = (String) jsonJob.get("job.user"); - if (user == null) user = "default"; + if (user == null) + user = "default"; String queue = jsonJob.get("job.queue.name").toString(); String oldAppId = jsonJob.get("job.id").toString(); boolean isTracked = trackedApps.contains(oldAppId); - int queueSize = queueAppNumMap.containsKey(queue) ? - queueAppNumMap.get(queue) : 0; - queueSize ++; + int queueSize = + queueAppNumMap.containsKey(queue) ? queueAppNumMap.get(queue) : 0; + queueSize++; queueAppNumMap.put(queue, queueSize); // tasks List tasks = (List) jsonJob.get("job.tasks"); @@ -352,12 +407,12 @@ // create a new AM String amType = jsonJob.get("am.type").toString(); - AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance( - amClassMap.get(amType), new Configuration()); + AMSimulator amSim = (AMSimulator) ReflectionUtils + .newInstance(amClassMap.get(amType), new Configuration()); if (amSim != null) { amSim.init(AM_ID++, heartbeatInterval, containerList, rm, this, jobStartTime, jobFinishTime, user, queue, - isTracked, oldAppId); + isTracked, oldAppId, null); runner.schedule(amSim); maxRuntime = Math.max(maxRuntime, jobFinishTime); numTasks += containerList.size(); @@ -375,22 +430,21 @@ */ @SuppressWarnings("unchecked") private void startAMFromRumenTraces(Resource containerResource, - int heartbeatInterval) - throws IOException { + int heartbeatInterval) throws IOException { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "file:///"); long baselineTimeMS = 0; for (String inputTrace : inputTraces) { File fin = new File(inputTrace); - JobTraceReader reader = new JobTraceReader( - new Path(fin.getAbsolutePath()), conf); + JobTraceReader reader = + new JobTraceReader(new Path(fin.getAbsolutePath()), conf); try { LoggedJob job = null; while ((job = reader.getNext()) != null) { // only support MapReduce currently String jobType = "mapreduce"; - String user = job.getUser() == null ? - "default" : job.getUser().getValue(); + String user = + job.getUser() == null ? "default" : job.getUser().getValue(); String jobQueue = job.getQueue().getValue(); String oldJobId = job.getJobID().toString(); long jobStartTimeMS = job.getSubmitTime(); @@ -407,13 +461,13 @@ } boolean isTracked = trackedApps.contains(oldJobId); - int queueSize = queueAppNumMap.containsKey(jobQueue) ? - queueAppNumMap.get(jobQueue) : 0; - queueSize ++; + int queueSize = queueAppNumMap.containsKey(jobQueue) + ? queueAppNumMap.get(jobQueue) : 0; + queueSize++; queueAppNumMap.put(jobQueue, queueSize); List containerList = - new ArrayList(); + new ArrayList(); // map tasks for(LoggedTask mapTask : job.getMapTasks()) { if (mapTask.getAttempts().size() == 0) { @@ -423,32 +477,32 @@ .get(mapTask.getAttempts().size() - 1); String hostname = taskAttempt.getHostName().getValue(); long containerLifeTime = taskAttempt.getFinishTime() - - taskAttempt.getStartTime(); + - taskAttempt.getStartTime(); containerList.add(new ContainerSimulator(containerResource, - containerLifeTime, hostname, 10, "map")); + containerLifeTime, hostname, 10, "map")); } // reduce tasks - for(LoggedTask reduceTask : job.getReduceTasks()) { + for (LoggedTask reduceTask : job.getReduceTasks()) { if (reduceTask.getAttempts().size() == 0) { continue; } LoggedTaskAttempt taskAttempt = reduceTask.getAttempts() - .get(reduceTask.getAttempts().size() - 1); + .get(reduceTask.getAttempts().size() - 1); String hostname = taskAttempt.getHostName().getValue(); long containerLifeTime = taskAttempt.getFinishTime() - - taskAttempt.getStartTime(); + - taskAttempt.getStartTime(); containerList.add(new ContainerSimulator(containerResource, - containerLifeTime, hostname, 20, "reduce")); + containerLifeTime, hostname, 20, "reduce")); } // create a new AM - AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance( - amClassMap.get(jobType), conf); + AMSimulator amSim = (AMSimulator) ReflectionUtils + .newInstance(amClassMap.get(jobType), conf); if (amSim != null) { amSim.init(AM_ID ++, heartbeatInterval, containerList, rm, this, jobStartTimeMS, jobFinishTimeMS, user, jobQueue, - isTracked, oldJobId); + isTracked, oldJobId, null); runner.schedule(amSim); maxRuntime = Math.max(maxRuntime, jobFinishTimeMS); numTasks += containerList.size(); @@ -460,34 +514,158 @@ } } } - + + /** + * parse workload information from synth-generator trace files + */ + @SuppressWarnings("unchecked") + private void startAMFromSynthGenerator(int heartbeatInterval) + throws IOException { + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", "file:///"); + long baselineTimeMS = 0; + + // reservations use wall clock time, so need to have a reference for that + UTCClock clock = new UTCClock(); + long now = clock.getTime(); + + try { + SynthJob job = null; + // we use stjp, a reference to the job producer instantiated during node + // creation + while ((job = (SynthJob) stjp.getNextJob()) != null) { + // only support MapReduce currently + String jobType = "mapreduce"; + String user = job.getUser() == null ? "default" : job.getUser(); + String jobQueue = job.getQueueName(); + String oldJobId = job.getJobID().toString(); + long jobStartTimeMS = job.getSubmissionTime(); + + // CARLO: Finish time is only used for logging, omit for now + long jobFinishTimeMS = -1L; + + if (baselineTimeMS == 0) { + baselineTimeMS = jobStartTimeMS; + } + jobStartTimeMS -= baselineTimeMS; + jobFinishTimeMS -= baselineTimeMS; + if (jobStartTimeMS < 0) { + LOG.warn("Warning: reset job " + oldJobId + " start time to 0."); + jobFinishTimeMS = jobFinishTimeMS - jobStartTimeMS; + jobStartTimeMS = 0; + } + + boolean isTracked = trackedApps.contains(oldJobId); + int queueSize = queueAppNumMap.containsKey(jobQueue) + ? queueAppNumMap.get(jobQueue) : 0; + queueSize++; + queueAppNumMap.put(jobQueue, queueSize); + + List containerList = + new ArrayList(); + ArrayList keyAsArray = new ArrayList(nmMap.keySet()); + Random rand = new Random(stjp.getSeed()); + + Resource maxMapRes = Resource.newInstance(0, 0); + long maxMapDur = 0; + // map tasks + for (int i = 0; i < job.getNumberMaps(); i++) { + TaskAttemptInfo tai = job.getTaskAttemptInfo(TaskType.MAP, i, 0); + RMNode node = nmMap + .get(keyAsArray.get(rand.nextInt(keyAsArray.size()))).getNode(); + String hostname = "/" + node.getRackName() + "/" + node.getHostName(); + long containerLifeTime = tai.getRuntime(); + Resource containerResource = + Resource.newInstance((int) tai.getTaskInfo().getTaskMemory(), + (int) tai.getTaskInfo().getTaskVCores()); + containerList.add(new ContainerSimulator(containerResource, + containerLifeTime, hostname, 10, "map")); + maxMapRes = Resources.componentwiseMax(maxMapRes, containerResource); + maxMapDur = + containerLifeTime > maxMapDur ? containerLifeTime : maxMapDur; + + } + + Resource maxRedRes = Resource.newInstance(0, 0); + long maxRedDur = 0; + // reduce tasks + for (int i = 0; i < job.getNumberReduces(); i++) { + TaskAttemptInfo tai = job.getTaskAttemptInfo(TaskType.REDUCE, i, 0); + RMNode node = nmMap + .get(keyAsArray.get(rand.nextInt(keyAsArray.size()))).getNode(); + String hostname = "/" + node.getRackName() + "/" + node.getHostName(); + long containerLifeTime = tai.getRuntime(); + Resource containerResource = + Resource.newInstance((int) tai.getTaskInfo().getTaskMemory(), + (int) tai.getTaskInfo().getTaskVCores()); + containerList.add(new ContainerSimulator(containerResource, + containerLifeTime, hostname, 20, "reduce")); + maxRedRes = Resources.componentwiseMax(maxRedRes, containerResource); + maxRedDur = + containerLifeTime > maxRedDur ? containerLifeTime : maxRedDur; + + } + + // generating reservations for the jobs that require them + + ReservationSubmissionRequest rr = null; + if (job.hasDeadline()) { + ReservationId reservationId = + ReservationId.newInstance(this.rm.getStartTime(), AM_ID); + + rr = ReservationClientUtil.createMRReservation(reservationId, + "reservation_" + AM_ID, maxMapRes, job.getNumberMaps(), maxMapDur, + maxRedRes, job.getNumberReduces(), maxRedDur, + now + jobStartTimeMS, now + job.getDeadline(), + job.getQueueName()); + + } + // create a new AM + AMSimulator amSim = (AMSimulator) ReflectionUtils + .newInstance(amClassMap.get(jobType), conf); + if (amSim != null) { + amSim.init(AM_ID++, heartbeatInterval, containerList, rm, this, + jobStartTimeMS, jobFinishTimeMS, user, jobQueue, isTracked, + oldJobId, rr); + runner.schedule(amSim); + maxRuntime = Math.max(maxRuntime, jobFinishTimeMS); + numTasks += containerList.size(); + amMap.put(oldJobId, amSim); + } + } + } finally { + stjp.close(); + } + + } + private void printSimulationInfo() { if (printSimulation) { // node LOG.info("------------------------------------"); LOG.info(MessageFormat.format("# nodes = {0}, # racks = {1}, capacity " + "of each node {2} MB memory and {3} vcores.", - numNMs, numRacks, nmMemoryMB, nmVCores)); + numNMs, numRacks, nmMemoryMB, nmVCores)); LOG.info("------------------------------------"); // job LOG.info(MessageFormat.format("# applications = {0}, # total " + "tasks = {1}, average # tasks per application = {2}", - numAMs, numTasks, (int)(Math.ceil((numTasks + 0.0) / numAMs)))); + numAMs, numTasks, (int)(Math.ceil((numTasks + 0.0) / numAMs)))); LOG.info("JobId\tQueue\tAMType\tDuration\t#Tasks"); for (Map.Entry entry : amMap.entrySet()) { AMSimulator am = entry.getValue(); - LOG.info(entry.getKey() + "\t" + am.getQueue() + "\t" + am.getAMType() + LOG.info(entry.getKey() + "\t" + am.getQueue() + "\t" + am.getAMType() + "\t" + am.getDuration() + "\t" + am.getNumTasks()); } LOG.info("------------------------------------"); // queue LOG.info(MessageFormat.format("number of queues = {0} average " + "number of apps = {1}", queueAppNumMap.size(), - (int)(Math.ceil((numAMs + 0.0) / queueAppNumMap.size())))); + (int)(Math.ceil((numAMs + 0.0) / queueAppNumMap.size())))); LOG.info("------------------------------------"); // runtime LOG.info(MessageFormat.format("estimated simulation time is {0}" + - " seconds", (long)(Math.ceil(maxRuntime / 1000.0)))); + " seconds", (long)(Math.ceil(maxRuntime / 1000.0)))); LOG.info("------------------------------------"); } // package these information in the simulateInfoMap used by other places @@ -498,12 +676,12 @@ simulateInfoMap.put("Number of applications", numAMs); simulateInfoMap.put("Number of tasks", numTasks); simulateInfoMap.put("Average tasks per applicaion", - (int)(Math.ceil((numTasks + 0.0) / numAMs))); + (int)(Math.ceil((numTasks + 0.0) / numAMs))); simulateInfoMap.put("Number of queues", queueAppNumMap.size()); simulateInfoMap.put("Average applications per queue", - (int)(Math.ceil((numAMs + 0.0) / queueAppNumMap.size()))); + (int)(Math.ceil((numAMs + 0.0) / queueAppNumMap.size()))); simulateInfoMap.put("Estimated simulate time (s)", - (long)(Math.ceil(maxRuntime / 1000.0))); + (long)(Math.ceil(maxRuntime / 1000.0))); } public HashMap getNmMap() { @@ -515,7 +693,7 @@ } public static void decreaseRemainingApps() { - remainingApps --; + remainingApps--; if (remainingApps == 0) { LOG.info("SLSRunner tears down."); @@ -525,54 +703,74 @@ public static void main(String args[]) throws Exception { Options options = new Options(); - options.addOption("inputrumen", true, "input rumen files"); - options.addOption("inputsls", true, "input sls files"); + options.addOption("tracetype", true, "the type of trace"); + options.addOption("tracelocation", true, "input trace files"); options.addOption("nodes", true, "input topology"); options.addOption("output", true, "output directory"); options.addOption("trackjobs", true, - "jobs to be tracked during simulating"); + "jobs to be tracked during simulating"); options.addOption("printsimulation", false, - "print out simulation information"); - + "print out simulation information"); + CommandLineParser parser = new GnuParser(); CommandLine cmd = parser.parse(options, args); - String inputRumen = cmd.getOptionValue("inputrumen"); - String inputSLS = cmd.getOptionValue("inputsls"); + String traceType = cmd.getOptionValue("tracetype"); + String traceLocation = cmd.getOptionValue("tracelocation"); String output = cmd.getOptionValue("output"); - - if ((inputRumen == null && inputSLS == null) || output == null) { - System.err.println(); - System.err.println("ERROR: Missing input or output file"); - System.err.println(); - System.err.println("Options: -inputrumen|-inputsls FILE,FILE... " + - "-output FILE [-nodes FILE] [-trackjobs JobId,JobId...] " + - "[-printsimulation]"); - System.err.println(); - System.exit(1); - } - + File outputFile = new File(output); - if (! outputFile.exists() - && ! outputFile.mkdirs()) { + if (!outputFile.exists() && !outputFile.mkdirs()) { System.err.println("ERROR: Cannot create output directory " - + outputFile.getAbsolutePath()); + + outputFile.getAbsolutePath()); System.exit(1); } - + Set trackedJobSet = new HashSet(); if (cmd.hasOption("trackjobs")) { String trackjobs = cmd.getOptionValue("trackjobs"); String jobIds[] = trackjobs.split(","); trackedJobSet.addAll(Arrays.asList(jobIds)); } - + String nodeFile = cmd.hasOption("nodes") ? cmd.getOptionValue("nodes") : ""; - boolean isSLS = inputSLS != null; - String inputFiles[] = isSLS ? inputSLS.split(",") : inputRumen.split(","); - SLSRunner sls = new SLSRunner(isSLS, inputFiles, nodeFile, output, + TraceType tempTraceType = TraceType.SLS; + switch (traceType) { + case "SLS": { + tempTraceType = TraceType.SLS; + break; + } + case "RUMEN": { + tempTraceType = TraceType.RUMEN; + break; + } + case "SYNTH": { + tempTraceType = TraceType.SYNTH; + break; + } + default: { + printUsage(); + System.exit(1); + } + + } + + String inputFiles[] = traceLocation.split(","); + SLSRunner sls = new SLSRunner(tempTraceType, inputFiles, nodeFile, output, trackedJobSet, cmd.hasOption("printsimulation")); sls.start(); } + + static void printUsage() { + System.err.println(); + System.err.println("ERROR: Wrong tracetype"); + System.err.println(); + System.err.println( + "Options: -tracetype " + "SLS|RUMEN|SYNTH -tracelocation FILE,FILE... " + + "-output FILE [-nodes FILE] [-trackjobs JobId,JobId...] " + + "[-printsimulation]"); + System.err.println(); + } + } Index: hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java (revision ) +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java (revision ) @@ -0,0 +1,309 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.sls.synthetic; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.math3.distribution.LogNormalDistribution; +import org.apache.commons.math3.random.JDKRandomGenerator; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.TaskStatus.State; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.tools.rumen.*; +import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values; + +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.hadoop.mapreduce.MRJobConfig.QUEUE_NAME; + +/** + * Generates random task data for a synthetic job. + */ +public class SynthJob implements JobStory { + + static Log LOG = LogFactory.getLog(SynthJob.class); + + private final Configuration conf; + private final int id; + private static final AtomicInteger seq = new AtomicInteger(0); + private final String name; + private final String queueName; + private final SynthJobClass jobClass; + + // job timing + private final long submitTime; + private final long duration; + private final long deadline; + + private final int numMapTasks; + private final int numRedTasks; + private final long mapMaxMemory; + private final long reduceMaxMemory; + private final long mapMaxVcores; + private final long reduceMaxVcores; + private final long[] mapRuntime; + private final long[] reduceRuntime; + private long totMapRuntime; + private long totRedRuntime; + + public SynthJob(JDKRandomGenerator rand, Configuration conf, + SynthJobClass jobClass, long actualSubmissionTime) { + + this.conf = conf; + this.jobClass = jobClass; + + this.duration = MILLISECONDS.convert(jobClass.getDur(), SECONDS); + this.numMapTasks = jobClass.getMtasks(); + this.numRedTasks = jobClass.getRtasks(); + + // sample memory distributions, correct for sub-minAlloc sizes + long mapMaxMemory = jobClass.getMapMaxMemory(); + this.mapMaxMemory = mapMaxMemory < MRJobConfig.DEFAULT_MAP_MEMORY_MB + ? MRJobConfig.DEFAULT_MAP_MEMORY_MB : mapMaxMemory; + long reduceMaxMemory = jobClass.getReduceMaxMemory(); + this.reduceMaxMemory = + reduceMaxMemory < MRJobConfig.DEFAULT_REDUCE_MEMORY_MB + ? MRJobConfig.DEFAULT_REDUCE_MEMORY_MB : reduceMaxMemory; + + // sample vcores distributions, correct for sub-minAlloc sizes + long mapMaxVcores = jobClass.getMapMaxVcores(); + this.mapMaxVcores = mapMaxVcores < MRJobConfig.DEFAULT_MAP_CPU_VCORES + ? MRJobConfig.DEFAULT_MAP_CPU_VCORES : mapMaxVcores; + long reduceMaxVcores = jobClass.getReduceMaxVcores(); + this.reduceMaxVcores = + reduceMaxVcores < MRJobConfig.DEFAULT_REDUCE_CPU_VCORES + ? MRJobConfig.DEFAULT_REDUCE_CPU_VCORES : reduceMaxVcores; + + if (numMapTasks > 0) { + conf.setLong(MRJobConfig.MAP_MEMORY_MB, this.mapMaxMemory); + conf.set(MRJobConfig.MAP_JAVA_OPTS, + "-Xmx" + (this.mapMaxMemory - 100) + "m"); + } + + if (numRedTasks > 0) { + conf.setLong(MRJobConfig.REDUCE_MEMORY_MB, this.reduceMaxMemory); + conf.set(MRJobConfig.REDUCE_JAVA_OPTS, + "-Xmx" + (this.reduceMaxMemory - 100) + "m"); + } + + boolean hasDeadline = + (rand.nextDouble() <= jobClass.jobClass.chance_of_reservation); + + LogNormalDistribution deadlineFactor = + SynthUtils.getLogNormalDist(rand, jobClass.jobClass.deadline_factor_avg, + jobClass.jobClass.deadline_factor_stddev); + + double deadlineFactorSample = + (deadlineFactor != null) ? deadlineFactor.sample() : -1; + + this.queueName = jobClass.workload.getQueueName(); + + this.submitTime = MILLISECONDS.convert(actualSubmissionTime, SECONDS); + + this.deadline = + hasDeadline ? MILLISECONDS.convert(actualSubmissionTime, SECONDS) + + (long) Math.ceil(deadlineFactorSample * duration) : -1; + + conf.set(QUEUE_NAME, queueName); + + // name and initialize job randomness + final long seed = rand.nextLong(); + rand.setSeed(seed); + id = seq.getAndIncrement(); + + name = String.format(jobClass.getClassName() + "_%06d", id); + LOG.debug(name + " (" + seed + ")"); + + LOG.info("JOB TIMING`: job: " + name + " submission:" + submitTime + + " deadline:" + deadline + " duration:" + duration + + " deadline-submission: " + (deadline - submitTime)); + + // generate map and reduce runtimes + mapRuntime = new long[numMapTasks]; + for (int i = 0; i < numMapTasks; i++) { + mapRuntime[i] = jobClass.getMapTimeSample(); + totMapRuntime += mapRuntime[i]; + } + reduceRuntime = new long[numRedTasks]; + for (int i = 0; i < numRedTasks; i++) { + reduceRuntime[i] = jobClass.getReduceTimeSample(); + totRedRuntime = reduceRuntime[i]; + } + } + + public boolean hasDeadline() { + return deadline > 0; + } + + @Override + public String getName() { + return name; + } + + @Override + public String getUser() { + // Obtain user name from job configuration, if available. + // Otherwise use dummy user names. + String user = conf.get(MRJobConfig.USER_NAME); + if (user == null) { + user = String.format("foobar%d", id); + } + + return user; + } + + @Override + public JobID getJobID() { + return new JobID("job_mock_" + name, id); + } + + @Override + public Values getOutcome() { + return Values.SUCCESS; + } + + @Override + public long getSubmissionTime() { + return submitTime; + } + + @Override + public int getNumberMaps() { + return numMapTasks; + } + + @Override + public int getNumberReduces() { + return numRedTasks; + } + + @Override + public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) { + switch (taskType) { + case MAP: + return new TaskInfo(-1, -1, -1, -1, mapMaxMemory, mapMaxVcores); + case REDUCE: + return new TaskInfo(-1, -1, -1, -1, reduceMaxMemory, reduceMaxVcores); + default: + throw new IllegalArgumentException("Not interested"); + } + } + + @Override + public InputSplit[] getInputSplits() { + throw new UnsupportedOperationException(); + } + + @Override + public TaskAttemptInfo getTaskAttemptInfo(TaskType taskType, int taskNumber, + int taskAttemptNumber) { + switch (taskType) { + case MAP: + return new MapTaskAttemptInfo(State.SUCCEEDED, + getTaskInfo(taskType, taskNumber), mapRuntime[taskNumber], null); + + case REDUCE: + // We assume uniform split between pull/sort/reduce + // aligned with naive progress reporting assumptions + return new ReduceTaskAttemptInfo(State.SUCCEEDED, + getTaskInfo(taskType, taskNumber), + (long) Math.round(reduceRuntime[taskNumber] / 3), + (long) Math.round(reduceRuntime[taskNumber] / 3), + (long) Math.round(reduceRuntime[taskNumber] / 3), null); + + default: + break; + } + throw new UnsupportedOperationException(); + } + + @Override + public TaskAttemptInfo getMapTaskAttemptInfoAdjusted(int taskNumber, + int taskAttemptNumber, int locality) { + throw new UnsupportedOperationException(); + } + + @Override + public org.apache.hadoop.mapred.JobConf getJobConf() { + return new JobConf(conf); + } + + @Override + public String getQueueName() { + return queueName; + } + + @Override + public String toString() { + return "SynthJob [\n" + " workload=" + jobClass.getWorkload().getId() + + "\n" + " jobClass=" + jobClass.getWorkload().list.indexOf(jobClass) + + "\n" + " conf=" + conf + ",\n" + " id=" + id + ",\n" + " name=" + + name + ",\n" + " mapRuntime=" + Arrays.toString(mapRuntime) + ",\n" + + " reduceRuntime=" + Arrays.toString(reduceRuntime) + ",\n" + + " submitTime=" + submitTime + ",\n" + " numMapTasks=" + numMapTasks + + ",\n" + " numRedTasks=" + numRedTasks + ",\n" + " mapMaxMemory=" + + mapMaxMemory + ",\n" + " reduceMaxMemory=" + reduceMaxMemory + ",\n" + + " queueName=" + queueName + "\n" + "]"; + } + + public SynthJobClass getJobClass() { + return jobClass; + } + + public long getTotalSlotTime() { + return totMapRuntime + totRedRuntime; + } + + public long getDuration() { + return duration; + } + + public long getDeadline() { + return deadline; + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof SynthJob)) { + return false; + } + SynthJob o = (SynthJob) other; + return Arrays.equals(mapRuntime, o.mapRuntime) + && Arrays.equals(reduceRuntime, o.reduceRuntime) + && submitTime == o.submitTime && numMapTasks == o.numMapTasks + && numRedTasks == o.numRedTasks && mapMaxMemory == o.mapMaxMemory + && reduceMaxMemory == o.reduceMaxMemory + && mapMaxVcores == o.mapMaxVcores + && reduceMaxVcores == o.reduceMaxVcores && queueName.equals(o.queueName) + && jobClass.equals(o.jobClass) && totMapRuntime == o.totMapRuntime + && totRedRuntime == o.totRedRuntime; + } + + @Override + public int hashCode() { + // could have a bad distr; investigate if a relevant use case exists + return jobClass.hashCode() * (int) submitTime; + } +} Index: hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/ReservationClientUtil.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/ReservationClientUtil.java (revision ) +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/ReservationClientUtil.java (revision ) @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.sls; + +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; +import org.apache.hadoop.yarn.api.records.*; + +import java.util.ArrayList; +import java.util.List; + +/** + * Simple support class, used to submit reservations + */ +public class ReservationClientUtil { + + /** + * Creates a request that envelopes a MR jobs, picking max number of maps and + * reducers, max durations, and max resources per container. + * + * @param reservationId the id of the reservation + * @param maxMapRes maximum resources used by any mapper + * @param numberMaps number of mappers + * @param maxMapDur maximum duration of any mapper + * @param maxRedRes maximum resources used by any reducer + * @param numberReduces number of reducers + * @param maxRedDur maximum duration of any reducer + * @param arrival start time of valid range for reservation + * @param deadline deadline for this reservation + * @param queueName queue to submit to + * @return a submission request + */ + public static ReservationSubmissionRequest createMRReservation( + ReservationId reservationId, String name, Resource maxMapRes, + int numberMaps, long maxMapDur, Resource maxRedRes, int numberReduces, + long maxRedDur, long arrival, long deadline, String queueName) { + + ReservationRequest mapRR = ReservationRequest.newInstance(maxMapRes, + numberMaps, numberMaps, maxMapDur); + ReservationRequest redRR = ReservationRequest.newInstance(maxRedRes, + numberReduces, numberReduces, maxRedDur); + + List listResReq = new ArrayList(); + listResReq.add(mapRR); + listResReq.add(redRR); + + ReservationRequests reservationRequests = ReservationRequests + .newInstance(listResReq, ReservationRequestInterpreter.R_ORDER_NO_GAP); + ReservationDefinition resDef = ReservationDefinition.newInstance(arrival, + deadline, reservationRequests, name); + + // outermost request + ReservationSubmissionRequest request = ReservationSubmissionRequest + .newInstance(resDef, queueName, reservationId); + + return request; + } +} Index: hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java (revision 4a8e3045027036afebbcb80f23b7a2886e56c255) +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java (revision ) @@ -678,6 +678,10 @@ reporter.start(timeIntervalMS, TimeUnit.MILLISECONDS); } + public Set getPlanQueues() throws YarnException { + return scheduler.getPlanQueues(); + } + class HistogramsRunnable implements Runnable { @Override public void run() { Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java (revision 4a8e3045027036afebbcb80f23b7a2886e56c255) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java (revision ) @@ -220,6 +220,23 @@ queue = new PlanQueue(csContext, queueName, parent, oldQueues.get(queueName)); + + //initializing the "internal" default queue, for SLS compatibility + String defReservationId = + queueName + ReservationConstants.DEFAULT_QUEUE_SUFFIX; + + List childQueues = new ArrayList<>(); + ReservationQueue resQueue = new ReservationQueue(csContext, + defReservationId, (PlanQueue) queue); + try { + resQueue.setEntitlement(new QueueEntitlement(1.0f, 1.0f)); + } catch (SchedulerDynamicEditException e) { + throw new IllegalStateException(e); + } + childQueues.add(resQueue); + ((PlanQueue) queue).setChildQueues(childQueues); + queues.put(defReservationId, resQueue); + } else { queue = new LeafQueue(csContext, queueName, parent, Index: hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSRunner.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSRunner.java (revision 4a8e3045027036afebbcb80f23b7a2886e56c255) +++ hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSRunner.java (revision ) @@ -20,18 +20,36 @@ import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.*; + import java.io.File; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.UUID; +import java.util.*; +@RunWith(value = Parameterized.class) public class TestSLSRunner { + @Parameter(value = 0) + public String traceType; + + @Parameter(value = 1) + public String traceLocation; + + @Parameters(name = "run with {index} trace") + public static Collection data() { + return Arrays.asList(new Object[][]{ + //{"RUMEN", "src/main/data/2jobs2min-rumen-jh.json"}, + {"SYNTH", "src/test/resources/syn.json"} + }); + } + + @Test @SuppressWarnings("all") public void testSimulatorRunning() throws Exception { + File tempDir = new File("target", UUID.randomUUID().toString()); final List exceptionList = Collections.synchronizedList(new ArrayList()); @@ -46,7 +64,7 @@ // start the simulator File slsOutputDir = new File(tempDir.getAbsolutePath() + "/slsoutput/"); String args[] = new String[]{ - "-inputrumen", "src/main/data/2jobs2min-rumen-jh.json", + "-tracetype", traceType, "-tracelocation", traceLocation, "-output", slsOutputDir.getAbsolutePath()}; SLSRunner.main(args); Index: hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java (revision 4a8e3045027036afebbcb80f23b7a2886e56c255) +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java (revision ) @@ -148,4 +148,13 @@ } return nodeSet; } + + public static Set generateNodesFromSynth( + int numNodes, int nodesPerRack) { + Set nodeSet = new HashSet(); + for (int i = 0; i < numNodes; i++) { + nodeSet.add("/rack" + i % nodesPerRack + "/node-" + i); + } + return nodeSet; + } } Index: hadoop-tools/hadoop-sls/src/test/resources/sls-runner.xml IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-tools/hadoop-sls/src/test/resources/sls-runner.xml (revision 4a8e3045027036afebbcb80f23b7a2886e56c255) +++ hadoop-tools/hadoop-sls/src/test/resources/sls-runner.xml (revision ) @@ -25,11 +25,11 @@ yarn.sls.nm.memory.mb - 10240 + 100240 yarn.sls.nm.vcores - 10 + 100 yarn.sls.nm.heartbeat.interval.ms @@ -77,5 +77,11 @@ org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler org.apache.hadoop.yarn.sls.scheduler.CapacitySchedulerMetrics - + + + + yarn.sls.enable.reservation-system + true + + Index: hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java (revision 4a8e3045027036afebbcb80f23b7a2886e56c255) +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java (revision ) @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.sls.appmaster; import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; import java.text.MessageFormat; @@ -37,6 +38,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords .FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; @@ -55,6 +57,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -62,10 +65,6 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.log4j.Logger; @@ -120,16 +119,18 @@ private final static int MR_AM_CONTAINER_RESOURCE_MEMORY_MB = 1024; private final static int MR_AM_CONTAINER_RESOURCE_VCORES = 1; + private ReservationSubmissionRequest reservationRequest; + public AMSimulator() { this.responseQueue = new LinkedBlockingQueue<>(); } - public void init(int id, int heartbeatInterval, + public void init(int id, int heartbeatInterval, List containerList, ResourceManager rm, SLSRunner se, - long traceStartTime, long traceFinishTime, String user, String queue, - boolean isTracked, String oldAppId) { + long traceStartTime, long traceFinishTime, String user, String queue, + boolean isTracked, String oldAppId, ReservationSubmissionRequest rr) { super.init(traceStartTime, traceStartTime + 1000000L * heartbeatInterval, - heartbeatInterval); + heartbeatInterval); this.user = user; this.rm = rm; this.se = se; @@ -139,6 +140,7 @@ this.isTracked = isTracked; this.traceStartTimeMS = traceStartTime; this.traceFinishTimeMS = traceFinishTime; + this.reservationRequest = rr; } /** @@ -149,8 +151,19 @@ simulateStartTimeMS = System.currentTimeMillis() - SLSRunner.getRunner().getStartTimeMS(); + ReservationId reservationId = null; + + // submit a reservation if one is required, exceptions naturally happen + // when the reservation does not fit, catch, log, and move on running job + // without reservation. + try { + reservationId = submitReservation(); + } catch (UndeclaredThrowableException y) { + LOG.warn("Unable to place reservation: " + y.getMessage()); + } + // submit application, waiting until ACCEPTED - submitApp(); + submitApp(reservationId); // track app metrics trackApp(); @@ -164,6 +177,26 @@ isAMContainerRunning = true; } + private ReservationId submitReservation() + throws IOException, InterruptedException { + if (reservationRequest != null) { + UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user); + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws YarnException, IOException { + rm.getClientRMService().submitReservation(reservationRequest); + LOG.info("RESERVATION SUCCESSFULLY SUBMITTED " + + reservationRequest.getReservationId()); + return null; + + } + }); + return reservationRequest.getReservationId(); + } else { + return null; + } + } + @Override public void middleStep() throws Exception { if (isAMContainerRunning) { @@ -224,8 +257,8 @@ SLSRunner.getRunner().getStartTimeMS(); // record job running information ((SchedulerWrapper)rm.getResourceScheduler()) - .addAMRuntime(appId, - traceStartTimeMS, traceFinishTimeMS, + .addAMRuntime(appId, + traceStartTimeMS, traceFinishTimeMS, simulateStartTimeMS, simulateFinishTimeMS); } @@ -262,7 +295,7 @@ protected abstract void checkStop(); - private void submitApp() + private void submitApp(ReservationId reservationId) throws YarnException, InterruptedException, IOException { // ask for new application GetNewApplicationRequest newAppRequest = @@ -292,6 +325,11 @@ appSubContext.setResource(Resources .createResource(MR_AM_CONTAINER_RESOURCE_MEMORY_MB, MR_AM_CONTAINER_RESOURCE_VCORES)); + + if(reservationId != null) { + appSubContext.setReservationID(reservationId); + } + subAppRequest.setApplicationSubmissionContext(appSubContext); UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user); ugi.doAs(new PrivilegedExceptionAction() { Index: hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java (revision 4a8e3045027036afebbcb80f23b7a2886e56c255) +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java (revision ) @@ -27,13 +27,13 @@ import java.util.List; import java.util.Map; -import org.apache.avro.Protocol; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -42,7 +42,6 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; -import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; import org.apache.hadoop.yarn.sls.SLSRunner; @@ -117,10 +116,10 @@ public void init(int id, int heartbeatInterval, List containerList, ResourceManager rm, SLSRunner se, long traceStartTime, long traceFinishTime, String user, String queue, - boolean isTracked, String oldAppId) { + boolean isTracked, String oldAppId, ReservationSubmissionRequest rr) { super.init(id, heartbeatInterval, containerList, rm, se, traceStartTime, traceFinishTime, user, queue, - isTracked, oldAppId); + isTracked, oldAppId, rr); amtype = "mapreduce"; // get map/reduce tasks