diff --git a/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/TaskAttemptInfo.java b/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/TaskAttemptInfo.java index d13974f..e78f184 100644 --- a/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/TaskAttemptInfo.java +++ b/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/TaskAttemptInfo.java @@ -20,6 +20,7 @@ import java.util.List; import org.apache.hadoop.mapred.TaskStatus.State; +import org.apache.hadoop.yarn.api.records.ExecutionType; /** * {@link TaskAttemptInfo} is a collection of statistics about a particular @@ -75,4 +76,8 @@ public TaskInfo getTaskInfo() { public List getSplitVector(LoggedTaskAttempt.SplitVectorKind kind) { return kind.get(allSplits); } + + public ExecutionType getExecutionType() { + return taskInfo.getExecutionType(); + } } diff --git a/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/TaskInfo.java b/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/TaskInfo.java index 6159f85..e04c824 100644 --- a/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/TaskInfo.java +++ b/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/TaskInfo.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.tools.rumen; +import org.apache.hadoop.yarn.api.records.ExecutionType; + public class TaskInfo { private final long bytesIn; private final int recsIn; @@ -25,29 +27,42 @@ private final long maxMemory; private final long maxVcores; private final ResourceUsageMetrics metrics; - + private ExecutionType executionType; public TaskInfo(long bytesIn, int recsIn, long bytesOut, int recsOut, long maxMemory) { this(bytesIn, recsIn, bytesOut, recsOut, maxMemory, 1, - new ResourceUsageMetrics()); + new ResourceUsageMetrics(), ExecutionType.GUARANTEED); } public TaskInfo(long bytesIn, int recsIn, long bytesOut, int recsOut, long maxMemory, ResourceUsageMetrics metrics) { - this(bytesIn, recsIn, bytesOut, recsOut, maxMemory, 1, metrics); + this(bytesIn, recsIn, bytesOut, recsOut, maxMemory, 1, metrics, ExecutionType.GUARANTEED); } public TaskInfo(long bytesIn, int recsIn, long bytesOut, int recsOut, long maxMemory, long maxVcores) { this(bytesIn, recsIn, bytesOut, recsOut, maxMemory, maxVcores, - new ResourceUsageMetrics()); + new ResourceUsageMetrics(), ExecutionType.GUARANTEED); + } + + public TaskInfo(long bytesIn, int recsIn, long bytesOut, int recsOut, + long maxMemory, long maxVcores, ExecutionType executionType) { + this(bytesIn, recsIn, bytesOut, recsOut, maxMemory, maxVcores, + new ResourceUsageMetrics(), executionType); + } + + public TaskInfo(long bytesIn, int recsIn, long bytesOut, int recsOut, + long maxMemory, long maxVcores, ResourceUsageMetrics + metrics) { + this(bytesIn, recsIn, bytesOut, recsOut, maxMemory, maxVcores, + metrics, ExecutionType.GUARANTEED); } public TaskInfo(long bytesIn, int recsIn, long bytesOut, int recsOut, long maxMemory, long maxVcores, ResourceUsageMetrics - metrics) { + metrics, ExecutionType executionType) { this.bytesIn = bytesIn; this.recsIn = recsIn; this.bytesOut = bytesOut; @@ -55,6 +70,7 @@ public TaskInfo(long bytesIn, int recsIn, long bytesOut, int recsOut, this.maxMemory = maxMemory; this.maxVcores = maxVcores; this.metrics = metrics; + this.executionType = executionType; } /** @@ -107,4 +123,8 @@ public long getTaskVCores() { public ResourceUsageMetrics getResourceUsageMetrics() { return metrics; } + + public ExecutionType getExecutionType() { + return executionType; + } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java index 456602f..260fef6 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java @@ -57,6 +57,7 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.ReservationId; @@ -289,6 +290,8 @@ private void startNM() throws YarnException, IOException { int heartbeatInterval = getConf().getInt(SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS, SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS_DEFAULT); + float waterLevel = getConf().getFloat(SLSConfiguration.NM_RESOURCE_WATER_LEVEL, + SLSConfiguration.NM_RESOURCE_WATER_LEVEL_DEFAULT); // nm information (fetch from topology file, or from sls/rumen json file) Set nodeSet = new HashSet(); if (nodeFile.isEmpty()) { @@ -326,7 +329,7 @@ private void startNM() throws YarnException, IOException { // we randomize the heartbeat start time from zero to 1 interval NMSimulator nm = new NMSimulator(); nm.init(hostName, nodeManagerResource, random.nextInt(heartbeatInterval), - heartbeatInterval, rm); + heartbeatInterval, rm, waterLevel); nmMap.put(nm.getNode().getNodeID(), nm); runner.schedule(nm); rackSet.add(nm.getNode().getRackName()); @@ -494,6 +497,11 @@ private void createAMForJob(Map jsonJob) throws YarnException { type = jsonTask.get(SLSConfiguration.TASK_TYPE).toString(); } + ExecutionType executionType = ExecutionType.GUARANTEED; + if (jsonTask.containsKey(SLSConfiguration.TASK_EXECUTION_TYPE)) { + executionType = ExecutionType.valueOf(jsonTask.get( + SLSConfiguration.TASK_EXECUTION_TYPE).toString().toUpperCase()); + } int count = 1; if (jsonTask.containsKey(SLSConfiguration.COUNT)) { count = Integer.parseInt( @@ -503,7 +511,7 @@ private void createAMForJob(Map jsonJob) throws YarnException { for (int i = 0; i < count; i++) { containers.add( - new ContainerSimulator(res, duration, hostname, priority, type)); + new ContainerSimulator(res, executionType, duration, hostname, priority, type)); } } @@ -584,7 +592,7 @@ private void createAMForJob(LoggedJob job, long baselineTimeMs) long containerLifeTime = taskAttempt.getFinishTime() - taskAttempt.getStartTime(); containerList.add( - new ContainerSimulator(getDefaultContainerResource(), + new ContainerSimulator(getDefaultContainerResource(), ExecutionType.GUARANTEED, containerLifeTime, hostname, DEFAULT_MAPPER_PRIORITY, "map")); } @@ -600,7 +608,7 @@ private void createAMForJob(LoggedJob job, long baselineTimeMs) long containerLifeTime = taskAttempt.getFinishTime() - taskAttempt.getStartTime(); containerList.add( - new ContainerSimulator(getDefaultContainerResource(), + new ContainerSimulator(getDefaultContainerResource(), ExecutionType.GUARANTEED, containerLifeTime, hostname, DEFAULT_REDUCER_PRIORITY, "reduce")); } @@ -676,7 +684,8 @@ private void startAMFromSynthGenerator() throws YarnException, IOException { Resource containerResource = Resource.newInstance((int) tai.getTaskInfo().getTaskMemory(), (int) tai.getTaskInfo().getTaskVCores()); - containerList.add(new ContainerSimulator(containerResource, + ExecutionType executionType = tai.getExecutionType(); + containerList.add(new ContainerSimulator(containerResource, executionType, containerLifeTime, hostname, DEFAULT_MAPPER_PRIORITY, "map")); } @@ -691,8 +700,9 @@ private void startAMFromSynthGenerator() throws YarnException, IOException { Resource containerResource = Resource.newInstance((int) tai.getTaskInfo().getTaskMemory(), (int) tai.getTaskInfo().getTaskVCores()); + ExecutionType executionType = tai.getExecutionType(); containerList.add( - new ContainerSimulator(containerResource, containerLifeTime, + new ContainerSimulator(containerResource, executionType, containerLifeTime, hostname, DEFAULT_REDUCER_PRIORITY, "reduce")); } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java index 5727b5f..319ef9a 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java @@ -47,6 +47,8 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ReservationId; @@ -260,12 +262,13 @@ public Object run() throws Exception { } protected ResourceRequest createResourceRequest( - Resource resource, String host, int priority, int numContainers) { + Resource resource, ExecutionType executionType, String host, int priority, int numContainers) { ResourceRequest request = recordFactory .newRecordInstance(ResourceRequest.class); request.setCapability(resource); request.setResourceName(host); request.setNumContainers(numContainers); + request.setExecutionTypeRequest(ExecutionTypeRequest.newInstance(executionType, true)); Priority prio = recordFactory.newRecordInstance(Priority.class); prio.setPriority(priority); request.setPriority(prio); @@ -401,7 +404,7 @@ public void untrackApp() { rackLocalRequestMap.get(rackname).getNumContainers() + 1); } else { ResourceRequest request = - createResourceRequest(cs.getResource(), rackname, priority, 1); + createResourceRequest(cs.getResource(), cs.getExecutionType(), rackname, priority, 1); rackLocalRequestMap.put(rackname, request); } // check node local @@ -411,17 +414,20 @@ public void untrackApp() { nodeLocalRequestMap.get(hostname).getNumContainers() + 1); } else { ResourceRequest request = - createResourceRequest(cs.getResource(), hostname, priority, 1); + createResourceRequest(cs.getResource(), cs.getExecutionType(), hostname, priority, 1); nodeLocalRequestMap.put(hostname, request); } } // any if (anyRequest == null) { anyRequest = createResourceRequest( - cs.getResource(), ResourceRequest.ANY, priority, 1); + cs.getResource(), cs.getExecutionType(), ResourceRequest.ANY, priority, 1); } else { anyRequest.setNumContainers(anyRequest.getNumContainers() + 1); } + + + } List ask = new ArrayList(); ask.addAll(nodeLocalRequestMap.values()); diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java index 7fc2a3c..1beb3c1 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java @@ -57,6 +57,8 @@ public static final int NM_RESOURCE_DEFAULT = 0; public static final String NM_HEARTBEAT_INTERVAL_MS = NM_PREFIX + "heartbeat.interval.ms"; + public static final String NM_RESOURCE_WATER_LEVEL = NM_PREFIX + + "resource.water.level"; public static final int NM_HEARTBEAT_INTERVAL_MS_DEFAULT = 1000; // am public static final String AM_PREFIX = PREFIX + "am."; @@ -82,6 +84,8 @@ public static final String CONTAINER_VCORES = CONTAINER_PREFIX + "vcores"; public static final int CONTAINER_VCORES_DEFAULT = 1; + public static final float NM_RESOURCE_WATER_LEVEL_DEFAULT = -1F; + public static Resource getAMContainerResource(Configuration conf) { return Resource.newInstance( conf.getLong(AM_CONTAINER_MEMORY, AM_CONTAINER_MEMORY_DEFAULT), @@ -116,5 +120,6 @@ public static Resource getAMContainerResource(Configuration conf) { public static final String TASK_DURATION_MS = TASK_CONTAINER + DURATION_MS; public static final String TASK_PRIORITY = TASK_CONTAINER + "priority"; public static final String TASK_TYPE = TASK_CONTAINER + "type"; + public static final String TASK_EXECUTION_TYPE = TASK_CONTAINER + "execution_type"; } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java index ba0fd56..3976d8d 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; @@ -72,10 +73,11 @@ private ResourceManager rm; // heart beat response id private int responseId = 0; + private float waterLevel; private final static Logger LOG = LoggerFactory.getLogger(NMSimulator.class); public void init(String nodeIdStr, Resource nodeResource, - int dispatchTime, int heartBeatInterval, ResourceManager rm) + int dispatchTime, int heartBeatInterval, ResourceManager rm, float waterLevel) throws IOException, YarnException { super.init(dispatchTime, dispatchTime + 1000000L * heartBeatInterval, heartBeatInterval); @@ -103,6 +105,7 @@ public void init(String nodeIdStr, Resource nodeResource, RegisterNodeManagerResponse response = rm.getResourceTrackerService() .registerNodeManager(req); masterKey = response.getNMTokenMasterKey(); + this.waterLevel = waterLevel; } @Override @@ -133,6 +136,13 @@ public void middleStep() throws Exception { ns.setKeepAliveApplications(new ArrayList()); ns.setResponseId(responseId++); ns.setNodeHealthStatus(NodeHealthStatus.newInstance(true, "", 0)); + if (waterLevel > 0 && waterLevel <=1) { + int pMemUsed = (int) (node.getTotalCapability().getMemorySize() * waterLevel); + float cpuUsed = node.getTotalCapability().getVirtualCores() * waterLevel; + ResourceUtilization resourceUtilization = ResourceUtilization.newInstance(pMemUsed, pMemUsed, cpuUsed); + ns.setContainersUtilization(resourceUtilization); + ns.setNodeUtilization(resourceUtilization); + } beatRequest.setNodeStatus(ns); NodeHeartbeatResponse beatResponse = rm.getResourceTrackerService().nodeHeartbeat(beatRequest); diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java index 8622976..abeda83 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java @@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Resource; @Private @@ -43,17 +44,20 @@ private int priority; // type private String type; + //Execution type + private ExecutionType executionType; /** * invoked when AM schedules containers to allocate */ - public ContainerSimulator(Resource resource, long lifeTime, + public ContainerSimulator(Resource resource, ExecutionType executionType, long lifeTime, String hostname, int priority, String type) { this.resource = resource; this.lifeTime = lifeTime; this.hostname = hostname; this.priority = priority; this.type = type; + this.executionType = executionType; } /** @@ -114,4 +118,8 @@ public String getType() { public void setPriority(int p) { priority = p; } + + public ExecutionType getExecutionType() { + return executionType; + } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java index 3ed81e1..b9588ab 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java @@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.tools.rumen.*; import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values; +import org.apache.hadoop.yarn.api.records.ExecutionType; import java.util.Arrays; import java.util.concurrent.atomic.AtomicInteger; @@ -55,6 +56,9 @@ private final String queueName; private final SynthJobClass jobClass; + private ExecutionType mapExecutionType; + private ExecutionType reduceExecutionType; + // job timing private final long submitTime; private final long duration; @@ -77,6 +81,8 @@ public SynthJob(JDKRandomGenerator rand, Configuration conf, this.conf = conf; this.jobClass = jobClass; + this.mapExecutionType = jobClass.getMapExecutionType(); + this.reduceExecutionType = jobClass.getReduceExecutionType(); this.duration = MILLISECONDS.convert(jobClass.getDur(), SECONDS); this.numMapTasks = jobClass.getMtasks(); this.numRedTasks = jobClass.getRtasks(); @@ -199,9 +205,9 @@ public int getNumberReduces() { public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) { switch (taskType) { case MAP: - return new TaskInfo(-1, -1, -1, -1, mapMaxMemory, mapMaxVcores); + return new TaskInfo(-1, -1, -1, -1, mapMaxMemory, mapMaxVcores, mapExecutionType); case REDUCE: - return new TaskInfo(-1, -1, -1, -1, reduceMaxMemory, reduceMaxVcores); + return new TaskInfo(-1, -1, -1, -1, reduceMaxMemory, reduceMaxVcores, reduceExecutionType); default: throw new IllegalArgumentException("Not interested"); } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJobClass.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJobClass.java index 439698f..c9a6d76 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJobClass.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJobClass.java @@ -22,6 +22,7 @@ import org.apache.commons.math3.random.JDKRandomGenerator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.tools.rumen.JobStory; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer.JobClass; import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer.Trace; @@ -42,6 +43,8 @@ private final LogNormalDistribution redMem; private final LogNormalDistribution mapVcores; private final LogNormalDistribution redVcores; + private final ExecutionType mapExecutionType; + private final ExecutionType reduceExecutionType; private final Trace trace; @SuppressWarnings("VisibilityModifier") @@ -77,6 +80,12 @@ public SynthJobClass(JDKRandomGenerator rand, Trace trace, jobClass.map_max_vcores_avg, jobClass.map_max_vcores_stddev); this.redVcores = SynthUtils.getLogNormalDist(rand, jobClass.reduce_max_vcores_avg, jobClass.reduce_max_vcores_stddev); + this.mapExecutionType = jobClass.map_execution_type == null + ? ExecutionType.GUARANTEED + : ExecutionType.valueOf(jobClass.map_execution_type); + this.reduceExecutionType = jobClass.reduce_execution_type == null + ? ExecutionType.GUARANTEED + : ExecutionType.valueOf(jobClass.reduce_execution_type); } public JobStory getJobStory(Configuration conf, long actualSubmissionTime) { @@ -105,6 +114,14 @@ public long getDur() { return genLongSample(dur); } + public ExecutionType getMapExecutionType() { + return mapExecutionType; + } + + public ExecutionType getReduceExecutionType() { + return reduceExecutionType; + } + public int getMtasks() { return genIntSample(mtasks); } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java index c89e4e2..7b57150 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java @@ -216,6 +216,11 @@ public int getNumNodes() { @JsonProperty("rtime_stddev") double rtime_stddev; + @JsonProperty("map_execution_type") + String map_execution_type; + @JsonProperty("reduce_execution_type") + String reduce_execution_type; + // number of tasks @JsonProperty("mtasks_avg") double mtasks_avg; diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSynthJobGeneration.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSynthJobGeneration.java index 2b1971a..37c964e 100644 --- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSynthJobGeneration.java +++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSynthJobGeneration.java @@ -20,6 +20,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.tools.rumen.TaskAttemptInfo; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.sls.synthetic.SynthJob; import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer; import org.apache.hadoop.mapreduce.MRJobConfig; @@ -29,6 +30,7 @@ import java.io.IOException; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; /** @@ -81,11 +83,13 @@ private void validateJob(SynthJob js) { for (int i = 0; i < js.getNumberMaps(); i++) { TaskAttemptInfo tai = js.getTaskAttemptInfo(TaskType.MAP, i, 0); assertTrue(tai.getRuntime() > 0); + assertEquals(ExecutionType.OPPORTUNISTIC, tai.getExecutionType()); } for (int i = 0; i < js.getNumberReduces(); i++) { TaskAttemptInfo tai = js.getTaskAttemptInfo(TaskType.REDUCE, i, 0); assertTrue(tai.getRuntime() > 0); + assertEquals(ExecutionType.GUARANTEED, tai.getExecutionType()); } if (js.hasDeadline()) { diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/nodemanager/TestNMSimulator.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/nodemanager/TestNMSimulator.java index 5064ef2..5e3a242 100644 --- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/nodemanager/TestNMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/nodemanager/TestNMSimulator.java @@ -76,7 +76,7 @@ public void testNMSimulator() throws Exception { // Register one node NMSimulator node1 = new NMSimulator(); node1.init("/rack1/node1", Resources.createResource(GB * 10, 10), 0, 1000, - rm); + rm, -1F); node1.middleStep(); int numClusterNodes = rm.getResourceScheduler().getNumClusterNodes(); diff --git a/hadoop-tools/hadoop-sls/src/test/resources/syn.json b/hadoop-tools/hadoop-sls/src/test/resources/syn.json index 8479d23..e285c38 100644 --- a/hadoop-tools/hadoop-sls/src/test/resources/syn.json +++ b/hadoop-tools/hadoop-sls/src/test/resources/syn.json @@ -14,6 +14,7 @@ { "class_name": "class_1", "user_name": "foobar", + "map_execution_type": "OPPORTUNISTIC", "class_weight": 1.0, "mtasks_avg": 5, "mtasks_stddev": 1,