diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java index 951c09d..1e873aa 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java @@ -55,6 +55,7 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.ReservationId; @@ -287,6 +288,8 @@ private void startNM() throws YarnException, IOException { int heartbeatInterval = getConf().getInt(SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS, SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS_DEFAULT); + float waterLevel = getConf().getFloat(SLSConfiguration.NM_RESOURCE_WATER_LEVEL, + SLSConfiguration.NM_RESOURCE_WATER_LEVEL_DEFAULT); // nm information (fetch from topology file, or from sls/rumen json file) Set nodeSet = new HashSet(); if (nodeFile.isEmpty()) { @@ -324,7 +327,7 @@ private void startNM() throws YarnException, IOException { // we randomize the heartbeat start time from zero to 1 interval NMSimulator nm = new NMSimulator(); nm.init(hostName, nodeManagerResource, random.nextInt(heartbeatInterval), - heartbeatInterval, rm); + heartbeatInterval, rm, waterLevel); nmMap.put(nm.getNode().getNodeID(), nm); runner.schedule(nm); rackSet.add(nm.getNode().getRackName()); @@ -499,9 +502,13 @@ private void createAMForJob(Map jsonJob) throws YarnException { } count = Math.max(count, 1); + ExecutionType executionType = ExecutionType.GUARANTEED; + if (jsonTask.containsKey(SLSConfiguration.TASK_EXECUTION_TYPE)) { + executionType = ExecutionType.valueOf(jsonTask.get(SLSConfiguration.TASK_EXECUTION_TYPE).toString()); + } for (int i = 0; i < count; i++) { containers.add( - new ContainerSimulator(res, duration, hostname, priority, type)); + new ContainerSimulator(res, duration, hostname, priority, type, executionType)); } } @@ -583,7 +590,7 @@ private void createAMForJob(LoggedJob job, long baselineTimeMs) taskAttempt.getStartTime(); containerList.add( new ContainerSimulator(getDefaultContainerResource(), - containerLifeTime, hostname, DEFAULT_MAPPER_PRIORITY, "map")); + containerLifeTime, hostname, DEFAULT_MAPPER_PRIORITY, "map", ExecutionType.GUARANTEED)); } // reducer @@ -599,7 +606,7 @@ private void createAMForJob(LoggedJob job, long baselineTimeMs) taskAttempt.getStartTime(); containerList.add( new ContainerSimulator(getDefaultContainerResource(), - containerLifeTime, hostname, DEFAULT_REDUCER_PRIORITY, "reduce")); + containerLifeTime, hostname, DEFAULT_REDUCER_PRIORITY, "reduce", ExecutionType.GUARANTEED)); } // Only supports the default job type currently @@ -670,7 +677,7 @@ private void startAMFromSynthGenerator() throws YarnException, IOException { .newInstance((int) task.getMemory(), (int) task.getVcores()); containerList.add( new ContainerSimulator(containerResource, containerLifeTime, - hostname, task.getPriority(), task.getType())); + hostname, task.getPriority(), task.getType(), task.getExecutionType())); } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java index bf85fff..e1e7f51 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java @@ -47,6 +47,8 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ReservationId; @@ -260,12 +262,14 @@ public Object run() throws Exception { } protected ResourceRequest createResourceRequest( - Resource resource, String host, int priority, int numContainers) { + Resource resource, ExecutionType executionType, String host, int priority, + int numContainers) { ResourceRequest request = recordFactory .newRecordInstance(ResourceRequest.class); request.setCapability(resource); request.setResourceName(host); request.setNumContainers(numContainers); + request.setExecutionTypeRequest(ExecutionTypeRequest.newInstance(executionType)); Priority prio = recordFactory.newRecordInstance(Priority.class); prio.setPriority(priority); request.setPriority(prio); @@ -401,7 +405,7 @@ public void untrackApp() { rackLocalRequestMap.get(rackname).getNumContainers() + 1); } else { ResourceRequest request = - createResourceRequest(cs.getResource(), rackname, priority, 1); + createResourceRequest(cs.getResource(), cs.getExecutionType(), rackname, priority, 1); rackLocalRequestMap.put(rackname, request); } // check node local @@ -411,14 +415,14 @@ public void untrackApp() { nodeLocalRequestMap.get(hostname).getNumContainers() + 1); } else { ResourceRequest request = - createResourceRequest(cs.getResource(), hostname, priority, 1); + createResourceRequest(cs.getResource(), cs.getExecutionType(), hostname, priority, 1); nodeLocalRequestMap.put(hostname, request); } } // any if (anyRequest == null) { anyRequest = createResourceRequest( - cs.getResource(), ResourceRequest.ANY, priority, 1); + cs.getResource(), cs.getExecutionType(), ResourceRequest.ANY, priority, 1); } else { anyRequest.setNumContainers(anyRequest.getNumContainers() + 1); } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java index 7fc2a3c..c14ad12 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java @@ -62,6 +62,8 @@ public static final String AM_PREFIX = PREFIX + "am."; public static final String AM_HEARTBEAT_INTERVAL_MS = AM_PREFIX + "heartbeat.interval.ms"; + public static final String NM_RESOURCE_WATER_LEVEL = NM_PREFIX + + "resource.water.level"; public static final int AM_HEARTBEAT_INTERVAL_MS_DEFAULT = 1000; public static final String AM_TYPE = AM_PREFIX + "type"; public static final String AM_TYPE_PREFIX = AM_TYPE + "."; @@ -74,6 +76,8 @@ "container.vcores"; public static final int AM_CONTAINER_VCORES_DEFAULT = 1; + public static final float NM_RESOURCE_WATER_LEVEL_DEFAULT = -1F; + // container public static final String CONTAINER_PREFIX = PREFIX + "container."; public static final String CONTAINER_MEMORY_MB = CONTAINER_PREFIX @@ -116,5 +120,6 @@ public static Resource getAMContainerResource(Configuration conf) { public static final String TASK_DURATION_MS = TASK_CONTAINER + DURATION_MS; public static final String TASK_PRIORITY = TASK_CONTAINER + "priority"; public static final String TASK_TYPE = TASK_CONTAINER + "type"; + public static final String TASK_EXECUTION_TYPE = TASK_CONTAINER + "execution.type"; } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java index ba0fd56..4898183 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; @@ -72,10 +73,11 @@ private ResourceManager rm; // heart beat response id private int responseId = 0; + private float waterLevel; private final static Logger LOG = LoggerFactory.getLogger(NMSimulator.class); public void init(String nodeIdStr, Resource nodeResource, - int dispatchTime, int heartBeatInterval, ResourceManager rm) + int dispatchTime, int heartBeatInterval, ResourceManager rm, float waterLevel) throws IOException, YarnException { super.init(dispatchTime, dispatchTime + 1000000L * heartBeatInterval, heartBeatInterval); @@ -103,6 +105,7 @@ public void init(String nodeIdStr, Resource nodeResource, RegisterNodeManagerResponse response = rm.getResourceTrackerService() .registerNodeManager(req); masterKey = response.getNMTokenMasterKey(); + this.waterLevel = waterLevel; } @Override @@ -133,6 +136,15 @@ public void middleStep() throws Exception { ns.setKeepAliveApplications(new ArrayList()); ns.setResponseId(responseId++); ns.setNodeHealthStatus(NodeHealthStatus.newInstance(true, "", 0)); + if (waterLevel > 0 && waterLevel <=1) { + int pMemUsed = (int) (node.getTotalCapability().getMemorySize() + * waterLevel); + float cpuUsed = node.getTotalCapability().getVirtualCores() * waterLevel; + ResourceUtilization resourceUtilization = ResourceUtilization.newInstance( + pMemUsed, pMemUsed, cpuUsed); + ns.setContainersUtilization(resourceUtilization); + ns.setNodeUtilization(resourceUtilization); + } beatRequest.setNodeStatus(ns); NodeHeartbeatResponse beatResponse = rm.getResourceTrackerService().nodeHeartbeat(beatRequest); diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java index 8622976..0e1a425 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java @@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Resource; @Private @@ -43,17 +44,20 @@ private int priority; // type private String type; + // execution type + private ExecutionType executionType = ExecutionType.GUARANTEED; /** * invoked when AM schedules containers to allocate */ public ContainerSimulator(Resource resource, long lifeTime, - String hostname, int priority, String type) { + String hostname, int priority, String type, ExecutionType executionType) { this.resource = resource; this.lifeTime = lifeTime; this.hostname = hostname; this.priority = priority; this.type = type; + this.executionType = executionType; } /** @@ -114,4 +118,8 @@ public String getType() { public void setPriority(int p) { priority = p; } + + public ExecutionType getExecutionType() { + return executionType; + } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java index 27156c7..36fe9d7 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java @@ -32,6 +32,7 @@ import org.apache.hadoop.tools.rumen.TaskAttemptInfo; import org.apache.hadoop.tools.rumen.TaskInfo; import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.sls.appmaster.MRAMSimulator; import java.util.ArrayList; @@ -92,14 +93,16 @@ private long maxMemory; private long maxVcores; private int priority; + private ExecutionType executionType; private SynthTask(String type, long time, long maxMemory, long maxVcores, - int priority){ + int priority, ExecutionType executionType){ this.type = type; this.time = time; this.maxMemory = maxMemory; this.maxVcores = maxVcores; this.priority = priority; + this.executionType = executionType; } public String getType(){ @@ -122,11 +125,15 @@ public int getPriority(){ return priority; } + public ExecutionType getExecutionType() { + return executionType; + } + @Override public String toString(){ return String.format("[task]\ttype: %1$-10s\ttime: %2$3s\tmemory: " - + "%3$4s\tvcores: %4$2s%n", getType(), getTime(), getMemory(), - getVcores()); + + "%3$4s\tvcores: %4$2s\texecution_type: %5$-10s%n", getType(), + getTime(), getMemory(), getVcores(), getExecutionType().toString()); } } @@ -181,6 +188,7 @@ protected SynthJob(JDKRandomGenerator rand, Configuration conf, long vcores = task.max_vcores.getLong(); vcores = vcores < MIN_VCORES ? MIN_VCORES : vcores; int priority = task.priority; + ExecutionType executionType = ExecutionType.valueOf(task.executionType); // Save task information by type taskByType.put(taskType, new ArrayList<>()); @@ -192,7 +200,7 @@ protected SynthJob(JDKRandomGenerator rand, Configuration conf, long time = task.time.getLong(); totalSlotTime += time; SynthTask t = new SynthTask(taskType, time, memory, vcores, - priority); + priority, executionType); tasks.add(t); taskByType.get(taskType).add(t); } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java index 09bc9b9..16e9a49 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java @@ -199,6 +199,7 @@ private static void validateJobDef(JobDefinition jobDef){ map.max_vcores = new Sample((double) jobDef.map_max_vcores_avg, jobDef.map_max_vcores_stddev); map.priority = DEFAULT_MAPPER_PRIORITY; + map.executionType = jobDef.map_execution_type; jobDef.tasks.add(map); TaskDefinition reduce = new TaskDefinition(); @@ -210,6 +211,7 @@ private static void validateJobDef(JobDefinition jobDef){ reduce.max_vcores = new Sample((double) jobDef.reduce_max_vcores_avg, jobDef.reduce_max_vcores_stddev); reduce.priority = DEFAULT_REDUCER_PRIORITY; + reduce.executionType = jobDef.reduce_execution_type; jobDef.tasks.add(reduce); } catch (JsonMappingException e) { @@ -425,6 +427,12 @@ public String toString(){ @JsonProperty("reduce_max_vcores_stddev") double reduce_max_vcores_stddev; + //container execution type + @JsonProperty("map_execution_type") + String map_execution_type; + @JsonProperty("reduce_execution_type") + String reduce_execution_type; + public void init(JDKRandomGenerator rand){ deadline_factor.init(rand); duration.init(rand); @@ -464,12 +472,15 @@ public String toString(){ Sample max_vcores; @JsonProperty("priority") int priority; + @JsonProperty("execution_type") + String executionType; @Override public String toString(){ return "\nTaskDefinition " + type + " Count[" + count + "] Time[" + time + "] Memory[" + max_memory - + "] Vcores[" + max_vcores + "] Priority[" + priority + "]"; + + "] Vcores[" + max_vcores + "] Priority[" + priority + + "] ExecutionType[" + executionType + "]"; } } diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSynthJobGeneration.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSynthJobGeneration.java index 794cd47..0316362 100644 --- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSynthJobGeneration.java +++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSynthJobGeneration.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.sls; import org.apache.commons.math3.random.JDKRandomGenerator; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.codehaus.jackson.map.JsonMappingException; import org.codehaus.jackson.map.ObjectMapper; import org.apache.hadoop.conf.Configuration; @@ -31,6 +32,7 @@ import java.io.IOException; import java.util.Arrays; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.codehaus.jackson.JsonParser.Feature.INTERN_FIELD_NAMES; @@ -254,6 +256,7 @@ private void validateJob(SynthJob js) { assertTrue(t.getTime() > 0); assertTrue(t.getMemory() > 0); assertTrue(t.getVcores() > 0); + assertEquals(ExecutionType.OPPORTUNISTIC, t.getExecutionType()); } } } diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/nodemanager/TestNMSimulator.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/nodemanager/TestNMSimulator.java index 5064ef2..8a6e61b 100644 --- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/nodemanager/TestNMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/nodemanager/TestNMSimulator.java @@ -19,6 +19,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; @@ -76,7 +77,7 @@ public void testNMSimulator() throws Exception { // Register one node NMSimulator node1 = new NMSimulator(); node1.init("/rack1/node1", Resources.createResource(GB * 10, 10), 0, 1000, - rm); + rm, -1f); node1.middleStep(); int numClusterNodes = rm.getResourceScheduler().getNumClusterNodes(); diff --git a/hadoop-tools/hadoop-sls/src/test/resources/inputsls.json b/hadoop-tools/hadoop-sls/src/test/resources/inputsls.json index b9d46a5..c8a6f11 100644 --- a/hadoop-tools/hadoop-sls/src/test/resources/inputsls.json +++ b/hadoop-tools/hadoop-sls/src/test/resources/inputsls.json @@ -11,21 +11,24 @@ "container.start.ms": 6664, "container.end.ms": 23707, "container.priority": 20, - "container.type": "map" + "container.type": "map", + "container.execution.type": "OPPORTUNISTIC" }, { "container.host": "/default-rack/node3", "container.start.ms": 6665, "container.end.ms": 21593, "container.priority": 20, - "container.type": "map" + "container.type": "map", + "container.execution.type": "OPPORTUNISTIC" }, { "container.host": "/default-rack/node2", "container.start.ms": 68770, "container.end.ms": 86613, "container.priority": 20, - "container.type": "map" + "container.type": "map", + "container.execution.type": "OPPORTUNISTIC" } ] } @@ -42,14 +45,16 @@ "container.start.ms": 111822, "container.end.ms": 133985, "container.priority": 20, - "container.type": "map" + "container.type": "map", + "container.execution.type": "OPPORTUNISTIC" }, { "container.host": "/default-rack/node2", "container.start.ms": 111788, "container.end.ms": 131377, "container.priority": 20, - "container.type": "map" + "container.type": "map", + "container.execution.type": "OPPORTUNISTIC" } ] } diff --git a/hadoop-tools/hadoop-sls/src/test/resources/syn.json b/hadoop-tools/hadoop-sls/src/test/resources/syn.json index c6e2c92..e51a5e4 100644 --- a/hadoop-tools/hadoop-sls/src/test/resources/syn.json +++ b/hadoop-tools/hadoop-sls/src/test/resources/syn.json @@ -27,8 +27,10 @@ "rtime_stddev": 4, "map_max_memory_avg": 1024, "map_max_memory_stddev": 0.001, + "map_execution_type": "OPPORTUNISTIC", "reduce_max_memory_avg": 2048, "reduce_max_memory_stddev": 0.001, + "reduce_execution_type": "OPPORTUNISTIC", "map_max_vcores_avg": 1, "map_max_vcores_stddev": 0.001, "reduce_max_vcores_avg": 2, diff --git a/hadoop-tools/hadoop-sls/src/test/resources/syn_generic.json b/hadoop-tools/hadoop-sls/src/test/resources/syn_generic.json index bde4cd0..71169d5 100644 --- a/hadoop-tools/hadoop-sls/src/test/resources/syn_generic.json +++ b/hadoop-tools/hadoop-sls/src/test/resources/syn_generic.json @@ -26,7 +26,8 @@ "count": { "val": 5, "std": 1}, "time": {"val": 10, "std": 2}, "max_memory": {"val": 1024}, - "max_vcores": {"val": 1} + "max_vcores": {"val": 1}, + "execution_type": "OPPORTUNISTIC" }, { "type": "reduce", @@ -34,7 +35,8 @@ "count": { "val": 5, "std": 1}, "time": {"val": 20, "std": 4}, "max_memory": {"val": 2048}, - "max_vcores": {"val": 2} + "max_vcores": {"val": 2}, + "execution_type": "OPPORTUNISTIC" } ] } diff --git a/hadoop-tools/hadoop-sls/src/test/resources/syn_stream.json b/hadoop-tools/hadoop-sls/src/test/resources/syn_stream.json index a85065b..4669eae 100644 --- a/hadoop-tools/hadoop-sls/src/test/resources/syn_stream.json +++ b/hadoop-tools/hadoop-sls/src/test/resources/syn_stream.json @@ -26,7 +26,8 @@ "count": { "val": 2}, "time": {"val": 60000}, "max_memory": {"val": 4096}, - "max_vcores": {"val": 4} + "max_vcores": {"val": 4}, + "execution_type": "OPPORTUNISTIC" } ] }