diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java index f9e4595b..a4b91ff 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java @@ -296,7 +296,6 @@ public AllocateResponse allocate(AllocateRequest request) Resources.none(), null, 1, null, Collections.emptyList(), yarnToken, - Collections.emptyList(), Collections.emptyList()); response.setApplicationPriority(Priority.newInstance(0)); return response; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index 52715d8..b955960 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -98,7 +98,6 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NMToken; @@ -107,6 +106,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; @@ -1702,8 +1702,8 @@ public synchronized Allocation allocate( ApplicationAttemptId applicationAttemptId, List ask, List release, List blacklistAdditions, List blacklistRemovals, - List increaseRequests, - List decreaseRequests) { + List increaseRequests, + List decreaseRequests) { List askCopy = new ArrayList(); for (ResourceRequest req : ask) { ResourceRequest reqCopy = ResourceRequest.newInstance(req @@ -1749,8 +1749,8 @@ public synchronized Allocation allocate( ApplicationAttemptId applicationAttemptId, List ask, List release, List blacklistAdditions, List blacklistRemovals, - List increaseRequest, - List decreaseRequests) { + List increaseRequest, + List decreaseRequests) { List askCopy = new ArrayList(); for (ResourceRequest req : ask) { ResourceRequest reqCopy = ResourceRequest.newInstance(req diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java index 014aa89..d17a2fb 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java @@ -51,7 +51,6 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; @@ -60,6 +59,7 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; @@ -206,8 +206,8 @@ public void run() { public Allocation allocate(ApplicationAttemptId attemptId, List resourceRequests, List containerIds, List strings, List strings2, - List increaseRequests, - List decreaseRequests) { + List increaseRequests, + List decreaseRequests) { if (metricsON) { final Timer.Context context = schedulerAllocateTimer.time(); Allocation allocation = null; diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java index 5cd53fe..206d775 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java @@ -48,10 +48,10 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; @@ -179,8 +179,8 @@ public void run() { public Allocation allocate(ApplicationAttemptId attemptId, List resourceRequests, List containerIds, List strings, List strings2, - List increaseRequests, - List decreaseRequests) { + List increaseRequests, + List decreaseRequests) { if (metricsON) { final Timer.Context context = schedulerAllocateTimer.time(); Allocation allocation = null; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java index 0b65e5c..3541c02 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java @@ -27,8 +27,8 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.util.Records; /** @@ -48,13 +48,8 @@ * A list of unused {@link Container} which are being returned. * *
  • - * A list of {@link ContainerResourceChangeRequest} to inform - * the ResourceManager about the resource increase - * requirements of running containers. - *
  • - *
  • - * A list of {@link ContainerResourceChangeRequest} to inform - * the ResourceManager about the resource decrease + * A list of {@link UpdateContainerRequest} to inform + * the ResourceManager about the change in * requirements of running containers. *
  • * @@ -72,25 +67,23 @@ public static AllocateRequest newInstance(int responseID, float appProgress, List containersToBeReleased, ResourceBlacklistRequest resourceBlacklistRequest) { return newInstance(responseID, appProgress, resourceAsk, - containersToBeReleased, resourceBlacklistRequest, null, null); + containersToBeReleased, resourceBlacklistRequest, null); } @Public - @Stable + @Unstable public static AllocateRequest newInstance(int responseID, float appProgress, List resourceAsk, List containersToBeReleased, ResourceBlacklistRequest resourceBlacklistRequest, - List increaseRequests, - List decreaseRequests) { + List updateRequests) { AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class); allocateRequest.setResponseId(responseID); allocateRequest.setProgress(appProgress); allocateRequest.setAskList(resourceAsk); allocateRequest.setReleaseList(containersToBeReleased); allocateRequest.setResourceBlacklistRequest(resourceBlacklistRequest); - allocateRequest.setIncreaseRequests(increaseRequests); - allocateRequest.setDecreaseRequests(decreaseRequests); + allocateRequest.setUpdateRequests(updateRequests); return allocateRequest; } @@ -197,38 +190,20 @@ public abstract void setResourceBlacklistRequest( ResourceBlacklistRequest resourceBlacklistRequest); /** - * Get the list of container resource increase requests being sent by the - * ApplicationMaster. - */ - @Public - @Unstable - public abstract List getIncreaseRequests(); - - /** - * Set the list of container resource increase requests to inform the - * ResourceManager about the containers whose resources need - * to be increased. - */ - @Public - @Unstable - public abstract void setIncreaseRequests( - List increaseRequests); - - /** - * Get the list of container resource decrease requests being sent by the + * Get the list of container update requests being sent by the * ApplicationMaster. */ @Public @Unstable - public abstract List getDecreaseRequests(); + public abstract List getUpdateRequests(); /** - * Set the list of container resource decrease requests to inform the - * ResourceManager about the containers whose resources need - * to be decreased. + * Set the list of container update requests to inform the + * ResourceManager about the containers that need to be + * updated. */ @Public @Unstable - public abstract void setDecreaseRequests( - List decreaseRequests); + public abstract void setUpdateRequests( + List updateRequests); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java index d1b2a3a..99ced92 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java @@ -95,19 +95,17 @@ public static AllocateResponse newInstance(int responseId, } @Public - @Stable + @Unstable public static AllocateResponse newInstance(int responseId, List completedContainers, List allocatedContainers, List updatedNodes, Resource availResources, AMCommand command, int numClusterNodes, PreemptionMessage preempt, List nmTokens, - List increasedContainers, - List decreasedContainers) { + List updatedContainers) { AllocateResponse response = newInstance(responseId, completedContainers, allocatedContainers, updatedNodes, availResources, command, numClusterNodes, preempt, nmTokens); - response.setIncreasedContainers(increasedContainers); - response.setDecreasedContainers(decreasedContainers); + response.setUpdatedContainers(updatedContainers); return response; } @@ -118,12 +116,11 @@ public static AllocateResponse newInstance(int responseId, List allocatedContainers, List updatedNodes, Resource availResources, AMCommand command, int numClusterNodes, PreemptionMessage preempt, List nmTokens, Token amRMToken, - List increasedContainers, - List decreasedContainers) { + List updatedContainers) { AllocateResponse response = newInstance(responseId, completedContainers, allocatedContainers, updatedNodes, availResources, command, numClusterNodes, preempt, - nmTokens, increasedContainers, decreasedContainers); + nmTokens, updatedContainers); response.setAMRMToken(amRMToken); return response; } @@ -270,40 +267,23 @@ public static AllocateResponse newInstance(int responseId, public abstract void setNMTokens(List nmTokens); /** - * Get the list of newly increased containers by + * Get the list of newly updated containers by * ResourceManager. */ @Public @Unstable - public abstract List getIncreasedContainers(); + public abstract List getUpdatedContainers(); /** - * Set the list of newly increased containers by + * Set the list of newly updated containers by * ResourceManager. */ @Private @Unstable - public abstract void setIncreasedContainers( + public abstract void setUpdatedContainers( List increasedContainers); /** - * Get the list of newly decreased containers by - * ResourceManager. - */ - @Public - @Unstable - public abstract List getDecreasedContainers(); - - /** - * Set the list of newly decreased containers by - * ResourceManager. - */ - @Private - @Unstable - public abstract void setDecreasedContainers( - List decreasedContainers); - - /** * The AMRMToken that belong to this attempt * * @return The AMRMToken that belong to this attempt diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java index 9a62935..c9909a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java @@ -189,4 +189,19 @@ public static Container newInstance(ContainerId containerId, NodeId nodeId, @Private @Unstable public abstract void setExecutionType(ExecutionType executionType); + + /** + * Get the version of this container. + * @return version of this container. + */ + @Private + @Unstable + public abstract int getVersion(); + + /** + * Set the version of this container. + * @param version of this container. + */ + public abstract void setVersion(int version); + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerResourceChangeRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerResourceChangeRequest.java deleted file mode 100644 index 117015b..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerResourceChangeRequest.java +++ /dev/null @@ -1,117 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.api.records; - -import org.apache.hadoop.classification.InterfaceAudience.Public; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; -import org.apache.hadoop.yarn.util.Records; - -/** - * {@code ContainerResourceChangeRequest} represents the request made by an - * application to the {@code ResourceManager} to change resource allocation of - * a running {@code Container}. - *

    - * It includes: - *

      - *
    • {@link ContainerId} for the container.
    • - *
    • - * {@link Resource} capability of the container after the resource change - * is completed. - *
    • - *
    - * - * @see ApplicationMasterProtocol#allocate(org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest) - */ -@Public -@Unstable -public abstract class ContainerResourceChangeRequest { - - @Public - @Unstable - public static ContainerResourceChangeRequest newInstance( - ContainerId existingContainerId, Resource targetCapability) { - ContainerResourceChangeRequest context = Records - .newRecord(ContainerResourceChangeRequest.class); - context.setContainerId(existingContainerId); - context.setCapability(targetCapability); - return context; - } - - /** - * Get the ContainerId of the container. - * @return ContainerId of the container - */ - @Public - @Unstable - public abstract ContainerId getContainerId(); - - /** - * Set the ContainerId of the container. - * @param containerId ContainerId of the container - */ - @Public - @Unstable - public abstract void setContainerId(ContainerId containerId); - - /** - * Get the Resource capability of the container. - * @return Resource capability of the container - */ - @Public - @Unstable - public abstract Resource getCapability(); - - /** - * Set the Resource capability of the container. - * @param capability Resource capability of the container - */ - @Public - @Unstable - public abstract void setCapability(Resource capability); - - @Override - public int hashCode() { - return getCapability().hashCode() + getContainerId().hashCode(); - } - - @Override - public boolean equals(Object other) { - if (other instanceof ContainerResourceChangeRequest) { - ContainerResourceChangeRequest ctx = - (ContainerResourceChangeRequest) other; - - if (getContainerId() == null && ctx.getContainerId() != null) { - return false; - } else if (!getContainerId().equals(ctx.getContainerId())) { - return false; - } - - if (getCapability() == null && ctx.getCapability() != null) { - return false; - } else if (!getCapability().equals(ctx.getCapability())) { - return false; - } - - return true; - } else { - return false; - } - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java new file mode 100644 index 0000000..1c2fff8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.util.Records; + +/** + * {@code UpdateContainerRequest} represents the request made by an + * application to the {@code ResourceManager} to update an attribute of a + * {@code Container} such as its Resource allocation or (@code ExecutionType} + *

    + * It includes: + *

      + *
    • version for the container.
    • + *
    • {@link ContainerId} for the container.
    • + *
    • + * {@link Resource} capability of the container after the update request + * is completed. + *
    • + *
    • + * {@link ExecutionType} of the container after the update request is + * completed. + *
    • + *
    + * + * @see ApplicationMasterProtocol#allocate(org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest) + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public abstract class UpdateContainerRequest { + + @InterfaceAudience.Public + @InterfaceStability.Unstable + public static UpdateContainerRequest newInstance(ContainerId containerId, + Resource targetCapability) { + return newInstance(0, containerId, targetCapability, null); + } + + @InterfaceAudience.Public + @InterfaceStability.Unstable + public static UpdateContainerRequest newInstance(int version, + ContainerId containerId, Resource targetCapability, + ExecutionType targetExecutionType) { + UpdateContainerRequest request = + Records.newRecord(UpdateContainerRequest.class); + request.setContainerVersion(version); + request.setContainerId(containerId); + request.setExecutionType(targetExecutionType); + request.setCapability(targetCapability); + return request; + } + + /** + * Get the ContainerId of the container. + * @return ContainerId of the container + */ + @InterfaceAudience.Public + @InterfaceStability.Unstable + public abstract int getContainerVersion(); + + /** + * Set the current version of the container. + * @param containerVersion of the container + */ + @InterfaceAudience.Public + @InterfaceStability.Unstable + public abstract void setContainerVersion(int containerVersion); + + /** + * Get the ContainerId of the container. + * @return ContainerId of the container + */ + @InterfaceAudience.Public + @InterfaceStability.Unstable + public abstract ContainerId getContainerId(); + + /** + * Set the ContainerId of the container. + * @param containerId ContainerId of the container + */ + @InterfaceAudience.Public + @InterfaceStability.Unstable + public abstract void setContainerId(ContainerId containerId); + + /** + * Get the Resource capability of the container. + * @return Resource capability of the container + */ + @InterfaceAudience.Public + @InterfaceStability.Unstable + public abstract Resource getCapability(); + + /** + * Set the Resource capability of the container. + * @param capability Resource capability of the container + */ + @InterfaceAudience.Public + @InterfaceStability.Unstable + public abstract void setCapability(Resource capability); + + /** + * Get the target ExecutionType of the container. + * @return ExecutionType of the container + */ + @InterfaceAudience.Public + @InterfaceStability.Unstable + public abstract ExecutionType getExecutionType(); + + /** + * Set the target ExecutionType of the container. + * @param executionType ExecutionType of the container + */ + @InterfaceAudience.Public + @InterfaceStability.Unstable + public abstract void setExecutionType(ExecutionType executionType); + + @Override + public int hashCode() { + final int prime = 2153; + int result = 2459; + ContainerId cId = getContainerId(); + ExecutionType execType = getExecutionType(); + Resource capability = getCapability(); + result = + prime * result + ((capability == null) ? 0 : capability.hashCode()); + result = prime * result + ((cId == null) ? 0 : cId.hashCode()); + result = prime * result + getContainerVersion(); + result = prime * result + ((execType == null) ? 0 : execType.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + UpdateContainerRequest other = (UpdateContainerRequest) obj; + Resource capability = getCapability(); + if (capability == null) { + if (other.getCapability() != null) + return false; + } else if (!capability.equals(other.getCapability())) + return false; + ContainerId cId = getContainerId(); + if (cId == null) { + if (other.getContainerId() != null) + return false; + } else if (!cId.equals(other.getContainerId())) + return false; + if (getContainerVersion() != other.getContainerVersion()) + return false; + ExecutionType execType = getExecutionType(); + if (execType == null) { + if (other.getExecutionType() != null) + return false; + } else if (!execType.equals(other.getExecutionType())) + return false; + return true; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 0649f8e..9b4cb68 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -93,6 +93,7 @@ message ContainerProto { optional PriorityProto priority = 5; optional hadoop.common.TokenProto container_token = 6; optional ExecutionTypeProto execution_type = 7 [default = GUARANTEED]; + optional int32 version = 8 [default = 0]; } message ContainerReportProto { @@ -531,11 +532,6 @@ enum ContainerExitStatusProto { DISKS_FAILED = -101; } -message ContainerResourceChangeRequestProto { - optional ContainerIdProto container_id = 1; - optional ResourceProto capability = 2; -} - message ContainerRetryContextProto { optional ContainerRetryPolicyProto retry_policy = 1 [default = NEVER_RETRY]; repeated int32 error_codes = 2; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 7070e38..fd8e43a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -60,14 +60,20 @@ message FinishApplicationMasterResponseProto { optional bool isUnregistered = 1 [default = false]; } +message UpdateContainerRequestProto { + required int32 container_version = 1; + required ContainerIdProto container_id = 2; + optional ResourceProto capability = 3; + optional ExecutionTypeProto execution_type = 4; +} + message AllocateRequestProto { repeated ResourceRequestProto ask = 1; repeated ContainerIdProto release = 2; optional ResourceBlacklistRequestProto blacklist_request = 3; optional int32 response_id = 4; optional float progress = 5; - repeated ContainerResourceChangeRequestProto increase_request = 6; - repeated ContainerResourceChangeRequestProto decrease_request = 7; + repeated UpdateContainerRequestProto update_requests = 6; } message NMTokenProto { @@ -85,8 +91,7 @@ message AllocateResponseProto { optional int32 num_cluster_nodes = 7; optional PreemptionMessageProto preempt = 8; repeated NMTokenProto nm_tokens = 9; - repeated ContainerProto increased_containers = 10; - repeated ContainerProto decreased_containers = 11; + repeated ContainerProto updated_containers = 10; optional hadoop.common.TokenProto am_rm_token = 12; optional PriorityProto application_priority = 13; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java index 286ca28..a22a667 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java @@ -339,8 +339,7 @@ public void run() { // RM side of the implementation guarantees that there are // no duplications between increased and decreased containers List changed = new ArrayList<>(); - changed.addAll(response.getIncreasedContainers()); - changed.addAll(response.getDecreasedContainers()); + changed.addAll(response.getUpdatedContainers()); if (!changed.isEmpty()) { ((AMRMClientAsync.AbstractCallbackHandler) handler) .onContainersResourceChanged(changed); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index 4366c25..92f3cac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -52,7 +52,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NMToken; @@ -61,6 +60,7 @@ import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; @@ -265,24 +265,13 @@ public AllocateResponse allocate(float progressIndicator) r.getResourceName(), r.getCapability(), r.getNumContainers(), r.getRelaxLocality(), r.getNodeLabelExpression())); } - List increaseList = new ArrayList<>(); - List decreaseList = new ArrayList<>(); + List updateList = new ArrayList<>(); // Save the current change for recovery oldChange.putAll(change); for (Map.Entry> entry : change.entrySet()) { - Container container = entry.getValue().getKey(); - Resource original = container.getResource(); - Resource target = entry.getValue().getValue(); - if (Resources.fitsIn(target, original)) { - // This is a decrease request - decreaseList.add(ContainerResourceChangeRequest.newInstance( - container.getId(), target)); - } else { - // This is an increase request - increaseList.add(ContainerResourceChangeRequest.newInstance( - container.getId(), target)); - } + updateList.add(UpdateContainerRequest.newInstance(0, entry.getKey(), + entry.getValue().getValue(), null)); } releaseList = new ArrayList(release); // optimistically clear this collection assuming no RPC failure @@ -299,8 +288,7 @@ public AllocateResponse allocate(float progressIndicator) allocateRequest = AllocateRequest.newInstance(lastResponseId, progressIndicator, - askList, releaseList, blacklistRequest, - increaseList, decreaseList); + askList, releaseList, blacklistRequest, updateList); // clear blacklistAdditions and blacklistRemovals before // unsynchronized part blacklistAdditions.clear(); @@ -351,8 +339,7 @@ public AllocateResponse allocate(float progressIndicator) List completed = allocateResponse.getCompletedContainersStatuses(); List changed = new ArrayList<>(); - changed.addAll(allocateResponse.getIncreasedContainers()); - changed.addAll(allocateResponse.getDecreasedContainers()); + changed.addAll(allocateResponse.getUpdatedContainers()); // remove all pending change requests that belong to the completed // containers for (ContainerStatus status : completed) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java index c7b3a94..a6c4040 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java @@ -410,10 +410,12 @@ private AllocateResponse createAllocateResponse( List completed, List allocated, List increased, List decreased, List nmTokens) { + List updatedContainers = new ArrayList<>(increased); + updatedContainers.addAll(decreased); AllocateResponse response = AllocateResponse.newInstance(0, completed, allocated, new ArrayList(), null, null, 1, null, nmTokens, - increased, decreased); + updatedContainers); return response; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java index 75b49d0..8105c99 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java @@ -881,22 +881,16 @@ private void doContainerResourceChange( AllocateResponse allocResponse = amClient.allocate(0.1f); Assert.assertEquals(0, amClientImpl.change.size()); // we should get decrease confirmation right away - List decreasedContainers = - allocResponse.getDecreasedContainers(); - List increasedContainers = - allocResponse.getIncreasedContainers(); - Assert.assertEquals(1, decreasedContainers.size()); - Assert.assertEquals(0, increasedContainers.size()); + List updaatedContainers = + allocResponse.getUpdatedContainers(); + Assert.assertEquals(1, updaatedContainers.size()); // we should get increase allocation after the next NM's heartbeat to RM sleep(150); // get allocations allocResponse = amClient.allocate(0.1f); - decreasedContainers = - allocResponse.getDecreasedContainers(); - increasedContainers = - allocResponse.getIncreasedContainers(); - Assert.assertEquals(1, increasedContainers.size()); - Assert.assertEquals(0, decreasedContainers.size()); + updaatedContainers = + allocResponse.getUpdatedContainers(); + Assert.assertEquals(1, updaatedContainers.size()); } private void testAllocation(final AMRMClientImpl amClient) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java index 0890396..81ad127 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java @@ -38,12 +38,12 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -550,8 +550,8 @@ public MyFifoScheduler(RMContext rmContext) { List lastAsk = null; List lastRelease = null; - List lastIncrease = null; - List lastDecrease = null; + List lastIncrease = null; + List lastDecrease = null; List lastBlacklistAdditions; List lastBlacklistRemovals; @@ -562,8 +562,8 @@ public synchronized Allocation allocate( ApplicationAttemptId applicationAttemptId, List ask, List release, List blacklistAdditions, List blacklistRemovals, - List increaseRequests, - List decreaseRequests) { + List increaseRequests, + List decreaseRequests) { List askCopy = new ArrayList(); for (ResourceRequest req : ask) { ResourceRequest reqCopy = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java index d6db32c..34fe3cf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java @@ -27,17 +27,17 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.ContainerResourceChangeRequestPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourceBlacklistRequestPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourceRequestPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.UpdateContainerRequestPBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; -import org.apache.hadoop.yarn.proto.YarnProtos.ContainerResourceChangeRequestProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceBlacklistRequestProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos; import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProtoOrBuilder; @@ -52,8 +52,7 @@ private List ask = null; private List release = null; - private List increaseRequests = null; - private List decreaseRequests = null; + private List updateRequests = null; private ResourceBlacklistRequest blacklistRequest = null; public AllocateRequestPBImpl() { @@ -99,12 +98,9 @@ private void mergeLocalToBuilder() { if (this.release != null) { addReleasesToProto(); } - if (this.increaseRequests != null) { + if (this.updateRequests != null) { addIncreaseRequestsToProto(); } - if (this.decreaseRequests != null) { - addDecreaseRequestsToProto(); - } if (this.blacklistRequest != null) { builder.setBlacklistRequest(convertToProtoFormat(this.blacklistRequest)); } @@ -166,37 +162,19 @@ public void setAskList(final List resourceRequests) { } @Override - public List getIncreaseRequests() { - initIncreaseRequests(); - return this.increaseRequests; + public List getUpdateRequests() { + initUpdateRequests(); + return this.updateRequests; } @Override - public void setIncreaseRequests( - List increaseRequests) { - if (increaseRequests == null) { + public void setUpdateRequests(List updateRequests) { + if (updateRequests == null) { return; } - initIncreaseRequests(); - this.increaseRequests.clear(); - this.increaseRequests.addAll(increaseRequests); - } - - @Override - public List getDecreaseRequests() { - initDecreaseRequests(); - return this.decreaseRequests; - } - - @Override - public void setDecreaseRequests( - List decreaseRequests) { - if (decreaseRequests == null) { - return; - } - initDecreaseRequests(); - this.decreaseRequests.clear(); - this.decreaseRequests.addAll(decreaseRequests); + initUpdateRequests(); + this.updateRequests.clear(); + this.updateRequests.addAll(updateRequests); } @Override @@ -239,7 +217,8 @@ private void addAsksToProto() { builder.clearAsk(); if (ask == null) return; - Iterable iterable = new Iterable() { + Iterable iterable = + new Iterable() { @Override public Iterator iterator() { return new Iterator() { @@ -268,84 +247,34 @@ public void remove() { builder.addAllAsk(iterable); } - private void initIncreaseRequests() { - if (this.increaseRequests != null) { + private void initUpdateRequests() { + if (this.updateRequests != null) { return; } AllocateRequestProtoOrBuilder p = viaProto ? proto : builder; - List list = - p.getIncreaseRequestList(); - this.increaseRequests = new ArrayList(); + List list = + p.getUpdateRequestsList(); + this.updateRequests = new ArrayList<>(); - for (ContainerResourceChangeRequestProto c : list) { - this.increaseRequests.add(convertFromProtoFormat(c)); - } - } - - private void initDecreaseRequests() { - if (this.decreaseRequests != null) { - return; - } - AllocateRequestProtoOrBuilder p = viaProto ? proto : builder; - List list = - p.getDecreaseRequestList(); - this.decreaseRequests = new ArrayList<>(); - - for (ContainerResourceChangeRequestProto c : list) { - this.decreaseRequests.add(convertFromProtoFormat(c)); + for (YarnServiceProtos.UpdateContainerRequestProto c : list) { + this.updateRequests.add(convertFromProtoFormat(c)); } } private void addIncreaseRequestsToProto() { maybeInitBuilder(); - builder.clearIncreaseRequest(); - if (increaseRequests == null) { - return; - } - Iterable iterable = - new Iterable() { - @Override - public Iterator iterator() { - return new Iterator() { - - Iterator iter = - increaseRequests.iterator(); - - @Override - public boolean hasNext() { - return iter.hasNext(); - } - - @Override - public ContainerResourceChangeRequestProto next() { - return convertToProtoFormat(iter.next()); - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; - - } - }; - builder.addAllIncreaseRequest(iterable); - } - - private void addDecreaseRequestsToProto() { - maybeInitBuilder(); - builder.clearDecreaseRequest(); - if (decreaseRequests == null) { + builder.clearUpdateRequests(); + if (updateRequests == null) { return; } - Iterable iterable = - new Iterable() { + Iterable iterable = + new Iterable() { @Override - public Iterator iterator() { - return new Iterator() { + public Iterator iterator() { + return new Iterator() { - Iterator iter = - decreaseRequests.iterator(); + Iterator iter = + updateRequests.iterator(); @Override public boolean hasNext() { @@ -353,7 +282,7 @@ public boolean hasNext() { } @Override - public ContainerResourceChangeRequestProto next() { + public YarnServiceProtos.UpdateContainerRequestProto next() { return convertToProtoFormat(iter.next()); } @@ -365,7 +294,7 @@ public void remove() { } }; - builder.addAllDecreaseRequest(iterable); + builder.addAllUpdateRequests(iterable); } @Override @@ -438,14 +367,14 @@ private ResourceRequestProto convertToProtoFormat(ResourceRequest t) { return ((ResourceRequestPBImpl)t).getProto(); } - private ContainerResourceChangeRequestPBImpl convertFromProtoFormat( - ContainerResourceChangeRequestProto p) { - return new ContainerResourceChangeRequestPBImpl(p); + private UpdateContainerRequestPBImpl convertFromProtoFormat( + YarnServiceProtos.UpdateContainerRequestProto p) { + return new UpdateContainerRequestPBImpl(p); } - private ContainerResourceChangeRequestProto convertToProtoFormat( - ContainerResourceChangeRequest t) { - return ((ContainerResourceChangeRequestPBImpl) t).getProto(); + private YarnServiceProtos.UpdateContainerRequestProto convertToProtoFormat( + UpdateContainerRequest t) { + return ((UpdateContainerRequestPBImpl) t).getProto(); } private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java index da87465..d060fa3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java @@ -69,8 +69,7 @@ private List allocatedContainers = null; private List nmTokens = null; private List completedContainersStatuses = null; - private List increasedContainers = null; - private List decreasedContainers = null; + private List updatedContainers = null; private List updatedNodes = null; private PreemptionMessage preempt; @@ -143,17 +142,11 @@ private synchronized void mergeLocalToBuilder() { if (this.preempt != null) { builder.setPreempt(convertToProtoFormat(this.preempt)); } - if (this.increasedContainers != null) { - builder.clearIncreasedContainers(); + if (this.updatedContainers != null) { + builder.clearUpdatedContainers(); Iterable iterable = - getContainerProtoIterable(this.increasedContainers); - builder.addAllIncreasedContainers(iterable); - } - if (this.decreasedContainers != null) { - builder.clearDecreasedContainers(); - Iterable iterable = - getContainerProtoIterable(this.decreasedContainers); - builder.addAllDecreasedContainers(iterable); + getContainerProtoIterable(this.updatedContainers); + builder.addAllUpdatedContainers(iterable); } if (this.amrmToken != null) { builder.setAmRmToken(convertToProtoFormat(this.amrmToken)); @@ -264,33 +257,18 @@ public synchronized void setAllocatedContainers( } @Override - public synchronized List getIncreasedContainers() { - initLocalIncreasedContainerList(); - return this.increasedContainers; + public synchronized List getUpdatedContainers() { + initLocalUpdatedContainerList(); + return this.updatedContainers; } @Override - public synchronized void setIncreasedContainers( + public synchronized void setUpdatedContainers( final List containers) { if (containers == null) return; - initLocalIncreasedContainerList(); - increasedContainers.addAll(containers); - } - - @Override - public synchronized List getDecreasedContainers() { - initLocalDecreasedContainerList(); - return this.decreasedContainers; - } - - @Override - public synchronized void setDecreasedContainers( - final List containers) { - if (containers == null) - return; - initLocalDecreasedContainerList(); - decreasedContainers.addAll(containers); + initLocalUpdatedContainerList(); + updatedContainers.addAll(containers); } //// Finished containers @@ -406,29 +384,16 @@ public synchronized void setApplicationPriority(Priority priority) { this.appPriority = priority; } - private synchronized void initLocalIncreasedContainerList() { - if (this.increasedContainers != null) { - return; - } - AllocateResponseProtoOrBuilder p = viaProto ? proto : builder; - List list = p.getIncreasedContainersList(); - increasedContainers = new ArrayList<>(); - - for (ContainerProto c : list) { - increasedContainers.add(convertFromProtoFormat(c)); - } - } - - private synchronized void initLocalDecreasedContainerList() { - if (this.decreasedContainers != null) { + private synchronized void initLocalUpdatedContainerList() { + if (this.updatedContainers != null) { return; } AllocateResponseProtoOrBuilder p = viaProto ? proto : builder; - List list = p.getDecreasedContainersList(); - decreasedContainers = new ArrayList<>(); + List list = p.getUpdatedContainersList(); + updatedContainers = new ArrayList<>(); for (ContainerProto c : list) { - decreasedContainers.add(convertFromProtoFormat(c)); + updatedContainers.add(convertFromProtoFormat(c)); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java index bd2d937..e9f91c7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java @@ -262,6 +262,18 @@ public void setExecutionType(ExecutionType executionType) { builder.setExecutionType(convertToProtoFormat(executionType)); } + @Override + public int getVersion() { + ContainerProtoOrBuilder p = viaProto ? proto : builder; + return p.getVersion(); + } + + @Override + public void setVersion(int version) { + maybeInitBuilder(); + builder.setVersion(version); + } + private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { return new ContainerIdPBImpl(p); } @@ -315,6 +327,7 @@ public String toString() { StringBuilder sb = new StringBuilder(); sb.append("Container: ["); sb.append("ContainerId: ").append(getId()).append(", "); + sb.append("Version: ").append(getVersion()).append(", "); sb.append("NodeId: ").append(getNodeId()).append(", "); sb.append("NodeHttpAddress: ").append(getNodeHttpAddress()).append(", "); sb.append("Resource: ").append(getResource()).append(", "); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerResourceChangeRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerResourceChangeRequestPBImpl.java deleted file mode 100644 index f382b8c..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerResourceChangeRequestPBImpl.java +++ /dev/null @@ -1,141 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.api.records.impl.pb; - -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; -import org.apache.hadoop.yarn.proto.YarnProtos.ContainerResourceChangeRequestProto; -import org.apache.hadoop.yarn.proto.YarnProtos.ContainerResourceChangeRequestProtoOrBuilder; -import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; - - -public class ContainerResourceChangeRequestPBImpl extends - ContainerResourceChangeRequest { - ContainerResourceChangeRequestProto proto = - ContainerResourceChangeRequestProto.getDefaultInstance(); - ContainerResourceChangeRequestProto.Builder builder = null; - boolean viaProto = false; - - private ContainerId existingContainerId = null; - private Resource targetCapability = null; - - public ContainerResourceChangeRequestPBImpl() { - builder = ContainerResourceChangeRequestProto.newBuilder(); - } - - public ContainerResourceChangeRequestPBImpl( - ContainerResourceChangeRequestProto proto) { - this.proto = proto; - viaProto = true; - } - - public ContainerResourceChangeRequestProto getProto() { - mergeLocalToProto(); - proto = viaProto ? proto : builder.build(); - viaProto = true; - return proto; - } - - @Override - public ContainerId getContainerId() { - ContainerResourceChangeRequestProtoOrBuilder p = viaProto ? proto - : builder; - if (this.existingContainerId != null) { - return this.existingContainerId; - } - if (p.hasContainerId()) { - this.existingContainerId = convertFromProtoFormat(p.getContainerId()); - } - return this.existingContainerId; - } - - @Override - public void setContainerId(ContainerId existingContainerId) { - maybeInitBuilder(); - if (existingContainerId == null) { - builder.clearContainerId(); - } - this.existingContainerId = existingContainerId; - } - - @Override - public Resource getCapability() { - ContainerResourceChangeRequestProtoOrBuilder p = viaProto ? proto - : builder; - if (this.targetCapability != null) { - return this.targetCapability; - } - if (p.hasCapability()) { - this.targetCapability = convertFromProtoFormat(p.getCapability()); - } - return this.targetCapability; - } - - @Override - public void setCapability(Resource targetCapability) { - maybeInitBuilder(); - if (targetCapability == null) { - builder.clearCapability(); - } - this.targetCapability = targetCapability; - } - - private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { - return new ContainerIdPBImpl(p); - } - - private ContainerIdProto convertToProtoFormat(ContainerId t) { - return ((ContainerIdPBImpl) t).getProto(); - } - - private Resource convertFromProtoFormat(ResourceProto p) { - return new ResourcePBImpl(p); - } - - private ResourceProto convertToProtoFormat(Resource t) { - return ((ResourcePBImpl) t).getProto(); - } - - private void mergeLocalToProto() { - if (viaProto) { - maybeInitBuilder(); - } - mergeLocalToBuilder(); - proto = builder.build(); - viaProto = true; - } - - private void maybeInitBuilder() { - if (viaProto || builder == null) { - builder = ContainerResourceChangeRequestProto.newBuilder(proto); - } - viaProto = false; - } - - private void mergeLocalToBuilder() { - if (this.existingContainerId != null) { - builder.setContainerId(convertToProtoFormat(this.existingContainerId)); - } - if (this.targetCapability != null) { - builder.setCapability(convertToProtoFormat(this.targetCapability)); - } - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java index 1a0f30a..16ff397 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; @@ -339,4 +340,17 @@ public static ExecutionTypeRequest convertFromProtoFormat( ExecutionTypeRequestProto e) { return new ExecutionTypeRequestPBImpl(e); } + + /* + * ContainerId + */ + public static ContainerIdPBImpl convertFromProtoFormat(YarnProtos.ContainerIdProto p) { + return new ContainerIdPBImpl(p); + } + + public static YarnProtos.ContainerIdProto convertToProtoFormat(ContainerId t) { + return ((ContainerIdPBImpl) t).getProto(); + } } + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/UpdateContainerRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/UpdateContainerRequestPBImpl.java new file mode 100644 index 0000000..e8ad099 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/UpdateContainerRequestPBImpl.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.impl.pb; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; +import org.apache.hadoop.yarn.proto.YarnServiceProtos; + +/** + * Implementation of UpdateContainerRequest. + */ +public class UpdateContainerRequestPBImpl extends UpdateContainerRequest { + YarnServiceProtos.UpdateContainerRequestProto proto = + YarnServiceProtos.UpdateContainerRequestProto.getDefaultInstance(); + YarnServiceProtos.UpdateContainerRequestProto.Builder builder = null; + boolean viaProto = false; + + private ContainerId existingContainerId = null; + private Resource targetCapability = null; + + public UpdateContainerRequestPBImpl() { + builder = YarnServiceProtos.UpdateContainerRequestProto.newBuilder(); + } + + public UpdateContainerRequestPBImpl(YarnServiceProtos + .UpdateContainerRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public YarnServiceProtos.UpdateContainerRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int getContainerVersion() { + YarnServiceProtos.UpdateContainerRequestProtoOrBuilder p = + viaProto ? proto : builder; + if (!p.hasContainerVersion()) { + return 0; + } + return p.getContainerVersion(); + } + + @Override + public void setContainerVersion(int containerVersion) { + maybeInitBuilder(); + builder.setContainerVersion(containerVersion); + } + + @Override + public ContainerId getContainerId() { + YarnServiceProtos.UpdateContainerRequestProtoOrBuilder p = + viaProto ? proto : builder; + if (this.existingContainerId != null) { + return this.existingContainerId; + } + if (p.hasContainerId()) { + this.existingContainerId = + ProtoUtils.convertFromProtoFormat(p.getContainerId()); + } + return this.existingContainerId; + } + + @Override + public void setContainerId(ContainerId existingContainerId) { + maybeInitBuilder(); + if (existingContainerId == null) { + builder.clearContainerId(); + } + this.existingContainerId = existingContainerId; + } + + @Override + public Resource getCapability() { + YarnServiceProtos.UpdateContainerRequestProtoOrBuilder p = viaProto ? proto + : builder; + if (this.targetCapability != null) { + return this.targetCapability; + } + if (p.hasCapability()) { + this.targetCapability = + ProtoUtils.convertFromProtoFormat(p.getCapability()); + } + return this.targetCapability; + } + + @Override + public void setCapability(Resource targetCapability) { + maybeInitBuilder(); + if (targetCapability == null) { + builder.clearCapability(); + } + this.targetCapability = targetCapability; + } + + @Override + public ExecutionType getExecutionType() { + YarnServiceProtos.UpdateContainerRequestProtoOrBuilder p = + viaProto ? proto : builder; + if (!p.hasExecutionType()) { + return null; + } + return ProtoUtils.convertFromProtoFormat(p.getExecutionType()); + } + + @Override + public void setExecutionType(ExecutionType execType) { + maybeInitBuilder(); + if (execType == null) { + builder.clearExecutionType(); + return; + } + builder.setExecutionType(ProtoUtils.convertToProtoFormat(execType)); + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = YarnServiceProtos.UpdateContainerRequestProto.newBuilder(proto); + } + viaProto = false; + } + + private void mergeLocalToBuilder() { + if (this.existingContainerId != null) { + builder.setContainerId( + ProtoUtils.convertToProtoFormat(this.existingContainerId)); + } + if (this.targetCapability != null) { + builder.setCapability( + ProtoUtils.convertToProtoFormat(this.targetCapability)); + } + } + + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java index 55b1233..ea1e567 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java @@ -119,7 +119,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerReport; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerRetryContext; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; @@ -153,6 +152,7 @@ import org.apache.hadoop.yarn.api.records.StrictPreemptionContract; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.URL; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptReportPBImpl; @@ -164,7 +164,6 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ContainerLaunchContextPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerReportPBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.ContainerResourceChangeRequestPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerRetryContextPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl; @@ -187,6 +186,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.StrictPreemptionContractPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.URLPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.UpdateContainerRequestPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.YarnClusterMetricsPBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptReportProto; @@ -198,7 +198,6 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerReportProto; -import org.apache.hadoop.yarn.proto.YarnProtos.ContainerResourceChangeRequestProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; @@ -244,6 +243,8 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeResponseProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceResponseProto; + +import org.apache.hadoop.yarn.proto.YarnServiceProtos; import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationMasterRequestProto; @@ -484,7 +485,7 @@ public static void setup() throws Exception { generateByNewInstance(ContainerLaunchContext.class); generateByNewInstance(ApplicationSubmissionContext.class); generateByNewInstance(ContainerReport.class); - generateByNewInstance(ContainerResourceChangeRequest.class); + generateByNewInstance(UpdateContainerRequest.class); generateByNewInstance(IncreaseContainersResourceRequest.class); generateByNewInstance(IncreaseContainersResourceResponse.class); generateByNewInstance(ContainerStatus.class); @@ -993,9 +994,9 @@ public void testContainerReportPBImpl() throws Exception { } @Test - public void testContainerResourceChangeRequestPBImpl() throws Exception { - validatePBImplRecord(ContainerResourceChangeRequestPBImpl.class, - ContainerResourceChangeRequestProto.class); + public void testUpdateContainerRequestPBImpl() throws Exception { + validatePBImplRecord(UpdateContainerRequestPBImpl.class, + YarnServiceProtos.UpdateContainerRequestProto.class); } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java index 0652e96..8397009 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java @@ -297,7 +297,6 @@ public AllocateResponse allocate(AllocateRequest request) new ArrayList(), containerList, new ArrayList(), null, AMCommand.AM_RESYNC, 1, null, new ArrayList(), - new ArrayList(), new ArrayList()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 4f90fa0..38d1b3e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.StrictPreemptionContract; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; @@ -87,6 +88,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; + +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; @@ -486,10 +489,24 @@ public AllocateResponse allocate(AllocateRequest request) throw e; } + List increaseResourceReqs = new ArrayList<>(); + List decreaseResourceReqs = new ArrayList<>(); + for (UpdateContainerRequest updateReqs : request.getUpdateRequests()) { + RMContainer rmContainer = rmContext.getScheduler().getRMContainer + (updateReqs.getContainerId()); + Resource original = rmContainer.getContainer().getResource(); + Resource target = updateReqs.getCapability(); + if (Resources.fitsIn(target, original)) { + // This is a decrease request + decreaseResourceReqs.add(updateReqs); + } else { + // This is an increase request + increaseResourceReqs.add(updateReqs); + } + } try { RMServerUtils.increaseDecreaseRequestSanityCheck(rmContext, - request.getIncreaseRequests(), request.getDecreaseRequests(), - maximumCapacity); + increaseResourceReqs, decreaseResourceReqs, maximumCapacity); } catch (InvalidResourceRequestException e) { LOG.warn(e); throw e; @@ -502,7 +519,8 @@ public AllocateResponse allocate(AllocateRequest request) try { RMServerUtils.validateContainerReleaseRequest(release, appAttemptId); } catch (InvalidContainerReleaseException e) { - LOG.warn("Invalid container release by application " + appAttemptId, e); + LOG.warn("Invalid container release by application " + appAttemptId, + e); throw e; } } @@ -521,7 +539,7 @@ public AllocateResponse allocate(AllocateRequest request) allocation = this.rScheduler.allocate(appAttemptId, ask, release, blacklistAdditions, blacklistRemovals, - request.getIncreaseRequests(), request.getDecreaseRequests()); + increaseResourceReqs, decreaseResourceReqs); } if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) { @@ -569,8 +587,14 @@ public AllocateResponse allocate(AllocateRequest request) allocateResponse.setAvailableResources(allocation.getResourceLimit()); // Handling increased/decreased containers - allocateResponse.setIncreasedContainers(allocation.getIncreasedContainers()); - allocateResponse.setDecreasedContainers(allocation.getDecreasedContainers()); + List updatedContainers = new ArrayList<>(); + if (allocation.getIncreasedContainers() != null) { + updatedContainers.addAll(allocation.getIncreasedContainers()); + } + if (allocation.getDecreasedContainers() != null) { + updatedContainers.addAll(allocation.getDecreasedContainers()); + } + allocateResponse.setUpdatedContainers(updatedContainers); allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java index 95fdb05..a39c03d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java @@ -36,12 +36,12 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -200,8 +200,8 @@ public static void validateBlacklistRequest( * - If targetResources violates maximum/minimumAllocation */ public static void increaseDecreaseRequestSanityCheck(RMContext rmContext, - List incRequests, - List decRequests, + List incRequests, + List decRequests, Resource maximumAllocation) throws InvalidResourceRequestException { checkDuplicatedIncreaseDecreaseRequest(incRequests, decRequests); validateIncreaseDecreaseRequest(rmContext, incRequests, maximumAllocation, @@ -211,14 +211,14 @@ public static void increaseDecreaseRequestSanityCheck(RMContext rmContext, } private static void checkDuplicatedIncreaseDecreaseRequest( - List incRequests, - List decRequests) + List incRequests, + List decRequests) throws InvalidResourceRequestException { String msg = "There're multiple increase or decrease container requests " + "for same containerId="; Set existedContainerIds = new HashSet(); if (incRequests != null) { - for (ContainerResourceChangeRequest r : incRequests) { + for (UpdateContainerRequest r : incRequests) { if (!existedContainerIds.add(r.getContainerId())) { throw new InvalidResourceRequestException(msg + r.getContainerId()); } @@ -226,7 +226,7 @@ private static void checkDuplicatedIncreaseDecreaseRequest( } if (decRequests != null) { - for (ContainerResourceChangeRequest r : decRequests) { + for (UpdateContainerRequest r : decRequests) { if (!existedContainerIds.add(r.getContainerId())) { throw new InvalidResourceRequestException(msg + r.getContainerId()); } @@ -236,13 +236,13 @@ private static void checkDuplicatedIncreaseDecreaseRequest( // Sanity check and normalize target resource private static void validateIncreaseDecreaseRequest(RMContext rmContext, - List requests, Resource maximumAllocation, + List requests, Resource maximumAllocation, boolean increase) throws InvalidResourceRequestException { if (requests == null) { return; } - for (ContainerResourceChangeRequest request : requests) { + for (UpdateContainerRequest request : requests) { if (request.getCapability().getMemorySize() < 0 || request.getCapability().getMemorySize() > maximumAllocation .getMemorySize()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 3066339..5be23f1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -40,7 +40,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -48,6 +47,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -556,7 +556,7 @@ protected void releaseContainers(List containers, } protected void decreaseContainers( - List decreaseRequests, + List decreaseRequests, SchedulerApplicationAttempt attempt) { if (null == decreaseRequests || decreaseRequests.isEmpty()) { return; @@ -729,7 +729,7 @@ public synchronized void setClusterMaxPriority(Configuration conf) /** * Sanity check increase/decrease request, and return * SchedulerContainerResourceChangeRequest according to given - * ContainerResourceChangeRequest. + * UpdateContainerRequest. * *
        * - Returns non-null value means validation succeeded
    @@ -737,7 +737,7 @@ public synchronized void setClusterMaxPriority(Configuration conf)
        * 
    */ private SchedContainerChangeRequest createSchedContainerChangeRequest( - ContainerResourceChangeRequest request, boolean increase) + UpdateContainerRequest request, boolean increase) throws YarnException { ContainerId containerId = request.getContainerId(); RMContainer rmContainer = getRMContainer(containerId); @@ -756,11 +756,11 @@ private SchedContainerChangeRequest createSchedContainerChangeRequest( protected List createSchedContainerChangeRequests( - List changeRequests, + List changeRequests, boolean increase) { List schedulerChangeRequests = new ArrayList(); - for (ContainerResourceChangeRequest r : changeRequests) { + for (UpdateContainerRequest r : changeRequests) { SchedContainerChangeRequest sr = null; try { sr = createSchedContainerChangeRequest(r, increase); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedContainerChangeRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedContainerChangeRequest.java index e4ab3a2..94b006c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedContainerChangeRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedContainerChangeRequest.java @@ -27,7 +27,7 @@ import org.apache.hadoop.yarn.util.resource.Resources; /** - * This is ContainerResourceChangeRequest in scheduler side, it contains some + * This is UpdateContainerRequest in scheduler side, it contains some * pointers to runtime objects like RMContainer, SchedulerNode, etc. This will * be easier for scheduler making decision. */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index 0aff669..c4f575f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -35,7 +35,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; @@ -43,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -143,8 +143,8 @@ public QueueInfo getQueueInfo(String queueName, boolean includeChildQueues, Allocation allocate(ApplicationAttemptId appAttemptId, List ask, List release, List blacklistAdditions, List blacklistRemovals, - List increaseRequests, - List decreaseRequests); + List increaseRequests, + List decreaseRequests); /** * Get node resource usage report. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index ee62a70..111b192 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -51,7 +51,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; @@ -63,6 +62,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -916,7 +916,7 @@ private synchronized void doneApplicationAttempt( // SchedContainerChangeRequest // 2. Deadlock with the scheduling thread. private LeafQueue updateIncreaseRequests( - List increaseRequests, + List increaseRequests, FiCaSchedulerApp app) { if (null == increaseRequests || increaseRequests.isEmpty()) { return null; @@ -944,8 +944,8 @@ private LeafQueue updateIncreaseRequests( public Allocation allocate(ApplicationAttemptId applicationAttemptId, List ask, List release, List blacklistAdditions, List blacklistRemovals, - List increaseRequests, - List decreaseRequests) { + List increaseRequests, + List decreaseRequests) { FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId); if (application == null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index bc953ba..49e5afb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -38,7 +38,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; @@ -49,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -941,8 +941,8 @@ private synchronized void removeNode(RMNode rmNode) { public Allocation allocate(ApplicationAttemptId appAttemptId, List ask, List release, List blacklistAdditions, List blacklistRemovals, - List increaseRequests, - List decreaseRequests) { + List increaseRequests, + List decreaseRequests) { // Make sure this application exists FSAppAttempt application = getSchedulerApp(appAttemptId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index eaab495..3532386 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -40,7 +40,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; @@ -52,6 +51,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -329,8 +329,8 @@ public synchronized void setRMContext(RMContext rmContext) { public Allocation allocate(ApplicationAttemptId applicationAttemptId, List ask, List release, List blacklistAdditions, List blacklistRemovals, - List increaseRequests, - List decreaseRequests) { + List increaseRequests, + List decreaseRequests) { FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId); if (application == null) { LOG.error("Calling allocate on removed " + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index f2b4ea7..1fe210f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -34,11 +34,11 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -225,10 +225,18 @@ public AllocateResponse allocate( } public AllocateResponse sendContainerResizingRequest( - List increaseRequests, - List decreaseRequests) throws Exception { + List increaseRequests, + List decreaseRequests) throws Exception { + ArrayList updateContainerRequests = + new ArrayList<>(); + if (increaseRequests != null) { + updateContainerRequests.addAll(increaseRequests); + } + if (decreaseRequests != null) { + updateContainerRequests.addAll(decreaseRequests); + } final AllocateRequest req = AllocateRequest.newInstance(0, 0F, null, null, - null, increaseRequests, decreaseRequests); + null, updateContainerRequests); return allocate(req); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java index 64673d2..b929ea7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java @@ -35,10 +35,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; @@ -383,17 +383,17 @@ public void testInvalidIncreaseDecreaseRequest() throws Exception { // Ask for a normal increase should be successfull am1.sendContainerResizingRequest(Arrays.asList( - ContainerResourceChangeRequest.newInstance( - ContainerId.newContainerId(attempt1.getAppAttemptId(), 1), - Resources.createResource(2048))), null); + UpdateContainerRequest.newInstance( + 0, ContainerId.newContainerId(attempt1.getAppAttemptId(), 1), + Resources.createResource(2048), null)), null); // Target resource is negative, should fail boolean exceptionCaught = false; try { am1.sendContainerResizingRequest(Arrays.asList( - ContainerResourceChangeRequest.newInstance( - ContainerId.newContainerId(attempt1.getAppAttemptId(), 1), - Resources.createResource(-1))), null); + UpdateContainerRequest.newInstance( + 0, ContainerId.newContainerId(attempt1.getAppAttemptId(), 1), + Resources.createResource(-1), null)), null); } catch (InvalidResourceRequestException e) { // This is expected exceptionCaught = true; @@ -403,11 +403,10 @@ public void testInvalidIncreaseDecreaseRequest() throws Exception { // Target resource is more than maxAllocation, should fail try { am1.sendContainerResizingRequest(Arrays.asList( - ContainerResourceChangeRequest.newInstance( - ContainerId.newContainerId(attempt1.getAppAttemptId(), 1), - Resources - .add(registerResponse.getMaximumResourceCapability(), - Resources.createResource(1)))), null); + UpdateContainerRequest.newInstance( + 0, ContainerId.newContainerId(attempt1.getAppAttemptId(), 1), + Resources.add(registerResponse.getMaximumResourceCapability(), + Resources.createResource(1)), null)), null); } catch (InvalidResourceRequestException e) { // This is expected exceptionCaught = true; @@ -418,16 +417,16 @@ public void testInvalidIncreaseDecreaseRequest() throws Exception { // Contains multiple increase/decrease requests for same contaienrId try { am1.sendContainerResizingRequest(Arrays.asList( - ContainerResourceChangeRequest.newInstance( - ContainerId.newContainerId(attempt1.getAppAttemptId(), 1), + UpdateContainerRequest.newInstance( + 0, ContainerId.newContainerId(attempt1.getAppAttemptId(), 1), Resources .add(registerResponse.getMaximumResourceCapability(), - Resources.createResource(1)))), Arrays.asList( - ContainerResourceChangeRequest.newInstance( - ContainerId.newContainerId(attempt1.getAppAttemptId(), 1), + Resources.createResource(1)), null)), Arrays.asList( + UpdateContainerRequest.newInstance( + 0, ContainerId.newContainerId(attempt1.getAppAttemptId(), 1), Resources .add(registerResponse.getMaximumResourceCapability(), - Resources.createResource(1))))); + Resources.createResource(1)), null))); } catch (InvalidResourceRequestException e) { // This is expected exceptionCaught = true; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 7c34292..83a71d0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -58,7 +58,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -70,6 +69,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; @@ -3176,7 +3176,7 @@ protected RMNodeLabelsManager createNodeLabelManager() { // am1 asks to change its AM container from 1GB to 3GB am1.sendContainerResizingRequest(Arrays.asList( - ContainerResourceChangeRequest + UpdateContainerRequest .newInstance(containerId1, Resources.createResource(3 * GB))), null); @@ -3190,9 +3190,9 @@ protected RMNodeLabelsManager createNodeLabelManager() { // am1 asks to change containerId2 (2G -> 3G) and containerId3 (2G -> 5G) am1.sendContainerResizingRequest(Arrays.asList( - ContainerResourceChangeRequest + UpdateContainerRequest .newInstance(containerId2, Resources.createResource(3 * GB)), - ContainerResourceChangeRequest + UpdateContainerRequest .newInstance(containerId3, Resources.createResource(5 * GB))), null); @@ -3205,11 +3205,11 @@ protected RMNodeLabelsManager createNodeLabelManager() { // am1 asks to change containerId1 (1G->3G), containerId2 (2G -> 4G) and // containerId3 (2G -> 2G) am1.sendContainerResizingRequest(Arrays.asList( - ContainerResourceChangeRequest + UpdateContainerRequest .newInstance(containerId1, Resources.createResource(3 * GB)), - ContainerResourceChangeRequest + UpdateContainerRequest .newInstance(containerId2, Resources.createResource(4 * GB)), - ContainerResourceChangeRequest + UpdateContainerRequest .newInstance(containerId3, Resources.createResource(2 * GB))), null); Assert.assertEquals(4 * GB, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java index 499e041..5d5bdcd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java @@ -30,10 +30,10 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; @@ -84,7 +84,7 @@ @Override protected void decreaseContainers( - List decreaseRequests, + List decreaseRequests, SchedulerApplicationAttempt attempt) { try { Thread.sleep(1000); @@ -138,8 +138,9 @@ public RMNodeLabelsManager createNodeLabelManager() { sentRMContainerLaunched(rm1, containerId1); // am1 asks to change its AM container from 1GB to 3GB am1.sendContainerResizingRequest(Arrays.asList( - ContainerResourceChangeRequest - .newInstance(containerId1, Resources.createResource(3 * GB))), + UpdateContainerRequest + .newInstance(0, containerId1, Resources.createResource(3 * GB), + null)), null); FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp( @@ -196,8 +197,9 @@ public RMNodeLabelsManager createNodeLabelManager() { // am1 asks to change its AM container from 1GB to 3GB AllocateResponse response = am1.sendContainerResizingRequest(null, Arrays - .asList(ContainerResourceChangeRequest - .newInstance(containerId1, Resources.createResource(1 * GB)))); + .asList(UpdateContainerRequest + .newInstance(0, containerId1, Resources.createResource(1 * GB), + null))); verifyContainerDecreased(response, containerId1, 1 * GB); checkUsedResource(rm1, "default", 1 * GB, null); @@ -266,8 +268,9 @@ public RMNodeLabelsManager createNodeLabelManager() { // am1 asks to change its AM container from 1GB to 3GB am1.sendContainerResizingRequest(Arrays.asList( - ContainerResourceChangeRequest - .newInstance(containerId1, Resources.createResource(7 * GB))), + UpdateContainerRequest + .newInstance(0, containerId1, Resources.createResource(7 * GB), + null)), null); checkPendingResource(rm1, "default", 6 * GB, null); @@ -367,8 +370,9 @@ public RMNodeLabelsManager createNodeLabelManager() { // am1 asks to change container2 from 2GB to 8GB, which will exceed user // limit am1.sendContainerResizingRequest(Arrays.asList( - ContainerResourceChangeRequest - .newInstance(containerId2, Resources.createResource(8 * GB))), + UpdateContainerRequest + .newInstance(0, containerId2, Resources.createResource(8 * GB), + null)), null); checkPendingResource(rm1, "default", 6 * GB, null); @@ -447,8 +451,9 @@ public RMNodeLabelsManager createNodeLabelManager() { // am1 asks to change its AM container from 1GB to 3GB am1.sendContainerResizingRequest(Arrays.asList( - ContainerResourceChangeRequest - .newInstance(containerId1, Resources.createResource(7 * GB))), + UpdateContainerRequest + .newInstance(0, containerId1, Resources.createResource(7 * GB), + null)), null); checkPendingResource(rm1, "default", 6 * GB, null); @@ -487,8 +492,9 @@ public RMNodeLabelsManager createNodeLabelManager() { // am1 asks to change its AM container from 1G to 1G (cancel the increase // request actually) am1.sendContainerResizingRequest(Arrays.asList( - ContainerResourceChangeRequest - .newInstance(containerId1, Resources.createResource(1 * GB))), + UpdateContainerRequest + .newInstance(0, containerId1, Resources.createResource(1 * GB), + null)), null); // Trigger a node heartbeat.. cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); @@ -559,8 +565,9 @@ public RMNodeLabelsManager createNodeLabelManager() { // am1 asks to change its AM container from 2GB to 8GB am1.sendContainerResizingRequest(Arrays.asList( - ContainerResourceChangeRequest - .newInstance(containerId1, Resources.createResource(8 * GB))), + UpdateContainerRequest + .newInstance(0, containerId1, Resources.createResource(8 * GB), + null)), null); checkPendingResource(rm1, "default", 6 * GB, null); @@ -598,8 +605,9 @@ public RMNodeLabelsManager createNodeLabelManager() { am1.allocate(null, Arrays.asList(containerId2)); // am1 asks to change its AM container from 2G to 1G (decrease) am1.sendContainerResizingRequest(null, Arrays.asList( - ContainerResourceChangeRequest - .newInstance(containerId1, Resources.createResource(1 * GB)))); + UpdateContainerRequest + .newInstance(0, containerId1, Resources.createResource(1 * GB), + null))); // Trigger a node heartbeat.. cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); @@ -668,8 +676,9 @@ public RMNodeLabelsManager createNodeLabelManager() { // am1 asks to change its AM container from 2GB to 8GB am1.sendContainerResizingRequest(Arrays.asList( - ContainerResourceChangeRequest - .newInstance(containerId2, Resources.createResource(8 * GB))), + UpdateContainerRequest + .newInstance(0, containerId2, Resources.createResource(8 * GB), + null)), null); checkPendingResource(rm1, "default", 6 * GB, null); @@ -765,8 +774,9 @@ public RMNodeLabelsManager createNodeLabelManager() { // am1 asks to change its AM container from 2GB to 8GB am1.sendContainerResizingRequest(Arrays.asList( - ContainerResourceChangeRequest - .newInstance(containerId2, Resources.createResource(8 * GB))), + UpdateContainerRequest + .newInstance(0, containerId2, Resources.createResource(8 * GB), + null)), null); checkPendingResource(rm1, "default", 6 * GB, null); @@ -883,12 +893,13 @@ public RMNodeLabelsManager createNodeLabelManager() { allocateAndLaunchContainers(am1, nm1, rm1, 2, 1 * GB, 4, 6); // am1 asks to change its container[2-7] from 1G to 2G - List increaseRequests = new ArrayList<>(); + List increaseRequests = new ArrayList<>(); for (int cId = 2; cId <= 7; cId++) { ContainerId containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), cId); - increaseRequests.add(ContainerResourceChangeRequest - .newInstance(containerId, Resources.createResource(2 * GB))); + increaseRequests.add(UpdateContainerRequest + .newInstance(0, containerId, Resources.createResource(2 * GB), + null)); } am1.sendContainerResizingRequest(increaseRequests, null); @@ -904,7 +915,7 @@ public RMNodeLabelsManager createNodeLabelManager() { // earlier allocated) cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); AllocateResponse allocateResponse = am1.allocate(null, null); - Assert.assertEquals(3, allocateResponse.getIncreasedContainers().size()); + Assert.assertEquals(3, allocateResponse.getUpdatedContainers().size()); verifyContainerIncreased(allocateResponse, ContainerId.newContainerId(attemptId, 4), 2 * GB); verifyContainerIncreased(allocateResponse, @@ -964,12 +975,12 @@ public RMNodeLabelsManager createNodeLabelManager() { allocateAndLaunchContainers(am1, nm1, rm1, 2, 1 * GB, 4, 6); // am1 asks to change its container[2-7] from 1G to 2G - List increaseRequests = new ArrayList<>(); + List increaseRequests = new ArrayList<>(); for (int cId = 2; cId <= 7; cId++) { ContainerId containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), cId); - increaseRequests.add(ContainerResourceChangeRequest - .newInstance(containerId, Resources.createResource(2 * GB))); + increaseRequests.add(UpdateContainerRequest + .newInstance(0, containerId, Resources.createResource(2 * GB), null)); } am1.sendContainerResizingRequest(increaseRequests, null); @@ -985,7 +996,7 @@ public RMNodeLabelsManager createNodeLabelManager() { // earlier allocated) cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); AllocateResponse allocateResponse = am1.allocate(null, null); - Assert.assertEquals(3, allocateResponse.getIncreasedContainers().size()); + Assert.assertEquals(3, allocateResponse.getUpdatedContainers().size()); verifyContainerIncreased(allocateResponse, ContainerId.newContainerId(attemptId, 4), 2 * GB); verifyContainerIncreased(allocateResponse, @@ -1047,8 +1058,8 @@ public ResourceScheduler createScheduler() { // *In the mean time*, am1 asks to decrease its AM container resource from // 3GB to 1GB AllocateResponse response = am1.sendContainerResizingRequest(null, - Collections.singletonList(ContainerResourceChangeRequest - .newInstance(containerId1, Resources.createResource(GB)))); + Collections.singletonList(UpdateContainerRequest + .newInstance(0, containerId1, Resources.createResource(GB), null))); // verify that the containe resource is decreased verifyContainerDecreased(response, containerId1, GB); @@ -1077,7 +1088,7 @@ private void checkUsedResource(MockRM rm, String queueName, int memory, private void verifyContainerIncreased(AllocateResponse response, ContainerId containerId, int mem) { - List increasedContainers = response.getIncreasedContainers(); + List increasedContainers = response.getUpdatedContainers(); boolean found = false; for (Container c : increasedContainers) { if (c.getId().equals(containerId)) { @@ -1092,7 +1103,7 @@ private void verifyContainerIncreased(AllocateResponse response, private void verifyContainerDecreased(AllocateResponse response, ContainerId containerId, int mem) { - List decreasedContainers = response.getDecreasedContainers(); + List decreasedContainers = response.getUpdatedContainers(); boolean found = false; for (Container c : decreasedContainers) { if (c.getId().equals(containerId)) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java index d388172..438e709 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java @@ -21,10 +21,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; @@ -109,7 +109,7 @@ public void testContainerIsRemovedFromAllocationExpirer() rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING); // am1 asks to increase containerId2 from 1GB to 3GB am1.sendContainerResizingRequest(Collections.singletonList( - ContainerResourceChangeRequest.newInstance( + UpdateContainerRequest.newInstance( containerId2, Resources.createResource(3 * GB))), null); // Kick off scheduling and sleep for 1 second; nm1.nodeHeartbeat(true); @@ -180,7 +180,7 @@ public void testContainerIncreaseAllocationExpiration() rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING); // am1 asks to increase containerId2 from 1GB to 3GB am1.sendContainerResizingRequest(Collections.singletonList( - ContainerResourceChangeRequest.newInstance( + UpdateContainerRequest.newInstance( containerId2, Resources.createResource(3 * GB))), null); // Kick off scheduling and wait for 1 second; nm1.nodeHeartbeat(true); @@ -249,7 +249,7 @@ public void testConsecutiveContainerIncreaseAllocationExpiration() rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING); // am1 asks to change containerId2 from 1GB to 3GB am1.sendContainerResizingRequest(Collections.singletonList( - ContainerResourceChangeRequest.newInstance( + UpdateContainerRequest.newInstance( containerId2, Resources.createResource(3 * GB))), null); // Kick off scheduling and sleep for 1 second to // make sure the allocation is done @@ -263,7 +263,7 @@ public void testConsecutiveContainerIncreaseAllocationExpiration() .getAllocatedResource()); // am1 asks to change containerId2 from 3GB to 5GB am1.sendContainerResizingRequest(Collections.singletonList( - ContainerResourceChangeRequest.newInstance( + UpdateContainerRequest.newInstance( containerId2, Resources.createResource(5 * GB))), null); // Kick off scheduling and sleep for 1 second to // make sure the allocation is done @@ -362,12 +362,12 @@ public void testDecreaseAfterIncreaseWithAllocationExpiration() rm1.waitForState(nm1, containerId3, RMContainerState.RUNNING); rm1.waitForState(nm1, containerId4, RMContainerState.RUNNING); // am1 asks to change containerId2 and containerId3 from 1GB to 3GB - List increaseRequests = new ArrayList<>(); - increaseRequests.add(ContainerResourceChangeRequest.newInstance( + List increaseRequests = new ArrayList<>(); + increaseRequests.add(UpdateContainerRequest.newInstance( containerId2, Resources.createResource(6 * GB))); - increaseRequests.add(ContainerResourceChangeRequest.newInstance( + increaseRequests.add(UpdateContainerRequest.newInstance( containerId3, Resources.createResource(6 * GB))); - increaseRequests.add(ContainerResourceChangeRequest.newInstance( + increaseRequests.add(UpdateContainerRequest.newInstance( containerId4, Resources.createResource(6 * GB))); am1.sendContainerResizingRequest(increaseRequests, null); nm1.nodeHeartbeat(true); @@ -375,17 +375,17 @@ public void testDecreaseAfterIncreaseWithAllocationExpiration() // Start container increase allocation expirer am1.allocate(null, null); // Decrease containers - List decreaseRequests = new ArrayList<>(); - decreaseRequests.add(ContainerResourceChangeRequest.newInstance( + List decreaseRequests = new ArrayList<>(); + decreaseRequests.add(UpdateContainerRequest.newInstance( containerId2, Resources.createResource(2 * GB))); - decreaseRequests.add(ContainerResourceChangeRequest.newInstance( + decreaseRequests.add(UpdateContainerRequest.newInstance( containerId3, Resources.createResource(4 * GB))); - decreaseRequests.add(ContainerResourceChangeRequest.newInstance( + decreaseRequests.add(UpdateContainerRequest.newInstance( containerId4, Resources.createResource(4 * GB))); AllocateResponse response = am1.sendContainerResizingRequest(null, decreaseRequests); // Verify containers are decreased in scheduler - Assert.assertEquals(3, response.getDecreasedContainers().size()); + Assert.assertEquals(3, response.getUpdatedContainers().size()); // Use the token for containerId4 on NM (6G). This should set the last // confirmed resource to 4G, and cancel the allocation expirer nm1.containerIncreaseStatus(getContainer(