diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java index 8a522fe..fda9e86 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java @@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; @@ -83,6 +84,10 @@ import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer; import org.apache.hadoop.yarn.sls.utils.SLSUtils; import org.apache.hadoop.yarn.util.UTCClock; +import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParseException; +import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser; +import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser + .SourceTags; import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.slf4j.Logger; @@ -536,9 +541,8 @@ private Resource getResourceForContainer(Map jsonTask) { /** * Parse workload from a rumen trace file. */ - @SuppressWarnings("unchecked") - private void startAMFromRumenTrace(String inputTrace, long baselineTimeMS) - throws IOException { + @SuppressWarnings("unchecked") private void startAMFromRumenTrace( + String inputTrace, long baselineTimeMS) throws IOException { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "file:///"); File fin = new File(inputTrace); @@ -676,15 +680,27 @@ private void startAMFromSynthGenerator() throws YarnException, IOException { .getNode(); String hostname = "/" + node.getRackName() + "/" + node.getHostName(); long containerLifeTime = task.getTime(); - Resource containerResource = Resource - .newInstance((int) task.getMemory(), (int) task.getVcores()); + Resource containerResource = Resource.newInstance( + (int) task.getMemory(), (int) task.getVcores()); + Map placementConstraintMap = null; + if (task.getPlacementConstraint() != null) { + try { + placementConstraintMap = PlacementConstraintParser + .parsePlacementSpec(task.getPlacementConstraint()); + } catch (PlacementConstraintParseException e) { + String errorMsg = String.format( + "parse placement constraint fail, placementConstraint=%s", + task.getPlacementConstraint()); + LOG.error(errorMsg, e); + throw new RuntimeException(errorMsg, e); + } + } containerList.add( new ContainerSimulator(containerResource, containerLifeTime, hostname, task.getPriority(), task.getType(), - task.getExecutionType())); + task.getExecutionType(), placementConstraintMap)); } - ReservationId reservationId = null; if(job.hasDeadline()){ diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java index 8e1c256..2b925ae 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java @@ -22,9 +22,11 @@ import java.lang.reflect.UndeclaredThrowableException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -54,6 +56,9 @@ import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.ResourceSizing; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -66,6 +71,8 @@ import org.apache.hadoop.yarn.sls.SLSRunner; import org.apache.hadoop.yarn.sls.scheduler.TaskRunner; import org.apache.hadoop.yarn.sls.utils.SLSUtils; +import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser + .SourceTags; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,6 +93,7 @@ // response queue protected final BlockingQueue responseQueue; private int responseId = 0; + private int allocationRequestId = 0; // user name private String user; // queue name @@ -287,6 +295,16 @@ protected AllocateRequest createAllocateRequest(List ask, return allocateRequest; } + protected AllocateRequest createAllocateRequestWithSchedulingRequests( + List schedulingRequests, List toRelease) { + AllocateRequest allocateRequest = + recordFactory.newRecordInstance(AllocateRequest.class); + allocateRequest.setResponseId(responseId++); + allocateRequest.setSchedulingRequests(schedulingRequests); + allocateRequest.setReleaseList(toRelease); + return allocateRequest; + } + protected AllocateRequest createAllocateRequest(List ask) { return createAllocateRequest(ask, new ArrayList()); } @@ -437,6 +455,33 @@ public void untrackApp() { return ask; } + protected List packageSchedulingRequests( + List csList, int priority) { + List schedulingRequests = new ArrayList<>(); + for (ContainerSimulator cs : csList) { + for (Entry entry : cs + .getPlacementConstraintMap().entrySet()) { + SchedulingRequest schedulingRequest = createSchedulingRequest( + cs.getResource(), cs.getExecutionType(), entry.getKey(), + entry.getValue(), priority); + schedulingRequests.add(schedulingRequest); + } + } + return schedulingRequests; + } + + private SchedulingRequest createSchedulingRequest(Resource resource, + ExecutionType executionType, SourceTags tags, + PlacementConstraint placementConstraint, int priority) { + SchedulingRequest schedulingRequest = SchedulingRequest.newInstance( + allocationRequestId++, Priority.newInstance(priority), + ExecutionTypeRequest.newInstance(executionType), + Collections.singleton(tags.getTag()), + ResourceSizing.newInstance(tags.getNumOfAllocations(), resource), + placementConstraint); + return schedulingRequest; + } + public String getQueue() { return queue; } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java index b41f5f2..e1f83ae 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java @@ -31,11 +31,15 @@ import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.sls.SLSRunner; import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; +import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser + .SourceTags; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,6 +91,7 @@ // finished private boolean isFinished = false; private long duration = 0; + private boolean enableConstraint = false; private static final Logger LOG = LoggerFactory.getLogger(StreamAMSimulator.class); @@ -104,6 +109,13 @@ public void init(int heartbeatInterval, allStreams.addAll(containerList); + if (!allStreams.isEmpty()) { + Map placementConstraintMap = allStreams + .get(0).getPlacementConstraintMap(); + if (placementConstraintMap != null && placementConstraintMap.size() > 0) { + this.enableConstraint = true; + } + } duration = traceFinishTime - traceStartTime; LOG.info("Added new job with {} streams, running for {}", @@ -201,48 +213,67 @@ private void restart() protected void sendContainerRequest() throws YarnException, IOException, InterruptedException { + AllocateRequest request = getAllocateRequest(); + + UserGroupInformation ugi = + UserGroupInformation.createRemoteUser(appAttemptId.toString()); + Token token = rm.getRMContext().getRMApps() + .get(appAttemptId.getApplicationId()) + .getRMAppAttempt(appAttemptId).getAMRMToken(); + ugi.addTokenIdentifier(token.decodeIdentifier()); + AllocateResponse response = ugi.doAs( + new PrivilegedExceptionAction() { + @Override + public AllocateResponse run() throws Exception { + return rm.getApplicationMasterService().allocate(request); + } + }); + if (response != null) { + responseQueue.put(response); + } + } + + private AllocateRequest getAllocateRequest() { + AllocateRequest request; // send out request List ask = new ArrayList<>(); List release = new ArrayList<>(); + List schedulingRequests = new ArrayList<>(); if (!isFinished) { if (!pendingStreams.isEmpty()) { - ask = packageRequests(mergeLists(pendingStreams, scheduledStreams), - PRIORITY_MAP); - LOG.debug("Application {} sends out request for {} streams.", - appId, pendingStreams.size()); + if (enableConstraint) { + // add schedulingRequest + schedulingRequests = packageSchedulingRequests( + mergeLists(pendingStreams, scheduledStreams), PRIORITY_MAP); + } else { + ask = packageRequests(mergeLists(pendingStreams, scheduledStreams), + PRIORITY_MAP); + } + LOG.debug("Application {} sends out request for {} streams.", appId, + pendingStreams.size()); scheduledStreams.addAll(pendingStreams); pendingStreams.clear(); } } - if(isFinished){ + if (isFinished) { release.addAll(assignedStreams.keySet()); ask.clear(); + schedulingRequests.clear(); } - final AllocateRequest request = createAllocateRequest(ask, release); + if (enableConstraint) { + request = createAllocateRequestWithSchedulingRequests(schedulingRequests, + release); + } else { + request = createAllocateRequest(ask, release); + } if (totalContainers == 0) { request.setProgress(1.0f); } else { request.setProgress((float) finishedContainers / totalContainers); } - - UserGroupInformation ugi = - UserGroupInformation.createRemoteUser(appAttemptId.toString()); - Token token = rm.getRMContext().getRMApps() - .get(appAttemptId.getApplicationId()) - .getRMAppAttempt(appAttemptId).getAMRMToken(); - ugi.addTokenIdentifier(token.decodeIdentifier()); - AllocateResponse response = ugi.doAs( - new PrivilegedExceptionAction() { - @Override - public AllocateResponse run() throws Exception { - return rm.getApplicationMasterService().allocate(request); - } - }); - if (response != null) { - responseQueue.put(response); - } + return request; } @Override diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java index 09498da..017518e 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.sls.scheduler; +import java.util.Map; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; @@ -26,6 +27,9 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; +import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser + .SourceTags; @Private @Unstable @@ -46,6 +50,8 @@ private String type; // execution type private ExecutionType executionType = ExecutionType.GUARANTEED; + // placement constraint + private Map placementConstraintMap; /** * invoked when AM schedules containers to allocate. @@ -61,12 +67,22 @@ public ContainerSimulator(Resource resource, long lifeTime, */ public ContainerSimulator(Resource resource, long lifeTime, String hostname, int priority, String type, ExecutionType executionType) { + this(resource, lifeTime, hostname, priority, type, executionType, null); + } + + /** + * invoked when AM schedules containers to allocate with placement constraint. + */ + public ContainerSimulator(Resource resource, long lifeTime, + String hostname, int priority, String type, ExecutionType executionType, + Map placementConstraint) { this.resource = resource; this.lifeTime = lifeTime; this.hostname = hostname; this.priority = priority; this.type = type; this.executionType = executionType; + this.placementConstraintMap = placementConstraint; } /** @@ -131,4 +147,8 @@ public void setPriority(int p) { public ExecutionType getExecutionType() { return executionType; } + + public Map getPlacementConstraintMap() { + return placementConstraintMap; + } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java index 6ede8b2..985e819 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java @@ -263,6 +263,7 @@ private void updateQueueWithAllocateRequest(Allocation allocation, } } // container allocated + // System.out.println("========== allocation.getContainers=" + allocation.getContainers()); for (Container container : allocation.getContainers()) { Resources.addTo(allocatedResource, container.getResource()); Resources.subtractFrom(pendingResource, container.getResource()); diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java index 21dec96..3553c52 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java @@ -94,6 +94,7 @@ private long maxVcores; private int priority; private ExecutionType executionType; + private String placementConstraint; private SynthTask(String type, long time, long maxMemory, long maxVcores, int priority, ExecutionType executionType){ @@ -105,6 +106,17 @@ private SynthTask(String type, long time, long maxMemory, long maxVcores, this.executionType = executionType; } + private SynthTask(String type, long time, long maxMemory, long maxVcores, + int priority, ExecutionType executionType, String vPlacementConstraint){ + this.type = type; + this.time = time; + this.maxMemory = maxMemory; + this.maxVcores = maxVcores; + this.priority = priority; + this.executionType = executionType; + this.placementConstraint = vPlacementConstraint; + } + public String getType(){ return type; } @@ -129,11 +141,17 @@ public ExecutionType getExecutionType() { return executionType; } + public String getPlacementConstraint() { + return placementConstraint; + } + @Override - public String toString(){ + public String toString() { return String.format("[task]\ttype: %1$-10s\ttime: %2$3s\tmemory: " - + "%3$4s\tvcores: %4$2s\texecution_type: %5$-10s%n", getType(), - getTime(), getMemory(), getVcores(), getExecutionType().toString()); + + "%3$4s\tvcores: %4$2s\texecution_type: " + + "%5$-10s\tplacementConstraint: %6$-10s%n", + getType(), getTime(), getMemory(), getVcores(), + getExecutionType().toString(), placementConstraint); } } @@ -202,7 +220,7 @@ protected SynthJob(JDKRandomGenerator rand, Configuration conf, long time = task.time.getLong(); totalSlotTime += time; SynthTask t = new SynthTask(taskType, time, memory, vcores, - priority, executionType); + priority, executionType, task.placementConstraint); tasks.add(t); taskByType.get(taskType).add(t); } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java index fa6f1fc..0cd11a1 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java @@ -475,13 +475,16 @@ public String toString(){ int priority; @JsonProperty("execution_type") String executionType = ExecutionType.GUARANTEED.name(); + @JsonProperty("placement_constraint") + String placementConstraint; @Override public String toString(){ return "\nTaskDefinition " + type + " Count[" + count + "] Time[" + time + "] Memory[" + max_memory + "] Vcores[" + max_vcores + "] Priority[" + priority - + "] ExecutionType[" + executionType + "]"; + + "] ExecutionType[" + executionType + "] PlacementConstraint[" + + placementConstraint + "]"; } } diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSStreamAMSynth.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSStreamAMSynth.java index a5d30e0..88ae17c 100644 --- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSStreamAMSynth.java +++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSStreamAMSynth.java @@ -70,6 +70,7 @@ public void setup() { @SuppressWarnings("all") public void testSimulatorRunning() throws Exception { Configuration conf = new Configuration(false); + conf.setBoolean("yarn.scheduler.capacity.scheduling-request.allowed", true); long timeTillShutdownInsec = 20L; runSLS(conf, timeTillShutdownInsec); } diff --git a/hadoop-tools/hadoop-sls/src/test/resources/syn_stream.json b/hadoop-tools/hadoop-sls/src/test/resources/syn_stream.json index a52de40..1e16051 100644 --- a/hadoop-tools/hadoop-sls/src/test/resources/syn_stream.json +++ b/hadoop-tools/hadoop-sls/src/test/resources/syn_stream.json @@ -27,6 +27,7 @@ "time": {"val": 60000}, "max_memory": {"val": 4096}, "max_vcores": {"val": 4}, + "placement_constraint":"dn=1,notin,node,dn", "execution_type": "GUARANTEED" } ]