diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java index 38fa8b9..edf6e86 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java @@ -66,6 +66,15 @@ public static Container newInstance(ContainerId containerId, NodeId nodeId, String nodeHttpAddress, Resource resource, Priority priority, Token containerToken) { + return newInstance(containerId, nodeId, nodeHttpAddress, resource, priority, + containerToken, ExecutionType.GUARANTEED); + } + + @Private + @Unstable + public static Container newInstance(ContainerId containerId, NodeId nodeId, + String nodeHttpAddress, Resource resource, Priority priority, + Token containerToken, ExecutionType executionType) { Container container = Records.newRecord(Container.class); container.setId(containerId); container.setNodeId(nodeId); @@ -73,6 +82,7 @@ public static Container newInstance(ContainerId containerId, NodeId nodeId, container.setResource(resource); container.setPriority(priority); container.setContainerToken(containerToken); + container.setExecutionType(executionType); return container; } @@ -163,4 +173,19 @@ public static Container newInstance(ContainerId containerId, NodeId nodeId, @Private @Unstable public abstract void setContainerToken(Token containerToken); + + /** + * Get the ExecutionType for the container. + * @return ExecutionType for the container. + */ + @Private + @Unstable + public abstract ExecutionType getExecutionType(); + + /** + * Set the ExecutionType for the container. + */ + @Private + @Unstable + public abstract void setExecutionType(ExecutionType executionType); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 60cdfd1..814c5bb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -92,6 +92,7 @@ message ContainerProto { optional ResourceProto resource = 4; optional PriorityProto priority = 5; optional hadoop.common.TokenProto container_token = 6; + optional ExecutionTypeProto execution_type = 7 [default = GUARANTEED]; } message ContainerReportProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java index 1700068..bd2d937 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java @@ -23,6 +23,7 @@ import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -33,6 +34,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeProto; @Private @Unstable @@ -47,7 +49,7 @@ private Resource resource = null; private Priority priority = null; private Token containerToken = null; - + public ContainerPBImpl() { builder = ContainerProto.newBuilder(); } @@ -248,6 +250,18 @@ public void setContainerToken(Token containerToken) { this.containerToken = containerToken; } + @Override + public ExecutionType getExecutionType() { + ContainerProtoOrBuilder p = viaProto ? proto : builder; + return convertFromProtoFormat(p.getExecutionType()); + } + + @Override + public void setExecutionType(ExecutionType executionType) { + maybeInitBuilder(); + builder.setExecutionType(convertToProtoFormat(executionType)); + } + private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { return new ContainerIdPBImpl(p); } @@ -288,6 +302,15 @@ private TokenProto convertToProtoFormat(Token t) { return ((TokenPBImpl)t).getProto(); } + private ExecutionType convertFromProtoFormat( + ExecutionTypeProto e) { + return ProtoUtils.convertFromProtoFormat(e); + } + + private ExecutionTypeProto convertToProtoFormat(ExecutionType e) { + return ProtoUtils.convertToProtoFormat(e); + } + public String toString() { StringBuilder sb = new StringBuilder(); sb.append("Container: ["); @@ -297,6 +320,7 @@ public String toString() { sb.append("Resource: ").append(getResource()).append(", "); sb.append("Priority: ").append(getPriority()).append(", "); sb.append("Token: ").append(getContainerToken()).append(", "); + sb.append("ExecutionType: ").append(getExecutionType()).append(", "); sb.append("]"); return sb.toString(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java index a70d143..b97f935 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java @@ -236,7 +236,7 @@ public static ContainerStatus newContainerStatus(ContainerId containerId, public static Container newContainer(ContainerId containerId, NodeId nodeId, String nodeHttpAddress, Resource resource, Priority priority, - Token containerToken) { + Token containerToken, ExecutionType executionType) { Container container = recordFactory.newRecordInstance(Container.class); container.setId(containerId); container.setNodeId(nodeId); @@ -244,9 +244,17 @@ public static Container newContainer(ContainerId containerId, NodeId nodeId, container.setResource(resource); container.setPriority(priority); container.setContainerToken(containerToken); + container.setExecutionType(executionType); return container; } + public static Container newContainer(ContainerId containerId, NodeId nodeId, + String nodeHttpAddress, Resource resource, Priority priority, + Token containerToken) { + return newContainer(containerId, nodeId, nodeHttpAddress, resource, + priority, containerToken, ExecutionType.GUARANTEED); + } + public static T newToken(Class tokenClass, byte[] identifier, String kind, byte[] password, String service) { T token = recordFactory.newRecordInstance(tokenClass); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java index e33c389..22a6a24 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java @@ -160,7 +160,8 @@ private Container buildContainer(DistSchedulerParams appParams, containerTokenIdentifier); Container container = BuilderUtils.newContainer( cId, nodeId, nodeId.getHost() + ":" + webpagePort, - capability, rr.getPriority(), containerToken); + capability, rr.getPriority(), containerToken, + containerTokenIdentifier.getExecutionType()); return container; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java index 47563d5..0cf1fa7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java @@ -35,6 +35,11 @@ .RegisterApplicationMasterRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb .RegisterApplicationMasterResponsePBImpl; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; @@ -66,6 +71,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.Arrays; +import java.util.List; public class TestDistributedSchedulingService { @@ -92,6 +98,12 @@ public Configuration getYarnConfiguration() { return new YarnConfiguration(); } }; + Container c = factory.newRecordInstance(Container.class); + c.setExecutionType(ExecutionType.OPPORTUNISTIC); + c.setId( + ContainerId.newContainerId( + ApplicationAttemptId.newInstance( + ApplicationId.newInstance(12345, 1), 2), 3)); DistributedSchedulingService service = new DistributedSchedulingService(rmContext, null) { @Override @@ -122,6 +134,7 @@ public AllocateResponse allocate(AllocateRequest request) throws AllocateResponse response = factory.newRecordInstance (AllocateResponse.class); response.setNumClusterNodes(12345); + response.setAllocatedContainers(Arrays.asList(c)); return response; } @@ -180,6 +193,10 @@ public AllocateResponse allocate(AllocateRequest request) throws ((AllocateRequestPBImpl)factory .newRecordInstance(AllocateRequest.class)).getProto()) ); + List allocatedContainers = allocResp.getAllocatedContainers(); + Assert.assertEquals(1, allocatedContainers.size()); + Assert.assertEquals(ExecutionType.OPPORTUNISTIC, + allocatedContainers.get(0).getExecutionType()); Assert.assertEquals(12345, allocResp.getNumClusterNodes());