diff --git a/hadoop-tools/hadoop-sls/pom.xml b/hadoop-tools/hadoop-sls/pom.xml
index 10f6294..50db964 100644
--- a/hadoop-tools/hadoop-sls/pom.xml
+++ b/hadoop-tools/hadoop-sls/pom.xml
@@ -135,6 +135,7 @@
src/test/resources/syn.json
src/test/resources/syn_generic.json
src/test/resources/syn_stream.json
+ src/test/resources/syn_stream_with_constraint.json
src/test/resources/inputsls.json
src/test/resources/nodes.json
src/test/resources/exit-invariants.txt
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
index 8a522fe..7b90706 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
@@ -37,11 +37,13 @@
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
@@ -61,6 +63,9 @@
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint
+ .AbstractConstraint;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
@@ -83,6 +88,8 @@
import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer;
import org.apache.hadoop.yarn.sls.utils.SLSUtils;
import org.apache.hadoop.yarn.util.UTCClock;
+import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParseException;
+import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
@@ -695,7 +702,8 @@ private void startAMFromSynthGenerator() throws YarnException, IOException {
runNewAM(job.getType(), user, jobQueue, oldJobId,
jobStartTimeMS, jobFinishTimeMS, containerList, reservationId,
job.getDeadline(), getAMContainerResource(null),
- job.getParams());
+ job.getParams(), job.getAllocationTags(),
+ job.getPlacementConstratin());
}
}
@@ -739,14 +747,15 @@ private void runNewAM(String jobType, String user,
Resource amContainerResource) {
runNewAM(jobType, user, jobQueue, oldJobId, jobStartTimeMS,
jobFinishTimeMS, containerList, null, -1,
- amContainerResource, null);
+ amContainerResource, null, null, null);
}
private void runNewAM(String jobType, String user,
String jobQueue, String oldJobId, long jobStartTimeMS,
long jobFinishTimeMS, List containerList,
ReservationId reservationId, long deadline, Resource amContainerResource,
- Map params) {
+ Map params, String allocationTagsStr,
+ String placementConstraintStr) {
AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance(
amClassMap.get(jobType), new Configuration());
@@ -761,9 +770,28 @@ private void runNewAM(String jobType, String user,
oldJobId = Integer.toString(AM_ID);
}
AM_ID++;
+ Set allocationTags = null;
+ if (!StringUtils.isEmpty(allocationTagsStr)) {
+ allocationTags = new HashSet<>(
+ Arrays.asList(allocationTagsStr.split(",")));
+ }
+ PlacementConstraint placementConstraint = null;
+ try {
+ if (!StringUtils.isEmpty(placementConstraintStr)) {
+ placementConstraint = PlacementConstraintParser.parseExpression(
+ placementConstraintStr).build();
+ }
+ } catch (PlacementConstraintParseException e) {
+ String errorMsg = String.format(
+ "parse placement constraint fail, placementConstraint=%s",
+ placementConstraintStr);
+ LOG.error(errorMsg, e);
+ throw new RuntimeException(errorMsg, e);
+ }
amSim.init(heartbeatInterval, containerList, rm, this, jobStartTimeMS,
jobFinishTimeMS, user, jobQueue, isTracked, oldJobId,
- runner.getStartTimeMS(), amContainerResource, params);
+ runner.getStartTimeMS(), amContainerResource, params, allocationTags,
+ placementConstraint);
if(reservationId != null) {
// if we have a ReservationId, delegate reservation creation to
// AMSim (reservation shape is impl specific)
@@ -776,7 +804,7 @@ private void runNewAM(String jobType, String user,
amMap.put(oldJobId, amSim);
}
}
-
+
private void printSimulationInfo() {
if (printSimulation) {
// node
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
index 8e1c256..42fcd09 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
@@ -22,9 +22,12 @@
import java.lang.reflect.UndeclaredThrowableException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -54,6 +57,9 @@
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.ResourceSizing;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -66,6 +72,7 @@
import org.apache.hadoop.yarn.sls.SLSRunner;
import org.apache.hadoop.yarn.sls.scheduler.TaskRunner;
import org.apache.hadoop.yarn.sls.utils.SLSUtils;
+import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.SourceTags;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,6 +93,7 @@
// response queue
protected final BlockingQueue responseQueue;
private int responseId = 0;
+ private int allocationRequestId = 0;
// user name
private String user;
// queue name
@@ -108,6 +116,10 @@
volatile boolean isAMContainerRunning = false;
volatile Container amContainer;
+ //placement constraint
+ protected Set allocationTags;
+ protected PlacementConstraint placementConstraint;
+
private static final Logger LOG = LoggerFactory.getLogger(AMSimulator.class);
private Resource amContainerResource;
@@ -123,7 +135,8 @@ public void init(int heartbeatInterval,
List containerList, ResourceManager resourceManager,
SLSRunner slsRunnner, long startTime, long finishTime, String simUser,
String simQueue, boolean tracked, String oldApp, long baseTimeMS,
- Resource amResource, Map params) {
+ Resource amResource, Map params, Set vAllocationTags,
+ PlacementConstraint vPlacementConstraint) {
super.init(startTime, startTime + 1000000L * heartbeatInterval,
heartbeatInterval);
this.user = simUser;
@@ -136,8 +149,9 @@ public void init(int heartbeatInterval,
this.traceStartTimeMS = startTime;
this.traceFinishTimeMS = finishTime;
this.amContainerResource = amResource;
+ this.allocationTags = vAllocationTags;
+ this.placementConstraint = vPlacementConstraint;
}
-
/**
* register with RM
*/
@@ -287,6 +301,16 @@ protected AllocateRequest createAllocateRequest(List ask,
return allocateRequest;
}
+ protected AllocateRequest createAllocateRequestWithSchedulingRequests(
+ List schedulingRequests, List toRelease) {
+ AllocateRequest allocateRequest =
+ recordFactory.newRecordInstance(AllocateRequest.class);
+ allocateRequest.setResponseId(responseId++);
+ allocateRequest.setSchedulingRequests(schedulingRequests);
+ allocateRequest.setReleaseList(toRelease);
+ return allocateRequest;
+ }
+
protected AllocateRequest createAllocateRequest(List ask) {
return createAllocateRequest(ask, new ArrayList());
}
@@ -352,6 +376,12 @@ private void registerAM()
amRegisterRequest.setHost("localhost");
amRegisterRequest.setRpcPort(1000);
amRegisterRequest.setTrackingUrl("localhost:1000");
+ if (allocationTags != null && placementConstraint != null) {
+ Map, PlacementConstraint> placementConstraintMap
+ = new HashMap<>();
+ placementConstraintMap.put(allocationTags, placementConstraint);
+ amRegisterRequest.setPlacementConstraints(placementConstraintMap);
+ }
UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(appAttemptId.toString());
@@ -437,6 +467,30 @@ public void untrackApp() {
return ask;
}
+ protected List packageSchedulingRequests(
+ List csList, int priority,
+ Set allocationTags) {
+ List schedulingRequests = new ArrayList<>();
+ if (csList == null || csList.isEmpty()) {
+ return schedulingRequests;
+ }
+ ContainerSimulator cs = csList.get(0);
+ SchedulingRequest schedulingRequest = createSchedulingRequest(
+ cs.getResource(), cs.getExecutionType(), allocationTags, priority);
+ schedulingRequest.getResourceSizing().setNumAllocations(csList.size());
+ schedulingRequests.add(schedulingRequest);
+ return schedulingRequests;
+ }
+
+ private SchedulingRequest createSchedulingRequest(Resource resource,
+ ExecutionType executionType, Set allocationTags, int priority) {
+ SchedulingRequest schedulingRequest = SchedulingRequest.newBuilder()
+ .allocationRequestId(allocationRequestId++)
+ .priority(Priority.newInstance(priority)).allocationTags(allocationTags)
+ .executionType(ExecutionTypeRequest.newInstance(executionType))
+ .resourceSizing(ResourceSizing.newInstance(resource)).build();
+ return schedulingRequest;
+ }
public String getQueue() {
return queue;
}
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
index 6f0f85f..1ac31a2 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
@@ -26,6 +26,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
@@ -41,6 +42,7 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
@@ -126,10 +128,12 @@ public void init(int heartbeatInterval,
List containerList, ResourceManager rm, SLSRunner se,
long traceStartTime, long traceFinishTime, String user, String queue,
boolean isTracked, String oldAppId, long baselineStartTimeMS,
- Resource amContainerResource, Map params) {
+ Resource amContainerResource, Map params,
+ Set vAllocationTags, PlacementConstraint vPlacementConstraint) {
super.init(heartbeatInterval, containerList, rm, se,
traceStartTime, traceFinishTime, user, queue, isTracked, oldAppId,
- baselineStartTimeMS, amContainerResource, params);
+ baselineStartTimeMS, amContainerResource, params, vAllocationTags,
+ vPlacementConstraint);
amtype = "mapreduce";
// get map/reduce tasks
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java
index b41f5f2..85ce545 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java
@@ -31,6 +31,8 @@
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
@@ -46,6 +48,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* AMSimulator that simulates streaming services - it keeps tasks
@@ -96,10 +99,11 @@ public void init(int heartbeatInterval,
List containerList, ResourceManager rm, SLSRunner se,
long traceStartTime, long traceFinishTime, String user, String queue,
boolean isTracked, String oldAppId, long baselineStartTimeMS,
- Resource amContainerResource, Map params) {
+ Resource amContainerResource, Map params,
+ Set vAllocationTags, PlacementConstraint vPlacementConstraint) {
super.init(heartbeatInterval, containerList, rm, se, traceStartTime,
traceFinishTime, user, queue, isTracked, oldAppId, baselineStartTimeMS,
- amContainerResource, params);
+ amContainerResource, params, vAllocationTags, vPlacementConstraint);
amtype = "stream";
allStreams.addAll(containerList);
@@ -199,52 +203,72 @@ private void restart()
@Override
protected void sendContainerRequest()
- throws YarnException, IOException, InterruptedException {
+ throws YarnException, IOException, InterruptedException {
+
+ AllocateRequest request = getAllocateRequest();
+ UserGroupInformation ugi =
+ UserGroupInformation.createRemoteUser(appAttemptId.toString());
+ Token token = rm.getRMContext().getRMApps()
+ .get(appAttemptId.getApplicationId())
+ .getRMAppAttempt(appAttemptId).getAMRMToken();
+ ugi.addTokenIdentifier(token.decodeIdentifier());
+ AllocateResponse response = ugi.doAs(
+ new PrivilegedExceptionAction() {
+ @Override
+ public AllocateResponse run() throws Exception {
+ return rm.getApplicationMasterService().allocate(request);
+ }
+ });
+ if (response != null) {
+ responseQueue.put(response);
+ }
+ }
+
+ private AllocateRequest getAllocateRequest() {
+ AllocateRequest request;
// send out request
List ask = new ArrayList<>();
List release = new ArrayList<>();
+ List schedulingRequests = new ArrayList<>();
+ boolean enableConstraint = allocationTags != null
+ && placementConstraint != null;
if (!isFinished) {
if (!pendingStreams.isEmpty()) {
- ask = packageRequests(mergeLists(pendingStreams, scheduledStreams),
- PRIORITY_MAP);
- LOG.debug("Application {} sends out request for {} streams.",
- appId, pendingStreams.size());
+ if (enableConstraint) {
+ // add schedulingRequest
+ schedulingRequests = packageSchedulingRequests(
+ mergeLists(pendingStreams, scheduledStreams), PRIORITY_MAP, allocationTags);
+ } else {
+ ask = packageRequests(mergeLists(pendingStreams, scheduledStreams),
+ PRIORITY_MAP);
+ }
+ LOG.debug("Application {} sends out request for {} streams.", appId,
+ pendingStreams.size());
scheduledStreams.addAll(pendingStreams);
pendingStreams.clear();
}
}
- if(isFinished){
+ if (isFinished) {
release.addAll(assignedStreams.keySet());
ask.clear();
+ schedulingRequests.clear();
}
- final AllocateRequest request = createAllocateRequest(ask, release);
+ if (enableConstraint) {
+ request = createAllocateRequestWithSchedulingRequests(schedulingRequests,
+ release);
+ } else {
+ request = createAllocateRequest(ask, release);
+ }
if (totalContainers == 0) {
request.setProgress(1.0f);
} else {
request.setProgress((float) finishedContainers / totalContainers);
}
-
- UserGroupInformation ugi =
- UserGroupInformation.createRemoteUser(appAttemptId.toString());
- Token token = rm.getRMContext().getRMApps()
- .get(appAttemptId.getApplicationId())
- .getRMAppAttempt(appAttemptId).getAMRMToken();
- ugi.addTokenIdentifier(token.decodeIdentifier());
- AllocateResponse response = ugi.doAs(
- new PrivilegedExceptionAction() {
- @Override
- public AllocateResponse run() throws Exception {
- return rm.getApplicationMasterService().allocate(request);
- }
- });
- if (response != null) {
- responseQueue.put(response);
- }
+ return request;
}
-
@Override
public void initReservation(
ReservationId reservationId, long deadline, long now){
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
index 6ede8b2..79fc8f2 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
@@ -112,7 +112,10 @@ public Allocation allocate(ApplicationAttemptId attemptId,
containerIds, strings,
strings2, updateRequests);
return allocation;
- } finally {
+ } catch (Throwable t) {
+ t.printStackTrace();
+ return allocation;
+ }finally {
context.stop();
schedulerMetrics.increaseSchedulerAllocationCounter();
try {
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java
index 21dec96..421d5d0 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java
@@ -65,6 +65,12 @@
private final String queueName;
private final SynthTraceJobProducer.JobDefinition jobDef;
+ //allocation tags
+ private final String allocationTags;
+
+ //placement constraint
+ private final String placementConstratin;
+
private String type;
// job timing
@@ -165,6 +171,9 @@ protected SynthJob(JDKRandomGenerator rand, Configuration conf,
this.params = jobDef.params;
+ this.allocationTags = jobDef.allocation_tags;
+ this.placementConstratin = jobDef.placement_constraint;
+
conf.set(QUEUE_NAME, queueName);
// name and initialize job randomness
@@ -242,8 +251,20 @@ public String getQueueName() {
return queueName;
}
+ public String getAllocationTags() {
+ return allocationTags;
+ }
+
+ public String getPlacementConstratin() {
+ return placementConstratin;
+ }
+
@Override
public String toString() {
+ String allocationTagsStr = allocationTags == null ? "" : allocationTags;
+ String placementConstraintStr = placementConstratin == null
+ ? ""
+ : placementConstratin;
StringBuilder sb = new StringBuilder();
String res = "\nSynthJob [" + jobDef.class_name + "]: \n"
+ "\tname: " + getName() + "\n"
@@ -252,7 +273,9 @@ public String toString() {
+ "\tqueue: " + getQueueName() + "\n"
+ "\tsubmission: " + getSubmissionTime() + "\n"
+ "\tduration: " + getDuration() + "\n"
- + "\tdeadline: " + getDeadline() + "\n";
+ + "\tdeadline: " + getDeadline() + "\n"
+ + "\tallocation_tags: " + allocationTagsStr + "\n"
+ + "\tplacement_constraint: " + placementConstraintStr + "\n";
sb.append(res);
int taskno = 0;
for(SynthJob.SynthTask t : getTasks()){
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java
index fa6f1fc..478072b 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java
@@ -28,6 +28,7 @@
import org.apache.hadoop.tools.rumen.JobStory;
import org.apache.hadoop.tools.rumen.JobStoryProducer;
import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.sls.appmaster.MRAMSimulator;
import org.codehaus.jackson.annotate.JsonCreator;
@@ -121,11 +122,18 @@ public SynthTraceJobProducer(Configuration conf, Path path)
private String queue;
// Definition to construct the job from
private JobDefinition jobDef;
-
- StoryParams(long actualSubmissionTime, String queue, JobDefinition jobDef) {
+ //allocation tags
+ private String allocationTags;
+ //placement constraint
+ private String placementConstraint;
+
+ StoryParams(long actualSubmissionTime, String queue, JobDefinition jobDef,
+ String vAllocationTags, String vPlacementConstraint) {
this.actualSubmissionTime = actualSubmissionTime;
this.queue = queue;
this.jobDef = jobDef;
+ this.allocationTags = vAllocationTags;
+ this.placementConstraint = vPlacementConstraint;
}
}
@@ -147,7 +155,9 @@ public int compare(StoryParams o1, StoryParams o2) {
long actualSubmissionTime = wl.generateSubmissionTime();
String queue = wl.queue_name;
JobDefinition job = wl.generateJobDefinition();
- storyQueue.add(new StoryParams(actualSubmissionTime, queue, job));
+ storyQueue.add(
+ new StoryParams(actualSubmissionTime, queue, job, job.allocation_tags,
+ job.placement_constraint));
}
return storyQueue;
}
@@ -433,6 +443,13 @@ public String toString(){
String map_execution_type = ExecutionType.GUARANTEED.name();
@JsonProperty("reduce_execution_type")
String reduce_execution_type = ExecutionType.GUARANTEED.name();
+
+ //allocation tags
+ @JsonProperty("allocation_tags")
+ String allocation_tags;
+ //placement constraint
+ @JsonProperty("placement_constraint")
+ String placement_constraint;
public void init(JDKRandomGenerator rand){
deadline_factor.init(rand);
diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSStreamAMSynthWithConstraint.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSStreamAMSynthWithConstraint.java
new file mode 100644
index 0000000..a27de7a
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSStreamAMSynthWithConstraint.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import net.jcip.annotations.NotThreadSafe;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * This test performs simple runs of the SLS with the generic syn json format
+ * and placement constraint.
+ */
+@RunWith(value = Parameterized.class)
+@NotThreadSafe
+public class TestSLSStreamAMSynthWithConstraint extends BaseSLSRunnerTest {
+
+ @Parameters(name = "Testing with: {1}, {0}, (nodeFile {3})")
+ public static Collection