diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
index 7030712..f4579ab 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
@@ -41,6 +41,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -462,7 +463,8 @@ private void addResourceRequest(Priority priority, String resourceName,
remoteRequest.setCapability(capability);
remoteRequest.setNumContainers(0);
remoteRequest.setNodeLabelExpression(nodeLabelExpression);
- remoteRequest.setExecutionType(executionType);
+ remoteRequest.setExecutionTypeRequest(
+ ExecutionTypeRequest.newInstance(executionType, true));
reqMap.put(capability, remoteRequest);
}
remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ExecutionTypeRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ExecutionTypeRequest.java
new file mode 100644
index 0000000..8653160
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ExecutionTypeRequest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * An object of this class represents a specification of the execution
+ * guarantee of the Containers associated with a ResourceRequests. It consists
+ * of an ExecutionType as well as flag that explicitly asks the
+ * Scheduling Authority to return Containers of exactly the Execution Type
+ * requested.
+ */
+@Public
+@Evolving
+public abstract class ExecutionTypeRequest {
+
+ @Public
+ @Evolving
+ public static ExecutionTypeRequest newInstance() {
+ return newInstance(ExecutionType.GUARANTEED, false);
+ }
+
+ @Public
+ @Evolving
+ public static ExecutionTypeRequest newInstance(ExecutionType execType,
+ boolean ensureExecutionType) {
+ ExecutionTypeRequest executionTypeRequest =
+ Records.newRecord(ExecutionTypeRequest.class);
+ executionTypeRequest.setExecutionType(execType);
+ executionTypeRequest.setEnsureExecutionType(ensureExecutionType);
+ return executionTypeRequest;
+ }
+
+ /**
+ * Set the ExecutionType of the requested container.
+ *
+ * @param execType
+ * ExecutionType of the requested container
+ */
+ @Public
+ public abstract void setExecutionType(ExecutionType execType);
+
+ /**
+ * Get ExecutionType.
+ *
+ * @return ExecutionType.
+ */
+ @Public
+ public abstract ExecutionType getExecutionType();
+
+ /**
+ * Set to true to explicitly ask that the Scheduling Authority return
+ * Containers of exactly the Execution Type requested.
+ * @param ensureExecutionType whether ExecutionType request should be
+ * strictly honored.
+ */
+ @Public
+ public abstract void setEnsureExecutionType(boolean ensureExecutionType);
+
+
+ /**
+ * Get whether Scheduling Authority should return Containers of exactly the
+ * Execution Type requested for this ResourceRequest.
+ * Defaults to false.
+ * @return whether ExecutionType request should be strictly honored
+ */
+ @Public
+ public abstract boolean getEnsureExecutionType();
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
index 8c1fd8d..496e629 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
@@ -80,14 +80,14 @@ public static ResourceRequest newInstance(Priority priority, String hostName,
Resource capability, int numContainers, boolean relaxLocality,
String labelExpression) {
return newInstance(priority, hostName, capability, numContainers,
- relaxLocality, labelExpression, ExecutionType.GUARANTEED);
+ relaxLocality, labelExpression, ExecutionTypeRequest.newInstance());
}
@Public
@Stable
public static ResourceRequest newInstance(Priority priority, String hostName,
Resource capability, int numContainers, boolean relaxLocality, String
- labelExpression, ExecutionType execType) {
+ labelExpression, ExecutionTypeRequest executionTypeRequest) {
ResourceRequest request = Records.newRecord(ResourceRequest.class);
request.setPriority(priority);
request.setResourceName(hostName);
@@ -95,7 +95,7 @@ public static ResourceRequest newInstance(Priority priority, String hostName,
request.setNumContainers(numContainers);
request.setRelaxLocality(relaxLocality);
request.setNodeLabelExpression(labelExpression);
- request.setExecutionType(execType);
+ request.setExecutionTypeRequest(executionTypeRequest);
return request;
}
@@ -233,14 +233,16 @@ public static boolean isAnyLocation(String hostName) {
public abstract boolean getRelaxLocality();
/**
- * Set the ExecutionType of the requested container.
+ * Set the ExecutionTypeRequest of the requested container.
*
- * @param execType
- * ExecutionType of the requested container
+ * @param execSpec
+ * ExecutionTypeRequest of the requested container
*/
@Public
- @Stable
- public abstract void setExecutionType(ExecutionType execType);
+ @Evolving
+ public void setExecutionTypeRequest(ExecutionTypeRequest execSpec) {
+ throw new UnsupportedOperationException();
+ }
/**
* Get whether locality relaxation is enabled with this
@@ -250,8 +252,10 @@ public static boolean isAnyLocation(String hostName) {
* ResourceRequest.
*/
@Public
- @Stable
- public abstract ExecutionType getExecutionType();
+ @Evolving
+ public ExecutionTypeRequest getExecutionTypeRequest() {
+ throw new UnsupportedOperationException();
+ }
/**
*
For a request at a network hierarchy level, set whether locality can be relaxed
@@ -353,12 +357,12 @@ public boolean equals(Object obj) {
return false;
} else if (!priority.equals(other.getPriority()))
return false;
- ExecutionType executionType = getExecutionType();
- if (executionType == null) {
- if (other.getExecutionType() != null) {
+ ExecutionTypeRequest executionTypeRequest = getExecutionTypeRequest();
+ if (executionTypeRequest == null) {
+ if (other.getExecutionTypeRequest() != null) {
return false;
}
- } else if (executionType != other.getExecutionType()) {
+ } else if (executionTypeRequest != other.getExecutionTypeRequest()) {
return false;
}
if (getNodeLabelExpression() == null) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 42b5410..de634ce 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -305,7 +305,12 @@ message ResourceRequestProto {
optional int32 num_containers = 4;
optional bool relax_locality = 5 [default = true];
optional string node_label_expression = 6;
- optional ExecutionTypeProto executionType = 7 [default = GUARANTEED];
+ optional ExecutionTypeRequestProto execution_type_request = 7;
+}
+
+message ExecutionTypeRequestProto {
+ optional ExecutionTypeProto execution_type = 1 [default = GUARANTEED];
+ optional bool ensure_execution_type = 2 [default = false];
}
enum AMCommandProto {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
index b4dcf66..8259b91 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
@@ -31,6 +31,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeState;
@@ -129,7 +130,8 @@ public void testOpportunisticExecutionTypeRequestE2E() throws Exception {
ResourceRequest newRR = ResourceRequest.newInstance(rr
.getPriority(), rr.getResourceName(),
rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(),
- rr.getNodeLabelExpression(), ExecutionType.OPPORTUNISTIC);
+ rr.getNodeLabelExpression(),
+ ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC, true));
newAskList.add(newRR);
}
}
@@ -235,7 +237,8 @@ public void testMixedExecutionTypeRequestE2E() throws Exception {
ResourceRequest newRR = ResourceRequest.newInstance(rr
.getPriority(), rr.getResourceName(),
rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(),
- rr.getNodeLabelExpression(), ExecutionType.OPPORTUNISTIC);
+ rr.getNodeLabelExpression(),
+ ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC, true));
newAskList.add(newRR);
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ExecutionTypeRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ExecutionTypeRequestPBImpl.java
new file mode 100644
index 0000000..6ee4888
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ExecutionTypeRequestPBImpl.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records.impl.pb;
+
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeRequestProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeRequestProtoOrBuilder;
+
+/**
+ * Implementation of ExecutionTypeRequest
+ */
+public class ExecutionTypeRequestPBImpl extends ExecutionTypeRequest {
+ private ExecutionTypeRequestProto proto =
+ ExecutionTypeRequestProto.getDefaultInstance();
+ private ExecutionTypeRequestProto.Builder builder = null;
+ private boolean viaProto = false;
+
+ public ExecutionTypeRequestPBImpl() {
+ builder = ExecutionTypeRequestProto.newBuilder();
+ }
+
+ public ExecutionTypeRequestPBImpl(ExecutionTypeRequestProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = ExecutionTypeRequestProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ public ExecutionTypeRequestProto getProto() {
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public ExecutionType getExecutionType() {
+ ExecutionTypeRequestProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasExecutionType()) {
+ return null;
+ }
+ return ProtoUtils.convertFromProtoFormat(p.getExecutionType());
+ }
+
+ @Override
+ public void setExecutionType(ExecutionType execType) {
+ maybeInitBuilder();
+ if (execType == null) {
+ builder.clearExecutionType();
+ return;
+ }
+ builder.setExecutionType(ProtoUtils.convertToProtoFormat(execType));
+ }
+
+ @Override
+ public void setEnsureExecutionType(boolean ensureExecutionType) {
+ maybeInitBuilder();
+ builder.setEnsureExecutionType(ensureExecutionType);
+ }
+
+ @Override
+ public boolean getEnsureExecutionType() {
+ ExecutionTypeRequestProtoOrBuilder p = viaProto ? proto : builder;
+ return p.getEnsureExecutionType();
+ }
+
+ @Override
+ public String toString() {
+ return "{Execution Type: " + getExecutionType()
+ + ", Ensure Execution Type: " + getEnsureExecutionType() + "}";
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
index 236df90..1a0f30a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
@@ -28,6 +28,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
@@ -60,6 +61,7 @@
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryPolicyProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerTypeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos;
import org.apache.hadoop.yarn.server.api.ContainerType;
@@ -324,4 +326,17 @@ public static ContainerRetryPolicy convertFromProtoFormat(
ContainerRetryPolicyProto e) {
return ContainerRetryPolicy.valueOf(e.name());
}
+
+ /*
+ * ExecutionTypeRequest
+ */
+ public static ExecutionTypeRequestProto convertToProtoFormat(
+ ExecutionTypeRequest e) {
+ return ((ExecutionTypeRequestPBImpl)e).getProto();
+ }
+
+ public static ExecutionTypeRequest convertFromProtoFormat(
+ ExecutionTypeRequestProto e) {
+ return new ExecutionTypeRequestPBImpl(e);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java
index 53ae2cd..fd56f4f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java
@@ -21,7 +21,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -39,6 +39,7 @@
private Priority priority = null;
private Resource capability = null;
+ private ExecutionTypeRequest executionTypeRequest = null;
public ResourceRequestPBImpl() {
@@ -64,6 +65,10 @@ private void mergeLocalToBuilder() {
if (this.capability != null) {
builder.setCapability(convertToProtoFormat(this.capability));
}
+ if (this.executionTypeRequest != null) {
+ builder.setExecutionTypeRequest(
+ ProtoUtils.convertToProtoFormat(this.executionTypeRequest));
+ }
}
private void mergeLocalToProto() {
@@ -102,6 +107,29 @@ public void setPriority(Priority priority) {
builder.clearPriority();
this.priority = priority;
}
+
+
+ public ExecutionTypeRequest getExecutionTypeRequest() {
+ ResourceRequestProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.executionTypeRequest != null) {
+ return this.executionTypeRequest;
+ }
+ if (!p.hasExecutionTypeRequest()) {
+ return null;
+ }
+ this.executionTypeRequest =
+ ProtoUtils.convertFromProtoFormat(p.getExecutionTypeRequest());
+ return this.executionTypeRequest;
+ }
+
+ public void setExecutionTypeRequest(ExecutionTypeRequest execSpec) {
+ maybeInitBuilder();
+ if (execSpec == null) {
+ builder.clearExecutionTypeRequest();
+ }
+ this.executionTypeRequest = execSpec;
+ }
+
@Override
public String getResourceName() {
ResourceRequestProtoOrBuilder p = viaProto ? proto : builder;
@@ -186,7 +214,7 @@ public String toString() {
+ ", # Containers: " + getNumContainers()
+ ", Location: " + getResourceName()
+ ", Relax Locality: " + getRelaxLocality()
- + ", Node Label Expression: " + getNodeLabelExpression() + "}";
+ + ", Execution Spec: " + getExecutionTypeRequest() + "}";
}
@Override
@@ -207,24 +235,4 @@ public void setNodeLabelExpression(String nodeLabelExpression) {
}
builder.setNodeLabelExpression(nodeLabelExpression);
}
-
- @Override
- public ExecutionType getExecutionType() {
- ResourceRequestProtoOrBuilder p = viaProto ? proto : builder;
- if (!p.hasExecutionType()) {
- return null;
- }
- return ProtoUtils.convertFromProtoFormat(p.getExecutionType());
- }
-
- @Override
- public void setExecutionType(ExecutionType execType) {
- maybeInitBuilder();
- if (execType == null) {
- builder.clearExecutionType();
- return;
- }
- builder.setExecutionType(ProtoUtils.convertToProtoFormat(execType));
- }
-
}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
index fca814b..8e2ceb0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
@@ -214,7 +214,8 @@ private PartitionedResourceRequests partitionAskList(List