diff --git a/hadoop-tools/hadoop-sls/pom.xml b/hadoop-tools/hadoop-sls/pom.xml index ce9197cd21d..0144d15e89d 100644 --- a/hadoop-tools/hadoop-sls/pom.xml +++ b/hadoop-tools/hadoop-sls/pom.xml @@ -73,6 +73,11 @@ com.fasterxml.jackson.core jackson-databind + + org.mockito + mockito-core + test + diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinition.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinition.java new file mode 100644 index 00000000000..34dc00f5146 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinition.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sls; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; +import org.apache.hadoop.yarn.sls.utils.SLSUtils; + +import java.util.List; + +public abstract class AMDefinition { + protected int jobCount; + protected String amType; + protected String user; + protected String queue; + protected long jobStartTime; + protected long jobFinishTime; + protected List taskContainers; + protected Resource amResource; + protected String labelExpression; + protected String oldAppId; + + public AMDefinition(AmDefinitionBuilder builder) { + this.jobStartTime = builder.jobStartTime; + this.jobFinishTime = builder.jobFinishTime; + this.amType = builder.amType; + this.taskContainers = builder.taskContainers; + this.labelExpression = builder.labelExpression; + this.user = builder.user; + this.amResource = builder.amResource; + this.queue = builder.queue; + this.jobCount = builder.jobCount; + } + + public String getAmType() { + return amType; + } + + public String getUser() { + return user; + } + + public String getOldAppId() { + return oldAppId; + } + + public long getJobStartTime() { + return jobStartTime; + } + + public long getJobFinishTime() { + return jobFinishTime; + } + + public List getTaskContainers() { + return taskContainers; + } + + public Resource getAmResource() { + return amResource; + } + + public String getLabelExpression() { + return labelExpression; + } + + public String getQueue() { + return queue; + } + + public int getJobCount() { + return jobCount; + } + + + public abstract static class AmDefinitionBuilder { + private static final String DEFAULT_USER = "default"; + + protected int jobCount = 1; + protected String amType = AMDefinitionFactory.DEFAULT_JOB_TYPE; + protected String user = DEFAULT_USER; + protected String queue; + protected String jobId; + protected long jobStartTime; + protected long jobFinishTime; + protected List taskContainers; + protected Resource amResource; + protected String labelExpression = null; + + } +} diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionFactory.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionFactory.java new file mode 100644 index 00000000000..0462b02ed9a --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionFactory.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sls; + +import java.util.Map; + +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.tools.rumen.LoggedJob; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; +import org.apache.hadoop.yarn.sls.synthetic.SynthJob; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class AMDefinitionFactory { + private static final Logger LOG = LoggerFactory.getLogger(AMDefinitionFactory.class); + public final static String DEFAULT_JOB_TYPE = "mapreduce"; + + public static AMDefinitionSLS createFromSlsTrace(Map jsonJob, SLSRunner slsRunner) throws YarnException { + AMDefinitionSLS amDefinition = AMDefinitionSLS.Builder.create(jsonJob) + .withAmType(SLSConfiguration.AM_TYPE) + .withAmResource(getAMContainerResourceSLS(jsonJob, slsRunner)) + .withTaskContainers( + AMDefinitionSLS.getTaskContainers(jsonJob, slsRunner)) + .withJobStartTime(SLSConfiguration.JOB_START_MS) + .withJobFinishTime(SLSConfiguration.JOB_END_MS) + .withLabelExpression(SLSConfiguration.JOB_LABEL_EXPR) + .withUser(SLSConfiguration.JOB_USER) + .withQueue(SLSConfiguration.JOB_QUEUE_NAME) + .withJobId(SLSConfiguration.JOB_ID) + .withJobCount(SLSConfiguration.JOB_COUNT) + .build(); + slsRunner.increaseQueueAppNum(amDefinition.getQueue()); + return amDefinition; + } + + public static AMDefinitionRumen createFromRumenTrace(LoggedJob job, + long baselineTimeMs, SLSRunner slsRunner) throws YarnException { + AMDefinitionRumen amDefinition = AMDefinitionRumen.Builder.create() + .withAmType(DEFAULT_JOB_TYPE) + .withAmResource(getAMContainerResourceSynthAndRumen(slsRunner)) + .withTaskContainers( + AMDefinitionRumen.getTaskContainers(job, slsRunner)) + .withJobStartTime(job.getSubmitTime()) + .withJobFinishTime(job.getFinishTime()) + .withBaseLineTimeMs(baselineTimeMs) + .withUser(job.getUser()) + .withQueue(job.getQueue().getValue()) + .withJobId(job.getJobID().toString()) + .build(); + slsRunner.increaseQueueAppNum(amDefinition.getQueue()); + return amDefinition; + } + + public static AMDefinitionSynth createFromSynth(SynthJob job, SLSRunner slsRunner) throws YarnException { + AMDefinitionSynth amDefinition = + AMDefinitionSynth.Builder.create() + .withAmType(job.getType()) + .withAmResource(getAMContainerResourceSynthAndRumen(slsRunner)) + .withTaskContainers( + AMDefinitionSynth.getTaskContainers(job, slsRunner)) + .withUser(job.getUser()) + .withQueue(job.getQueueName()) + .withJobId(job.getJobID().toString()) + .withJobStartTime(job.getSubmissionTime()) + .withJobFinishTime(job.getSubmissionTime() + job.getDuration()) + .withBaseLineTimeMs(0) + .build(); + + slsRunner.increaseQueueAppNum(amDefinition.getQueue()); + return amDefinition; + } + + private static Resource getAMContainerResourceSLS(Map jsonJob, Configured configured) { + Resource amContainerResource = + SLSConfiguration.getAMContainerResource(configured.getConf()); + if (jsonJob == null) { + return amContainerResource; + } + + ResourceInformation[] infors = ResourceUtils.getResourceTypesArray(); + for (ResourceInformation info : infors) { + String key = SLSConfiguration.JOB_AM_PREFIX + info.getName(); + if (jsonJob.containsKey(key)) { + long value = Long.parseLong(jsonJob.get(key).toString()); + amContainerResource.setResourceValue(info.getName(), value); + } + } + + return amContainerResource; + } + + private static Resource getAMContainerResourceSynthAndRumen(Configured configured) { + return SLSConfiguration.getAMContainerResource(configured.getConf()); + } + + static void adjustTimeValuesToBaselineTime(AMDefinition amDef, + AMDefinition.AmDefinitionBuilder builder, long baselineTimeMs) { + builder.jobStartTime -= baselineTimeMs; + builder.jobFinishTime -= baselineTimeMs; + if (builder.jobStartTime < 0) { + LOG.warn("Warning: reset job {} start time to 0.", amDef.getOldAppId()); + builder.jobFinishTime = builder.jobFinishTime - builder.jobStartTime; + builder.jobStartTime = 0; + } + amDef.jobStartTime = builder.jobStartTime; + } + +} diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionRumen.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionRumen.java new file mode 100644 index 00000000000..a36ca9ad796 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionRumen.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sls; + +import org.apache.hadoop.tools.rumen.LoggedJob; +import org.apache.hadoop.tools.rumen.LoggedTask; +import org.apache.hadoop.tools.rumen.LoggedTaskAttempt; +import org.apache.hadoop.tools.rumen.datatypes.UserName; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.hadoop.yarn.sls.AMDefinitionFactory.adjustTimeValuesToBaselineTime; + +public class AMDefinitionRumen extends AMDefinition { + public final static int DEFAULT_MAPPER_PRIORITY = 20; + private final static int DEFAULT_REDUCER_PRIORITY = 10; + + public AMDefinitionRumen(AmDefinitionBuilder builder) { + super(builder); + } + + public static List getTaskContainers(LoggedJob job, SLSRunner slsRunner) throws YarnException { + List containerList = new ArrayList<>(); + + TaskContainerDefinition.Builder builder = + TaskContainerDefinition.Builder.create() + .withCount(1) + .withResource(slsRunner.getDefaultContainerResource()) + .withExecutionType(ExecutionType.GUARANTEED) + .withAllocationId(-1) + .withRequestDelay(0); + + // mapper + for (LoggedTask mapTask : job.getMapTasks()) { + if (mapTask.getAttempts().size() == 0) { + throw new YarnException("Invalid map task, no attempt for a mapper!"); + } + LoggedTaskAttempt taskAttempt = + mapTask.getAttempts().get(mapTask.getAttempts().size() - 1); + TaskContainerDefinition containerDef = builder + .withHostname(taskAttempt.getHostName().getValue()) + .withDuration(taskAttempt.getFinishTime() - + taskAttempt.getStartTime()) + .withPriority(DEFAULT_MAPPER_PRIORITY) + .withType("map") + .build(); + containerList.add( + ContainerSimulator.createFromTaskContainerDefinition(containerDef)); + } + + // reducer + for (LoggedTask reduceTask : job.getReduceTasks()) { + if (reduceTask.getAttempts().size() == 0) { + throw new YarnException( + "Invalid reduce task, no attempt for a reducer!"); + } + LoggedTaskAttempt taskAttempt = + reduceTask.getAttempts().get(reduceTask.getAttempts().size() - 1); + TaskContainerDefinition containerDef = builder + .withHostname(taskAttempt.getHostName().getValue()) + .withDuration(taskAttempt.getFinishTime() - + taskAttempt.getStartTime()) + .withPriority(DEFAULT_REDUCER_PRIORITY) + .withType("reduce") + .build(); + containerList.add( + ContainerSimulator.createFromTaskContainerDefinition(containerDef)); + } + + return containerList; + } + + + public static final class Builder extends AmDefinitionBuilder { + private long baselineTimeMs; + + private Builder() { + } + + public static Builder create() { + return new Builder(); + } + + public Builder withAmType(String amType) { + this.amType = amType; + return this; + } + + public Builder withUser(UserName user) { + if (user != null) { + this.user = user.getValue(); + } + return this; + } + + public Builder withQueue(String queue) { + this.queue = queue; + return this; + } + + public Builder withJobId(String oldJobId) { + this.jobId = oldJobId; + return this; + } + + public Builder withJobStartTime(long time) { + this.jobStartTime = time; + return this; + } + + public Builder withJobFinishTime(long time) { + this.jobFinishTime = time; + return this; + } + + public Builder withBaseLineTimeMs(long baselineTimeMs) { + this.baselineTimeMs = baselineTimeMs; + return this; + } + + public Builder withLabelExpression(String expr) { + this.labelExpression = expr; + return this; + } + + public AMDefinitionRumen.Builder withTaskContainers(List taskContainers) { + this.taskContainers = taskContainers; + return this; + } + + public AMDefinitionRumen.Builder withAmResource(Resource amResource) { + this.amResource = amResource; + return this; + } + + public AMDefinitionRumen build() { + AMDefinitionRumen amDef = new AMDefinitionRumen(this); + + if (baselineTimeMs == 0) { + baselineTimeMs = jobStartTime; + } + adjustTimeValuesToBaselineTime(amDef, this, baselineTimeMs); + return amDef; + } + + } + +} diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionSLS.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionSLS.java new file mode 100644 index 00000000000..d18266017f0 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionSLS.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sls; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; +import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class AMDefinitionSLS extends AMDefinition { + public AMDefinitionSLS(AmDefinitionBuilder builder) { + super(builder); + } + + public String getQueue() { + return queue; + } + + public static List getTaskContainers(Map jsonJob, + SLSRunner slsRunner) throws YarnException { + List tasks = (List) jsonJob.get(SLSConfiguration.JOB_TASKS); + if (tasks == null || tasks.size() == 0) { + throw new YarnException("No task for the job!"); + } + + List containers = new ArrayList<>(); + for (Object o : tasks) { + Map jsonTask = (Map) o; + TaskContainerDefinition containerDef = + TaskContainerDefinition.Builder.create() + .withCount(jsonTask, SLSConfiguration.COUNT) + .withHostname((String) jsonTask.get(SLSConfiguration.TASK_HOST)) + .withDuration(jsonTask, SLSConfiguration.TASK_DURATION_MS) + .withDurationLegacy(jsonTask, SLSConfiguration.DURATION_MS) + .withTaskStart(jsonTask, SLSConfiguration.TASK_START_MS) + .withTaskFinish(jsonTask, SLSConfiguration.TASK_END_MS) + .withResource(getResourceForContainer(jsonTask, slsRunner)) + .withPriority(jsonTask, SLSConfiguration.TASK_PRIORITY) + .withType(jsonTask, SLSConfiguration.TASK_TYPE) + .withExecutionType(jsonTask, SLSConfiguration.TASK_EXECUTION_TYPE) + .withAllocationId(jsonTask, SLSConfiguration.TASK_ALLOCATION_ID) + .withRequestDelay(jsonTask, SLSConfiguration.TASK_REQUEST_DELAY) + .build(); + + for (int i = 0; i < containerDef.getCount(); i++) { + containers.add(ContainerSimulator.createFromTaskContainerDefinition(containerDef)); + } + } + return containers; + } + + private static Resource getResourceForContainer(Map jsonTask, + SLSRunner slsRunner) { + Resource res = slsRunner.getDefaultContainerResource(); + ResourceInformation[] infors = ResourceUtils.getResourceTypesArray(); + for (ResourceInformation info : infors) { + if (jsonTask.containsKey(SLSConfiguration.TASK_PREFIX + info.getName())) { + long value = Long.parseLong( + jsonTask.get(SLSConfiguration.TASK_PREFIX + info.getName()) + .toString()); + res.setResourceValue(info.getName(), value); + } + } + return res; + } + + public static final class Builder extends AmDefinitionBuilder { + private Map jsonJob; + + private Builder(Map jsonJob) { + this.jsonJob = jsonJob; + } + + public static Builder create(Map jsonJob) { + return new Builder(jsonJob); + } + + public Builder withAmType(String key) { + if (jsonJob.containsKey(key)) { + String amType = (String) jsonJob.get(key); + if (amType != null) { + this.amType = amType; + } + } + return this; + } + + public Builder withUser(String key) { + if (jsonJob.containsKey(key)) { + String user = (String) jsonJob.get(key); + if (user != null) { + this.user = user; + } + } + return this; + } + + public Builder withQueue(String key) { + if (jsonJob.containsKey(key)) { + this.queue = jsonJob.get(key).toString(); + } + return this; + } + + public Builder withJobId(String key) { + if (jsonJob.containsKey(key)) { + this.jobId = (String) jsonJob.get(key); + } + return this; + } + + public Builder withJobCount(String key) { + if (jsonJob.containsKey(key)) { + jobCount = Integer.parseInt(jsonJob.get(key).toString()); + jobCount = Math.max(jobCount, 1); + } + return this; + } + + public Builder withJobStartTime(String key) { + if (jsonJob.containsKey(key)) { + this.jobStartTime = Long.parseLong(jsonJob.get(key).toString()); + } + return this; + } + + public Builder withJobFinishTime(String key) { + if (jsonJob.containsKey(key)) { + this.jobFinishTime = Long.parseLong(jsonJob.get(key).toString()); + } + return this; + } + + public Builder withLabelExpression(String key) { + if (jsonJob.containsKey(key)) { + this.labelExpression = jsonJob.get(key).toString(); + } + return this; + } + + public AMDefinitionSLS.Builder withTaskContainers(List taskContainers) { + this.taskContainers = taskContainers; + return this; + } + + public AMDefinitionSLS.Builder withAmResource(Resource amResource) { + this.amResource = amResource; + return this; + } + + public AMDefinitionSLS build() { + AMDefinitionSLS amDef = new AMDefinitionSLS(this); + // Job id is generated automatically if this job configuration allows + // multiple job instances + if (jobCount > 1) { + amDef.oldAppId = null; + } else { + amDef.oldAppId = jobId; + } + amDef.jobCount = jobCount; + return amDef; + } + } + +} diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionSynth.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionSynth.java new file mode 100644 index 00000000000..499692a5929 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionSynth.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sls; + +import static org.apache.hadoop.yarn.sls.AMDefinitionFactory.adjustTimeValuesToBaselineTime; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; +import org.apache.hadoop.yarn.sls.synthetic.SynthJob; + +public class AMDefinitionSynth extends AMDefinition { + public AMDefinitionSynth(AmDefinitionBuilder builder) { + super(builder); + } + + public static List getTaskContainers( + SynthJob job, SLSRunner slsRunner) throws YarnException { + List containerList = new ArrayList<>(); + ArrayList keyAsArray = new ArrayList<>(slsRunner.getNmMap().keySet()); + Random rand = new Random(slsRunner.getStjp().getSeed()); + + for (SynthJob.SynthTask task : job.getTasks()) { + RMNode node = getRandomNode(slsRunner, keyAsArray, rand); + TaskContainerDefinition containerDef = + TaskContainerDefinition.Builder.create() + .withCount(1) + .withHostname("/" + node.getRackName() + "/" + node.getHostName()) + .withDuration(task.getTime()) + .withResource(Resource + .newInstance((int) task.getMemory(), (int) task.getVcores())) + .withPriority(task.getPriority()) + .withType(task.getType()) + .withExecutionType(task.getExecutionType()) + .withAllocationId(-1) + .withRequestDelay(0) + .build(); + containerList.add( + ContainerSimulator.createFromTaskContainerDefinition(containerDef)); + } + + return containerList; + } + + private static RMNode getRandomNode(SLSRunner slsRunner, + ArrayList keyAsArray, Random rand) { + int randomIndex = rand.nextInt(keyAsArray.size()); + return slsRunner.getNmMap().get(keyAsArray.get(randomIndex)).getNode(); + } + + public static final class Builder extends AmDefinitionBuilder { + private long baselineTimeMs; + + private Builder() { + } + + public static Builder create() { + return new Builder(); + } + + public Builder withAmType(String amType) { + this.amType = amType; + return this; + } + + public Builder withUser(String user) { + if (user != null) { + this.user = user; + } + return this; + } + + public Builder withQueue(String queue) { + this.queue = queue; + return this; + } + + public Builder withJobId(String oldJobId) { + this.jobId = oldJobId; + return this; + } + + public Builder withJobStartTime(long time) { + this.jobStartTime = time; + return this; + } + + public Builder withJobFinishTime(long time) { + this.jobFinishTime = time; + return this; + } + + public Builder withBaseLineTimeMs(long baselineTimeMs) { + this.baselineTimeMs = baselineTimeMs; + return this; + } + + public AMDefinitionSynth.Builder withLabelExpression(String expr) { + this.labelExpression = expr; + return this; + } + + public AMDefinitionSynth.Builder withTaskContainers(List taskContainers) { + this.taskContainers = taskContainers; + return this; + } + + public AMDefinitionSynth.Builder withAmResource(Resource amResource) { + this.amResource = amResource; + return this; + } + + public AMDefinitionSynth build() { + AMDefinitionSynth amDef = new AMDefinitionSynth(this); + + if (baselineTimeMs == 0) { + baselineTimeMs = jobStartTime; + } + adjustTimeValuesToBaselineTime(amDef, this, baselineTimeMs); + return amDef; + } + } + +} diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMRunner.java new file mode 100644 index 00000000000..e300e43c74e --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMRunner.java @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sls; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.tools.rumen.JobTraceReader; +import org.apache.hadoop.tools.rumen.LoggedJob; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.sls.SLSRunner.TraceType; +import org.apache.hadoop.yarn.sls.appmaster.AMSimulator; +import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; +import org.apache.hadoop.yarn.sls.scheduler.TaskRunner; +import org.apache.hadoop.yarn.sls.synthetic.SynthJob; +import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer; +import org.apache.hadoop.yarn.util.UTCClock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.hadoop.yarn.sls.SLSRunner.TraceType.RUMEN; +import static org.apache.hadoop.yarn.sls.SLSRunner.TraceType.SLS; +import static org.apache.hadoop.yarn.sls.SLSRunner.TraceType.SYNTH; + +public class AMRunner { + private static final Logger LOG = LoggerFactory.getLogger(AMRunner.class); + static int REMAINING_APPS = 0; + + private final Configuration conf; + private int AM_ID; + private Map amMap; + private Map appIdAMSim; + private Set trackedApps; + private Map amClassMap; + private TraceType inputType; + private String[] inputTraces; + private SynthTraceJobProducer stjp; + private TaskRunner runner; + private SLSRunner slsRunner; + private int numAMs, numTasks; + private long maxRuntime; + private ResourceManager rm; + + public AMRunner(TaskRunner runner, SLSRunner slsRunner, + SynthTraceJobProducer stjp) { + this.runner = runner; + this.slsRunner = slsRunner; + this.conf = slsRunner.getConf(); + this.stjp = stjp; + } + + + public void init(Configuration conf) throws ClassNotFoundException { + amMap = new ConcurrentHashMap<>(); + amClassMap = new HashMap<>(); + appIdAMSim = new ConcurrentHashMap<>(); + // map + for (Map.Entry e : conf) { + String key = e.getKey().toString(); + if (key.startsWith(SLSConfiguration.AM_TYPE_PREFIX)) { + String amType = key.substring(SLSConfiguration.AM_TYPE_PREFIX.length()); + amClassMap.put(amType, Class.forName(conf.get(key))); + } + } + } + + @SuppressWarnings("unchecked") + public void startAM() throws YarnException, IOException { + switch (inputType) { + case SLS: + for (String inputTrace : inputTraces) { + startAMFromSLSTrace(inputTrace); + } + break; + case RUMEN: + long baselineTimeMS = 0; + for (String inputTrace : inputTraces) { + startAMFromRumenTrace(inputTrace, baselineTimeMS); + } + break; + case SYNTH: + startAMFromSynthGenerator(); + break; + default: + throw new YarnException("Input configuration not recognized, " + + "trace type should be SLS, RUMEN, or SYNTH"); + } + + numAMs = amMap.size(); + REMAINING_APPS = numAMs; + } + + /** + * Parse workload from a SLS trace file. + */ + @SuppressWarnings("unchecked") + private void startAMFromSLSTrace(String inputTrace) throws IOException { + JsonFactory jsonF = new JsonFactory(); + ObjectMapper mapper = new ObjectMapper(); + + try (Reader input = new InputStreamReader( + new FileInputStream(inputTrace), "UTF-8")) { + Iterator jobIter = mapper.readValues( + jsonF.createParser(input), Map.class); + + while (jobIter.hasNext()) { + try { + Map jsonJob = jobIter.next(); + AMDefinitionSLS amDef = AMDefinitionFactory.createFromSlsTrace(jsonJob, slsRunner); + startAMs(amDef); + } catch (Exception e) { + LOG.error("Failed to create an AM: {}", e.getMessage()); + } + } + } + } + + /** + * parse workload information from synth-generator trace files. + */ + @SuppressWarnings("unchecked") + private void startAMFromSynthGenerator() throws YarnException, IOException { + Configuration localConf = new Configuration(); + localConf.set("fs.defaultFS", "file:///"); + + SynthJob job; + // we use stjp, a reference to the job producer instantiated during node + // creation + while ((job = (SynthJob) stjp.getNextJob()) != null) { + ReservationId reservationId = null; + if (job.hasDeadline()) { + reservationId = ReservationId + .newInstance(rm.getStartTime(), AM_ID); + } + AMDefinitionSynth amDef = AMDefinitionFactory.createFromSynth(job, slsRunner); + startAMs(amDef, reservationId, job.getParams(), job.getDeadline()); + } + } + + /** + * Parse workload from a rumen trace file. + */ + @SuppressWarnings("unchecked") + private void startAMFromRumenTrace(String inputTrace, long baselineTimeMS) + throws IOException { + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", "file:///"); + File fin = new File(inputTrace); + + try (JobTraceReader reader = new JobTraceReader( + new Path(fin.getAbsolutePath()), conf)) { + LoggedJob job = reader.getNext(); + + while (job != null) { + try { + AMDefinitionRumen amDef = + AMDefinitionFactory.createFromRumenTrace(job, baselineTimeMS, + slsRunner); + startAMs(amDef); + } catch (Exception e) { + LOG.error("Failed to create an AM", e); + } + job = reader.getNext(); + } + } + } + + private void startAMs(AMDefinition amDef) throws YarnException { + for (int i = 0; i < amDef.getJobCount(); i++) { + JobDefinition jobDef = JobDefinition.Builder.create() + .withAmDefinition(amDef) + .withDeadline(-1) + .withReservationId(null) + .withParams(null) + .build(); + runNewAM(jobDef); + } + } + + private void startAMs(AMDefinition amDef, + ReservationId reservationId, + Map params, long deadline) throws YarnException { + for (int i = 0; i < amDef.getJobCount(); i++) { + JobDefinition jobDef = JobDefinition.Builder.create() + .withAmDefinition(amDef) + .withReservationId(reservationId) + .withParams(params) + .withDeadline(deadline) + .build(); + runNewAM(jobDef); + } + } + + private void runNewAM(JobDefinition jobDef) { + AMDefinition amDef = jobDef.getAmDefinition(); + String oldJobId = amDef.getOldAppId(); + AMSimulator amSim = + createAmSimulator(amDef.getAmType()); + + if (amSim != null) { + int heartbeatInterval = conf.getInt( + SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS, + SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS_DEFAULT); + boolean isTracked = trackedApps.contains(oldJobId); + + if (oldJobId == null) { + oldJobId = Integer.toString(AM_ID); + } + AM_ID++; + amSim.init(amDef, rm, slsRunner, isTracked, runner.getStartTimeMS(), heartbeatInterval, appIdAMSim); + if (jobDef.getReservationId() != null) { + // if we have a ReservationId, delegate reservation creation to + // AMSim (reservation shape is impl specific) + UTCClock clock = new UTCClock(); + amSim.initReservation(jobDef.getReservationId(), jobDef.getDeadline(), clock.getTime()); + } + runner.schedule(amSim); + maxRuntime = Math.max(maxRuntime, amDef.getJobFinishTime()); + numTasks += amDef.getTaskContainers().size(); + amMap.put(oldJobId, amSim); + } + } + + private AMSimulator createAmSimulator(String jobType) { + return (AMSimulator) ReflectionUtils.newInstance( + amClassMap.get(jobType), new Configuration()); + } + + public AMSimulator getAMSimulator(ApplicationId appId) { + return appIdAMSim.get(appId); + } + + public void setInputType(TraceType inputType) { + this.inputType = inputType; + } + + public void setInputTraces(String[] inputTraces) { + this.inputTraces = inputTraces; + } + + public void setResourceManager(ResourceManager rm) { + this.rm = rm; + } + + public Set getTrackedApps() { + return trackedApps; + } + + public void setTrackedApps(Set trackApps) { + this.trackedApps = trackApps; + } + + public int getNumAMs() { + return numAMs; + } + + public int getNumTasks() { + return numTasks; + } + + public long getMaxRuntime() { + return maxRuntime; + } + + public Map getAmMap() { + return amMap; + } +} diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/JobDefinition.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/JobDefinition.java new file mode 100644 index 00000000000..4a39d3710c9 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/JobDefinition.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sls; + +import org.apache.hadoop.yarn.api.records.ReservationId; +import java.util.Map; + +public class JobDefinition { + private AMDefinition amDefinition; + private ReservationId reservationId; + private long deadline; + private Map params; + + public AMDefinition getAmDefinition() { + return amDefinition; + } + + public ReservationId getReservationId() { + return reservationId; + } + + public long getDeadline() { + return deadline; + } + + //Currently unused + public Map getParams() { + return params; + } + + public static final class Builder { + private AMDefinition amDefinition; + private ReservationId reservationId; + private long deadline; + private Map params; + + private Builder() { + } + + public static Builder create() { + return new Builder(); + } + + public Builder withAmDefinition(AMDefinition amDefinition) { + this.amDefinition = amDefinition; + return this; + } + + public Builder withReservationId(ReservationId reservationId) { + this.reservationId = reservationId; + return this; + } + + public Builder withDeadline(long deadline) { + this.deadline = deadline; + return this; + } + + public Builder withParams(Map params) { + this.params = params; + return this; + } + + public JobDefinition build() { + JobDefinition jobDef = new JobDefinition(); + jobDef.params = this.params; + jobDef.amDefinition = this.amDefinition; + jobDef.reservationId = this.reservationId; + jobDef.deadline = this.deadline; + return jobDef; + } + } +} diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/NMRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/NMRunner.java new file mode 100644 index 00000000000..571b24f80c6 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/NMRunner.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sls; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.sls.SLSRunner.TraceType; +import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; +import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator; +import org.apache.hadoop.yarn.sls.scheduler.TaskRunner; +import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer; +import org.apache.hadoop.yarn.sls.utils.SLSUtils; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +public class NMRunner { + private static final Logger LOG = LoggerFactory.getLogger(NMRunner.class); + + // other simulation information + private int numNMs, numRacks; + + // NM simulator + private Map nmMap; + private Resource nodeManagerResource; + private String nodeFile; + private TaskRunner taskRunner; + private Configuration conf; + private ResourceManager rm; + private String tableMapping; + private int thredPoolSize; + private TraceType inputType; + private String[] inputTraces; + private SynthTraceJobProducer stjp; + + public NMRunner(TaskRunner taskRunner, Configuration conf, ResourceManager rm, String tableMapping, int thredPoolSize) { + this.taskRunner = taskRunner; + this.conf = conf; + this.rm = rm; + this.tableMapping = tableMapping; + this.thredPoolSize = thredPoolSize; + this.nmMap = new ConcurrentHashMap<>(); + this.nodeManagerResource = getNodeManagerResourceFromConf(); + } + + public void startNM() throws YarnException, IOException, + InterruptedException { + // nm configuration + int heartbeatInterval = conf.getInt( + SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS, + SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS_DEFAULT); + float resourceUtilizationRatio = conf.getFloat( + SLSConfiguration.NM_RESOURCE_UTILIZATION_RATIO, + SLSConfiguration.NM_RESOURCE_UTILIZATION_RATIO_DEFAULT); + // nm information (fetch from topology file, or from sls/rumen json file) + Set nodeSet = null; + if (nodeFile.isEmpty()) { + for (String inputTrace : inputTraces) { + switch (inputType) { + case SLS: + nodeSet = SLSUtils.parseNodesFromSLSTrace(inputTrace); + break; + case RUMEN: + nodeSet = SLSUtils.parseNodesFromRumenTrace(inputTrace); + break; + case SYNTH: + stjp = new SynthTraceJobProducer(conf, new Path(inputTraces[0])); + nodeSet = SLSUtils.generateNodes(stjp.getNumNodes(), + stjp.getNumNodes()/stjp.getNodesPerRack()); + break; + default: + throw new YarnException("Input configuration not recognized, " + + "trace type should be SLS, RUMEN, or SYNTH"); + } + } + } else { + nodeSet = SLSUtils.parseNodesFromNodeFile(nodeFile, + nodeManagerResource); + } + + if (nodeSet == null || nodeSet.isEmpty()) { + throw new YarnException("No node! Please configure nodes."); + } + + SLSUtils.generateNodeTableMapping(nodeSet, tableMapping); + + // create NM simulators + Random random = new Random(); + Set rackSet = ConcurrentHashMap.newKeySet(); + int threadPoolSize = Math.max(thredPoolSize, + SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT); + ExecutorService executorService = Executors. + newFixedThreadPool(threadPoolSize); + for (SLSRunner.NodeDetails nodeDetails : nodeSet) { + executorService.submit(new Runnable() { + @Override public void run() { + try { + // we randomize the heartbeat start time from zero to 1 interval + NMSimulator nm = new NMSimulator(); + Resource nmResource = nodeManagerResource; + String hostName = nodeDetails.getHostname(); + if (nodeDetails.getNodeResource() != null) { + nmResource = nodeDetails.getNodeResource(); + } + Set nodeLabels = nodeDetails.getLabels(); + nm.init(hostName, nmResource, + random.nextInt(heartbeatInterval), + heartbeatInterval, rm, resourceUtilizationRatio, nodeLabels); + nmMap.put(nm.getNode().getNodeID(), nm); + taskRunner.schedule(nm); + rackSet.add(nm.getNode().getRackName()); + } catch (IOException | YarnException e) { + LOG.error("Got an error while adding node", e); + } + } + }); + } + executorService.shutdown(); + executorService.awaitTermination(10, TimeUnit.MINUTES); + numRacks = rackSet.size(); + numNMs = nmMap.size(); + } + + void waitForNodesRunning() throws InterruptedException { + long startTimeMS = System.currentTimeMillis(); + while (true) { + int numRunningNodes = 0; + for (RMNode node : rm.getRMContext().getRMNodes().values()) { + if (node.getState() == NodeState.RUNNING) { + numRunningNodes++; + } + } + if (numRunningNodes == numNMs) { + break; + } + LOG.info("SLSRunner is waiting for all nodes RUNNING." + + " {} of {} NMs initialized.", numRunningNodes, numNMs); + Thread.sleep(1000); + } + LOG.info("SLSRunner takes {} ms to launch all nodes.", + System.currentTimeMillis() - startTimeMS); + } + + private Resource getNodeManagerResourceFromConf() { + Resource resource = Resources.createResource(0); + ResourceInformation[] infors = ResourceUtils.getResourceTypesArray(); + for (ResourceInformation info : infors) { + long value; + if (info.getName().equals(ResourceInformation.MEMORY_URI)) { + value = conf.getInt(SLSConfiguration.NM_MEMORY_MB, + SLSConfiguration.NM_MEMORY_MB_DEFAULT); + } else if (info.getName().equals(ResourceInformation.VCORES_URI)) { + value = conf.getInt(SLSConfiguration.NM_VCORES, + SLSConfiguration.NM_VCORES_DEFAULT); + } else { + value = conf.getLong(SLSConfiguration.NM_PREFIX + + info.getName(), SLSConfiguration.NM_RESOURCE_DEFAULT); + } + + resource.setResourceValue(info.getName(), value); + } + + return resource; + } + + public void setNodeFile(String nodeFile) { + this.nodeFile = nodeFile; + } + + + public void setInputType(TraceType inputType) { + this.inputType = inputType; + } + + public void setInputTraces(String[] inputTraces) { + this.inputTraces = inputTraces; + } + + public int getNumNMs() { + return numNMs; + } + + public int getNumRacks() { + return numRacks; + } + + public Resource getNodeManagerResource() { + return nodeManagerResource; + } + + public Map getNmMap() { + return nmMap; + } + + public SynthTraceJobProducer getStjp() { + return stjp; + } +} diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/RMRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/RMRunner.java new file mode 100644 index 00000000000..dbded4b306e --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/RMRunner.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sls; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.metrics2.source.JvmMetrics; +import org.apache.hadoop.net.DNSToSwitchMapping; +import org.apache.hadoop.net.TableMapping; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; +import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; +import org.apache.hadoop.yarn.sls.resourcemanager.MockAMLauncher; +import org.apache.hadoop.yarn.sls.scheduler.SLSCapacityScheduler; +import org.apache.hadoop.yarn.sls.scheduler.SLSFairScheduler; +import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics; +import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper; +import java.util.HashMap; +import java.util.Map; + +public class RMRunner { + private ResourceManager rm; + private String metricsOutputDir; + private Configuration conf; + private SLSRunner slsRunner; + private String tableMapping; + private Map queueAppNumMap; + + public RMRunner(Configuration conf, SLSRunner slsRunner) { + this.conf = conf; + this.slsRunner = slsRunner; + this.queueAppNumMap = new HashMap<>(); + } + + public void startRM() throws ClassNotFoundException, YarnException { + Configuration rmConf = new YarnConfiguration(conf); + String schedulerClass = rmConf.get(YarnConfiguration.RM_SCHEDULER); + + if (Class.forName(schedulerClass) == CapacityScheduler.class) { + rmConf.set(YarnConfiguration.RM_SCHEDULER, + SLSCapacityScheduler.class.getName()); + rmConf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true); + rmConf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, + ProportionalCapacityPreemptionPolicy.class.getName()); + } else if (Class.forName(schedulerClass) == FairScheduler.class) { + rmConf.set(YarnConfiguration.RM_SCHEDULER, + SLSFairScheduler.class.getName()); + } else if (Class.forName(schedulerClass) == FifoScheduler.class) { + // TODO add support for FifoScheduler + throw new YarnException("Fifo Scheduler is not supported yet."); + } + rmConf.setClass( + CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + TableMapping.class, DNSToSwitchMapping.class); + rmConf.set( + CommonConfigurationKeysPublic.NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, + tableMapping); + rmConf.set(SLSConfiguration.METRICS_OUTPUT_DIR, metricsOutputDir); + + rm = new ResourceManager() { + @Override + protected ApplicationMasterLauncher createAMLauncher() { + return new MockAMLauncher(slsRunner, this.rmContext); + } + }; + + // Across runs of parametrized tests, the JvmMetrics objects is retained, + // but is not registered correctly + JvmMetrics jvmMetrics = JvmMetrics.initSingleton("ResourceManager", null); + jvmMetrics.registerIfNeeded(); + + // Init and start the actual ResourceManager + rm.init(rmConf); + rm.start(); + } + + public void increaseQueueAppNum(String queue) throws YarnException { + SchedulerWrapper wrapper = (SchedulerWrapper)rm.getResourceScheduler(); + String queueName = wrapper.getRealQueueName(queue); + Integer appNum = queueAppNumMap.get(queueName); + if (appNum == null) { + appNum = 1; + } else { + appNum = appNum + 1; + } + + queueAppNumMap.put(queueName, appNum); + SchedulerMetrics metrics = wrapper.getSchedulerMetrics(); + if (metrics != null) { + metrics.trackQueue(queueName); + } + } + + public void setMetricsOutputDir(String metricsOutputDir) { + this.metricsOutputDir = metricsOutputDir; + } + + public String getTableMapping() { + return tableMapping; + } + + public void setTableMapping(String tableMapping) { + this.tableMapping = tableMapping; + } + + public void stop() { + rm.stop(); + } + + public ResourceManager getRm() { + return rm; + } + + public Map getQueueAppNumMap() { + return queueAppNumMap; + } +} diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java index b4d4a809bb6..b14e24ed510 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java @@ -17,30 +17,6 @@ */ package org.apache.hadoop.yarn.sls; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.Reader; -import java.security.Security; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.Collections; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.databind.ObjectMapper; - import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; @@ -52,98 +28,57 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.metrics2.source.JvmMetrics; -import org.apache.hadoop.net.DNSToSwitchMapping; -import org.apache.hadoop.net.TableMapping; -import org.apache.hadoop.tools.rumen.JobTraceReader; -import org.apache.hadoop.tools.rumen.LoggedJob; -import org.apache.hadoop.tools.rumen.LoggedTask; -import org.apache.hadoop.tools.rumen.LoggedTaskAttempt; -import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; -import org.apache.hadoop.yarn.api.records.NodeState; -import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; -import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; -import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; -import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.sls.appmaster.AMSimulator; import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator; -import org.apache.hadoop.yarn.sls.resourcemanager.MockAMLauncher; -import org.apache.hadoop.yarn.sls.scheduler.SLSCapacityScheduler; -import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics; -import org.apache.hadoop.yarn.sls.scheduler.TaskRunner; -import org.apache.hadoop.yarn.sls.scheduler.SLSFairScheduler; -import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper; -import org.apache.hadoop.yarn.sls.synthetic.SynthJob; +import org.apache.hadoop.yarn.sls.scheduler.TaskRunner; +import org.apache.hadoop.yarn.sls.scheduler.Tracker; import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer; -import org.apache.hadoop.yarn.sls.utils.SLSUtils; -import org.apache.hadoop.yarn.util.UTCClock; -import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.IOException; +import java.security.Security; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + @Private @Unstable public class SLSRunner extends Configured implements Tool { - // RM, Runner - private ResourceManager rm; private static TaskRunner runner = new TaskRunner(); private String[] inputTraces; - private Map queueAppNumMap; - private int poolSize; - - // NM simulator - private Map nmMap; - private Resource nodeManagerResource; - private String nodeFile; - - // AM simulator - private int AM_ID; - private Map amMap; - private Map appIdAMSim; - private Set trackedApps; - private Map amClassMap; - private static int remainingApps = 0; // metrics - private String metricsOutputDir; private boolean printSimulation; - // other simulation information - private int numNMs, numRacks, numAMs, numTasks; - private long maxRuntime; - private String tableMapping; - private final static Map simulateInfoMap = - new HashMap(); + new HashMap<>(); // logger public final static Logger LOG = LoggerFactory.getLogger(SLSRunner.class); - private final static int DEFAULT_MAPPER_PRIORITY = 20; - private final static int DEFAULT_REDUCER_PRIORITY = 10; - private static boolean exitAtTheFinish = false; + private AMRunner amRunner; + private RMRunner rmRunner; + private NMRunner nmRunner; - private static final String DEFAULT_USER = "default"; + private SynthTraceJobProducer stjp; /** * The type of trace in input. @@ -156,15 +91,12 @@ public static final String NETWORK_NEGATIVE_CACHE_TTL = "networkaddress.cache.negative.ttl"; - private TraceType inputType; - private SynthTraceJobProducer stjp; - - public SLSRunner() throws ClassNotFoundException { + public SLSRunner() throws ClassNotFoundException, YarnException { Configuration tempConf = new Configuration(false); init(tempConf); } - public SLSRunner(Configuration tempConf) throws ClassNotFoundException { + public SLSRunner(Configuration tempConf) throws ClassNotFoundException, YarnException { init(tempConf); } @@ -178,51 +110,33 @@ public void setConf(Configuration conf) { super.setConf(conf); } - private void init(Configuration tempConf) throws ClassNotFoundException { - nmMap = new ConcurrentHashMap<>(); - queueAppNumMap = new HashMap<>(); - amMap = new ConcurrentHashMap<>(); - amClassMap = new HashMap<>(); - appIdAMSim = new ConcurrentHashMap<>(); + private void init(Configuration tempConf) throws ClassNotFoundException, YarnException { // runner configuration setConf(tempConf); - - // runner - poolSize = tempConf.getInt(SLSConfiguration.RUNNER_POOL_SIZE, + + //taskrunner + int poolSize = tempConf.getInt(SLSConfiguration.RUNNER_POOL_SIZE, SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT); SLSRunner.runner.setQueueSize(poolSize); - // map - for (Map.Entry e : tempConf) { - String key = e.getKey().toString(); - if (key.startsWith(SLSConfiguration.AM_TYPE_PREFIX)) { - String amType = key.substring(SLSConfiguration.AM_TYPE_PREFIX.length()); - amClassMap.put(amType, Class.forName(tempConf.get(key))); - } - } - nodeManagerResource = getNodeManagerResource(); + rmRunner = new RMRunner(getConf(), this); + nmRunner = new NMRunner(runner, getConf(), rmRunner.getRm(), rmRunner.getTableMapping(), poolSize); + stjp = getSynthJobTraceProducer(); + amRunner = new AMRunner(runner, this, stjp); + amRunner.init(tempConf); } - private Resource getNodeManagerResource() { - Resource resource = Resources.createResource(0); - ResourceInformation[] infors = ResourceUtils.getResourceTypesArray(); - for (ResourceInformation info : infors) { - long value; - if (info.getName().equals(ResourceInformation.MEMORY_URI)) { - value = getConf().getInt(SLSConfiguration.NM_MEMORY_MB, - SLSConfiguration.NM_MEMORY_MB_DEFAULT); - } else if (info.getName().equals(ResourceInformation.VCORES_URI)) { - value = getConf().getInt(SLSConfiguration.NM_VCORES, - SLSConfiguration.NM_VCORES_DEFAULT); - } else { - value = getConf().getLong(SLSConfiguration.NM_PREFIX + - info.getName(), SLSConfiguration.NM_RESOURCE_DEFAULT); + private SynthTraceJobProducer getSynthJobTraceProducer() throws YarnException { + // if we use the nodeFile this could have been not initialized yet. + if (nmRunner.getStjp() != null) { + return nmRunner.getStjp(); + } else { + try { + return new SynthTraceJobProducer(getConf(), new Path(inputTraces[0])); + } catch (IOException e) { + throw new YarnException("Failed to initialize SynthTraceJobProducer", e); } - - resource.setResourceValue(info.getName(), value); } - - return resource; } /** @@ -232,17 +146,31 @@ private Resource getNodeManagerResource() { return Collections.unmodifiableMap(simulateInfoMap); } + /** + * This is invoked before start. + * @param inType + * @param inTraces + * @param nodes + * @param metricsOutputDir + * @param trackApps + * @param printsimulation + * @throws IOException + * @throws ClassNotFoundException + */ public void setSimulationParams(TraceType inType, String[] inTraces, - String nodes, String outDir, Set trackApps, + String nodes, String metricsOutputDir, Set trackApps, boolean printsimulation) throws IOException, ClassNotFoundException { - this.inputType = inType; this.inputTraces = inTraces.clone(); - this.nodeFile = nodes; - this.trackedApps = trackApps; + this.amRunner.setInputType(inType); + this.amRunner.setInputTraces(this.inputTraces); + this.amRunner.setTrackedApps(trackApps); + this.nmRunner.setNodeFile(nodes); + this.nmRunner.setInputType(inType); + this.nmRunner.setInputTraces(this.inputTraces); this.printSimulation = printsimulation; - metricsOutputDir = outDir; - tableMapping = outDir + "/tableMapping.csv"; + this.rmRunner.setMetricsOutputDir(metricsOutputDir); + this.rmRunner.setTableMapping(metricsOutputDir + "/tableMapping.csv"); } public void start() throws IOException, ClassNotFoundException, YarnException, @@ -251,20 +179,24 @@ public void start() throws IOException, ClassNotFoundException, YarnException, enableDNSCaching(getConf()); // start resource manager - startRM(); + rmRunner.startRM(); + amRunner.setResourceManager(rmRunner.getRm()); + // start node managers - startNM(); + nmRunner.startNM(); // start application masters - startAM(); + amRunner.startAM(); + // set queue & tracked apps information - ((SchedulerWrapper) rm.getResourceScheduler()).getTracker() - .setQueueSet(this.queueAppNumMap.keySet()); - ((SchedulerWrapper) rm.getResourceScheduler()).getTracker() - .setTrackedAppSet(this.trackedApps); + SchedulerWrapper resourceScheduler = + (SchedulerWrapper) rmRunner.getRm().getResourceScheduler(); + Tracker tracker = resourceScheduler.getTracker(); + tracker.setQueueSet(rmRunner.getQueueAppNumMap().keySet()); + tracker.setTrackedAppSet(amRunner.getTrackedApps()); // print out simulation info printSimulationInfo(); // blocked until all nodes RUNNING - waitForNodesRunning(); + nmRunner.waitForNodesRunning(); // starting the runner once everything is ready to go, runner.start(); } @@ -286,428 +218,7 @@ static void enableDNSCaching(Configuration conf) { } } - private void startRM() throws ClassNotFoundException, YarnException { - Configuration rmConf = new YarnConfiguration(getConf()); - String schedulerClass = rmConf.get(YarnConfiguration.RM_SCHEDULER); - - if (Class.forName(schedulerClass) == CapacityScheduler.class) { - rmConf.set(YarnConfiguration.RM_SCHEDULER, - SLSCapacityScheduler.class.getName()); - rmConf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true); - rmConf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, - ProportionalCapacityPreemptionPolicy.class.getName()); - } else if (Class.forName(schedulerClass) == FairScheduler.class) { - rmConf.set(YarnConfiguration.RM_SCHEDULER, - SLSFairScheduler.class.getName()); - } else if (Class.forName(schedulerClass) == FifoScheduler.class) { - // TODO add support for FifoScheduler - throw new YarnException("Fifo Scheduler is not supported yet."); - } - rmConf.setClass( - CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, - TableMapping.class, DNSToSwitchMapping.class); - rmConf.set( - CommonConfigurationKeysPublic.NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, - tableMapping); - rmConf.set(SLSConfiguration.METRICS_OUTPUT_DIR, metricsOutputDir); - - final SLSRunner se = this; - rm = new ResourceManager() { - @Override - protected ApplicationMasterLauncher createAMLauncher() { - return new MockAMLauncher(se, this.rmContext, appIdAMSim); - } - }; - - // Across runs of parametrized tests, the JvmMetrics objects is retained, - // but is not registered correctly - JvmMetrics jvmMetrics = JvmMetrics.initSingleton("ResourceManager", null); - jvmMetrics.registerIfNeeded(); - - // Init and start the actual ResourceManager - rm.init(rmConf); - rm.start(); - } - - private void startNM() throws YarnException, IOException, - InterruptedException { - // nm configuration - int heartbeatInterval = getConf().getInt( - SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS, - SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS_DEFAULT); - float resourceUtilizationRatio = getConf().getFloat( - SLSConfiguration.NM_RESOURCE_UTILIZATION_RATIO, - SLSConfiguration.NM_RESOURCE_UTILIZATION_RATIO_DEFAULT); - // nm information (fetch from topology file, or from sls/rumen json file) - Set nodeSet = null; - if (nodeFile.isEmpty()) { - for (String inputTrace : inputTraces) { - switch (inputType) { - case SLS: - nodeSet = SLSUtils.parseNodesFromSLSTrace(inputTrace); - break; - case RUMEN: - nodeSet = SLSUtils.parseNodesFromRumenTrace(inputTrace); - break; - case SYNTH: - stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0])); - nodeSet = SLSUtils.generateNodes(stjp.getNumNodes(), - stjp.getNumNodes()/stjp.getNodesPerRack()); - break; - default: - throw new YarnException("Input configuration not recognized, " - + "trace type should be SLS, RUMEN, or SYNTH"); - } - } - } else { - nodeSet = SLSUtils.parseNodesFromNodeFile(nodeFile, - nodeManagerResource); - } - - if (nodeSet == null || nodeSet.isEmpty()) { - throw new YarnException("No node! Please configure nodes."); - } - - SLSUtils.generateNodeTableMapping(nodeSet, tableMapping); - - // create NM simulators - Random random = new Random(); - Set rackSet = ConcurrentHashMap.newKeySet(); - int threadPoolSize = Math.max(poolSize, - SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT); - ExecutorService executorService = Executors. - newFixedThreadPool(threadPoolSize); - for (NodeDetails nodeDetails : nodeSet) { - executorService.submit(new Runnable() { - @Override public void run() { - try { - // we randomize the heartbeat start time from zero to 1 interval - NMSimulator nm = new NMSimulator(); - Resource nmResource = nodeManagerResource; - String hostName = nodeDetails.getHostname(); - if (nodeDetails.getNodeResource() != null) { - nmResource = nodeDetails.getNodeResource(); - } - Set nodeLabels = nodeDetails.getLabels(); - nm.init(hostName, nmResource, - random.nextInt(heartbeatInterval), - heartbeatInterval, rm, resourceUtilizationRatio, nodeLabels); - nmMap.put(nm.getNode().getNodeID(), nm); - runner.schedule(nm); - rackSet.add(nm.getNode().getRackName()); - } catch (IOException | YarnException e) { - LOG.error("Got an error while adding node", e); - } - } - }); - } - executorService.shutdown(); - executorService.awaitTermination(10, TimeUnit.MINUTES); - numRacks = rackSet.size(); - numNMs = nmMap.size(); - } - - private void waitForNodesRunning() throws InterruptedException { - long startTimeMS = System.currentTimeMillis(); - while (true) { - int numRunningNodes = 0; - for (RMNode node : rm.getRMContext().getRMNodes().values()) { - if (node.getState() == NodeState.RUNNING) { - numRunningNodes++; - } - } - if (numRunningNodes == numNMs) { - break; - } - LOG.info("SLSRunner is waiting for all nodes RUNNING." - + " {} of {} NMs initialized.", numRunningNodes, numNMs); - Thread.sleep(1000); - } - LOG.info("SLSRunner takes {} ms to launch all nodes.", - System.currentTimeMillis() - startTimeMS); - } - - @SuppressWarnings("unchecked") - private void startAM() throws YarnException, IOException { - switch (inputType) { - case SLS: - for (String inputTrace : inputTraces) { - startAMFromSLSTrace(inputTrace); - } - break; - case RUMEN: - long baselineTimeMS = 0; - for (String inputTrace : inputTraces) { - startAMFromRumenTrace(inputTrace, baselineTimeMS); - } - break; - case SYNTH: - startAMFromSynthGenerator(); - break; - default: - throw new YarnException("Input configuration not recognized, " - + "trace type should be SLS, RUMEN, or SYNTH"); - } - - numAMs = amMap.size(); - remainingApps = numAMs; - } - - /** - * Parse workload from a SLS trace file. - */ - @SuppressWarnings("unchecked") - private void startAMFromSLSTrace(String inputTrace) throws IOException { - JsonFactory jsonF = new JsonFactory(); - ObjectMapper mapper = new ObjectMapper(); - - try (Reader input = new InputStreamReader( - new FileInputStream(inputTrace), "UTF-8")) { - Iterator jobIter = mapper.readValues( - jsonF.createParser(input), Map.class); - - while (jobIter.hasNext()) { - try { - createAMForJob(jobIter.next()); - } catch (Exception e) { - LOG.error("Failed to create an AM: {}", e.getMessage()); - } - } - } - } - - private void createAMForJob(Map jsonJob) throws YarnException { - long jobStartTime = Long.parseLong( - jsonJob.get(SLSConfiguration.JOB_START_MS).toString()); - - long jobFinishTime = 0; - if (jsonJob.containsKey(SLSConfiguration.JOB_END_MS)) { - jobFinishTime = Long.parseLong( - jsonJob.get(SLSConfiguration.JOB_END_MS).toString()); - } - - String jobLabelExpr = null; - if (jsonJob.containsKey(SLSConfiguration.JOB_LABEL_EXPR)) { - jobLabelExpr = jsonJob.get(SLSConfiguration.JOB_LABEL_EXPR).toString(); - } - - String user = (String) jsonJob.get(SLSConfiguration.JOB_USER); - if (user == null) { - user = "default"; - } - - String queue = jsonJob.get(SLSConfiguration.JOB_QUEUE_NAME).toString(); - increaseQueueAppNum(queue); - - String amType = (String)jsonJob.get(SLSConfiguration.AM_TYPE); - if (amType == null) { - amType = SLSUtils.DEFAULT_JOB_TYPE; - } - - int jobCount = 1; - if (jsonJob.containsKey(SLSConfiguration.JOB_COUNT)) { - jobCount = Integer.parseInt( - jsonJob.get(SLSConfiguration.JOB_COUNT).toString()); - } - jobCount = Math.max(jobCount, 1); - - String oldAppId = (String)jsonJob.get(SLSConfiguration.JOB_ID); - // Job id is generated automatically if this job configuration allows - // multiple job instances - if(jobCount > 1) { - oldAppId = null; - } - - for (int i = 0; i < jobCount; i++) { - runNewAM(amType, user, queue, oldAppId, jobStartTime, jobFinishTime, - getTaskContainers(jsonJob), getAMContainerResource(jsonJob), - jobLabelExpr); - } - } - - private List getTaskContainers(Map jsonJob) - throws YarnException { - List containers = new ArrayList<>(); - List tasks = (List) jsonJob.get(SLSConfiguration.JOB_TASKS); - if (tasks == null || tasks.size() == 0) { - throw new YarnException("No task for the job!"); - } - - for (Object o : tasks) { - Map jsonTask = (Map) o; - - String hostname = (String) jsonTask.get(SLSConfiguration.TASK_HOST); - - long duration = 0; - if (jsonTask.containsKey(SLSConfiguration.TASK_DURATION_MS)) { - duration = Integer.parseInt( - jsonTask.get(SLSConfiguration.TASK_DURATION_MS).toString()); - } else if (jsonTask.containsKey(SLSConfiguration.DURATION_MS)) { - // Also support "duration.ms" for backward compatibility - duration = Integer.parseInt( - jsonTask.get(SLSConfiguration.DURATION_MS).toString()); - } else if (jsonTask.containsKey(SLSConfiguration.TASK_START_MS) && - jsonTask.containsKey(SLSConfiguration.TASK_END_MS)) { - long taskStart = Long.parseLong( - jsonTask.get(SLSConfiguration.TASK_START_MS).toString()); - long taskFinish = Long.parseLong( - jsonTask.get(SLSConfiguration.TASK_END_MS).toString()); - duration = taskFinish - taskStart; - } - if (duration <= 0) { - throw new YarnException("Duration of a task shouldn't be less or equal" - + " to 0!"); - } - - Resource res = getResourceForContainer(jsonTask); - - int priority = DEFAULT_MAPPER_PRIORITY; - if (jsonTask.containsKey(SLSConfiguration.TASK_PRIORITY)) { - priority = Integer.parseInt( - jsonTask.get(SLSConfiguration.TASK_PRIORITY).toString()); - } - - String type = "map"; - if (jsonTask.containsKey(SLSConfiguration.TASK_TYPE)) { - type = jsonTask.get(SLSConfiguration.TASK_TYPE).toString(); - } - - int count = 1; - if (jsonTask.containsKey(SLSConfiguration.COUNT)) { - count = Integer.parseInt( - jsonTask.get(SLSConfiguration.COUNT).toString()); - } - count = Math.max(count, 1); - - ExecutionType executionType = ExecutionType.GUARANTEED; - if (jsonTask.containsKey(SLSConfiguration.TASK_EXECUTION_TYPE)) { - executionType = ExecutionType.valueOf( - jsonTask.get(SLSConfiguration.TASK_EXECUTION_TYPE).toString()); - } - long allocationId = -1; - if (jsonTask.containsKey(SLSConfiguration.TASK_ALLOCATION_ID)) { - allocationId = Long.parseLong( - jsonTask.get(SLSConfiguration.TASK_ALLOCATION_ID).toString()); - } - - long requestDelay = 0; - if (jsonTask.containsKey(SLSConfiguration.TASK_REQUEST_DELAY)) { - requestDelay = Long.parseLong( - jsonTask.get(SLSConfiguration.TASK_REQUEST_DELAY).toString()); - } - requestDelay = Math.max(requestDelay, 0); - - for (int i = 0; i < count; i++) { - containers.add( - new ContainerSimulator(res, duration, hostname, priority, type, - executionType, allocationId, requestDelay)); - } - } - - return containers; - } - - private Resource getResourceForContainer(Map jsonTask) { - Resource res = getDefaultContainerResource(); - ResourceInformation[] infors = ResourceUtils.getResourceTypesArray(); - for (ResourceInformation info : infors) { - if (jsonTask.containsKey(SLSConfiguration.TASK_PREFIX + info.getName())) { - long value = Long.parseLong( - jsonTask.get(SLSConfiguration.TASK_PREFIX + info.getName()) - .toString()); - res.setResourceValue(info.getName(), value); - } - } - - return res; - } - - /** - * Parse workload from a rumen trace file. - */ - @SuppressWarnings("unchecked") - private void startAMFromRumenTrace(String inputTrace, long baselineTimeMS) - throws IOException { - Configuration conf = new Configuration(); - conf.set("fs.defaultFS", "file:///"); - File fin = new File(inputTrace); - - try (JobTraceReader reader = new JobTraceReader( - new Path(fin.getAbsolutePath()), conf)) { - LoggedJob job = reader.getNext(); - - while (job != null) { - try { - createAMForJob(job, baselineTimeMS); - } catch (Exception e) { - LOG.error("Failed to create an AM", e); - } - - job = reader.getNext(); - } - } - } - - private void createAMForJob(LoggedJob job, long baselineTimeMs) - throws YarnException { - String user = job.getUser() == null ? "default" : - job.getUser().getValue(); - String jobQueue = job.getQueue().getValue(); - String oldJobId = job.getJobID().toString(); - long jobStartTimeMS = job.getSubmitTime(); - long jobFinishTimeMS = job.getFinishTime(); - if (baselineTimeMs == 0) { - baselineTimeMs = job.getSubmitTime(); - } - jobStartTimeMS -= baselineTimeMs; - jobFinishTimeMS -= baselineTimeMs; - if (jobStartTimeMS < 0) { - LOG.warn("Warning: reset job {} start time to 0.", oldJobId); - jobFinishTimeMS = jobFinishTimeMS - jobStartTimeMS; - jobStartTimeMS = 0; - } - - increaseQueueAppNum(jobQueue); - - List containerList = new ArrayList<>(); - // mapper - for (LoggedTask mapTask : job.getMapTasks()) { - if (mapTask.getAttempts().size() == 0) { - throw new YarnException("Invalid map task, no attempt for a mapper!"); - } - LoggedTaskAttempt taskAttempt = - mapTask.getAttempts().get(mapTask.getAttempts().size() - 1); - String hostname = taskAttempt.getHostName().getValue(); - long containerLifeTime = taskAttempt.getFinishTime() - - taskAttempt.getStartTime(); - containerList.add( - new ContainerSimulator(getDefaultContainerResource(), - containerLifeTime, hostname, DEFAULT_MAPPER_PRIORITY, "map")); - } - - // reducer - for (LoggedTask reduceTask : job.getReduceTasks()) { - if (reduceTask.getAttempts().size() == 0) { - throw new YarnException( - "Invalid reduce task, no attempt for a reducer!"); - } - LoggedTaskAttempt taskAttempt = - reduceTask.getAttempts().get(reduceTask.getAttempts().size() - 1); - String hostname = taskAttempt.getHostName().getValue(); - long containerLifeTime = taskAttempt.getFinishTime() - - taskAttempt.getStartTime(); - containerList.add( - new ContainerSimulator(getDefaultContainerResource(), - containerLifeTime, hostname, DEFAULT_REDUCER_PRIORITY, "reduce")); - } - - // Only supports the default job type currently - runNewAM(SLSUtils.DEFAULT_JOB_TYPE, user, jobQueue, oldJobId, - jobStartTimeMS, jobFinishTimeMS, containerList, - getAMContainerResource(null)); - } - - private Resource getDefaultContainerResource() { + Resource getDefaultContainerResource() { int containerMemory = getConf().getInt(SLSConfiguration.CONTAINER_MEMORY_MB, SLSConfiguration.CONTAINER_MEMORY_MB_DEFAULT); int containerVCores = getConf().getInt(SLSConfiguration.CONTAINER_VCORES, @@ -715,178 +226,23 @@ private Resource getDefaultContainerResource() { return Resources.createResource(containerMemory, containerVCores); } - /** - * parse workload information from synth-generator trace files. - */ - @SuppressWarnings("unchecked") - private void startAMFromSynthGenerator() throws YarnException, IOException { - Configuration localConf = new Configuration(); - localConf.set("fs.defaultFS", "file:///"); - long baselineTimeMS = 0; - - // if we use the nodeFile this could have been not initialized yet. - if (stjp == null) { - stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0])); - } - - SynthJob job = null; - // we use stjp, a reference to the job producer instantiated during node - // creation - while ((job = (SynthJob) stjp.getNextJob()) != null) { - // only support MapReduce currently - String user = job.getUser() == null ? DEFAULT_USER : - job.getUser(); - String jobQueue = job.getQueueName(); - String oldJobId = job.getJobID().toString(); - long jobStartTimeMS = job.getSubmissionTime(); - - // CARLO: Finish time is only used for logging, omit for now - long jobFinishTimeMS = jobStartTimeMS + job.getDuration(); - - if (baselineTimeMS == 0) { - baselineTimeMS = jobStartTimeMS; - } - jobStartTimeMS -= baselineTimeMS; - jobFinishTimeMS -= baselineTimeMS; - if (jobStartTimeMS < 0) { - LOG.warn("Warning: reset job {} start time to 0.", oldJobId); - jobFinishTimeMS = jobFinishTimeMS - jobStartTimeMS; - jobStartTimeMS = 0; - } - - increaseQueueAppNum(jobQueue); - - List containerList = - new ArrayList(); - ArrayList keyAsArray = new ArrayList(nmMap.keySet()); - Random rand = new Random(stjp.getSeed()); - - for (SynthJob.SynthTask task : job.getTasks()) { - RMNode node = nmMap.get(keyAsArray.get(rand.nextInt(keyAsArray.size()))) - .getNode(); - String hostname = "/" + node.getRackName() + "/" + node.getHostName(); - long containerLifeTime = task.getTime(); - Resource containerResource = Resource - .newInstance((int) task.getMemory(), (int) task.getVcores()); - containerList.add( - new ContainerSimulator(containerResource, containerLifeTime, - hostname, task.getPriority(), task.getType(), - task.getExecutionType())); - } - - - ReservationId reservationId = null; - - if(job.hasDeadline()){ - reservationId = ReservationId - .newInstance(this.rm.getStartTime(), AM_ID); - } - - runNewAM(job.getType(), user, jobQueue, oldJobId, - jobStartTimeMS, jobFinishTimeMS, containerList, reservationId, - job.getDeadline(), getAMContainerResource(null), null, - job.getParams()); - } - } - - private Resource getAMContainerResource(Map jsonJob) { - Resource amContainerResource = - SLSConfiguration.getAMContainerResource(getConf()); - - if (jsonJob == null) { - return amContainerResource; - } - - ResourceInformation[] infors = ResourceUtils.getResourceTypesArray(); - for (ResourceInformation info : infors) { - String key = SLSConfiguration.JOB_AM_PREFIX + info.getName(); - if (jsonJob.containsKey(key)) { - long value = Long.parseLong(jsonJob.get(key).toString()); - amContainerResource.setResourceValue(info.getName(), value); - } - } - - return amContainerResource; - } - - private void increaseQueueAppNum(String queue) throws YarnException { - SchedulerWrapper wrapper = (SchedulerWrapper)rm.getResourceScheduler(); - String queueName = wrapper.getRealQueueName(queue); - Integer appNum = queueAppNumMap.get(queueName); - if (appNum == null) { - appNum = 1; - } else { - appNum = appNum + 1; - } - - queueAppNumMap.put(queueName, appNum); - SchedulerMetrics metrics = wrapper.getSchedulerMetrics(); - if (metrics != null) { - metrics.trackQueue(queueName); - } - } - - private void runNewAM(String jobType, String user, - String jobQueue, String oldJobId, long jobStartTimeMS, - long jobFinishTimeMS, List containerList, - Resource amContainerResource) { - runNewAM(jobType, user, jobQueue, oldJobId, jobStartTimeMS, - jobFinishTimeMS, containerList, null, -1, - amContainerResource, null, null); - } - - private void runNewAM(String jobType, String user, - String jobQueue, String oldJobId, long jobStartTimeMS, - long jobFinishTimeMS, List containerList, - Resource amContainerResource, String labelExpr) { - runNewAM(jobType, user, jobQueue, oldJobId, jobStartTimeMS, - jobFinishTimeMS, containerList, null, -1, - amContainerResource, labelExpr, null); - } - - @SuppressWarnings("checkstyle:parameternumber") - private void runNewAM(String jobType, String user, - String jobQueue, String oldJobId, long jobStartTimeMS, - long jobFinishTimeMS, List containerList, - ReservationId reservationId, long deadline, Resource amContainerResource, - String labelExpr, Map params) { - AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance( - amClassMap.get(jobType), new Configuration()); - - if (amSim != null) { - int heartbeatInterval = getConf().getInt( - SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS, - SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS_DEFAULT); - boolean isTracked = trackedApps.contains(oldJobId); - - if (oldJobId == null) { - oldJobId = Integer.toString(AM_ID); - } - AM_ID++; - amSim.init(heartbeatInterval, containerList, rm, this, jobStartTimeMS, - jobFinishTimeMS, user, jobQueue, isTracked, oldJobId, - runner.getStartTimeMS(), amContainerResource, labelExpr, params, - appIdAMSim); - if(reservationId != null) { - // if we have a ReservationId, delegate reservation creation to - // AMSim (reservation shape is impl specific) - UTCClock clock = new UTCClock(); - amSim.initReservation(reservationId, deadline, clock.getTime()); - } - runner.schedule(amSim); - maxRuntime = Math.max(maxRuntime, jobFinishTimeMS); - numTasks += containerList.size(); - amMap.put(oldJobId, amSim); - } + public void increaseQueueAppNum(String queue) throws YarnException { + rmRunner.increaseQueueAppNum(queue); } private void printSimulationInfo() { + final int numAMs = amRunner.getNumAMs(); + final int numTasks = amRunner.getNumTasks(); + final long maxRuntime = amRunner.getMaxRuntime(); + Map amMap = amRunner.getAmMap(); + Map queueAppNumMap = rmRunner.getQueueAppNumMap(); + if (printSimulation) { // node LOG.info("------------------------------------"); LOG.info("# nodes = {}, # racks = {}, capacity " + "of each node {}.", - numNMs, numRacks, nodeManagerResource); + nmRunner.getNumNMs(), nmRunner.getNumRacks(), nmRunner.getNodeManagerResource()); LOG.info("------------------------------------"); // job LOG.info("# applications = {}, # total " + @@ -910,12 +266,12 @@ private void printSimulationInfo() { LOG.info("------------------------------------"); } // package these information in the simulateInfoMap used by other places - simulateInfoMap.put("Number of racks", numRacks); - simulateInfoMap.put("Number of nodes", numNMs); + simulateInfoMap.put("Number of racks", nmRunner.getNumRacks()); + simulateInfoMap.put("Number of nodes", nmRunner.getNumNMs()); simulateInfoMap.put("Node memory (MB)", - nodeManagerResource.getResourceValue(ResourceInformation.MEMORY_URI)); + nmRunner.getNodeManagerResource().getResourceValue(ResourceInformation.MEMORY_URI)); simulateInfoMap.put("Node VCores", - nodeManagerResource.getResourceValue(ResourceInformation.VCORES_URI)); + nmRunner.getNodeManagerResource().getResourceValue(ResourceInformation.VCORES_URI)); simulateInfoMap.put("Number of applications", numAMs); simulateInfoMap.put("Number of tasks", numTasks); simulateInfoMap.put("Average tasks per applicaion", @@ -928,13 +284,13 @@ private void printSimulationInfo() { } public Map getNmMap() { - return nmMap; + return nmRunner.getNmMap(); } public static void decreaseRemainingApps() { - remainingApps--; + AMRunner.REMAINING_APPS--; - if (remainingApps == 0) { + if (AMRunner.REMAINING_APPS == 0) { LOG.info("SLSRunner tears down."); if (exitAtTheFinish) { System.exit(0); @@ -943,7 +299,7 @@ public static void decreaseRemainingApps() { } public void stop() throws InterruptedException { - rm.stop(); + rmRunner.stop(); runner.stop(); } @@ -1117,4 +473,12 @@ public int hashCode() { return result; } } + + public SynthTraceJobProducer getStjp() { + return stjp; + } + + public AMSimulator getAMSimulatorByAppId(ApplicationId appId) { + return amRunner.getAMSimulator(appId); + } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/TaskContainerDefinition.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/TaskContainerDefinition.java new file mode 100644 index 00000000000..3520dae8c44 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/TaskContainerDefinition.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sls; + +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.exceptions.YarnException; +import java.util.Map; + +import static org.apache.hadoop.yarn.sls.AMDefinitionRumen.DEFAULT_MAPPER_PRIORITY; + +public class TaskContainerDefinition { + private long duration; + private Resource resource; + private int priority; + private String type; + private int count; + private ExecutionType executionType; + private long allocationId = -1; + private long requestDelay = 0; + private String hostname; + + public long getDuration() { + return duration; + } + + public Resource getResource() { + return resource; + } + + public int getPriority() { + return priority; + } + + public String getType() { + return type; + } + + public int getCount() { + return count; + } + + public ExecutionType getExecutionType() { + return executionType; + } + + public long getAllocationId() { + return allocationId; + } + + public long getRequestDelay() { + return requestDelay; + } + + public String getHostname() { + return hostname; + } + + public static final class Builder { + private long duration = -1; + private long durationLegacy = -1; + private long taskStart = -1; + private long taskFinish = -1; + private Resource resource; + private int priority = DEFAULT_MAPPER_PRIORITY; + private String type = "map"; + private int count = 1; + private ExecutionType executionType = ExecutionType.GUARANTEED; + private long allocationId = -1; + private long requestDelay = 0; + private String hostname; + + public static Builder create() { + return new Builder(); + } + + public Builder withDuration(Map jsonTask, String key) { + if (jsonTask.containsKey(key)) { + this.duration = Integer.parseInt(jsonTask.get(key).toString()); + } + return this; + } + + public Builder withDuration(long duration) { + this.duration = duration; + return this; + } + + /** + * Also support "duration.ms" for backward compatibility. + * @param key + * @return + */ + public Builder withDurationLegacy(Map jsonTask, String key) { + if (jsonTask.containsKey(key)) { + this.durationLegacy = Integer.parseInt(jsonTask.get(key).toString()); + } + return this; + } + + public Builder withTaskStart(Map jsonTask, String key) { + if (jsonTask.containsKey(key)) { + this.taskStart = Long.parseLong(jsonTask.get(key).toString()); + } + return this; + } + + public Builder withTaskFinish(Map jsonTask, String key) { + if (jsonTask.containsKey(key)) { + this.taskFinish = Long.parseLong(jsonTask.get(key).toString()); + } + return this; + } + + public Builder withResource(Resource resource) { + this.resource = resource; + return this; + } + + public Builder withPriority(Map jsonTask, String key) { + if (jsonTask.containsKey(key)) { + this.priority = Integer.parseInt(jsonTask.get(key).toString()); + } + return this; + } + + public Builder withPriority(int priority) { + this.priority = priority; + return this; + } + + public Builder withType(Map jsonTask, String key) { + if (jsonTask.containsKey(key)) { + this.type = jsonTask.get(key).toString(); + } + return this; + } + + public Builder withType(String type) { + this.type = type; + return this; + } + + public Builder withCount(Map jsonTask, String key) { + if (jsonTask.containsKey(key)) { + count = Integer.parseInt(jsonTask.get(key).toString()); + count = Math.max(count, 1); + } + return this; + } + + public Builder withCount(int count) { + this.count = count; + return this; + } + + public Builder withExecutionType(Map jsonTask, String key) { + if (jsonTask.containsKey(key)) { + this.executionType = ExecutionType.valueOf(jsonTask.get(key).toString()); + } + return this; + } + + public Builder withExecutionType(ExecutionType executionType) { + this.executionType = executionType; + return this; + } + + public Builder withAllocationId(Map jsonTask, String key) { + if (jsonTask.containsKey(key)) { + this.allocationId = Long.parseLong(jsonTask.get(key).toString()); + } + return this; + } + + public Builder withAllocationId(long allocationId) { + this.allocationId = allocationId; + return this; + } + + public Builder withRequestDelay(Map jsonTask, String key) { + if (jsonTask.containsKey(key)) { + requestDelay = Long.parseLong(jsonTask.get(key).toString()); + requestDelay = Math.max(requestDelay, 0); + } + return this; + } + + public Builder withRequestDelay(long requestDelay) { + this.requestDelay = requestDelay; + return this; + } + + public Builder withHostname(String hostname) { + this.hostname = hostname; + return this; + } + + public TaskContainerDefinition build() throws YarnException { + TaskContainerDefinition taskContainerDef = + new TaskContainerDefinition(); + taskContainerDef.duration = validateAndGetDuration(this); + taskContainerDef.resource = this.resource; + taskContainerDef.type = this.type; + taskContainerDef.requestDelay = this.requestDelay; + taskContainerDef.priority = this.priority; + taskContainerDef.count = this.count; + taskContainerDef.allocationId = this.allocationId; + taskContainerDef.executionType = this.executionType; + taskContainerDef.hostname = this.hostname; + return taskContainerDef; + } + + private long validateAndGetDuration(Builder builder) throws YarnException { + long duration = 0; + + if (builder.duration != -1) { + duration = builder.duration; + } else if (builder.durationLegacy != -1) { + duration = builder.durationLegacy; + } else if (builder.taskStart != -1 && builder.taskFinish != -1) { + duration = builder.taskFinish - builder.taskStart; + } + + if (duration <= 0) { + throw new YarnException("Duration of a task shouldn't be less or equal" + + " to 0!"); + } + return duration; + } + } +} diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java index 1330e4d2f2b..2d54734f265 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java @@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.sls.AMDefinition; import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; @@ -121,28 +122,26 @@ public AMSimulator() { this.responseQueue = new LinkedBlockingQueue<>(); } - - @SuppressWarnings("checkstyle:parameternumber") - public void init(int heartbeatInterval, - List containerList, ResourceManager resourceManager, - SLSRunner slsRunnner, long startTime, long finishTime, String simUser, - String simQueue, boolean tracked, String oldApp, long baseTimeMS, - Resource amResource, String nodeLabelExpr, Map params, - Map appIdAMSim) { - super.init(startTime, startTime + 1000000L * heartbeatInterval, - heartbeatInterval); - this.user = simUser; - this.rm = resourceManager; - this.se = slsRunnner; - this.queue = simQueue; - this.oldAppId = oldApp; + + public void init(AMDefinition amDef, ResourceManager rm, SLSRunner slsRunner, + boolean tracked, long baselineTimeMS, long heartbeatInterval, + Map appIdToAMSim) { + long startTime = amDef.getJobStartTime(); + long endTime = startTime + 1000000L * heartbeatInterval; + super.init(startTime, endTime, heartbeatInterval); + + this.user = amDef.getUser(); + this.queue = amDef.getQueue(); + this.oldAppId = amDef.getOldAppId(); + this.amContainerResource = amDef.getAmResource(); + this.nodeLabelExpression = amDef.getLabelExpression(); + this.traceStartTimeMS = amDef.getJobStartTime(); + this.traceFinishTimeMS = amDef.getJobFinishTime(); + this.rm = rm; + this.se = slsRunner; this.isTracked = tracked; - this.baselineTimeMS = baseTimeMS; - this.traceStartTimeMS = startTime; - this.traceFinishTimeMS = finishTime; - this.amContainerResource = amResource; - this.nodeLabelExpression = nodeLabelExpr; - this.appIdToAMSim = appIdAMSim; + this.baselineTimeMS = baselineTimeMS; + this.appIdToAMSim = appIdToAMSim; } /** diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/DAGAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/DAGAMSimulator.java index f886a69e024..ba8c7b3558f 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/DAGAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/DAGAMSimulator.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.sls.AMDefinition; import org.apache.hadoop.yarn.sls.SLSRunner; import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; import org.slf4j.Logger; @@ -93,19 +94,15 @@ LoggerFactory.getLogger(DAGAMSimulator.class); @SuppressWarnings("checkstyle:parameternumber") - public void init(int heartbeatInterval, - List containerList, ResourceManager resourceManager, - SLSRunner slsRunnner, long startTime, long finishTime, String simUser, - String simQueue, boolean tracked, String oldApp, long baseTimeMS, - Resource amResource, String nodeLabelExpr, Map params, - Map appIdAMSim) { - super.init(heartbeatInterval, containerList, resourceManager, slsRunnner, - startTime, finishTime, simUser, simQueue, tracked, oldApp, baseTimeMS, - amResource, nodeLabelExpr, params, appIdAMSim); + public void init(AMDefinition amDef, ResourceManager rm, SLSRunner slsRunner, + boolean tracked, long baselineTimeMS, long heartbeatInterval, + Map appIdToAMSim) { + super.init(amDef, rm, slsRunner, tracked, baselineTimeMS, heartbeatInterval, + appIdToAMSim); super.amtype = "dag"; - allContainers.addAll(containerList); - pendingContainers.addAll(containerList); + allContainers.addAll(amDef.getTaskContainers()); + pendingContainers.addAll(amDef.getTaskContainers()); totalContainers = allContainers.size(); LOG.info("Added new job with {} containers", allContainers.size()); diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java index 586c671afed..d9eb41e6602 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java @@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.sls.AMDefinition; import org.apache.hadoop.yarn.sls.ReservationClientUtil; import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; import org.apache.hadoop.yarn.sls.SLSRunner; @@ -123,19 +124,15 @@ scheduled when all maps have finished (not support slow-start currently). LoggerFactory.getLogger(MRAMSimulator.class); @SuppressWarnings("checkstyle:parameternumber") - public void init(int heartbeatInterval, - List containerList, ResourceManager rm, SLSRunner se, - long traceStartTime, long traceFinishTime, String user, String queue, - boolean isTracked, String oldAppId, long baselineStartTimeMS, - Resource amContainerResource, String nodeLabelExpr, - Map params, Map appIdAMSim) { - super.init(heartbeatInterval, containerList, rm, se, traceStartTime, - traceFinishTime, user, queue, isTracked, oldAppId, baselineStartTimeMS, - amContainerResource, nodeLabelExpr, params, appIdAMSim); + public void init(AMDefinition amDef, ResourceManager rm, SLSRunner slsRunner, + boolean tracked, long baselineTimeMS, long heartbeatInterval, + Map appIdToAMSim) { + super.init(amDef, rm, slsRunner, tracked, baselineTimeMS, heartbeatInterval, + appIdToAMSim); amtype = "mapreduce"; // get map/reduce tasks - for (ContainerSimulator cs : containerList) { + for (ContainerSimulator cs : amDef.getTaskContainers()) { if (cs.getType().equals("map")) { cs.setPriority(PRIORITY_MAP); allMaps.add(cs); diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java index 46bc90a337c..751b1797fd6 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.sls.AMDefinition; import org.apache.hadoop.yarn.sls.SLSRunner; import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; import org.slf4j.Logger; @@ -93,21 +94,13 @@ LoggerFactory.getLogger(StreamAMSimulator.class); @SuppressWarnings("checkstyle:parameternumber") - public void init(int heartbeatInterval, - List containerList, ResourceManager rm, SLSRunner se, - long traceStartTime, long traceFinishTime, String user, String queue, - boolean isTracked, String oldAppId, long baselineStartTimeMS, - Resource amContainerResource, String nodeLabelExpr, - Map params, Map appIdAMSim) { - super.init(heartbeatInterval, containerList, rm, se, traceStartTime, - traceFinishTime, user, queue, isTracked, oldAppId, baselineStartTimeMS, - amContainerResource, nodeLabelExpr, params, appIdAMSim); + public void init(AMDefinition amDef, ResourceManager rm, SLSRunner slsRunner, + boolean tracked, long baselineTimeMS, long heartbeatInterval, + Map appIdToAMSim) { + super.init(amDef, rm, slsRunner, tracked, baselineTimeMS, heartbeatInterval, appIdToAMSim); amtype = "stream"; - - allStreams.addAll(containerList); - - duration = traceFinishTime - traceStartTime; - + allStreams.addAll(amDef.getTaskContainers()); + duration = amDef.getJobFinishTime() - amDef.getJobStartTime(); LOG.info("Added new job with {} streams, running for {}", allStreams.size(), duration); } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java index 37bf96afa05..c62b0ac3b4e 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java @@ -44,15 +44,11 @@ private static final Logger LOG = LoggerFactory.getLogger( MockAMLauncher.class); - private Map appIdAMSim; + private SLSRunner slsRunner; - SLSRunner se; - - public MockAMLauncher(SLSRunner se, RMContext rmContext, - Map appIdAMSim) { + public MockAMLauncher(SLSRunner slsRunner, RMContext rmContext) { super(rmContext); - this.appIdAMSim = appIdAMSim; - this.se = se; + this.slsRunner = slsRunner; } @Override @@ -84,7 +80,7 @@ public void handle(AMLauncherEvent event) { ApplicationId appId = event.getAppAttempt().getAppAttemptId().getApplicationId(); // find AMSimulator - AMSimulator ams = appIdAMSim.get(appId); + AMSimulator ams = slsRunner.getAMSimulatorByAppId(appId); if (ams == null) { throw new YarnRuntimeException( "Didn't find any AMSimulator for applicationId=" + appId); @@ -103,14 +99,14 @@ public void handle(AMLauncherEvent event) { event.getAppAttempt().getMasterContainer()); LOG.info("Notify AM launcher launched:" + amContainer.getId()); - se.getNmMap().get(amContainer.getNodeId()) + slsRunner.getNmMap().get(amContainer.getNodeId()) .addNewContainer(amContainer, -1); return; } catch (Exception e) { throw new YarnRuntimeException(e); } case CLEANUP: - se.getNmMap().get(amContainer.getNodeId()) + slsRunner.getNmMap().get(amContainer.getNodeId()) .cleanupContainer(amContainer.getId()); break; default: diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java index e83ee91d8e1..4f9d254a81e 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java @@ -26,54 +26,39 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.sls.TaskContainerDefinition; @Private @Unstable public class ContainerSimulator implements Delayed { - // id private ContainerId id; - // resource allocated private Resource resource; - // end time private long endTime; // life time (ms) private long lifeTime; // time(ms) after which container would be requested by AM private long requestDelay; - // host name private String hostname; - // priority private int priority; - // type private String type; - // execution type private ExecutionType executionType = ExecutionType.GUARANTEED; - // allocation id private long allocationId; /** - * invoked when AM schedules containers to allocate. + * Invoked when AM schedules containers to allocate. */ - public ContainerSimulator(Resource resource, long lifeTime, - String hostname, int priority, String type) { - this(resource, lifeTime, hostname, priority, type, - ExecutionType.GUARANTEED); + public static ContainerSimulator createFromTaskContainerDefinition( + TaskContainerDefinition def) { + return new ContainerSimulator(def.getResource(), def.getDuration(), + def.getHostname(), def.getPriority(), def.getType(), + def.getExecutionType(), def.getAllocationId(), def.getRequestDelay()); } - - /** - * invoked when AM schedules containers to allocate. - */ - public ContainerSimulator(Resource resource, long lifeTime, - String hostname, int priority, String type, ExecutionType executionType) { - this(resource, lifeTime, hostname, priority, type, - executionType, -1, 0); - } - + /** - * invoked when AM schedules containers to allocate. + * Invoked when AM schedules containers to allocate. */ @SuppressWarnings("checkstyle:parameternumber") - public ContainerSimulator(Resource resource, long lifeTime, + private ContainerSimulator(Resource resource, long lifeTime, String hostname, int priority, String type, ExecutionType executionType, long allocationId, long requestDelay) { this.resource = resource; @@ -87,7 +72,7 @@ public ContainerSimulator(Resource resource, long lifeTime, } /** - * invoke when NM schedules containers to run. + * Invoked when NM schedules containers to run. */ public ContainerSimulator(ContainerId id, Resource resource, long endTime, long lifeTime, long allocationId) { diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java index 256dcf46291..e529d1841a6 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java @@ -57,8 +57,6 @@ @Private @Unstable public class SLSUtils { - public final static String DEFAULT_JOB_TYPE = "mapreduce"; - private static final String LABEL_FORMAT_ERR_MSG = "Input format for adding node-labels is not correct, it should be " + "labelName1[(exclusive=true/false)],labelName2[] .."; diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestDagAMSimulator.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestDagAMSimulator.java index 8ac7fff75cb..bb6c0c36829 100644 --- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestDagAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestDagAMSimulator.java @@ -26,6 +26,8 @@ import java.util.List; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * Tests for DagAMSimulator. @@ -74,7 +76,15 @@ public void testGetToBeScheduledContainers() throws Exception { private ContainerSimulator createContainerSim(long allocationId, long requestDelay) { - return new ContainerSimulator(null, 1000, "*", 1, "Map", - null, allocationId, requestDelay); + TaskContainerDefinition taskContainerDef = mock(TaskContainerDefinition.class); + when(taskContainerDef.getResource()).thenReturn(null); + when(taskContainerDef.getDuration()).thenReturn(1000L); + when(taskContainerDef.getHostname()).thenReturn("*"); + when(taskContainerDef.getPriority()).thenReturn(1); + when(taskContainerDef.getType()).thenReturn("Map"); + when(taskContainerDef.getExecutionType()).thenReturn(null); + when(taskContainerDef.getAllocationId()).thenReturn(allocationId); + when(taskContainerDef.getRequestDelay()).thenReturn(requestDelay); + return ContainerSimulator.createFromTaskContainerDefinition(taskContainerDef); } } diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java index ec7c81d63bd..1f26f0b2830 100644 --- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java @@ -20,6 +20,7 @@ import com.codahale.metrics.MetricRegistry; import java.util.HashMap; import org.apache.commons.io.FileUtils; +import org.apache.hadoop.tools.rumen.datatypes.UserName; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ReservationId; @@ -32,6 +33,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.sls.AMDefinitionRumen; +import org.apache.hadoop.yarn.sls.TaskContainerDefinition; import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; import org.apache.hadoop.yarn.sls.scheduler.*; import org.apache.hadoop.yarn.util.resource.Resources; @@ -52,6 +55,9 @@ import java.util.List; import java.util.concurrent.ConcurrentMap; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + @RunWith(Parameterized.class) public class TestAMSimulator { private ResourceManager rm; @@ -150,9 +156,20 @@ public void testAMSimulator() throws Exception { String queue = "default"; List containers = new ArrayList<>(); HashMap map = new HashMap<>(); - app.init(1000, containers, rm, null, 0, 1000000L, "user1", queue, true, - appId, 0, SLSConfiguration.getAMContainerResource(conf), null, null, - map); + + UserName mockUser = mock(UserName.class); + when(mockUser.getValue()).thenReturn("user1"); + AMDefinitionRumen amDef = + ((AMDefinitionRumen.Builder) AMDefinitionRumen.Builder.create() + .withUser(mockUser) + .withQueue(queue) + .withJobId(appId) + .withJobStartTime(0) + .withJobFinishTime(1000000L) + .withAmResource(SLSConfiguration.getAMContainerResource(conf)) + .withTaskContainers(containers)) + .build(); + app.init(amDef, rm, null, true, 0, 1000, map); app.firstStep(); verifySchedulerMetrics(appId); @@ -177,9 +194,21 @@ public void testAMSimulatorWithNodeLabels() throws Exception { String queue = "default"; List containers = new ArrayList<>(); HashMap map = new HashMap<>(); - app.init(1000, containers, rm, null, 0, 1000000L, "user1", queue, true, - appId, 0, SLSConfiguration.getAMContainerResource(conf), "label1", - null, map); + + UserName mockUser = mock(UserName.class); + when(mockUser.getValue()).thenReturn("user1"); + AMDefinitionRumen amDef = + AMDefinitionRumen.Builder.create() + .withUser(mockUser) + .withQueue(queue) + .withJobId(appId) + .withJobStartTime(0) + .withJobFinishTime(1000000L) + .withAmResource(SLSConfiguration.getAMContainerResource(conf)) + .withTaskContainers(containers) + .withLabelExpression("label1") + .build(); + app.init(amDef, rm, null, true, 0, 1000, map); app.firstStep(); verifySchedulerMetrics(appId); @@ -194,7 +223,7 @@ public void testAMSimulatorWithNodeLabels() throws Exception { } @Test - public void testPackageRequests() { + public void testPackageRequests() throws YarnException { MockAMSimulator app = new MockAMSimulator(); List containerSimulators = new ArrayList<>(); Resource resource = Resources.createResource(1024); @@ -202,13 +231,26 @@ public void testPackageRequests() { ExecutionType execType = ExecutionType.GUARANTEED; String type = "map"; - ContainerSimulator s1 = new ContainerSimulator(resource, 100, - "/default-rack/h1", priority, type, execType); - ContainerSimulator s2 = new ContainerSimulator(resource, 100, - "/default-rack/h1", priority, type, execType); - ContainerSimulator s3 = new ContainerSimulator(resource, 100, - "/default-rack/h2", priority, type, execType); - + TaskContainerDefinition.Builder builder = + TaskContainerDefinition.Builder.create() + .withResource(resource) + .withDuration(100) + .withPriority(1) + .withType(type) + .withExecutionType(execType) + .withAllocationId(-1) + .withRequestDelay(0); + + ContainerSimulator s1 = ContainerSimulator.createFromTaskContainerDefinition( + builder.withHostname("/default-rack/h1") + .build()); + ContainerSimulator s2 = ContainerSimulator.createFromTaskContainerDefinition( + builder.withHostname("/default-rack/h1") + .build()); + ContainerSimulator s3 = ContainerSimulator.createFromTaskContainerDefinition( + builder.withHostname("/default-rack/h2") + .build()); + containerSimulators.add(s1); containerSimulators.add(s2); containerSimulators.add(s3); @@ -243,12 +285,12 @@ public void testPackageRequests() { Assert.assertEquals(2, nodeRequestCount); containerSimulators.clear(); - s1 = new ContainerSimulator(resource, 100, - "/default-rack/h1", priority, type, execType, 1, 0); - s2 = new ContainerSimulator(resource, 100, - "/default-rack/h1", priority, type, execType, 2, 0); - s3 = new ContainerSimulator(resource, 100, - "/default-rack/h2", priority, type, execType, 1, 0); + s1 = ContainerSimulator.createFromTaskContainerDefinition( + createDefaultTaskContainerDefMock(resource, priority, execType, type, "/default-rack/h1", 1)); + s2 = ContainerSimulator.createFromTaskContainerDefinition( + createDefaultTaskContainerDefMock(resource, priority, execType, type, "/default-rack/h1", 2)); + s3 = ContainerSimulator.createFromTaskContainerDefinition( + createDefaultTaskContainerDefMock(resource, priority, execType, type, "/default-rack/h2", 1)); containerSimulators.add(s1); containerSimulators.add(s2); @@ -288,6 +330,17 @@ public void testPackageRequests() { Assert.assertEquals(3, nodeRequestCount); } + private TaskContainerDefinition createDefaultTaskContainerDefMock(Resource resource, int priority, ExecutionType execType, String type, String hostname, long allocationId) { + TaskContainerDefinition taskContainerDef = mock(TaskContainerDefinition.class); + when(taskContainerDef.getResource()).thenReturn(resource); + when(taskContainerDef.getDuration()).thenReturn(100L); + when(taskContainerDef.getPriority()).thenReturn(priority); + when(taskContainerDef.getType()).thenReturn(type); + when(taskContainerDef.getExecutionType()).thenReturn(execType); + when(taskContainerDef.getHostname()).thenReturn(hostname); + when(taskContainerDef.getAllocationId()).thenReturn(allocationId); + return taskContainerDef; + } @After public void tearDown() {