diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
index 7030712..aaaa505 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
@@ -41,6 +41,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionSpec;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -462,7 +463,8 @@ private void addResourceRequest(Priority priority, String resourceName,
remoteRequest.setCapability(capability);
remoteRequest.setNumContainers(0);
remoteRequest.setNodeLabelExpression(nodeLabelExpression);
- remoteRequest.setExecutionType(executionType);
+ remoteRequest.setExecutionSpec(
+ ExecutionSpec.newInstance(executionType, true));
reqMap.put(capability, remoteRequest);
}
remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ExecutionSpec.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ExecutionSpec.java
new file mode 100644
index 0000000..1293146
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ExecutionSpec.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * An object of this class represents a specification of the execution
+ * guarantee of the Containers associated with a ResourceRequests. It consists
+ * of an ExecutionT as well as flag that explicitly asks the
+ * Scheduling Authority to return Containers of exactly the Execution Type
+ * requested.
+ */
+public abstract class ExecutionSpec {
+
+ public static ExecutionSpec newInstance(ExecutionType execType,
+ boolean ensureExecutionType) {
+ ExecutionSpec executionSpec = Records.newRecord(ExecutionSpec.class);
+ executionSpec.setExecutionType(execType);
+ executionSpec.setEnsureExecutionType(ensureExecutionType);
+ return executionSpec;
+ }
+
+ /**
+ * Set the ExecutionType of the requested container.
+ *
+ * @param execType
+ * ExecutionType of the requested container
+ */
+ @Public
+ @Stable
+ public abstract void setExecutionType(ExecutionType execType);
+
+ /**
+ * Get whether locality relaxation is enabled with this
+ * ResourceRequest. Defaults to true.
+ *
+ * @return whether locality relaxation is enabled with this
+ * ResourceRequest.
+ */
+ @Public
+ @Stable
+ public abstract ExecutionType getExecutionType();
+
+ /**
+ * Set to true to explicitly ask that the Scheduling Authority return
+ * Containers of exactly the Execution Type requested.
+ * @param ensureExecutionType whether ExecutionType request should be
+ * strictly honored.
+ */
+ @Public
+ @Stable
+ public abstract void setEnsureExecutionType(boolean ensureExecutionType);
+
+
+ /**
+ * Get whether Scheduling Authority should return Containers of exactly the
+ * Execution Type requested for this ResourceRequest.
+ * Defaults to false.
+ * @return whether ExecutionType request should be strictly honored
+ */
+ @Public
+ @Stable
+ public abstract boolean getEnsureExecutionType();
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
index 8c1fd8d..26d6730 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
@@ -80,14 +80,15 @@ public static ResourceRequest newInstance(Priority priority, String hostName,
Resource capability, int numContainers, boolean relaxLocality,
String labelExpression) {
return newInstance(priority, hostName, capability, numContainers,
- relaxLocality, labelExpression, ExecutionType.GUARANTEED);
+ relaxLocality, labelExpression,
+ ExecutionSpec.newInstance(ExecutionType.GUARANTEED, false));
}
@Public
@Stable
public static ResourceRequest newInstance(Priority priority, String hostName,
Resource capability, int numContainers, boolean relaxLocality, String
- labelExpression, ExecutionType execType) {
+ labelExpression, ExecutionSpec executionSpec) {
ResourceRequest request = Records.newRecord(ResourceRequest.class);
request.setPriority(priority);
request.setResourceName(hostName);
@@ -95,7 +96,7 @@ public static ResourceRequest newInstance(Priority priority, String hostName,
request.setNumContainers(numContainers);
request.setRelaxLocality(relaxLocality);
request.setNodeLabelExpression(labelExpression);
- request.setExecutionType(execType);
+ request.setExecutionSpec(executionSpec);
return request;
}
@@ -233,14 +234,14 @@ public static boolean isAnyLocation(String hostName) {
public abstract boolean getRelaxLocality();
/**
- * Set the ExecutionType of the requested container.
+ * Set the ExecutionSpec of the requested container.
*
- * @param execType
- * ExecutionType of the requested container
+ * @param execSpec
+ * ExecutionSpec of the requested container
*/
@Public
@Stable
- public abstract void setExecutionType(ExecutionType execType);
+ public abstract void setExecutionSpec(ExecutionSpec execSpec);
/**
* Get whether locality relaxation is enabled with this
@@ -251,7 +252,7 @@ public static boolean isAnyLocation(String hostName) {
*/
@Public
@Stable
- public abstract ExecutionType getExecutionType();
+ public abstract ExecutionSpec getExecutionSpec();
/**
*
For a request at a network hierarchy level, set whether locality can be relaxed
@@ -353,12 +354,12 @@ public boolean equals(Object obj) {
return false;
} else if (!priority.equals(other.getPriority()))
return false;
- ExecutionType executionType = getExecutionType();
- if (executionType == null) {
- if (other.getExecutionType() != null) {
+ ExecutionSpec executionSpec = getExecutionSpec();
+ if (executionSpec == null) {
+ if (other.getExecutionSpec() != null) {
return false;
}
- } else if (executionType != other.getExecutionType()) {
+ } else if (executionSpec != other.getExecutionSpec()) {
return false;
}
if (getNodeLabelExpression() == null) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 42b5410..bf3cb97 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -305,7 +305,12 @@ message ResourceRequestProto {
optional int32 num_containers = 4;
optional bool relax_locality = 5 [default = true];
optional string node_label_expression = 6;
- optional ExecutionTypeProto executionType = 7 [default = GUARANTEED];
+ optional ExecutionSpecProto execution_spec = 7;
+}
+
+message ExecutionSpecProto {
+ optional ExecutionTypeProto execution_type = 1 [default = GUARANTEED];
+ optional bool ensure_execution_type = 2 [default = false];
}
enum AMCommandProto {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
index b4dcf66..630618d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
@@ -31,6 +31,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ExecutionSpec;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeState;
@@ -129,7 +130,8 @@ public void testOpportunisticExecutionTypeRequestE2E() throws Exception {
ResourceRequest newRR = ResourceRequest.newInstance(rr
.getPriority(), rr.getResourceName(),
rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(),
- rr.getNodeLabelExpression(), ExecutionType.OPPORTUNISTIC);
+ rr.getNodeLabelExpression(),
+ ExecutionSpec.newInstance(ExecutionType.OPPORTUNISTIC, true));
newAskList.add(newRR);
}
}
@@ -235,7 +237,8 @@ public void testMixedExecutionTypeRequestE2E() throws Exception {
ResourceRequest newRR = ResourceRequest.newInstance(rr
.getPriority(), rr.getResourceName(),
rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(),
- rr.getNodeLabelExpression(), ExecutionType.OPPORTUNISTIC);
+ rr.getNodeLabelExpression(),
+ ExecutionSpec.newInstance(ExecutionType.OPPORTUNISTIC, true));
newAskList.add(newRR);
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ExecutionSpecPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ExecutionSpecPBImpl.java
new file mode 100644
index 0000000..8491fc3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ExecutionSpecPBImpl.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records.impl.pb;
+
+import org.apache.hadoop.yarn.api.records.ExecutionSpec;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionSpecProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionSpecProtoOrBuilder;
+
+/**
+ * Implementation of ExecutionSpec
+ */
+public class ExecutionSpecPBImpl extends ExecutionSpec {
+ private ExecutionSpecProto proto = ExecutionSpecProto.getDefaultInstance();
+ private ExecutionSpecProto.Builder builder = null;
+ private boolean viaProto = false;
+
+ public ExecutionSpecPBImpl() {
+ builder = ExecutionSpecProto.newBuilder();
+ }
+
+ public ExecutionSpecPBImpl(ExecutionSpecProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = ExecutionSpecProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ public ExecutionSpecProto getProto() {
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public ExecutionType getExecutionType() {
+ ExecutionSpecProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasExecutionType()) {
+ return null;
+ }
+ return ProtoUtils.convertFromProtoFormat(p.getExecutionType());
+ }
+
+ @Override
+ public void setExecutionType(ExecutionType execType) {
+ maybeInitBuilder();
+ if (execType == null) {
+ builder.clearExecutionType();
+ return;
+ }
+ builder.setExecutionType(ProtoUtils.convertToProtoFormat(execType));
+ }
+
+ @Override
+ public void setEnsureExecutionType(boolean ensureExecutionType) {
+ maybeInitBuilder();
+ builder.setEnsureExecutionType(ensureExecutionType);
+ }
+
+ @Override
+ public boolean getEnsureExecutionType() {
+ ExecutionSpecProtoOrBuilder p = viaProto ? proto : builder;
+ return p.getEnsureExecutionType();
+ }
+
+ @Override
+ public String toString() {
+ return "{Execution Type: " + getExecutionType()
+ + ", Ensure Execution Type: " + getEnsureExecutionType() + "}";
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
index 236df90..eeac3b0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
@@ -28,6 +28,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ExecutionSpec;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
@@ -60,6 +61,7 @@
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryPolicyProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerTypeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionSpecProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos;
import org.apache.hadoop.yarn.server.api.ContainerType;
@@ -324,4 +326,15 @@ public static ContainerRetryPolicy convertFromProtoFormat(
ContainerRetryPolicyProto e) {
return ContainerRetryPolicy.valueOf(e.name());
}
+
+ /*
+ * ExecutionSpec
+ */
+ public static ExecutionSpecProto convertToProtoFormat(ExecutionSpec e) {
+ return ((ExecutionSpecPBImpl)e).getProto();
+ }
+
+ public static ExecutionSpec convertFromProtoFormat(ExecutionSpecProto e) {
+ return new ExecutionSpecPBImpl(e);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java
index 53ae2cd..d5e9b98 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java
@@ -21,7 +21,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionSpec;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -39,6 +39,7 @@
private Priority priority = null;
private Resource capability = null;
+ private ExecutionSpec executionSpec = null;
public ResourceRequestPBImpl() {
@@ -64,6 +65,10 @@ private void mergeLocalToBuilder() {
if (this.capability != null) {
builder.setCapability(convertToProtoFormat(this.capability));
}
+ if (this.executionSpec != null) {
+ builder.setExecutionSpec(
+ ProtoUtils.convertToProtoFormat(this.executionSpec));
+ }
}
private void mergeLocalToProto() {
@@ -102,6 +107,31 @@ public void setPriority(Priority priority) {
builder.clearPriority();
this.priority = priority;
}
+
+
+ @Override
+ public ExecutionSpec getExecutionSpec() {
+ ResourceRequestProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.executionSpec != null) {
+ return this.executionSpec;
+ }
+ if (!p.hasExecutionSpec()) {
+ return null;
+ }
+ this.executionSpec =
+ ProtoUtils.convertFromProtoFormat(p.getExecutionSpec());
+ return this.executionSpec;
+ }
+
+ @Override
+ public void setExecutionSpec(ExecutionSpec execSpec) {
+ maybeInitBuilder();
+ if (execSpec == null) {
+ builder.clearExecutionSpec();
+ }
+ this.executionSpec = execSpec;
+ }
+
@Override
public String getResourceName() {
ResourceRequestProtoOrBuilder p = viaProto ? proto : builder;
@@ -186,7 +216,7 @@ public String toString() {
+ ", # Containers: " + getNumContainers()
+ ", Location: " + getResourceName()
+ ", Relax Locality: " + getRelaxLocality()
- + ", Node Label Expression: " + getNodeLabelExpression() + "}";
+ + ", Execution Spec: " + getExecutionSpec() + "}";
}
@Override
@@ -207,24 +237,4 @@ public void setNodeLabelExpression(String nodeLabelExpression) {
}
builder.setNodeLabelExpression(nodeLabelExpression);
}
-
- @Override
- public ExecutionType getExecutionType() {
- ResourceRequestProtoOrBuilder p = viaProto ? proto : builder;
- if (!p.hasExecutionType()) {
- return null;
- }
- return ProtoUtils.convertFromProtoFormat(p.getExecutionType());
- }
-
- @Override
- public void setExecutionType(ExecutionType execType) {
- maybeInitBuilder();
- if (execType == null) {
- builder.clearExecutionType();
- return;
- }
- builder.setExecutionType(ProtoUtils.convertToProtoFormat(execType));
- }
-
}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
index fca814b..a9f1d10 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
@@ -214,7 +214,8 @@ private PartitionedResourceRequests partitionAskList(List