diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
index 8c1fd8d..0920103 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
@@ -88,6 +88,15 @@ public static ResourceRequest newInstance(Priority priority, String hostName,
public static ResourceRequest newInstance(Priority priority, String hostName,
Resource capability, int numContainers, boolean relaxLocality, String
labelExpression, ExecutionType execType) {
+ return newInstance(priority, hostName, capability, numContainers,
+ relaxLocality, labelExpression, execType, false);
+ }
+
+ @Public
+ @Stable
+ public static ResourceRequest newInstance(Priority priority, String hostName,
+ Resource capability, int numContainers, boolean relaxLocality, String
+ labelExpression, ExecutionType execType, boolean ensureExecutionType) {
ResourceRequest request = Records.newRecord(ResourceRequest.class);
request.setPriority(priority);
request.setResourceName(hostName);
@@ -96,6 +105,7 @@ public static ResourceRequest newInstance(Priority priority, String hostName,
request.setRelaxLocality(relaxLocality);
request.setNodeLabelExpression(labelExpression);
request.setExecutionType(execType);
+ request.setEnsureExecutionType(ensureExecutionType);
return request;
}
@@ -254,6 +264,27 @@ public static boolean isAnyLocation(String hostName) {
public abstract ExecutionType getExecutionType();
/**
+ * Set to true to explicitly ask that the Scheduling Authority return
+ * Containers of exactly the Execution Type requested.
+ * @param ensureExecutionType whether ExecutionType request should be
+ * strictly honored.
+ */
+ @Public
+ @Stable
+ public abstract void setEnsureExecutionType(boolean ensureExecutionType);
+
+
+ /**
+ * Get whether Scheduling Authority should return Containers of exactly the
+ * Execution Type requested for this ResourceRequest.
+ * Defaults to false.
+ * @return whether ExecutionType request should be strictly honored
+ */
+ @Public
+ @Stable
+ public abstract boolean getEnsureExecutionType();
+
+ /**
*
For a request at a network hierarchy level, set whether locality can be relaxed * to that level and beyond.
*
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 42b5410..d766ac6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -306,6 +306,7 @@ message ResourceRequestProto {
optional bool relax_locality = 5 [default = true];
optional string node_label_expression = 6;
optional ExecutionTypeProto executionType = 7 [default = GUARANTEED];
+ optional bool ensure_execution_type = 8 [default = false];
}
enum AMCommandProto {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java
index 53ae2cd..6f98fc5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java
@@ -186,6 +186,8 @@ public String toString() {
+ ", # Containers: " + getNumContainers()
+ ", Location: " + getResourceName()
+ ", Relax Locality: " + getRelaxLocality()
+ + ", Execution Type: " + getExecutionType()
+ + ", Ensure Execution Type: " + getEnsureExecutionType()
+ ", Node Label Expression: " + getNodeLabelExpression() + "}";
}
@@ -227,4 +229,15 @@ public void setExecutionType(ExecutionType execType) {
builder.setExecutionType(ProtoUtils.convertToProtoFormat(execType));
}
+ @Override
+ public void setEnsureExecutionType(boolean ensureExecutionType) {
+ maybeInitBuilder();
+ builder.setEnsureExecutionType(ensureExecutionType);
+ }
+
+ @Override
+ public boolean getEnsureExecutionType() {
+ ResourceRequestProtoOrBuilder p = viaProto ? proto : builder;
+ return p.getEnsureExecutionType();
+ }
}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
index 5d5ab78..ba37b5b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
@@ -40,7 +40,9 @@
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@@ -104,6 +106,12 @@ public Configuration getYarnConfiguration() {
ContainerId.newContainerId(
ApplicationAttemptId.newInstance(
ApplicationId.newInstance(12345, 1), 2), 3));
+ AllocateRequest allReq =
+ (AllocateRequestPBImpl)factory.newRecordInstance(AllocateRequest.class);
+ allReq.setAskList(Arrays.asList(
+ ResourceRequest.newInstance(Priority.UNDEFINED, "a",
+ Resource.newInstance(1, 2), 1, true, "exp",
+ ExecutionType.OPPORTUNISTIC, true)));
DistributedSchedulingService service = createService(factory, rmContext, c);
Server server = service.getServer(rpc, conf, addr, null);
server.start();
@@ -168,8 +176,7 @@ public Configuration getYarnConfiguration() {
DistSchedAllocateResponse dsAllocResp =
new DistSchedAllocateResponsePBImpl(
dsProxy.allocateForDistributedScheduling(null,
- ((AllocateRequestPBImpl)factory
- .newRecordInstance(AllocateRequest.class)).getProto()));
+ ((AllocateRequestPBImpl)allReq).getProto()));
Assert.assertEquals(
"h1", dsAllocResp.getNodesForScheduling().get(0).getHost());
@@ -235,6 +242,9 @@ public AllocateResponse allocate(AllocateRequest request) throws
@Override
public DistSchedAllocateResponse allocateForDistributedScheduling(
AllocateRequest request) throws YarnException, IOException {
+ List