diff --git a/hadoop-tools/hadoop-sls/pom.xml b/hadoop-tools/hadoop-sls/pom.xml index 10f6294..50db964 100644 --- a/hadoop-tools/hadoop-sls/pom.xml +++ b/hadoop-tools/hadoop-sls/pom.xml @@ -135,6 +135,7 @@ src/test/resources/syn.json src/test/resources/syn_generic.json src/test/resources/syn_stream.json + src/test/resources/syn_stream_with_constraint.json src/test/resources/inputsls.json src/test/resources/nodes.json src/test/resources/exit-invariants.txt diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java index 8a522fe..99023d4 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java @@ -42,6 +42,7 @@ import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; @@ -61,6 +62,7 @@ import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; @@ -83,6 +85,8 @@ import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer; import org.apache.hadoop.yarn.sls.utils.SLSUtils; import org.apache.hadoop.yarn.util.UTCClock; +import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParseException; +import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser; import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.slf4j.Logger; @@ -695,7 +699,8 @@ private void startAMFromSynthGenerator() throws YarnException, IOException { runNewAM(job.getType(), user, jobQueue, oldJobId, jobStartTimeMS, jobFinishTimeMS, containerList, reservationId, job.getDeadline(), getAMContainerResource(null), - job.getParams()); + job.getParams(), job.getAllocationTags(), + job.getPlacementConstratin()); } } @@ -739,14 +744,15 @@ private void runNewAM(String jobType, String user, Resource amContainerResource) { runNewAM(jobType, user, jobQueue, oldJobId, jobStartTimeMS, jobFinishTimeMS, containerList, null, -1, - amContainerResource, null); + amContainerResource, null, null, null); } private void runNewAM(String jobType, String user, String jobQueue, String oldJobId, long jobStartTimeMS, long jobFinishTimeMS, List containerList, ReservationId reservationId, long deadline, Resource amContainerResource, - Map params) { + Map params, String allocationTagsStr, + String placementConstraintStr) { AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance( amClassMap.get(jobType), new Configuration()); @@ -761,9 +767,28 @@ private void runNewAM(String jobType, String user, oldJobId = Integer.toString(AM_ID); } AM_ID++; + Set allocationTags = null; + if (!StringUtils.isEmpty(allocationTagsStr)) { + allocationTags = new HashSet<>( + Arrays.asList(allocationTagsStr.split(","))); + } + PlacementConstraint placementConstraint = null; + try { + if (!StringUtils.isEmpty(placementConstraintStr)) { + placementConstraint = PlacementConstraintParser.parseExpression( + placementConstraintStr).build(); + } + } catch (PlacementConstraintParseException e) { + String errorMsg = String.format( + "parse placement constraint fail, placementConstraint=%s", + placementConstraintStr); + LOG.error(errorMsg, e); + throw new RuntimeException(errorMsg, e); + } amSim.init(heartbeatInterval, containerList, rm, this, jobStartTimeMS, jobFinishTimeMS, user, jobQueue, isTracked, oldJobId, - runner.getStartTimeMS(), amContainerResource, params); + runner.getStartTimeMS(), amContainerResource, params, allocationTags, + placementConstraint); if(reservationId != null) { // if we have a ReservationId, delegate reservation creation to // AMSim (reservation shape is impl specific) diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java index 8e1c256..01541c5 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -54,6 +55,9 @@ import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.ResourceSizing; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -86,6 +90,7 @@ // response queue protected final BlockingQueue responseQueue; private int responseId = 0; + private int allocationRequestId = 0; // user name private String user; // queue name @@ -108,6 +113,10 @@ volatile boolean isAMContainerRunning = false; volatile Container amContainer; + //placement constraint + private Set allocationTags; + private PlacementConstraint placementConstraint; + private static final Logger LOG = LoggerFactory.getLogger(AMSimulator.class); private Resource amContainerResource; @@ -118,12 +127,13 @@ public AMSimulator() { this.responseQueue = new LinkedBlockingQueue<>(); } - @SuppressWarnings("checkstyle:parameternumber") - public void init(int heartbeatInterval, - List containerList, ResourceManager resourceManager, - SLSRunner slsRunnner, long startTime, long finishTime, String simUser, - String simQueue, boolean tracked, String oldApp, long baseTimeMS, - Resource amResource, Map params) { + @SuppressWarnings("checkstyle:parameternumber") public void init( + int heartbeatInterval, List containerList, + ResourceManager resourceManager, SLSRunner slsRunnner, long startTime, + long finishTime, String simUser, String simQueue, boolean tracked, + String oldApp, long baseTimeMS, Resource amResource, + Map params, Set vAllocationTags, + PlacementConstraint vPlacementConstraint) { super.init(startTime, startTime + 1000000L * heartbeatInterval, heartbeatInterval); this.user = simUser; @@ -136,8 +146,9 @@ public void init(int heartbeatInterval, this.traceStartTimeMS = startTime; this.traceFinishTimeMS = finishTime; this.amContainerResource = amResource; + this.allocationTags = vAllocationTags; + this.placementConstraint = vPlacementConstraint; } - /** * register with RM */ @@ -287,6 +298,16 @@ protected AllocateRequest createAllocateRequest(List ask, return allocateRequest; } + protected AllocateRequest createAllocateRequestWithSchedulingRequests( + List schedulingRequests, List toRelease) { + AllocateRequest allocateRequest = + recordFactory.newRecordInstance(AllocateRequest.class); + allocateRequest.setResponseId(responseId++); + allocateRequest.setSchedulingRequests(schedulingRequests); + allocateRequest.setReleaseList(toRelease); + return allocateRequest; + } + protected AllocateRequest createAllocateRequest(List ask) { return createAllocateRequest(ask, new ArrayList()); } @@ -352,6 +373,12 @@ private void registerAM() amRegisterRequest.setHost("localhost"); amRegisterRequest.setRpcPort(1000); amRegisterRequest.setTrackingUrl("localhost:1000"); + if (allocationTags != null && placementConstraint != null) { + Map, PlacementConstraint> placementConstraintMap + = new HashMap<>(); + placementConstraintMap.put(allocationTags, placementConstraint); + amRegisterRequest.setPlacementConstraints(placementConstraintMap); + } UserGroupInformation ugi = UserGroupInformation.createRemoteUser(appAttemptId.toString()); @@ -437,6 +464,29 @@ public void untrackApp() { return ask; } + protected List packageSchedulingRequests( + List csList, int priority) { + List schedulingRequests = new ArrayList<>(); + if (csList == null || csList.isEmpty()) { + return schedulingRequests; + } + ContainerSimulator cs = csList.get(0); + SchedulingRequest request = createSchedulingRequest( + cs.getResource(), cs.getExecutionType(), priority); + request.getResourceSizing().setNumAllocations(csList.size()); + schedulingRequests.add(request); + return schedulingRequests; + } + + private SchedulingRequest createSchedulingRequest(Resource resource, + ExecutionType executionType, int priority) { + SchedulingRequest schedulingRequest = SchedulingRequest.newBuilder() + .allocationRequestId(allocationRequestId++).priority( + Priority.newInstance(priority)).allocationTags(allocationTags) + .executionType(ExecutionTypeRequest.newInstance(executionType)) + .resourceSizing(ResourceSizing.newInstance(resource)).build(); + return schedulingRequest; + } public String getQueue() { return queue; } @@ -457,4 +507,12 @@ public ApplicationId getApplicationId() { public ApplicationAttemptId getApplicationAttemptId() { return appAttemptId; } + + public Set getAllocationTags() { + return allocationTags; + } + + public PlacementConstraint getPlacementConstraint() { + return placementConstraint; + } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java index 6f0f85f..1ac31a2 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java @@ -26,6 +26,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; @@ -126,10 +128,12 @@ public void init(int heartbeatInterval, List containerList, ResourceManager rm, SLSRunner se, long traceStartTime, long traceFinishTime, String user, String queue, boolean isTracked, String oldAppId, long baselineStartTimeMS, - Resource amContainerResource, Map params) { + Resource amContainerResource, Map params, + Set vAllocationTags, PlacementConstraint vPlacementConstraint) { super.init(heartbeatInterval, containerList, rm, se, traceStartTime, traceFinishTime, user, queue, isTracked, oldAppId, - baselineStartTimeMS, amContainerResource, params); + baselineStartTimeMS, amContainerResource, params, vAllocationTags, + vPlacementConstraint); amtype = "mapreduce"; // get map/reduce tasks diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java index b41f5f2..137762f 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java @@ -31,6 +31,8 @@ import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; @@ -46,6 +48,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; /** * AMSimulator that simulates streaming services - it keeps tasks @@ -96,10 +99,11 @@ public void init(int heartbeatInterval, List containerList, ResourceManager rm, SLSRunner se, long traceStartTime, long traceFinishTime, String user, String queue, boolean isTracked, String oldAppId, long baselineStartTimeMS, - Resource amContainerResource, Map params) { + Resource amContainerResource, Map params, + Set vAllocationTags, PlacementConstraint vPlacementConstraint) { super.init(heartbeatInterval, containerList, rm, se, traceStartTime, traceFinishTime, user, queue, isTracked, oldAppId, baselineStartTimeMS, - amContainerResource, params); + amContainerResource, params, vAllocationTags, vPlacementConstraint); amtype = "stream"; allStreams.addAll(containerList); @@ -199,52 +203,72 @@ private void restart() @Override protected void sendContainerRequest() - throws YarnException, IOException, InterruptedException { + throws YarnException, IOException, InterruptedException { + + AllocateRequest request = getAllocateRequest(); + UserGroupInformation ugi = + UserGroupInformation.createRemoteUser(appAttemptId.toString()); + Token token = rm.getRMContext().getRMApps() + .get(appAttemptId.getApplicationId()) + .getRMAppAttempt(appAttemptId).getAMRMToken(); + ugi.addTokenIdentifier(token.decodeIdentifier()); + AllocateResponse response = ugi.doAs( + new PrivilegedExceptionAction() { + @Override + public AllocateResponse run() throws Exception { + return rm.getApplicationMasterService().allocate(request); + } + }); + if (response != null) { + responseQueue.put(response); + } + } + + private AllocateRequest getAllocateRequest() { + AllocateRequest request; // send out request List ask = new ArrayList<>(); List release = new ArrayList<>(); + List schedulingRequests = new ArrayList<>(); + boolean enableConstraint = getAllocationTags() != null + && getPlacementConstraint() != null; if (!isFinished) { if (!pendingStreams.isEmpty()) { - ask = packageRequests(mergeLists(pendingStreams, scheduledStreams), - PRIORITY_MAP); - LOG.debug("Application {} sends out request for {} streams.", - appId, pendingStreams.size()); + if (enableConstraint) { + // add schedulingRequest + schedulingRequests = packageSchedulingRequests( + mergeLists(pendingStreams, scheduledStreams), PRIORITY_MAP); + } else { + ask = packageRequests(mergeLists(pendingStreams, scheduledStreams), + PRIORITY_MAP); + } + LOG.debug("Application {} sends out request for {} streams.", appId, + pendingStreams.size()); scheduledStreams.addAll(pendingStreams); pendingStreams.clear(); } } - if(isFinished){ + if (isFinished) { release.addAll(assignedStreams.keySet()); ask.clear(); + schedulingRequests.clear(); } - final AllocateRequest request = createAllocateRequest(ask, release); + if (enableConstraint) { + request = createAllocateRequestWithSchedulingRequests(schedulingRequests, + release); + } else { + request = createAllocateRequest(ask, release); + } if (totalContainers == 0) { request.setProgress(1.0f); } else { request.setProgress((float) finishedContainers / totalContainers); } - - UserGroupInformation ugi = - UserGroupInformation.createRemoteUser(appAttemptId.toString()); - Token token = rm.getRMContext().getRMApps() - .get(appAttemptId.getApplicationId()) - .getRMAppAttempt(appAttemptId).getAMRMToken(); - ugi.addTokenIdentifier(token.decodeIdentifier()); - AllocateResponse response = ugi.doAs( - new PrivilegedExceptionAction() { - @Override - public AllocateResponse run() throws Exception { - return rm.getApplicationMasterService().allocate(request); - } - }); - if (response != null) { - responseQueue.put(response); - } + return request; } - @Override public void initReservation( ReservationId reservationId, long deadline, long now){ diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java index 6ede8b2..79fc8f2 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java @@ -112,7 +112,10 @@ public Allocation allocate(ApplicationAttemptId attemptId, containerIds, strings, strings2, updateRequests); return allocation; - } finally { + } catch (Throwable t) { + t.printStackTrace(); + return allocation; + }finally { context.stop(); schedulerMetrics.increaseSchedulerAllocationCounter(); try { diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java index 21dec96..421d5d0 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java @@ -65,6 +65,12 @@ private final String queueName; private final SynthTraceJobProducer.JobDefinition jobDef; + //allocation tags + private final String allocationTags; + + //placement constraint + private final String placementConstratin; + private String type; // job timing @@ -165,6 +171,9 @@ protected SynthJob(JDKRandomGenerator rand, Configuration conf, this.params = jobDef.params; + this.allocationTags = jobDef.allocation_tags; + this.placementConstratin = jobDef.placement_constraint; + conf.set(QUEUE_NAME, queueName); // name and initialize job randomness @@ -242,8 +251,20 @@ public String getQueueName() { return queueName; } + public String getAllocationTags() { + return allocationTags; + } + + public String getPlacementConstratin() { + return placementConstratin; + } + @Override public String toString() { + String allocationTagsStr = allocationTags == null ? "" : allocationTags; + String placementConstraintStr = placementConstratin == null + ? "" + : placementConstratin; StringBuilder sb = new StringBuilder(); String res = "\nSynthJob [" + jobDef.class_name + "]: \n" + "\tname: " + getName() + "\n" @@ -252,7 +273,9 @@ public String toString() { + "\tqueue: " + getQueueName() + "\n" + "\tsubmission: " + getSubmissionTime() + "\n" + "\tduration: " + getDuration() + "\n" - + "\tdeadline: " + getDeadline() + "\n"; + + "\tdeadline: " + getDeadline() + "\n" + + "\tallocation_tags: " + allocationTagsStr + "\n" + + "\tplacement_constraint: " + placementConstraintStr + "\n"; sb.append(res); int taskno = 0; for(SynthJob.SynthTask t : getTasks()){ diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java index fa6f1fc..2ecad49 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java @@ -434,6 +434,13 @@ public String toString(){ @JsonProperty("reduce_execution_type") String reduce_execution_type = ExecutionType.GUARANTEED.name(); + //allocation tags + @JsonProperty("allocation_tags") + String allocation_tags; + //placement constraint + @JsonProperty("placement_constraint") + String placement_constraint; + public void init(JDKRandomGenerator rand){ deadline_factor.init(rand); duration.init(rand); diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSStreamAMSynthWithConstraint.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSStreamAMSynthWithConstraint.java new file mode 100644 index 0000000..08c7e94 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSStreamAMSynthWithConstraint.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sls; + +import java.util.Arrays; +import java.util.Collection; + +import net.jcip.annotations.NotThreadSafe; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +/** + * This test performs simple runs of the SLS with the generic syn json format + * and placement constraint. + */ +@RunWith(value = Parameterized.class) +@NotThreadSafe +public class TestSLSStreamAMSynthWithConstraint extends BaseSLSRunnerTest { + + @Parameters(name = "Testing with: {1}, {0}, (nodeFile {3})") + public static Collection data() { + + String capScheduler = CapacityScheduler.class.getCanonicalName(); + String synthTraceFile + = "src/test/resources/syn_stream_with_constraint.json"; + String nodeFile = "src/test/resources/nodes.json"; + + // Test with both schedulers + return Arrays.asList(new Object[][] { + + // covering the no nodeFile case + {capScheduler, "SYNTH", synthTraceFile, null }, + + // covering new commandline and CapacityScheduler + {capScheduler, "SYNTH", synthTraceFile, nodeFile }, + }); + } + + @Before + public void setup() { + ongoingInvariantFile = "src/test/resources/ongoing-invariants.txt"; + exitInvariantFile = "src/test/resources/exit-invariants.txt"; + } + + @Test(timeout = 90000) + @SuppressWarnings("all") + public void testSimulatorRunning() throws Exception { + Configuration conf = new Configuration(false); + if (this.schedulerType.equals(CapacityScheduler.class.getCanonicalName())) { + conf.setBoolean("yarn.scheduler.capacity.scheduling-request.allowed", + true); + //TODO set into sheduler handler + // YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER + conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER); + } + long timeTillShutdownInsec = 20L; + runSLS(conf, timeTillShutdownInsec); + } +} diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java index bfc7d0c..d717f10 100644 --- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java @@ -139,7 +139,8 @@ public void testAMSimulator() throws Exception { String queue = "default"; List containers = new ArrayList<>(); app.init(1000, containers, rm, null, 0, 1000000L, "user1", queue, true, - appId, 0, SLSConfiguration.getAMContainerResource(conf), null); + appId, 0, SLSConfiguration.getAMContainerResource(conf), null, null, + null); app.firstStep(); verifySchedulerMetrics(appId); diff --git a/hadoop-tools/hadoop-sls/src/test/resources/syn_stream_with_constraint.json b/hadoop-tools/hadoop-sls/src/test/resources/syn_stream_with_constraint.json new file mode 100644 index 0000000..ea22737 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/test/resources/syn_stream_with_constraint.json @@ -0,0 +1,49 @@ +{ + "description": "stream workload", + "num_nodes": 20, + "nodes_per_rack": 4, + "num_jobs": 5, + "rand_seed": 2, + "workloads": [ + { + "workload_name": "tiny-test", + "workload_weight": 1, + "description": "long lived streaming jobs", + "queue_name": "sls_queue_1", + "job_classes": [ + { + "class_name": "class_1", + "user_name": "foobar", + "class_weight": 1.0, + "type": "stream", + "deadline_factor": {"val": 10}, + "duration": {"val": 30, "std": 5}, + "reservation": {"val": 0.5}, + "allocation_tags": "dn", + "placement_constraint": "notin,node,dn", + "tasks":[ + { + "type": "stream", + "priority": 20, + "count": { "val": 2}, + "time": {"val": 60000}, + "max_memory": {"val": 4096}, + "max_vcores": {"val": 4}, + "execution_type": "GUARANTEED" + } + ] + } + ], + "time_distribution": [ + { + "time": 1, + "weight": 100 + }, + { + "time": 2, + "weight": 0 + } + ] + } + ] +}