diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/SchedulerInvalidResoureRequestException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/SchedulerInvalidResoureRequestException.java new file mode 100644 index 00000000000..2d2562450fb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/SchedulerInvalidResoureRequestException.java @@ -0,0 +1,29 @@ +package org.apache.hadoop.yarn.exceptions; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * This exception is thrown when any issue inside scheduler to handle a new or + * updated {@link org.apache.hadoop.yarn.api.records.SchedulingRequest}/ + * {@link org.apache.hadoop.yarn.api.records.ResourceRequest} add to the + * scheduler. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class SchedulerInvalidResoureRequestException extends YarnRuntimeException { + private static final long serialVersionUID = 10081123982L; + + public SchedulerInvalidResoureRequestException(String message) { + super(message); + } + + public SchedulerInvalidResoureRequestException(Throwable cause) { + super(cause); + } + + public SchedulerInvalidResoureRequestException(String message, + Throwable cause) { + super(message, cause); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java index c4f37f62649..0fce083b5ef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java @@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; /** @@ -45,6 +46,16 @@ public static SchedulerRequestKey create(ResourceRequest req) { req.getAllocationRequestId(), null); } + /** + * Factory method to generate a SchedulerRequestKey from a SchedulingRequest. + * @param req SchedulingRequest + * @return SchedulerRequestKey + */ + public static SchedulerRequestKey create(SchedulingRequest req) { + return new SchedulerRequestKey(req.getPriority(), + req.getAllocationRequestId(), null); + } + public static SchedulerRequestKey create(UpdateContainerRequest req, SchedulerRequestKey schedulerRequestKey) { return new SchedulerRequestKey(schedulerRequestKey.getPriority(), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsNamespaces.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementTagsConstants.java similarity index 76% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsNamespaces.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementTagsConstants.java index 893ff1cc377..60296a9dae1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsNamespaces.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementTagsConstants.java @@ -18,7 +18,10 @@ * / */ -package org.apache.hadoop.yarn.server.resourcemanager.constraint; +package org.apache.hadoop.yarn.api.resource; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; /** * Predefined namespaces for tags @@ -26,6 +29,9 @@ * Same as namespace of resource types. Namespaces of placement tags are start * with alphabets and ended with "/" */ -public class AllocationTagsNamespaces { +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class PlacementTagsConstants { public static final String APP_ID = "yarn_app_id/"; + public static final String NODE_PARTITION_KEY = "yarn_node_partition"; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java index 713947fe5b7..11047036757 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java @@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException; import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; +import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -273,10 +274,13 @@ public void allocate(ApplicationAttemptId appAttemptId, " state, ignore container allocate request."); allocation = EMPTY_ALLOCATION; } else { - allocation = - getScheduler().allocate(appAttemptId, ask, release, - blacklistAdditions, blacklistRemovals, - containerUpdateRequests); + try { + allocation = getScheduler().allocate(appAttemptId, ask, release, + blacklistAdditions, blacklistRemovals, containerUpdateRequests); + } catch (SchedulerInvalidResoureRequestException e) { + LOG.warn("Exceptions caught when scheduler handling requests"); + throw new YarnException(e); + } } if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsManager.java index b67fab9bcc7..01645cc02b4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsManager.java @@ -24,6 +24,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.resource.PlacementTagsConstants; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; @@ -231,7 +232,7 @@ public AllocationTagsManager() { public void addContainer(NodeId nodeId, ApplicationId applicationId, ContainerId containerId, Set allocationTags) { String applicationIdTag = - AllocationTagsNamespaces.APP_ID + applicationId.toString(); + PlacementTagsConstants.APP_ID + applicationId.toString(); boolean useSet = false; if (allocationTags != null && !allocationTags.isEmpty()) { @@ -275,7 +276,7 @@ public void addContainer(NodeId nodeId, ApplicationId applicationId, public void removeContainer(NodeId nodeId, ApplicationId applicationId, ContainerId containerId, Set allocationTags) { String applicationIdTag = - AllocationTagsNamespaces.APP_ID + applicationId.toString(); + PlacementTagsConstants.APP_ID + applicationId.toString(); boolean useSet = false; if (allocationTags != null && !allocationTags.isEmpty()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index e47f0c1d614..68bf6ae6210 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; @@ -160,58 +161,112 @@ public ContainerUpdateContext getUpdateContext() { * application, by asking for more resources and releasing resources acquired * by the application. * - * @param requests - * resources to be acquired + * @param resourceRequests resource requests to be allocated + * @param schedulingRequests scheduling requests to be allocated * @param recoverPreemptedRequestForAContainer - * recover ResourceRequest on preemption + * recover ResourceRequest/SchedulingRequest on preemption * @return true if any resource was updated, false otherwise */ - public boolean updateResourceRequests(List requests, + public boolean updateResourceDemands(List resourceRequests, + List schedulingRequests, boolean recoverPreemptedRequestForAContainer) { - if (null == requests || requests.isEmpty()) { - return false; - } - // Flag to track if any incoming requests update "ANY" requests - boolean offswitchResourcesUpdated = false; + boolean offswitchResourcesUpdated; + writeLock.lock(); try { - this.writeLock.lock(); - - // A map to group resource requests and dedup - Map> dedupRequests = - new HashMap<>(); - - // Group resource request by schedulerRequestKey and resourceName - for (ResourceRequest request : requests) { - SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request); - if (!dedupRequests.containsKey(schedulerKey)) { - dedupRequests.put(schedulerKey, new HashMap<>()); - } - dedupRequests.get(schedulerKey).put(request.getResourceName(), request); - } - - // Update AppPlacementAllocator by dedup requests. - offswitchResourcesUpdated = - addRequestToAppPlacement( - recoverPreemptedRequestForAContainer, dedupRequests); - - return offswitchResourcesUpdated; + // Update AppPlacementAllocator by requests + offswitchResourcesUpdated = addResourceRequests( + recoverPreemptedRequestForAContainer, resourceRequests) + && addSchedulingRequests(recoverPreemptedRequestForAContainer, + schedulingRequests); } finally { - this.writeLock.unlock(); + writeLock.unlock(); } + + return offswitchResourcesUpdated; } public void removeAppPlacement(SchedulerRequestKey schedulerRequestKey) { schedulerKeyToAppPlacementAllocator.remove(schedulerRequestKey); } - boolean addRequestToAppPlacement( + private boolean addSchedulingRequests( boolean recoverPreemptedRequestForAContainer, - Map> dedupRequests) { + List schedulingRequests) { + // Do we need to update pending resource for app/queue, etc.? + boolean requireUpdatePendingResource = false; + + Map schedulingRequestMap = + new HashMap<>(); + for (SchedulingRequest req : schedulingRequests) { + schedulingRequestMap.put(SchedulerRequestKey.create(req), req); + } + + for (Map.Entry entry : schedulingRequestMap + .entrySet()) { + SchedulerRequestKey schedulerRequestKey = entry.getKey(); + SchedulingRequest schedulingRequest = entry.getValue(); + + AppPlacementAllocator appPlacementAllocator = getAppPlacementAllocator( + schedulerRequestKey, schedulingRequest); + + // Update AppPlacementAllocator + PendingAskUpdateResult pendingAmountChanges = + appPlacementAllocator.updatePendingAsk(entry.getValue(), + recoverPreemptedRequestForAContainer); + + if (null != pendingAmountChanges) { + updatePendingResources(pendingAmountChanges, schedulerRequestKey, + queue.getMetrics()); + requireUpdatePendingResource = true; + } + } + + return requireUpdatePendingResource; + } + + /* + * Get app placement allocator according to scheduler request key and request, + * Throw exception if incompatible request specified. For example, if + * a SchedulingRequest made against a scheduler request key which associated + * with a ResourceRequest. + */ + private AppPlacementAllocator getAppPlacementAllocator( + SchedulerRequestKey schedulerRequestKey, Object request) { + AppPlacementAllocator allocator = schedulerKeyToAppPlacementAllocator.get(schedulerRequestKey); + if (allocator == null) { + schedulerKeyToAppPlacementAllocator.put(schedulerRequestKey, + // FIXME, use factory to get the allocator. + request instanceof ResourceRequest ? + new LocalityAppPlacementAllocator<>(this) : + new /* other placement allocator*/); + } + return allocator; + } + + boolean addResourceRequests(boolean recoverPreemptedRequestForAContainer, + List resourceRequests) { + if (null == resourceRequests || resourceRequests.isEmpty()) { + return false; + } + + // A map to group resource requests and dedup + Map> dedupRequests = + new HashMap<>(); + + // Group resource request by schedulerRequestKey and resourceName + for (ResourceRequest request : resourceRequests) { + SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request); + if (!dedupRequests.containsKey(schedulerKey)) { + dedupRequests.put(schedulerKey, new HashMap<>()); + } + dedupRequests.get(schedulerKey).put(request.getResourceName(), request); + } + boolean offswitchResourcesUpdated = false; - for (Map.Entry> entry : - dedupRequests.entrySet()) { + for (Map.Entry> entry : dedupRequests + .entrySet()) { SchedulerRequestKey schedulerRequestKey = entry.getKey(); if (!schedulerKeyToAppPlacementAllocator.containsKey( @@ -220,15 +275,16 @@ boolean addRequestToAppPlacement( new LocalityAppPlacementAllocator<>(this)); } + AppPlacementAllocator appPlacementAllocator = getAppPlacementAllocator( + schedulerRequestKey, resourceRequests.get(0)); + // Update AppPlacementAllocator PendingAskUpdateResult pendingAmountChanges = - schedulerKeyToAppPlacementAllocator.get(schedulerRequestKey) - .updatePendingAsk(entry.getValue().values(), - recoverPreemptedRequestForAContainer); + appPlacementAllocator.updatePendingAsk(entry.getValue().values(), + recoverPreemptedRequestForAContainer); if (null != pendingAmountChanges) { - updatePendingResources( - pendingAmountChanges, schedulerRequestKey, + updatePendingResources(pendingAmountChanges, schedulerRequestKey, queue.getMetrics()); offswitchResourcesUpdated = true; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java index f410db12f32..c70c218ee27 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java @@ -146,7 +146,7 @@ public synchronized boolean checkAndAddToOutstandingIncreases( createResourceRequests(rmContainer, schedulerNode, schedulerKey, resToIncrease); updateResReqs.put(schedulerKey, resMap); - appSchedulingInfo.addRequestToAppPlacement(false, updateResReqs); + appSchedulingInfo.addResourceRequests(false, updateResReqs); } return true; } @@ -290,7 +290,7 @@ public ContainerId matchContainerToOutstandingIncreaseReq( (rmContainer, node, schedulerKey, rmContainer.getContainer().getResource()); reqsToUpdate.put(schedulerKey, resMap); - appSchedulingInfo.addRequestToAppPlacement(true, reqsToUpdate); + appSchedulingInfo.addResourceRequests(true, reqsToUpdate); return UNDEFINED; } return retVal; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index dfb0e67fcd9..ccc19c41b33 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -441,7 +441,7 @@ public boolean updateResourceRequests( try { writeLock.lock(); if (!isStopped) { - return appSchedulingInfo.updateResourceRequests(requests, false); + return appSchedulingInfo.updateResourceDemands(requests, null, false); } return false; } finally { @@ -454,8 +454,8 @@ public void recoverResourceRequestsForContainer( try { writeLock.lock(); if (!isStopped) { - appSchedulingInfo.updateResourceRequests( - containerRequest.getResourceRequests(), true); + appSchedulingInfo.updateResourceDemands( + containerRequest.getResourceRequests(), null, true); } } finally { writeLock.unlock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 8de363140fb..2c9ffdd2d65 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1003,6 +1003,8 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, // Sanity check for new allocation requests normalizeRequests(ask); + // FIXME: normalizeSchedulingRequests + Allocation allocation; // make sure we aren't stopping/removing the application diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java index 85d8715bb2a..2ed3e837188 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceSizing; import org.apache.hadoop.yarn.util.resource.Resources; /** @@ -31,6 +32,11 @@ private final int count; public final static PendingAsk ZERO = new PendingAsk(Resources.none(), 0); + public PendingAsk(ResourceSizing sizing) { + this.perAllocationResource = sizing.getResources(); + this.count = sizing.getNumAllocations(); + } + public PendingAsk(Resource res, int num) { this.perAllocationResource = res; this.count = num; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java index dcb38aa0054..af138202aaf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; @@ -70,6 +71,20 @@ PendingAskUpdateResult updatePendingAsk( boolean recoverPreemptedRequestForAContainer); /** + * Replace existing pending asks by the new SchedulingRequest + * + * @param schedulerRequestKey scheduler request key + * @param schedulingRequest new asks + * @param recoverPreemptedRequestForAContainer if we're recovering resource + * requests for preempted container + * @return true if total pending resource changed + */ + PendingAskUpdateResult updatePendingAsk( + SchedulerRequestKey schedulerRequestKey, + SchedulingRequest schedulingRequest, + boolean recoverPreemptedRequestForAContainer); + + /** * Get pending ResourceRequests by given schedulerRequestKey * @return Map of resourceName to ResourceRequest */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java index 766827ca300..f4f62df0a33 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java @@ -22,6 +22,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; @@ -176,6 +178,19 @@ public PendingAskUpdateResult updatePendingAsk( } @Override + public PendingAskUpdateResult updatePendingAsk( + SchedulerRequestKey schedulerRequestKey, + SchedulingRequest schedulingRequest, + boolean recoverPreemptedRequestForAContainer) + throws SchedulerInvalidResoureRequestException { + throw new SchedulerInvalidResoureRequestException(this.getClass().getName() + + " not be able to handle SchedulingRequest, there exists a " + + "ResourceRequest with the same scheduler key=" + schedulerRequestKey + + ", please send SchedulingRequest with a different allocationId and " + + "priority"); + } + + @Override public Map getResourceRequests() { return resourceRequestMap; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java new file mode 100644 index 00000000000..917b66052c5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java @@ -0,0 +1,407 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import org.apache.commons.collections.IteratorUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.ResourceSizing; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; +import org.apache.hadoop.yarn.api.resource.PlacementConstraints; +import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static org.apache.hadoop.yarn.api.resource.PlacementTagsConstants.APP_ID; +import static org.apache.hadoop.yarn.api.resource.PlacementTagsConstants.NODE_PARTITION_KEY; + +/** + * This is a simple implementation to do affinity or anti-affinity for + * inter/intra apps. + * + * For now we limit it to handle node scope anti-affinity for intra-app only + * because of test bandwidth limitation. See validateAndSetSchedulingRequest. + */ +public class SingleConstraintAppPlacementAllocator + implements AppPlacementAllocator { + private static final Log LOG = + LogFactory.getLog(SingleConstraintAppPlacementAllocator.class); + + private ReentrantReadWriteLock.ReadLock readLock; + private ReentrantReadWriteLock.WriteLock writeLock; + private SchedulingRequest schedulingRequest = null; + private AppSchedulingInfo appSchedulingInfo; + private String targetNodePartition; + private Set targetAllocationTags; + private SchedulerRequestKey schedulerRequestKey; + + public SingleConstraintAppPlacementAllocator( + SchedulerRequestKey schedulerRequestKey, AppSchedulingInfo info) { + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); + this.appSchedulingInfo = info; + this.schedulerRequestKey = schedulerRequestKey; + } + + @Override + @SuppressWarnings("unchecked") + public Iterator getPreferredNodeIterator( + CandidateNodeSet candidateNodeSet) { + // Now only handle the case that single node in the candidateNodeSet + // TODO, Add support to multi-hosts inside candidateNodeSet which is passed + // in. + + N singleNode = CandidateNodeSetUtils.getSingleNode(candidateNodeSet); + if (null != singleNode) { + return IteratorUtils.singletonIterator(singleNode); + } + + return IteratorUtils.emptyIterator(); } + + @Override + public PendingAskUpdateResult updatePendingAsk( + Collection requests, + boolean recoverPreemptedRequestForAContainer) { + if (requests != null && !requests.isEmpty()) { + throw new SchedulerInvalidResoureRequestException( + this.getClass().getName() + + " not be able to handle ResourceRequest, there exists a " + + "SchedulingRequest with the same scheduler key=" + + SchedulerRequestKey.create(requests.iterator().next()) + + ", please send ResourceRequest with a different allocationId and " + + "priority"); + } + + // Do nothing + return null; + } + + private PendingAskUpdateResult internalUpdatePendingAsk( + SchedulerRequestKey schedulerRequestKey, + SchedulingRequest newSchedulingRequest) { + if (schedulingRequest != null) { + // If we have an old scheduling request, we will make sure that no changes + // made except #containers. + // We do this by replacing numAllocations with old numAllocations in the + // newSchedulingRequest#getResourceSizing, and compare the two objects. + ResourceSizing sizing = newSchedulingRequest.getResourceSizing(); + int newNumAllocations = sizing.getNumAllocations(); + int existingNumAllocations = + schedulingRequest.getResourceSizing().getNumAllocations(); + sizing.setNumAllocations(existingNumAllocations); + + // Compare two objects + if (!schedulingRequest.equals(newSchedulingRequest)) { + // Rollback #numAllocations + sizing.setNumAllocations(newNumAllocations); + throw new SchedulerInvalidResoureRequestException( + "Invalid updated SchedulingRequest added to scheduler, " + + " we only allows changing numAllocations for the updated " + + "SchedulingRequest. Old=" + schedulingRequest.toString() + + " new=" + newSchedulingRequest.toString() + + ", if any fields need to be updated, please cancel the " + + "old request (by setting numAllocations to 0) and send a " + + "SchedulingRequest with different combination of " + + "priority/allocationId"); + } + + // Rollback #numAllocations + sizing.setNumAllocations(newNumAllocations); + + // Basic sanity check + if (newNumAllocations < 0) { + throw new SchedulerInvalidResoureRequestException( + "numAllocation in ResourceSizing field must be >= 0, " + + "updating schedulingRequest failed."); + } + + // Ok, now everything is same except numAllocation, update numAllocation. + LOG.info( + "Update numAllocation from old=" + existingNumAllocations + " to new=" + + newNumAllocations); + return new PendingAskUpdateResult( + new PendingAsk(schedulingRequest.getResourceSizing()), + new PendingAsk(newSchedulingRequest.getResourceSizing()), + targetNodePartition, targetNodePartition); + } + + // For a new schedulingRequest, we need to validate if we support its asks. + // This will update internal partitions, etc. after the SchedulingRequest is + // valid. + validateAndSetSchedulingRequest(newSchedulingRequest); + + return new PendingAskUpdateResult(null, + new PendingAsk(newSchedulingRequest.getResourceSizing()), null, + targetNodePartition); + } + + @Override + public PendingAskUpdateResult updatePendingAsk( + SchedulerRequestKey schedulerRequestKey, + SchedulingRequest newSchedulingRequest, + boolean recoverPreemptedRequestForAContainer) { + writeLock.lock(); + try { + return internalUpdatePendingAsk(schedulerRequestKey, + newSchedulingRequest); + } finally { + writeLock.unlock(); + } + } + + private String throwExceptionWithMetaInfo(String message) { + StringBuilder sb = new StringBuilder(); + sb.append("AppId=").append(appSchedulingInfo.getApplicationId()).append( + " Key=").append(this.schedulerRequestKey).append(". Exception message:") + .append(message); + throw new SchedulerInvalidResoureRequestException(sb.toString()); + } + + private void validateAndSetSchedulingRequest(SchedulingRequest newSchedulingRequest) + throws SchedulerInvalidResoureRequestException { + // Assume sizing is already checked and normalized by Scheduler. + // ... + + PlacementConstraint constraint = + newSchedulingRequest.getPlacementConstraint(); + + // We only accept SingleConstraint + PlacementConstraint.AbstractConstraint ac = constraint.getConstraintExpr(); + if (!(ac instanceof PlacementConstraint.SingleConstraint)) { + throwExceptionWithMetaInfo( + "Only accepts " + PlacementConstraint.SingleConstraint.class.getName() + + " as constraint-expression. Rejecting the new added " + + "constraint-expression.class=" + ac.getClass().getName()); + } + + PlacementConstraint.SingleConstraint singleConstraint = + (PlacementConstraint.SingleConstraint) ac; + + // Make sure it is an anti-affinity request (actually this implementation + // should be able to support both affinity / anti-affinity without much + // effort. Considering potential test effort required. Limit to + // anti-affinity to intra-app and scope is node. + if (!singleConstraint.getScope().equals(PlacementConstraints.NODE)) { + throwExceptionWithMetaInfo( + "Only support scope=" + PlacementConstraints.NODE + + "now. PlacementConstraint=" + singleConstraint); + } + + if (singleConstraint.getMinCardinality() != 0 + || singleConstraint.getMaxCardinality() != 1) { + throwExceptionWithMetaInfo( + "Only support anti-affinity, which is: minCardinality=0, " + + "maxCardinality=1"); + } + + Set targetExpressionSet = + singleConstraint.getTargetExpressions(); + if (targetExpressionSet == null || targetExpressionSet.isEmpty()) { + throwExceptionWithMetaInfo( + "TargetExpression should not be null or empty"); + } + + // Set node partition + String nodePartition = null; + + // Target allocation tags + Set targetAllocationTags = null; + + // Target App Id + boolean targetAppIdChecked = false; + + for (PlacementConstraint.TargetExpression targetExpression : targetExpressionSet) { + if (targetExpression.getTargetType().equals( + PlacementConstraint.TargetExpression.TargetType.SELF)) { + throwExceptionWithMetaInfo("SELF target type is not supported"); + } + + // Handle node partition + if (targetExpression.getTargetType().equals( + PlacementConstraint.TargetExpression.TargetType.NODE_ATTRIBUTE)) { + // For node attribute target, we only support Partition now. And once + // YARN-3409 is merged, we will support node attribute. + if (!targetExpression.getTargetKey().equals(NODE_PARTITION_KEY)) { + throwExceptionWithMetaInfo("When TargetType=" + + PlacementConstraint.TargetExpression.TargetType.NODE_ATTRIBUTE + + " only " + NODE_PARTITION_KEY + " is accepted as TargetKey."); + } + + if (nodePartition != null) { + // This means we have duplicated node partition entry inside placement + // constraint, which might be set by mistake. + throwExceptionWithMetaInfo( + "Only one node partition targetExpression is allowed"); + } + + Set values = targetExpression.getTargetValues(); + if (values == null || values.isEmpty()) { + nodePartition = RMNodeLabelsManager.NO_LABEL; + continue; + } + + if (values.size() > 1) { + throwExceptionWithMetaInfo("Inside one targetExpression, we only " + + "support affinity to at most one node partition now"); + } + + nodePartition = values.iterator().next(); + continue; + } + + // Handle allocation tags + if (targetExpression.getTargetType().equals( + PlacementConstraint.TargetExpression.TargetType.ALLOCATION_TAG)) { + if (targetAllocationTags != null) { + // This means we have duplicated AllocationTag expressions entries + // inside placement constraint, which might be set by mistake. + throwExceptionWithMetaInfo( + "Only one AllocationTag targetExpression is allowed"); + } + + targetAllocationTags = new HashSet<>( + targetExpression.getTargetValues()); + + // Make sure we have one and only one target is set to our own app id + for (String tag : targetAllocationTags) { + if (tag.startsWith(APP_ID)) { + String appId = tag.substring(APP_ID.length()); + String ownAppIdStr = appSchedulingInfo.getApplicationId().toString(); + if (appId.equals(ownAppIdStr)) { + targetAppIdChecked = true; + } else{ + throwExceptionWithMetaInfo( + "Only allow intra-app anti-affinity now, this target " + + "expression includes a target allocation_tag=" + tag + + ", we only expect has a tag: [" + APP_ID + ownAppIdStr + + "]"); + } + } + } + } + } + + if (targetAllocationTags == null || !targetAppIdChecked) { + throwExceptionWithMetaInfo("Failed to find allocation tags from " + + "TargetExpressions or couldn't find self-app target."); + } + + if (nodePartition == null) { + nodePartition = RMNodeLabelsManager.NO_LABEL; + } + + // Validation is done. set local results: + this.targetNodePartition = nodePartition; + this.targetAllocationTags = targetAllocationTags; + this.schedulingRequest = newSchedulingRequest; + + LOG.info("Successfully added SchedulingRequest to app=" + appSchedulingInfo + .getApplicationAttemptId() + " targetAllocationTags=[" + StringUtils + .join(",", targetAllocationTags) + "]. nodePartition=" + + targetNodePartition); + } + + @Override + @SuppressWarnings("unchecked") + public Map getResourceRequests() { + return Collections.EMPTY_MAP; + } + + @Override + public PendingAsk getPendingAsk(String resourceName) { + if (resourceName.equals("*") && schedulingRequest != null) { + return new PendingAsk(schedulingRequest.getResourceSizing()); + } + return PendingAsk.ZERO; + } + + @Override + public int getOutstandingAsksCount(String resourceName) { + if (resourceName.equals("*") && schedulingRequest != null) { + return schedulingRequest.getResourceSizing().getNumAllocations(); + } + return 0; + } + + @Override + public ContainerRequest allocate(SchedulerRequestKey schedulerKey, + NodeType type, SchedulerNode node) { + // TODO + } + + @Override + public boolean canAllocate(NodeType type, SchedulerNode node) { + return true; + } + + @Override + public boolean canDelayTo(String resourceName) { + return true; + } + + @Override + public boolean acceptNodePartition(String nodePartition, + SchedulingMode schedulingMode) { + // We will only look at node label = nodeLabelToLookAt according to + // schedulingMode and partition of node. + String nodePartitionToLookAt; + if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) { + nodePartitionToLookAt = nodePartition; + } else{ + nodePartitionToLookAt = RMNodeLabelsManager.NO_LABEL; + } + + return this.targetNodePartition.equals(nodePartitionToLookAt); + } + + @Override + public String getPrimaryRequestedNodePartition() { + return targetNodePartition; + } + + @Override + public int getUniqueLocationAsks() { + return 1; + } + + @Override + public void showRequests() { + if (schedulingRequest != null) { + LOG.info(schedulingRequest.toString()); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java index fbde6813d5c..2f05cf1a624 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java @@ -431,7 +431,7 @@ private void updateResourceRequests(Map requests, if (type == NodeType.NODE_LOCAL) { for (String host : task.getHosts()) { if(LOG.isDebugEnabled()) { - LOG.debug("updatePendingAsk:" + " application=" + applicationId + LOG.debug("updateResourceDemands:" + " application=" + applicationId + " type=" + type + " host=" + host + " request=" + ((requests == null) ? "null" : requests.get(host))); } @@ -442,7 +442,7 @@ private void updateResourceRequests(Map requests, if (type == NodeType.NODE_LOCAL || type == NodeType.RACK_LOCAL) { for (String rack : task.getRacks()) { if(LOG.isDebugEnabled()) { - LOG.debug("updatePendingAsk:" + " application=" + applicationId + LOG.debug("updateResourceDemands:" + " application=" + applicationId + " type=" + type + " rack=" + rack + " request=" + ((requests == null) ? "null" : requests.get(rack))); } @@ -453,7 +453,7 @@ private void updateResourceRequests(Map requests, updateResourceRequest(requests.get(ResourceRequest.ANY)); if(LOG.isDebugEnabled()) { - LOG.debug("updatePendingAsk:" + " application=" + applicationId + LOG.debug("updateResourceDemands:" + " application=" + applicationId + " #asks=" + ask.size()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/TestAllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/TestAllocationTagsManager.java index 03587927734..0983c0475e1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/TestAllocationTagsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/TestAllocationTagsManager.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.constraint; import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.yarn.api.resource.PlacementTagsConstants; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; import org.junit.Assert; @@ -113,7 +114,7 @@ public void testAllocationTagsManagerSimpleCases() Assert.assertEquals(2, atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), TestUtils.getMockApplicationId(1), ImmutableSet - .of(AllocationTagsNamespaces.APP_ID + TestUtils + .of(PlacementTagsConstants.APP_ID + TestUtils .getMockApplicationId(1).toString()), Long::max)); // Get Cardinality of app1 on node2, with empty tag set, op=max diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java index bb29889a854..b66461da2e0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java @@ -132,7 +132,7 @@ public void testSchedulerKeyAccounting() { List reqs = new ArrayList<>(); reqs.add(req1); reqs.add(req2); - info.updateResourceRequests(reqs, false); + info.updateResourceDemands(reqs, null, false); ArrayList keys = new ArrayList<>(info.getSchedulerKeys()); Assert.assertEquals(2, keys.size()); @@ -151,7 +151,7 @@ public void testSchedulerKeyAccounting() { ResourceRequest.ANY, Resource.newInstance(1024, 1), 1); reqs.clear(); reqs.add(req2); - info.updateResourceRequests(reqs, false); + info.updateResourceDemands(reqs, null, false); info.allocate(NodeType.OFF_SWITCH, null, SchedulerRequestKey.create(req2), null); Assert.assertEquals(0, info.getSchedulerKeys().size()); @@ -160,7 +160,7 @@ public void testSchedulerKeyAccounting() { ResourceRequest.ANY, Resource.newInstance(1024, 1), 5); reqs.clear(); reqs.add(req1); - info.updateResourceRequests(reqs, false); + info.updateResourceDemands(reqs, null, false); Assert.assertEquals(1, info.getSchedulerKeys().size()); Assert.assertEquals(SchedulerRequestKey.create(req1), info.getSchedulerKeys().iterator().next()); @@ -168,7 +168,7 @@ public void testSchedulerKeyAccounting() { ResourceRequest.ANY, Resource.newInstance(1024, 1), 0); reqs.clear(); reqs.add(req1); - info.updateResourceRequests(reqs, false); + info.updateResourceDemands(reqs, null, false); Assert.assertEquals(0, info.getSchedulerKeys().size()); } }