diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java index 8c1fd8d..0920103 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java @@ -88,6 +88,15 @@ public static ResourceRequest newInstance(Priority priority, String hostName, public static ResourceRequest newInstance(Priority priority, String hostName, Resource capability, int numContainers, boolean relaxLocality, String labelExpression, ExecutionType execType) { + return newInstance(priority, hostName, capability, numContainers, + relaxLocality, labelExpression, execType, false); + } + + @Public + @Stable + public static ResourceRequest newInstance(Priority priority, String hostName, + Resource capability, int numContainers, boolean relaxLocality, String + labelExpression, ExecutionType execType, boolean ensureExecutionType) { ResourceRequest request = Records.newRecord(ResourceRequest.class); request.setPriority(priority); request.setResourceName(hostName); @@ -96,6 +105,7 @@ public static ResourceRequest newInstance(Priority priority, String hostName, request.setRelaxLocality(relaxLocality); request.setNodeLabelExpression(labelExpression); request.setExecutionType(execType); + request.setEnsureExecutionType(ensureExecutionType); return request; } @@ -254,6 +264,27 @@ public static boolean isAnyLocation(String hostName) { public abstract ExecutionType getExecutionType(); /** + * Set to true to explicitly ask that the Scheduling Authority return + * Containers of exactly the Execution Type requested. + * @param ensureExecutionType whether ExecutionType request should be + * strictly honored. + */ + @Public + @Stable + public abstract void setEnsureExecutionType(boolean ensureExecutionType); + + + /** + * Get whether Scheduling Authority should return Containers of exactly the + * Execution Type requested for this ResourceRequest. + * Defaults to false. + * @return whether ExecutionType request should be strictly honored + */ + @Public + @Stable + public abstract boolean getEnsureExecutionType(); + + /** *

For a request at a network hierarchy level, set whether locality can be relaxed * to that level and beyond.

* diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 42b5410..d766ac6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -306,6 +306,7 @@ message ResourceRequestProto { optional bool relax_locality = 5 [default = true]; optional string node_label_expression = 6; optional ExecutionTypeProto executionType = 7 [default = GUARANTEED]; + optional bool ensure_execution_type = 8 [default = false]; } enum AMCommandProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java index 53ae2cd..6f98fc5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java @@ -186,6 +186,8 @@ public String toString() { + ", # Containers: " + getNumContainers() + ", Location: " + getResourceName() + ", Relax Locality: " + getRelaxLocality() + + ", Execution Type: " + getExecutionType() + + ", Ensure Execution Type: " + getEnsureExecutionType() + ", Node Label Expression: " + getNodeLabelExpression() + "}"; } @@ -227,4 +229,15 @@ public void setExecutionType(ExecutionType execType) { builder.setExecutionType(ProtoUtils.convertToProtoFormat(execType)); } + @Override + public void setEnsureExecutionType(boolean ensureExecutionType) { + maybeInitBuilder(); + builder.setEnsureExecutionType(ensureExecutionType); + } + + @Override + public boolean getEnsureExecutionType() { + ResourceRequestProtoOrBuilder p = viaProto ? proto : builder; + return p.getEnsureExecutionType(); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java index 5d5ab78..ba37b5b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java @@ -40,7 +40,9 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; @@ -104,6 +106,12 @@ public Configuration getYarnConfiguration() { ContainerId.newContainerId( ApplicationAttemptId.newInstance( ApplicationId.newInstance(12345, 1), 2), 3)); + AllocateRequest allReq = + (AllocateRequestPBImpl)factory.newRecordInstance(AllocateRequest.class); + allReq.setAskList(Arrays.asList( + ResourceRequest.newInstance(Priority.UNDEFINED, "a", + Resource.newInstance(1, 2), 1, true, "exp", + ExecutionType.OPPORTUNISTIC, true))); DistributedSchedulingService service = createService(factory, rmContext, c); Server server = service.getServer(rpc, conf, addr, null); server.start(); @@ -168,8 +176,7 @@ public Configuration getYarnConfiguration() { DistSchedAllocateResponse dsAllocResp = new DistSchedAllocateResponsePBImpl( dsProxy.allocateForDistributedScheduling(null, - ((AllocateRequestPBImpl)factory - .newRecordInstance(AllocateRequest.class)).getProto())); + ((AllocateRequestPBImpl)allReq).getProto())); Assert.assertEquals( "h1", dsAllocResp.getNodesForScheduling().get(0).getHost()); @@ -235,6 +242,9 @@ public AllocateResponse allocate(AllocateRequest request) throws @Override public DistSchedAllocateResponse allocateForDistributedScheduling( AllocateRequest request) throws YarnException, IOException { + List askList = request.getAskList(); + Assert.assertEquals(1, askList.size()); + Assert.assertTrue(askList.get(0).getEnsureExecutionType()); DistSchedAllocateResponse resp = factory.newRecordInstance(DistSchedAllocateResponse.class); resp.setNodesForScheduling(