diff --git a/hadoop-tools/hadoop-sls/pom.xml b/hadoop-tools/hadoop-sls/pom.xml
index ce9197cd21d..0144d15e89d 100644
--- a/hadoop-tools/hadoop-sls/pom.xml
+++ b/hadoop-tools/hadoop-sls/pom.xml
@@ -73,6 +73,11 @@
com.fasterxml.jackson.core
jackson-databind
+
+ org.mockito
+ mockito-core
+ test
+
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinition.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinition.java
new file mode 100644
index 00000000000..34dc00f5146
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinition.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
+import org.apache.hadoop.yarn.sls.utils.SLSUtils;
+
+import java.util.List;
+
+public abstract class AMDefinition {
+ protected int jobCount;
+ protected String amType;
+ protected String user;
+ protected String queue;
+ protected long jobStartTime;
+ protected long jobFinishTime;
+ protected List taskContainers;
+ protected Resource amResource;
+ protected String labelExpression;
+ protected String oldAppId;
+
+ public AMDefinition(AmDefinitionBuilder builder) {
+ this.jobStartTime = builder.jobStartTime;
+ this.jobFinishTime = builder.jobFinishTime;
+ this.amType = builder.amType;
+ this.taskContainers = builder.taskContainers;
+ this.labelExpression = builder.labelExpression;
+ this.user = builder.user;
+ this.amResource = builder.amResource;
+ this.queue = builder.queue;
+ this.jobCount = builder.jobCount;
+ }
+
+ public String getAmType() {
+ return amType;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public String getOldAppId() {
+ return oldAppId;
+ }
+
+ public long getJobStartTime() {
+ return jobStartTime;
+ }
+
+ public long getJobFinishTime() {
+ return jobFinishTime;
+ }
+
+ public List getTaskContainers() {
+ return taskContainers;
+ }
+
+ public Resource getAmResource() {
+ return amResource;
+ }
+
+ public String getLabelExpression() {
+ return labelExpression;
+ }
+
+ public String getQueue() {
+ return queue;
+ }
+
+ public int getJobCount() {
+ return jobCount;
+ }
+
+
+ public abstract static class AmDefinitionBuilder {
+ private static final String DEFAULT_USER = "default";
+
+ protected int jobCount = 1;
+ protected String amType = AMDefinitionFactory.DEFAULT_JOB_TYPE;
+ protected String user = DEFAULT_USER;
+ protected String queue;
+ protected String jobId;
+ protected long jobStartTime;
+ protected long jobFinishTime;
+ protected List taskContainers;
+ protected Resource amResource;
+ protected String labelExpression = null;
+
+ }
+}
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionFactory.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionFactory.java
new file mode 100644
index 00000000000..0462b02ed9a
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionFactory.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls;
+
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.tools.rumen.LoggedJob;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
+import org.apache.hadoop.yarn.sls.synthetic.SynthJob;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class AMDefinitionFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(AMDefinitionFactory.class);
+ public final static String DEFAULT_JOB_TYPE = "mapreduce";
+
+ public static AMDefinitionSLS createFromSlsTrace(Map jsonJob, SLSRunner slsRunner) throws YarnException {
+ AMDefinitionSLS amDefinition = AMDefinitionSLS.Builder.create(jsonJob)
+ .withAmType(SLSConfiguration.AM_TYPE)
+ .withAmResource(getAMContainerResourceSLS(jsonJob, slsRunner))
+ .withTaskContainers(
+ AMDefinitionSLS.getTaskContainers(jsonJob, slsRunner))
+ .withJobStartTime(SLSConfiguration.JOB_START_MS)
+ .withJobFinishTime(SLSConfiguration.JOB_END_MS)
+ .withLabelExpression(SLSConfiguration.JOB_LABEL_EXPR)
+ .withUser(SLSConfiguration.JOB_USER)
+ .withQueue(SLSConfiguration.JOB_QUEUE_NAME)
+ .withJobId(SLSConfiguration.JOB_ID)
+ .withJobCount(SLSConfiguration.JOB_COUNT)
+ .build();
+ slsRunner.increaseQueueAppNum(amDefinition.getQueue());
+ return amDefinition;
+ }
+
+ public static AMDefinitionRumen createFromRumenTrace(LoggedJob job,
+ long baselineTimeMs, SLSRunner slsRunner) throws YarnException {
+ AMDefinitionRumen amDefinition = AMDefinitionRumen.Builder.create()
+ .withAmType(DEFAULT_JOB_TYPE)
+ .withAmResource(getAMContainerResourceSynthAndRumen(slsRunner))
+ .withTaskContainers(
+ AMDefinitionRumen.getTaskContainers(job, slsRunner))
+ .withJobStartTime(job.getSubmitTime())
+ .withJobFinishTime(job.getFinishTime())
+ .withBaseLineTimeMs(baselineTimeMs)
+ .withUser(job.getUser())
+ .withQueue(job.getQueue().getValue())
+ .withJobId(job.getJobID().toString())
+ .build();
+ slsRunner.increaseQueueAppNum(amDefinition.getQueue());
+ return amDefinition;
+ }
+
+ public static AMDefinitionSynth createFromSynth(SynthJob job, SLSRunner slsRunner) throws YarnException {
+ AMDefinitionSynth amDefinition =
+ AMDefinitionSynth.Builder.create()
+ .withAmType(job.getType())
+ .withAmResource(getAMContainerResourceSynthAndRumen(slsRunner))
+ .withTaskContainers(
+ AMDefinitionSynth.getTaskContainers(job, slsRunner))
+ .withUser(job.getUser())
+ .withQueue(job.getQueueName())
+ .withJobId(job.getJobID().toString())
+ .withJobStartTime(job.getSubmissionTime())
+ .withJobFinishTime(job.getSubmissionTime() + job.getDuration())
+ .withBaseLineTimeMs(0)
+ .build();
+
+ slsRunner.increaseQueueAppNum(amDefinition.getQueue());
+ return amDefinition;
+ }
+
+ private static Resource getAMContainerResourceSLS(Map jsonJob, Configured configured) {
+ Resource amContainerResource =
+ SLSConfiguration.getAMContainerResource(configured.getConf());
+ if (jsonJob == null) {
+ return amContainerResource;
+ }
+
+ ResourceInformation[] infors = ResourceUtils.getResourceTypesArray();
+ for (ResourceInformation info : infors) {
+ String key = SLSConfiguration.JOB_AM_PREFIX + info.getName();
+ if (jsonJob.containsKey(key)) {
+ long value = Long.parseLong(jsonJob.get(key).toString());
+ amContainerResource.setResourceValue(info.getName(), value);
+ }
+ }
+
+ return amContainerResource;
+ }
+
+ private static Resource getAMContainerResourceSynthAndRumen(Configured configured) {
+ return SLSConfiguration.getAMContainerResource(configured.getConf());
+ }
+
+ static void adjustTimeValuesToBaselineTime(AMDefinition amDef,
+ AMDefinition.AmDefinitionBuilder builder, long baselineTimeMs) {
+ builder.jobStartTime -= baselineTimeMs;
+ builder.jobFinishTime -= baselineTimeMs;
+ if (builder.jobStartTime < 0) {
+ LOG.warn("Warning: reset job {} start time to 0.", amDef.getOldAppId());
+ builder.jobFinishTime = builder.jobFinishTime - builder.jobStartTime;
+ builder.jobStartTime = 0;
+ }
+ amDef.jobStartTime = builder.jobStartTime;
+ }
+
+}
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionRumen.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionRumen.java
new file mode 100644
index 00000000000..a36ca9ad796
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionRumen.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls;
+
+import org.apache.hadoop.tools.rumen.LoggedJob;
+import org.apache.hadoop.tools.rumen.LoggedTask;
+import org.apache.hadoop.tools.rumen.LoggedTaskAttempt;
+import org.apache.hadoop.tools.rumen.datatypes.UserName;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.hadoop.yarn.sls.AMDefinitionFactory.adjustTimeValuesToBaselineTime;
+
+public class AMDefinitionRumen extends AMDefinition {
+ public final static int DEFAULT_MAPPER_PRIORITY = 20;
+ private final static int DEFAULT_REDUCER_PRIORITY = 10;
+
+ public AMDefinitionRumen(AmDefinitionBuilder builder) {
+ super(builder);
+ }
+
+ public static List getTaskContainers(LoggedJob job, SLSRunner slsRunner) throws YarnException {
+ List containerList = new ArrayList<>();
+
+ TaskContainerDefinition.Builder builder =
+ TaskContainerDefinition.Builder.create()
+ .withCount(1)
+ .withResource(slsRunner.getDefaultContainerResource())
+ .withExecutionType(ExecutionType.GUARANTEED)
+ .withAllocationId(-1)
+ .withRequestDelay(0);
+
+ // mapper
+ for (LoggedTask mapTask : job.getMapTasks()) {
+ if (mapTask.getAttempts().size() == 0) {
+ throw new YarnException("Invalid map task, no attempt for a mapper!");
+ }
+ LoggedTaskAttempt taskAttempt =
+ mapTask.getAttempts().get(mapTask.getAttempts().size() - 1);
+ TaskContainerDefinition containerDef = builder
+ .withHostname(taskAttempt.getHostName().getValue())
+ .withDuration(taskAttempt.getFinishTime() -
+ taskAttempt.getStartTime())
+ .withPriority(DEFAULT_MAPPER_PRIORITY)
+ .withType("map")
+ .build();
+ containerList.add(
+ ContainerSimulator.createFromTaskContainerDefinition(containerDef));
+ }
+
+ // reducer
+ for (LoggedTask reduceTask : job.getReduceTasks()) {
+ if (reduceTask.getAttempts().size() == 0) {
+ throw new YarnException(
+ "Invalid reduce task, no attempt for a reducer!");
+ }
+ LoggedTaskAttempt taskAttempt =
+ reduceTask.getAttempts().get(reduceTask.getAttempts().size() - 1);
+ TaskContainerDefinition containerDef = builder
+ .withHostname(taskAttempt.getHostName().getValue())
+ .withDuration(taskAttempt.getFinishTime() -
+ taskAttempt.getStartTime())
+ .withPriority(DEFAULT_REDUCER_PRIORITY)
+ .withType("reduce")
+ .build();
+ containerList.add(
+ ContainerSimulator.createFromTaskContainerDefinition(containerDef));
+ }
+
+ return containerList;
+ }
+
+
+ public static final class Builder extends AmDefinitionBuilder {
+ private long baselineTimeMs;
+
+ private Builder() {
+ }
+
+ public static Builder create() {
+ return new Builder();
+ }
+
+ public Builder withAmType(String amType) {
+ this.amType = amType;
+ return this;
+ }
+
+ public Builder withUser(UserName user) {
+ if (user != null) {
+ this.user = user.getValue();
+ }
+ return this;
+ }
+
+ public Builder withQueue(String queue) {
+ this.queue = queue;
+ return this;
+ }
+
+ public Builder withJobId(String oldJobId) {
+ this.jobId = oldJobId;
+ return this;
+ }
+
+ public Builder withJobStartTime(long time) {
+ this.jobStartTime = time;
+ return this;
+ }
+
+ public Builder withJobFinishTime(long time) {
+ this.jobFinishTime = time;
+ return this;
+ }
+
+ public Builder withBaseLineTimeMs(long baselineTimeMs) {
+ this.baselineTimeMs = baselineTimeMs;
+ return this;
+ }
+
+ public Builder withLabelExpression(String expr) {
+ this.labelExpression = expr;
+ return this;
+ }
+
+ public AMDefinitionRumen.Builder withTaskContainers(List taskContainers) {
+ this.taskContainers = taskContainers;
+ return this;
+ }
+
+ public AMDefinitionRumen.Builder withAmResource(Resource amResource) {
+ this.amResource = amResource;
+ return this;
+ }
+
+ public AMDefinitionRumen build() {
+ AMDefinitionRumen amDef = new AMDefinitionRumen(this);
+
+ if (baselineTimeMs == 0) {
+ baselineTimeMs = jobStartTime;
+ }
+ adjustTimeValuesToBaselineTime(amDef, this, baselineTimeMs);
+ return amDef;
+ }
+
+ }
+
+}
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionSLS.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionSLS.java
new file mode 100644
index 00000000000..d18266017f0
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionSLS.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
+import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class AMDefinitionSLS extends AMDefinition {
+ public AMDefinitionSLS(AmDefinitionBuilder builder) {
+ super(builder);
+ }
+
+ public String getQueue() {
+ return queue;
+ }
+
+ public static List getTaskContainers(Map jsonJob,
+ SLSRunner slsRunner) throws YarnException {
+ List tasks = (List) jsonJob.get(SLSConfiguration.JOB_TASKS);
+ if (tasks == null || tasks.size() == 0) {
+ throw new YarnException("No task for the job!");
+ }
+
+ List containers = new ArrayList<>();
+ for (Object o : tasks) {
+ Map jsonTask = (Map) o;
+ TaskContainerDefinition containerDef =
+ TaskContainerDefinition.Builder.create()
+ .withCount(jsonTask, SLSConfiguration.COUNT)
+ .withHostname((String) jsonTask.get(SLSConfiguration.TASK_HOST))
+ .withDuration(jsonTask, SLSConfiguration.TASK_DURATION_MS)
+ .withDurationLegacy(jsonTask, SLSConfiguration.DURATION_MS)
+ .withTaskStart(jsonTask, SLSConfiguration.TASK_START_MS)
+ .withTaskFinish(jsonTask, SLSConfiguration.TASK_END_MS)
+ .withResource(getResourceForContainer(jsonTask, slsRunner))
+ .withPriority(jsonTask, SLSConfiguration.TASK_PRIORITY)
+ .withType(jsonTask, SLSConfiguration.TASK_TYPE)
+ .withExecutionType(jsonTask, SLSConfiguration.TASK_EXECUTION_TYPE)
+ .withAllocationId(jsonTask, SLSConfiguration.TASK_ALLOCATION_ID)
+ .withRequestDelay(jsonTask, SLSConfiguration.TASK_REQUEST_DELAY)
+ .build();
+
+ for (int i = 0; i < containerDef.getCount(); i++) {
+ containers.add(ContainerSimulator.createFromTaskContainerDefinition(containerDef));
+ }
+ }
+ return containers;
+ }
+
+ private static Resource getResourceForContainer(Map jsonTask,
+ SLSRunner slsRunner) {
+ Resource res = slsRunner.getDefaultContainerResource();
+ ResourceInformation[] infors = ResourceUtils.getResourceTypesArray();
+ for (ResourceInformation info : infors) {
+ if (jsonTask.containsKey(SLSConfiguration.TASK_PREFIX + info.getName())) {
+ long value = Long.parseLong(
+ jsonTask.get(SLSConfiguration.TASK_PREFIX + info.getName())
+ .toString());
+ res.setResourceValue(info.getName(), value);
+ }
+ }
+ return res;
+ }
+
+ public static final class Builder extends AmDefinitionBuilder {
+ private Map jsonJob;
+
+ private Builder(Map jsonJob) {
+ this.jsonJob = jsonJob;
+ }
+
+ public static Builder create(Map jsonJob) {
+ return new Builder(jsonJob);
+ }
+
+ public Builder withAmType(String key) {
+ if (jsonJob.containsKey(key)) {
+ String amType = (String) jsonJob.get(key);
+ if (amType != null) {
+ this.amType = amType;
+ }
+ }
+ return this;
+ }
+
+ public Builder withUser(String key) {
+ if (jsonJob.containsKey(key)) {
+ String user = (String) jsonJob.get(key);
+ if (user != null) {
+ this.user = user;
+ }
+ }
+ return this;
+ }
+
+ public Builder withQueue(String key) {
+ if (jsonJob.containsKey(key)) {
+ this.queue = jsonJob.get(key).toString();
+ }
+ return this;
+ }
+
+ public Builder withJobId(String key) {
+ if (jsonJob.containsKey(key)) {
+ this.jobId = (String) jsonJob.get(key);
+ }
+ return this;
+ }
+
+ public Builder withJobCount(String key) {
+ if (jsonJob.containsKey(key)) {
+ jobCount = Integer.parseInt(jsonJob.get(key).toString());
+ jobCount = Math.max(jobCount, 1);
+ }
+ return this;
+ }
+
+ public Builder withJobStartTime(String key) {
+ if (jsonJob.containsKey(key)) {
+ this.jobStartTime = Long.parseLong(jsonJob.get(key).toString());
+ }
+ return this;
+ }
+
+ public Builder withJobFinishTime(String key) {
+ if (jsonJob.containsKey(key)) {
+ this.jobFinishTime = Long.parseLong(jsonJob.get(key).toString());
+ }
+ return this;
+ }
+
+ public Builder withLabelExpression(String key) {
+ if (jsonJob.containsKey(key)) {
+ this.labelExpression = jsonJob.get(key).toString();
+ }
+ return this;
+ }
+
+ public AMDefinitionSLS.Builder withTaskContainers(List taskContainers) {
+ this.taskContainers = taskContainers;
+ return this;
+ }
+
+ public AMDefinitionSLS.Builder withAmResource(Resource amResource) {
+ this.amResource = amResource;
+ return this;
+ }
+
+ public AMDefinitionSLS build() {
+ AMDefinitionSLS amDef = new AMDefinitionSLS(this);
+ // Job id is generated automatically if this job configuration allows
+ // multiple job instances
+ if (jobCount > 1) {
+ amDef.oldAppId = null;
+ } else {
+ amDef.oldAppId = jobId;
+ }
+ amDef.jobCount = jobCount;
+ return amDef;
+ }
+ }
+
+}
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionSynth.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionSynth.java
new file mode 100644
index 00000000000..499692a5929
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionSynth.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls;
+
+import static org.apache.hadoop.yarn.sls.AMDefinitionFactory.adjustTimeValuesToBaselineTime;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
+import org.apache.hadoop.yarn.sls.synthetic.SynthJob;
+
+public class AMDefinitionSynth extends AMDefinition {
+ public AMDefinitionSynth(AmDefinitionBuilder builder) {
+ super(builder);
+ }
+
+ public static List getTaskContainers(
+ SynthJob job, SLSRunner slsRunner) throws YarnException {
+ List containerList = new ArrayList<>();
+ ArrayList keyAsArray = new ArrayList<>(slsRunner.getNmMap().keySet());
+ Random rand = new Random(slsRunner.getStjp().getSeed());
+
+ for (SynthJob.SynthTask task : job.getTasks()) {
+ RMNode node = getRandomNode(slsRunner, keyAsArray, rand);
+ TaskContainerDefinition containerDef =
+ TaskContainerDefinition.Builder.create()
+ .withCount(1)
+ .withHostname("/" + node.getRackName() + "/" + node.getHostName())
+ .withDuration(task.getTime())
+ .withResource(Resource
+ .newInstance((int) task.getMemory(), (int) task.getVcores()))
+ .withPriority(task.getPriority())
+ .withType(task.getType())
+ .withExecutionType(task.getExecutionType())
+ .withAllocationId(-1)
+ .withRequestDelay(0)
+ .build();
+ containerList.add(
+ ContainerSimulator.createFromTaskContainerDefinition(containerDef));
+ }
+
+ return containerList;
+ }
+
+ private static RMNode getRandomNode(SLSRunner slsRunner,
+ ArrayList keyAsArray, Random rand) {
+ int randomIndex = rand.nextInt(keyAsArray.size());
+ return slsRunner.getNmMap().get(keyAsArray.get(randomIndex)).getNode();
+ }
+
+ public static final class Builder extends AmDefinitionBuilder {
+ private long baselineTimeMs;
+
+ private Builder() {
+ }
+
+ public static Builder create() {
+ return new Builder();
+ }
+
+ public Builder withAmType(String amType) {
+ this.amType = amType;
+ return this;
+ }
+
+ public Builder withUser(String user) {
+ if (user != null) {
+ this.user = user;
+ }
+ return this;
+ }
+
+ public Builder withQueue(String queue) {
+ this.queue = queue;
+ return this;
+ }
+
+ public Builder withJobId(String oldJobId) {
+ this.jobId = oldJobId;
+ return this;
+ }
+
+ public Builder withJobStartTime(long time) {
+ this.jobStartTime = time;
+ return this;
+ }
+
+ public Builder withJobFinishTime(long time) {
+ this.jobFinishTime = time;
+ return this;
+ }
+
+ public Builder withBaseLineTimeMs(long baselineTimeMs) {
+ this.baselineTimeMs = baselineTimeMs;
+ return this;
+ }
+
+ public AMDefinitionSynth.Builder withLabelExpression(String expr) {
+ this.labelExpression = expr;
+ return this;
+ }
+
+ public AMDefinitionSynth.Builder withTaskContainers(List taskContainers) {
+ this.taskContainers = taskContainers;
+ return this;
+ }
+
+ public AMDefinitionSynth.Builder withAmResource(Resource amResource) {
+ this.amResource = amResource;
+ return this;
+ }
+
+ public AMDefinitionSynth build() {
+ AMDefinitionSynth amDef = new AMDefinitionSynth(this);
+
+ if (baselineTimeMs == 0) {
+ baselineTimeMs = jobStartTime;
+ }
+ adjustTimeValuesToBaselineTime(amDef, this, baselineTimeMs);
+ return amDef;
+ }
+ }
+
+}
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMRunner.java
new file mode 100644
index 00000000000..e300e43c74e
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMRunner.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.tools.rumen.JobTraceReader;
+import org.apache.hadoop.tools.rumen.LoggedJob;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.sls.SLSRunner.TraceType;
+import org.apache.hadoop.yarn.sls.appmaster.AMSimulator;
+import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
+import org.apache.hadoop.yarn.sls.scheduler.TaskRunner;
+import org.apache.hadoop.yarn.sls.synthetic.SynthJob;
+import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer;
+import org.apache.hadoop.yarn.util.UTCClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.hadoop.yarn.sls.SLSRunner.TraceType.RUMEN;
+import static org.apache.hadoop.yarn.sls.SLSRunner.TraceType.SLS;
+import static org.apache.hadoop.yarn.sls.SLSRunner.TraceType.SYNTH;
+
+public class AMRunner {
+ private static final Logger LOG = LoggerFactory.getLogger(AMRunner.class);
+ static int REMAINING_APPS = 0;
+
+ private final Configuration conf;
+ private int AM_ID;
+ private Map amMap;
+ private Map appIdAMSim;
+ private Set trackedApps;
+ private Map amClassMap;
+ private TraceType inputType;
+ private String[] inputTraces;
+ private SynthTraceJobProducer stjp;
+ private TaskRunner runner;
+ private SLSRunner slsRunner;
+ private int numAMs, numTasks;
+ private long maxRuntime;
+ private ResourceManager rm;
+
+ public AMRunner(TaskRunner runner, SLSRunner slsRunner,
+ SynthTraceJobProducer stjp) {
+ this.runner = runner;
+ this.slsRunner = slsRunner;
+ this.conf = slsRunner.getConf();
+ this.stjp = stjp;
+ }
+
+
+ public void init(Configuration conf) throws ClassNotFoundException {
+ amMap = new ConcurrentHashMap<>();
+ amClassMap = new HashMap<>();
+ appIdAMSim = new ConcurrentHashMap<>();
+ // map
+ for (Map.Entry e : conf) {
+ String key = e.getKey().toString();
+ if (key.startsWith(SLSConfiguration.AM_TYPE_PREFIX)) {
+ String amType = key.substring(SLSConfiguration.AM_TYPE_PREFIX.length());
+ amClassMap.put(amType, Class.forName(conf.get(key)));
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public void startAM() throws YarnException, IOException {
+ switch (inputType) {
+ case SLS:
+ for (String inputTrace : inputTraces) {
+ startAMFromSLSTrace(inputTrace);
+ }
+ break;
+ case RUMEN:
+ long baselineTimeMS = 0;
+ for (String inputTrace : inputTraces) {
+ startAMFromRumenTrace(inputTrace, baselineTimeMS);
+ }
+ break;
+ case SYNTH:
+ startAMFromSynthGenerator();
+ break;
+ default:
+ throw new YarnException("Input configuration not recognized, "
+ + "trace type should be SLS, RUMEN, or SYNTH");
+ }
+
+ numAMs = amMap.size();
+ REMAINING_APPS = numAMs;
+ }
+
+ /**
+ * Parse workload from a SLS trace file.
+ */
+ @SuppressWarnings("unchecked")
+ private void startAMFromSLSTrace(String inputTrace) throws IOException {
+ JsonFactory jsonF = new JsonFactory();
+ ObjectMapper mapper = new ObjectMapper();
+
+ try (Reader input = new InputStreamReader(
+ new FileInputStream(inputTrace), "UTF-8")) {
+ Iterator