diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java
index 38fa8b9..edf6e86 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java
@@ -66,6 +66,15 @@
public static Container newInstance(ContainerId containerId, NodeId nodeId,
String nodeHttpAddress, Resource resource, Priority priority,
Token containerToken) {
+ return newInstance(containerId, nodeId, nodeHttpAddress, resource, priority,
+ containerToken, ExecutionType.GUARANTEED);
+ }
+
+ @Private
+ @Unstable
+ public static Container newInstance(ContainerId containerId, NodeId nodeId,
+ String nodeHttpAddress, Resource resource, Priority priority,
+ Token containerToken, ExecutionType executionType) {
Container container = Records.newRecord(Container.class);
container.setId(containerId);
container.setNodeId(nodeId);
@@ -73,6 +82,7 @@ public static Container newInstance(ContainerId containerId, NodeId nodeId,
container.setResource(resource);
container.setPriority(priority);
container.setContainerToken(containerToken);
+ container.setExecutionType(executionType);
return container;
}
@@ -163,4 +173,19 @@ public static Container newInstance(ContainerId containerId, NodeId nodeId,
@Private
@Unstable
public abstract void setContainerToken(Token containerToken);
+
+ /**
+ * Get the ExecutionType for the container.
+ * @return ExecutionType for the container.
+ */
+ @Private
+ @Unstable
+ public abstract ExecutionType getExecutionType();
+
+ /**
+ * Set the ExecutionType for the container.
+ */
+ @Private
+ @Unstable
+ public abstract void setExecutionType(ExecutionType executionType);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 60cdfd1..814c5bb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -92,6 +92,7 @@ message ContainerProto {
optional ResourceProto resource = 4;
optional PriorityProto priority = 5;
optional hadoop.common.TokenProto container_token = 6;
+ optional ExecutionTypeProto execution_type = 7 [default = GUARANTEED];
}
message ContainerReportProto {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java
index 1700068..bd2d937 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -33,6 +34,7 @@
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeProto;
@Private
@Unstable
@@ -47,7 +49,7 @@
private Resource resource = null;
private Priority priority = null;
private Token containerToken = null;
-
+
public ContainerPBImpl() {
builder = ContainerProto.newBuilder();
}
@@ -248,6 +250,18 @@ public void setContainerToken(Token containerToken) {
this.containerToken = containerToken;
}
+ @Override
+ public ExecutionType getExecutionType() {
+ ContainerProtoOrBuilder p = viaProto ? proto : builder;
+ return convertFromProtoFormat(p.getExecutionType());
+ }
+
+ @Override
+ public void setExecutionType(ExecutionType executionType) {
+ maybeInitBuilder();
+ builder.setExecutionType(convertToProtoFormat(executionType));
+ }
+
private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
return new ContainerIdPBImpl(p);
}
@@ -288,6 +302,15 @@ private TokenProto convertToProtoFormat(Token t) {
return ((TokenPBImpl)t).getProto();
}
+ private ExecutionType convertFromProtoFormat(
+ ExecutionTypeProto e) {
+ return ProtoUtils.convertFromProtoFormat(e);
+ }
+
+ private ExecutionTypeProto convertToProtoFormat(ExecutionType e) {
+ return ProtoUtils.convertToProtoFormat(e);
+ }
+
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("Container: [");
@@ -297,6 +320,7 @@ public String toString() {
sb.append("Resource: ").append(getResource()).append(", ");
sb.append("Priority: ").append(getPriority()).append(", ");
sb.append("Token: ").append(getContainerToken()).append(", ");
+ sb.append("ExecutionType: ").append(getExecutionType()).append(", ");
sb.append("]");
return sb.toString();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
index a70d143..b97f935 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
@@ -236,7 +236,7 @@ public static ContainerStatus newContainerStatus(ContainerId containerId,
public static Container newContainer(ContainerId containerId, NodeId nodeId,
String nodeHttpAddress, Resource resource, Priority priority,
- Token containerToken) {
+ Token containerToken, ExecutionType executionType) {
Container container = recordFactory.newRecordInstance(Container.class);
container.setId(containerId);
container.setNodeId(nodeId);
@@ -244,9 +244,17 @@ public static Container newContainer(ContainerId containerId, NodeId nodeId,
container.setResource(resource);
container.setPriority(priority);
container.setContainerToken(containerToken);
+ container.setExecutionType(executionType);
return container;
}
+ public static Container newContainer(ContainerId containerId, NodeId nodeId,
+ String nodeHttpAddress, Resource resource, Priority priority,
+ Token containerToken) {
+ return newContainer(containerId, nodeId, nodeHttpAddress, resource,
+ priority, containerToken, ExecutionType.GUARANTEED);
+ }
+
public static T newToken(Class tokenClass,
byte[] identifier, String kind, byte[] password, String service) {
T token = recordFactory.newRecordInstance(tokenClass);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
index e33c389..22a6a24 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
@@ -160,7 +160,8 @@ private Container buildContainer(DistSchedulerParams appParams,
containerTokenIdentifier);
Container container = BuilderUtils.newContainer(
cId, nodeId, nodeId.getHost() + ":" + webpagePort,
- capability, rr.getPriority(), containerToken);
+ capability, rr.getPriority(), containerToken,
+ containerTokenIdentifier.getExecutionType());
return container;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
index 47563d5..0cf1fa7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
@@ -35,6 +35,11 @@
.RegisterApplicationMasterRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
.RegisterApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@@ -66,6 +71,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Arrays;
+import java.util.List;
public class TestDistributedSchedulingService {
@@ -92,6 +98,12 @@ public Configuration getYarnConfiguration() {
return new YarnConfiguration();
}
};
+ Container c = factory.newRecordInstance(Container.class);
+ c.setExecutionType(ExecutionType.OPPORTUNISTIC);
+ c.setId(
+ ContainerId.newContainerId(
+ ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(12345, 1), 2), 3));
DistributedSchedulingService service =
new DistributedSchedulingService(rmContext, null) {
@Override
@@ -122,6 +134,7 @@ public AllocateResponse allocate(AllocateRequest request) throws
AllocateResponse response = factory.newRecordInstance
(AllocateResponse.class);
response.setNumClusterNodes(12345);
+ response.setAllocatedContainers(Arrays.asList(c));
return response;
}
@@ -180,6 +193,10 @@ public AllocateResponse allocate(AllocateRequest request) throws
((AllocateRequestPBImpl)factory
.newRecordInstance(AllocateRequest.class)).getProto())
);
+ List allocatedContainers = allocResp.getAllocatedContainers();
+ Assert.assertEquals(1, allocatedContainers.size());
+ Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
+ allocatedContainers.get(0).getExecutionType());
Assert.assertEquals(12345, allocResp.getNumClusterNodes());