diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java index 7030712..aaaa505 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionSpec; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -462,7 +463,8 @@ private void addResourceRequest(Priority priority, String resourceName, remoteRequest.setCapability(capability); remoteRequest.setNumContainers(0); remoteRequest.setNodeLabelExpression(nodeLabelExpression); - remoteRequest.setExecutionType(executionType); + remoteRequest.setExecutionSpec( + ExecutionSpec.newInstance(executionType, true)); reqMap.put(capability, remoteRequest); } remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ExecutionSpec.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ExecutionSpec.java new file mode 100644 index 0000000..1293146 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ExecutionSpec.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.yarn.util.Records; + +/** + * An object of this class represents a specification of the execution + * guarantee of the Containers associated with a ResourceRequests. It consists + * of an ExecutionT as well as flag that explicitly asks the + * Scheduling Authority to return Containers of exactly the Execution Type + * requested. + */ +public abstract class ExecutionSpec { + + public static ExecutionSpec newInstance(ExecutionType execType, + boolean ensureExecutionType) { + ExecutionSpec executionSpec = Records.newRecord(ExecutionSpec.class); + executionSpec.setExecutionType(execType); + executionSpec.setEnsureExecutionType(ensureExecutionType); + return executionSpec; + } + + /** + * Set the ExecutionType of the requested container. + * + * @param execType + * ExecutionType of the requested container + */ + @Public + @Stable + public abstract void setExecutionType(ExecutionType execType); + + /** + * Get whether locality relaxation is enabled with this + * ResourceRequest. Defaults to true. + * + * @return whether locality relaxation is enabled with this + * ResourceRequest. + */ + @Public + @Stable + public abstract ExecutionType getExecutionType(); + + /** + * Set to true to explicitly ask that the Scheduling Authority return + * Containers of exactly the Execution Type requested. + * @param ensureExecutionType whether ExecutionType request should be + * strictly honored. + */ + @Public + @Stable + public abstract void setEnsureExecutionType(boolean ensureExecutionType); + + + /** + * Get whether Scheduling Authority should return Containers of exactly the + * Execution Type requested for this ResourceRequest. + * Defaults to false. + * @return whether ExecutionType request should be strictly honored + */ + @Public + @Stable + public abstract boolean getEnsureExecutionType(); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java index 8c1fd8d..26d6730 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java @@ -80,14 +80,15 @@ public static ResourceRequest newInstance(Priority priority, String hostName, Resource capability, int numContainers, boolean relaxLocality, String labelExpression) { return newInstance(priority, hostName, capability, numContainers, - relaxLocality, labelExpression, ExecutionType.GUARANTEED); + relaxLocality, labelExpression, + ExecutionSpec.newInstance(ExecutionType.GUARANTEED, false)); } @Public @Stable public static ResourceRequest newInstance(Priority priority, String hostName, Resource capability, int numContainers, boolean relaxLocality, String - labelExpression, ExecutionType execType) { + labelExpression, ExecutionSpec executionSpec) { ResourceRequest request = Records.newRecord(ResourceRequest.class); request.setPriority(priority); request.setResourceName(hostName); @@ -95,7 +96,7 @@ public static ResourceRequest newInstance(Priority priority, String hostName, request.setNumContainers(numContainers); request.setRelaxLocality(relaxLocality); request.setNodeLabelExpression(labelExpression); - request.setExecutionType(execType); + request.setExecutionSpec(executionSpec); return request; } @@ -233,14 +234,14 @@ public static boolean isAnyLocation(String hostName) { public abstract boolean getRelaxLocality(); /** - * Set the ExecutionType of the requested container. + * Set the ExecutionSpec of the requested container. * - * @param execType - * ExecutionType of the requested container + * @param execSpec + * ExecutionSpec of the requested container */ @Public @Stable - public abstract void setExecutionType(ExecutionType execType); + public abstract void setExecutionSpec(ExecutionSpec execSpec); /** * Get whether locality relaxation is enabled with this @@ -251,7 +252,7 @@ public static boolean isAnyLocation(String hostName) { */ @Public @Stable - public abstract ExecutionType getExecutionType(); + public abstract ExecutionSpec getExecutionSpec(); /** *

For a request at a network hierarchy level, set whether locality can be relaxed @@ -353,12 +354,12 @@ public boolean equals(Object obj) { return false; } else if (!priority.equals(other.getPriority())) return false; - ExecutionType executionType = getExecutionType(); - if (executionType == null) { - if (other.getExecutionType() != null) { + ExecutionSpec executionSpec = getExecutionSpec(); + if (executionSpec == null) { + if (other.getExecutionSpec() != null) { return false; } - } else if (executionType != other.getExecutionType()) { + } else if (executionSpec != other.getExecutionSpec()) { return false; } if (getNodeLabelExpression() == null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 42b5410..bf3cb97 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -305,7 +305,12 @@ message ResourceRequestProto { optional int32 num_containers = 4; optional bool relax_locality = 5 [default = true]; optional string node_label_expression = 6; - optional ExecutionTypeProto executionType = 7 [default = GUARANTEED]; + optional ExecutionSpecProto execution_spec = 7; +} + +message ExecutionSpecProto { + optional ExecutionTypeProto execution_type = 1 [default = GUARANTEED]; + optional bool ensure_execution_type = 2 [default = false]; } enum AMCommandProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java index b4dcf66..630618d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ExecutionSpec; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeState; @@ -129,7 +130,8 @@ public void testOpportunisticExecutionTypeRequestE2E() throws Exception { ResourceRequest newRR = ResourceRequest.newInstance(rr .getPriority(), rr.getResourceName(), rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(), - rr.getNodeLabelExpression(), ExecutionType.OPPORTUNISTIC); + rr.getNodeLabelExpression(), + ExecutionSpec.newInstance(ExecutionType.OPPORTUNISTIC, true)); newAskList.add(newRR); } } @@ -235,7 +237,8 @@ public void testMixedExecutionTypeRequestE2E() throws Exception { ResourceRequest newRR = ResourceRequest.newInstance(rr .getPriority(), rr.getResourceName(), rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(), - rr.getNodeLabelExpression(), ExecutionType.OPPORTUNISTIC); + rr.getNodeLabelExpression(), + ExecutionSpec.newInstance(ExecutionType.OPPORTUNISTIC, true)); newAskList.add(newRR); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ExecutionSpecPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ExecutionSpecPBImpl.java new file mode 100644 index 0000000..8491fc3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ExecutionSpecPBImpl.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.impl.pb; + +import org.apache.hadoop.yarn.api.records.ExecutionSpec; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionSpecProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionSpecProtoOrBuilder; + +/** + * Implementation of ExecutionSpec + */ +public class ExecutionSpecPBImpl extends ExecutionSpec { + private ExecutionSpecProto proto = ExecutionSpecProto.getDefaultInstance(); + private ExecutionSpecProto.Builder builder = null; + private boolean viaProto = false; + + public ExecutionSpecPBImpl() { + builder = ExecutionSpecProto.newBuilder(); + } + + public ExecutionSpecPBImpl(ExecutionSpecProto proto) { + this.proto = proto; + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ExecutionSpecProto.newBuilder(proto); + } + viaProto = false; + } + + public ExecutionSpecProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public ExecutionType getExecutionType() { + ExecutionSpecProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasExecutionType()) { + return null; + } + return ProtoUtils.convertFromProtoFormat(p.getExecutionType()); + } + + @Override + public void setExecutionType(ExecutionType execType) { + maybeInitBuilder(); + if (execType == null) { + builder.clearExecutionType(); + return; + } + builder.setExecutionType(ProtoUtils.convertToProtoFormat(execType)); + } + + @Override + public void setEnsureExecutionType(boolean ensureExecutionType) { + maybeInitBuilder(); + builder.setEnsureExecutionType(ensureExecutionType); + } + + @Override + public boolean getEnsureExecutionType() { + ExecutionSpecProtoOrBuilder p = viaProto ? proto : builder; + return p.getEnsureExecutionType(); + } + + @Override + public String toString() { + return "{Execution Type: " + getExecutionType() + + ", Ensure Execution Type: " + getEnsureExecutionType() + "}"; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java index 236df90..eeac3b0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ExecutionSpec; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LocalResourceType; @@ -60,6 +61,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryPolicyProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerTypeProto; import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionSpecProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos; import org.apache.hadoop.yarn.server.api.ContainerType; @@ -324,4 +326,15 @@ public static ContainerRetryPolicy convertFromProtoFormat( ContainerRetryPolicyProto e) { return ContainerRetryPolicy.valueOf(e.name()); } + + /* + * ExecutionSpec + */ + public static ExecutionSpecProto convertToProtoFormat(ExecutionSpec e) { + return ((ExecutionSpecPBImpl)e).getProto(); + } + + public static ExecutionSpec convertFromProtoFormat(ExecutionSpecProto e) { + return new ExecutionSpecPBImpl(e); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java index 53ae2cd..d5e9b98 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java @@ -21,7 +21,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionSpec; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -39,6 +39,7 @@ private Priority priority = null; private Resource capability = null; + private ExecutionSpec executionSpec = null; public ResourceRequestPBImpl() { @@ -64,6 +65,10 @@ private void mergeLocalToBuilder() { if (this.capability != null) { builder.setCapability(convertToProtoFormat(this.capability)); } + if (this.executionSpec != null) { + builder.setExecutionSpec( + ProtoUtils.convertToProtoFormat(this.executionSpec)); + } } private void mergeLocalToProto() { @@ -102,6 +107,31 @@ public void setPriority(Priority priority) { builder.clearPriority(); this.priority = priority; } + + + @Override + public ExecutionSpec getExecutionSpec() { + ResourceRequestProtoOrBuilder p = viaProto ? proto : builder; + if (this.executionSpec != null) { + return this.executionSpec; + } + if (!p.hasExecutionSpec()) { + return null; + } + this.executionSpec = + ProtoUtils.convertFromProtoFormat(p.getExecutionSpec()); + return this.executionSpec; + } + + @Override + public void setExecutionSpec(ExecutionSpec execSpec) { + maybeInitBuilder(); + if (execSpec == null) { + builder.clearExecutionSpec(); + } + this.executionSpec = execSpec; + } + @Override public String getResourceName() { ResourceRequestProtoOrBuilder p = viaProto ? proto : builder; @@ -186,7 +216,7 @@ public String toString() { + ", # Containers: " + getNumContainers() + ", Location: " + getResourceName() + ", Relax Locality: " + getRelaxLocality() - + ", Node Label Expression: " + getNodeLabelExpression() + "}"; + + ", Execution Spec: " + getExecutionSpec() + "}"; } @Override @@ -207,24 +237,4 @@ public void setNodeLabelExpression(String nodeLabelExpression) { } builder.setNodeLabelExpression(nodeLabelExpression); } - - @Override - public ExecutionType getExecutionType() { - ResourceRequestProtoOrBuilder p = viaProto ? proto : builder; - if (!p.hasExecutionType()) { - return null; - } - return ProtoUtils.convertFromProtoFormat(p.getExecutionType()); - } - - @Override - public void setExecutionType(ExecutionType execType) { - maybeInitBuilder(); - if (execType == null) { - builder.clearExecutionType(); - return; - } - builder.setExecutionType(ProtoUtils.convertToProtoFormat(execType)); - } - } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java index fca814b..a9f1d10 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java @@ -214,7 +214,8 @@ private PartitionedResourceRequests partitionAskList(List PartitionedResourceRequests partitionedRequests = new PartitionedResourceRequests(); for (ResourceRequest rr : askList) { - if (rr.getExecutionType() == ExecutionType.OPPORTUNISTIC) { + if (rr.getExecutionSpec().getExecutionType() == + ExecutionType.OPPORTUNISTIC) { partitionedRequests.getOpportunistic().add(rr); } else { partitionedRequests.getGuaranteed().add(rr); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java index e987e79..e9c099f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager.scheduler; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ExecutionSpec; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; @@ -138,13 +139,15 @@ public DistSchedAllocateResponse answer(InvocationOnMock AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class); ResourceRequest guaranteedReq = Records.newRecord(ResourceRequest.class); - guaranteedReq.setExecutionType(ExecutionType.GUARANTEED); + guaranteedReq.setExecutionSpec( + ExecutionSpec.newInstance(ExecutionType.GUARANTEED, true)); guaranteedReq.setNumContainers(5); guaranteedReq.setCapability(Resource.newInstance(2048, 2)); guaranteedReq.setRelaxLocality(true); guaranteedReq.setResourceName("*"); ResourceRequest opportunisticReq = Records.newRecord(ResourceRequest.class); - opportunisticReq.setExecutionType(ExecutionType.OPPORTUNISTIC); + opportunisticReq.setExecutionSpec( + ExecutionSpec.newInstance(ExecutionType.OPPORTUNISTIC, true)); opportunisticReq.setNumContainers(4); opportunisticReq.setCapability(Resource.newInstance(1024, 4)); opportunisticReq.setPriority(Priority.newInstance(100)); @@ -167,7 +170,8 @@ public DistSchedAllocateResponse answer(InvocationOnMock // New Allocate request allocateRequest = Records.newRecord(AllocateRequest.class); opportunisticReq = Records.newRecord(ResourceRequest.class); - opportunisticReq.setExecutionType(ExecutionType.OPPORTUNISTIC); + opportunisticReq.setExecutionSpec( + ExecutionSpec.newInstance(ExecutionType.OPPORTUNISTIC, true)); opportunisticReq.setNumContainers(6); opportunisticReq.setCapability(Resource.newInstance(512, 3)); opportunisticReq.setPriority(Priority.newInstance(100)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java index 5d5ab78..4ced20e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java @@ -39,8 +39,11 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionSpec; import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; @@ -104,6 +107,12 @@ public Configuration getYarnConfiguration() { ContainerId.newContainerId( ApplicationAttemptId.newInstance( ApplicationId.newInstance(12345, 1), 2), 3)); + AllocateRequest allReq = + (AllocateRequestPBImpl)factory.newRecordInstance(AllocateRequest.class); + allReq.setAskList(Arrays.asList( + ResourceRequest.newInstance(Priority.UNDEFINED, "a", + Resource.newInstance(1, 2), 1, true, "exp", + ExecutionSpec.newInstance(ExecutionType.OPPORTUNISTIC, true)))); DistributedSchedulingService service = createService(factory, rmContext, c); Server server = service.getServer(rpc, conf, addr, null); server.start(); @@ -168,8 +177,7 @@ public Configuration getYarnConfiguration() { DistSchedAllocateResponse dsAllocResp = new DistSchedAllocateResponsePBImpl( dsProxy.allocateForDistributedScheduling(null, - ((AllocateRequestPBImpl)factory - .newRecordInstance(AllocateRequest.class)).getProto())); + ((AllocateRequestPBImpl)allReq).getProto())); Assert.assertEquals( "h1", dsAllocResp.getNodesForScheduling().get(0).getHost()); @@ -235,6 +243,10 @@ public AllocateResponse allocate(AllocateRequest request) throws @Override public DistSchedAllocateResponse allocateForDistributedScheduling( AllocateRequest request) throws YarnException, IOException { + List askList = request.getAskList(); + Assert.assertEquals(1, askList.size()); + Assert.assertTrue(askList.get(0) + .getExecutionSpec().getEnsureExecutionType()); DistSchedAllocateResponse resp = factory.newRecordInstance(DistSchedAllocateResponse.class); resp.setNodesForScheduling(