diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java index ae0891e89af..a8986ed475b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.api.protocolrecords; +import java.util.Collections; import java.util.List; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -28,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.util.Records; @@ -212,6 +214,26 @@ public abstract void setResourceBlacklistRequest( public abstract void setUpdateRequests( List updateRequests); + /** + * Get the list of Scheduling requests being sent by the + * ApplicationMaster. + * @return list of {@link SchedulingRequest} being sent by the + * ApplicationMaster. + */ + @Public + @Unstable + public abstract List getSchedulingRequests(); + + /** + * Set the list of Scheduling requests to inform the + * ResourceManager about. + * @param schedulingRequests list of SchedulingRequest. + */ + @Public + @Unstable + public abstract void setSchedulingRequests( + List schedulingRequests); + @Public @Unstable public static AllocateRequestBuilder newBuilder() { @@ -271,6 +293,20 @@ public AllocateRequestBuilder askList(List askList) { } /** + * Set the schedulingRequestList of the request. + * @see AllocateRequest#setSchedulingRequests(List) (List) + * @param schedulingRequestList schedulingRequestList + * @return {@link AllocateRequestBuilder} + */ + @Public + @Unstable + public AllocateRequestBuilder schedulingRequestList( + List schedulingRequestList){ + allocateRequest.setSchedulingRequests(schedulingRequestList); + return this; + } + + /** * Set the releaseList of the request. * @see AllocateRequest#setReleaseList(List) * @param releaseList releaseList of the request diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceSizing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceSizing.java index d82be11017c..8cdc63fdc7d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceSizing.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceSizing.java @@ -61,4 +61,31 @@ public static ResourceSizing newInstance(int numAllocations, Resource resources) @Public @Unstable public abstract void setResources(Resource resources); + + @Override + public int hashCode() { + int result = getResources().hashCode(); + result = 31 * result + getNumAllocations(); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if(obj == null || getClass() != obj.getClass()) { + return false; + } + + ResourceSizing that = (ResourceSizing) obj; + + if(getNumAllocations() != that.getNumAllocations()) { + return false; + } + if(!getResources().equals(that.getResources())) { + return false; + } + return true; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SchedulingRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SchedulingRequest.java index 47a0697f878..ddc4cdf7dd9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SchedulingRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SchedulingRequest.java @@ -49,6 +49,7 @@ public static SchedulingRequest newInstance(long allocationRequestId, return SchedulingRequest.newBuilder() .allocationRequestId(allocationRequestId).priority(priority) .executionType(executionType).allocationTags(allocationTags) + .resourceSizing(resourceSizing) .placementConstraintExpression(placementConstraintExpression).build(); } @@ -202,4 +203,63 @@ public SchedulingRequest build() { public abstract void setPlacementConstraint( PlacementConstraint placementConstraint); + + @Override + public int hashCode() { + final int prime = 2153; + int result = 2459; + Priority priority = getPriority(); + ExecutionTypeRequest executionType = getExecutionType(); + Set allocationTags = getAllocationTags(); + ResourceSizing resourceSizing = getResourceSizing(); + PlacementConstraint placementConstraintExpression = + getPlacementConstraint(); + result = prime * result + Long.valueOf(getAllocationRequestId()).hashCode(); + result = prime * result + ((priority == null) ? 0 : priority.hashCode()); + result = prime * result + + ((executionType == null) ? 0 : executionType.hashCode()); + result = prime * result + + ((allocationTags == null) ? 0 : allocationTags.hashCode()); + result = prime * result + + ((resourceSizing == null) ? 0 : resourceSizing.hashCode()); + result = prime * result + ((placementConstraintExpression == null) ? 0 + : placementConstraintExpression.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (!(obj instanceof SchedulingRequest)) + return false; + + SchedulingRequest other = (SchedulingRequest) obj; + + if (getAllocationRequestId() != other.getAllocationRequestId()) + return false; + ExecutionTypeRequest executionType = getExecutionType(); + if (executionType == null || other.getExecutionType() == null + || !other.getExecutionType().equals(executionType)) + return false; + Set allocationTags = getAllocationTags(); + if (allocationTags == null || other.getAllocationTags() == null + || !allocationTags.equals(other.getAllocationTags())) + return false; + ResourceSizing resourceSizing = getResourceSizing(); + if (resourceSizing == null || other.getResourceSizing() == null + || !resourceSizing.equals(other.getResourceSizing())) + return false; + PlacementConstraint placementConstraintExpression = + getPlacementConstraint(); + if (placementConstraintExpression == null + || other.getPlacementConstraint() == null + || !placementConstraintExpression + .equals(other.getPlacementConstraint())) + return false; + + return true; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 26bcb1d7e07..5e7c213804b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -91,6 +91,7 @@ message AllocateRequestProto { optional int32 response_id = 4; optional float progress = 5; repeated UpdateContainerRequestProto update_requests = 7; + repeated SchedulingRequestProto scheduling_requests = 10; } message NMTokenProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java index 0f0f5710332..b460044bb7f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java @@ -29,14 +29,17 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourceBlacklistRequestPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourceRequestPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.SchedulingRequestPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.UpdateContainerRequestPBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceBlacklistRequestProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProto; +import org.apache.hadoop.yarn.proto.YarnProtos.SchedulingRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateContainerRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProtoOrBuilder; @@ -53,6 +56,7 @@ private List ask = null; private List release = null; private List updateRequests = null; + private List schedulingRequests = null; private ResourceBlacklistRequest blacklistRequest = null; public AllocateRequestPBImpl() { @@ -101,6 +105,9 @@ private void mergeLocalToBuilder() { if (this.updateRequests != null) { addUpdateRequestsToProto(); } + if (this.schedulingRequests != null) { + addSchedulingRequestsToProto(); + } if (this.blacklistRequest != null) { builder.setBlacklistRequest(convertToProtoFormat(this.blacklistRequest)); } @@ -178,6 +185,23 @@ public void setUpdateRequests(List updateRequests) { } @Override + public List getSchedulingRequests() { + initSchedulingRequests(); + return this.schedulingRequests; + } + + @Override + public void setSchedulingRequests( + List schedulingRequests) { + if (schedulingRequests == null) { + return; + } + initSchedulingRequests(); + this.schedulingRequests.clear(); + this.schedulingRequests.addAll(schedulingRequests); + } + + @Override public ResourceBlacklistRequest getResourceBlacklistRequest() { AllocateRequestProtoOrBuilder p = viaProto ? proto : builder; if (this.blacklistRequest != null) { @@ -261,6 +285,20 @@ private void initUpdateRequests() { } } + private void initSchedulingRequests() { + if (this.schedulingRequests != null) { + return; + } + AllocateRequestProtoOrBuilder p = viaProto ? proto : builder; + List list = + p.getSchedulingRequestsList(); + this.schedulingRequests = new ArrayList<>(); + + for (SchedulingRequestProto c : list) { + this.schedulingRequests.add(convertFromProtoFormat(c)); + } + } + private void addUpdateRequestsToProto() { maybeInitBuilder(); builder.clearUpdateRequests(); @@ -297,6 +335,41 @@ public void remove() { builder.addAllUpdateRequests(iterable); } + private void addSchedulingRequestsToProto() { + maybeInitBuilder(); + builder.clearSchedulingRequests(); + if (schedulingRequests == null) { + return; + } + Iterable iterable = + new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + + private Iterator iter = + schedulingRequests.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public SchedulingRequestProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + + } + }; + builder.addAllSchedulingRequests(iterable); + } @Override public List getReleaseList() { initReleases(); @@ -377,6 +450,16 @@ private UpdateContainerRequestProto convertToProtoFormat( return ((UpdateContainerRequestPBImpl) t).getProto(); } + private SchedulingRequestPBImpl convertFromProtoFormat( + SchedulingRequestProto p) { + return new SchedulingRequestPBImpl(p); + } + + private SchedulingRequestProto convertToProtoFormat( + SchedulingRequest t) { + return ((SchedulingRequestPBImpl) t).getProto(); + } + private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { return new ContainerIdPBImpl(p); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceSizingPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceSizingPBImpl.java index 05bb3bd8551..f98e488031b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceSizingPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceSizingPBImpl.java @@ -112,6 +112,6 @@ private ResourcePBImpl convertFromProtoFormat(ResourceProto r) { } private ResourceProto convertToProtoFormat(Resource r) { - return ((ResourcePBImpl) r).getProto(); + return ProtoUtils.convertToProtoFormat(r); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java index c5585c28b21..a0b907dae1f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java @@ -149,8 +149,10 @@ import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.ResourceSizing; import org.apache.hadoop.yarn.api.records.ResourceTypeInfo; import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.records.SerializedException; import org.apache.hadoop.yarn.api.records.StrictPreemptionContract; import org.apache.hadoop.yarn.api.records.Token; @@ -189,7 +191,9 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ResourceOptionPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourceRequestPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ResourceSizingPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourceTypeInfoPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.SchedulingRequestPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.StrictPreemptionContractPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl; @@ -225,6 +229,8 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ResourceOptionProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ResourceSizingProto; +import org.apache.hadoop.yarn.proto.YarnProtos.SchedulingRequestProto; import org.apache.hadoop.yarn.proto.YarnProtos.SerializedExceptionProto; import org.apache.hadoop.yarn.proto.YarnProtos.StrictPreemptionContractProto; import org.apache.hadoop.yarn.proto.YarnProtos.URLProto; @@ -428,6 +434,8 @@ public static void setup() throws Exception { generateByNewInstance(QueueConfigurations.class); generateByNewInstance(CollectorInfo.class); generateByNewInstance(ResourceTypeInfo.class); + generateByNewInstance(ResourceSizing.class); + generateByNewInstance(SchedulingRequest.class); } @Test @@ -907,6 +915,17 @@ public void testResourceRequestPBImpl() throws Exception { } @Test + public void testResourceSizingPBImpl() throws Exception { + validatePBImplRecord(ResourceSizingPBImpl.class, ResourceSizingProto.class); + } + + @Test + public void testSchedulingRequestPBImpl() throws Exception { + validatePBImplRecord(SchedulingRequestPBImpl.class, + SchedulingRequestProto.class); + } + + @Test public void testSerializedExceptionPBImpl() throws Exception { validatePBImplRecord(SerializedExceptionPBImpl.class, SerializedExceptionProto.class);