diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java index 12333e8..1a5a098 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java @@ -19,22 +19,43 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.resource.Resources; /** * Resource limits for queues/applications, this means max overall (please note * that, it's not "extra") resource you can get. */ public class ResourceLimits { + volatile Resource limit; + + // This is special limit that goes with the RESERVE_CONT_LOOK_ALL_NODES config. + // This limit indicates how much we need to unreserve to allocate another container. + private volatile Resource amountNeededUnreserve; + public ResourceLimits(Resource limit) { + this.amountNeededUnreserve = Resources.none(); + this.limit = limit; + } + + public ResourceLimits(Resource limit, Resource amountNeededUnreserve) { + this.amountNeededUnreserve = amountNeededUnreserve; this.limit = limit; } - volatile Resource limit; public Resource getLimit() { return limit; } - + + public Resource getAmountNeededUnreserve() { + return amountNeededUnreserve; + } + public void setLimit(Resource limit) { this.limit = limit; } + + public void setAmountNeededUnreserve(Resource amountNeededUnreserve) { + this.amountNeededUnreserve = amountNeededUnreserve; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 3cd85ae..79a803e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -81,7 +81,7 @@ // Track capacities like used-capcity/abs-used-capacity/capacity/abs-capacity, // etc. QueueCapacities queueCapacities; - + private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); protected CapacitySchedulerContext csContext; @@ -453,55 +453,56 @@ synchronized boolean canAssignToThisQueue(Resource clusterResource, Resource currentLimitResource = getCurrentLimitResource(label, clusterResource, currentResourceLimits); - // if reservation continous looking enabled, check to see if could we - // potentially use this node instead of a reserved node if the application - // has reserved containers. - // TODO, now only consider reservation cases when the node has no label - if (this.reservationsContinueLooking - && label.equals(RMNodeLabelsManager.NO_LABEL) - && Resources.greaterThan(resourceCalculator, clusterResource, - resourceCouldBeUnreserved, Resources.none())) { - // resource-without-reserved = used - reserved - Resource newTotalWithoutReservedResource = - Resources.subtract(newTotalResource, resourceCouldBeUnreserved); - - // when total-used-without-reserved-resource < currentLimit, we still - // have chance to allocate on this node by unreserving some containers - if (Resources.lessThan(resourceCalculator, clusterResource, - newTotalWithoutReservedResource, currentLimitResource)) { - if (LOG.isDebugEnabled()) { - LOG.debug("try to use reserved: " + getQueueName() - + " usedResources: " + queueUsage.getUsed() - + ", clusterResources: " + clusterResource - + ", reservedResources: " + resourceCouldBeUnreserved - + ", capacity-without-reserved: " - + newTotalWithoutReservedResource + ", maxLimitCapacity: " - + currentLimitResource); - } - return true; - } - } - - // Otherwise, if any of the label of this node beyond queue limit, we - // cannot allocate on this node. Consider a small epsilon here. if (Resources.greaterThan(resourceCalculator, clusterResource, newTotalResource, currentLimitResource)) { + + // if reservation continous looking enabled, check to see if could we + // potentially use this node instead of a reserved node if the application + // has reserved containers. + // TODO, now only consider reservation cases when the node has no label + if (this.reservationsContinueLooking + && label.equals(RMNodeLabelsManager.NO_LABEL) + && Resources.greaterThan(resourceCalculator, clusterResource, + resourceCouldBeUnreserved, Resources.none())) { + // resource-without-reserved = used - reserved + Resource newTotalWithoutReservedResource = + Resources.subtract(newTotalResource, resourceCouldBeUnreserved); + + // when total-used-without-reserved-resource < currentLimit, we still + // have chance to allocate on this node by unreserving some containers + if (Resources.lessThan(resourceCalculator, clusterResource, + newTotalWithoutReservedResource, currentLimitResource)) { + if (LOG.isDebugEnabled()) { + LOG.debug("try to use reserved: " + getQueueName() + + " usedResources: " + queueUsage.getUsed() + + ", clusterResources: " + clusterResource + + ", reservedResources: " + resourceCouldBeUnreserved + + ", capacity-without-reserved: " + + newTotalWithoutReservedResource + ", maxLimitCapacity: " + + currentLimitResource); + } + currentResourceLimits.setAmountNeededUnreserve(Resources.subtract(newTotalResource, + currentLimitResource)); + return true; + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug(getQueueName() + + "Check assign to queue, label=" + label + + " usedResources: " + queueUsage.getUsed(label) + + " clusterResources: " + clusterResource + + " currentUsedCapacity " + + Resources.divide(resourceCalculator, clusterResource, + queueUsage.getUsed(label), + labelManager.getResourceByLabel(label, clusterResource)) + + " max-capacity: " + + queueCapacities.getAbsoluteMaximumCapacity(label) + + ")"); + } return false; } - if (LOG.isDebugEnabled()) { - LOG.debug(getQueueName() - + "Check assign to queue, label=" + label - + " usedResources: " + queueUsage.getUsed(label) - + " clusterResources: " + clusterResource - + " currentUsedCapacity " - + Resources.divide(resourceCalculator, clusterResource, - queueUsage.getUsed(label), - labelManager.getResourceByLabel(label, clusterResource)) - + " max-capacity: " - + queueCapacities.getAbsoluteMaximumCapacity(label) - + ")"); - } return true; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index c86c0ff..e518228 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1072,14 +1072,14 @@ private synchronized void allocateContainersToNode(FiCaSchedulerNode node) { RMContainer excessReservation = assignment.getExcessReservation(); if (excessReservation != null) { - Container container = excessReservation.getContainer(); - queue.completedContainer( - clusterResource, assignment.getApplication(), node, - excessReservation, - SchedulerUtils.createAbnormalContainerStatus( - container.getId(), - SchedulerUtils.UNRESERVED_CONTAINER), - RMContainerEventType.RELEASED, null, true); + Container container = excessReservation.getContainer(); + queue.completedContainer( + clusterResource, assignment.getApplication(), node, + excessReservation, + SchedulerUtils.createAbnormalContainerStatus( + container.getId(), + SchedulerUtils.UNRESERVED_CONTAINER), + RMContainerEventType.RELEASED, null, true); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 282d407..42f986a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -122,7 +122,7 @@ new QueueResourceLimitsInfo(); private volatile ResourceLimits currentResourceLimits = null; - + public LeafQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { super(cs, queueName, parent, old); @@ -718,14 +718,14 @@ public synchronized void removeApplicationAttempt( activateApplications(); LOG.info("Application removed -" + - " appId: " + application.getApplicationId() + - " user: " + application.getUser() + + " appId: " + application.getApplicationId() + + " user: " + application.getUser() + " queue: " + getQueueName() + " #user-pending-applications: " + user.getPendingApplications() + " #user-active-applications: " + user.getActiveApplications() + " #queue-pending-applications: " + getNumPendingApplications() + " #queue-active-applications: " + getNumActiveApplications() - ); + ); } private synchronized FiCaSchedulerApp getApplication( @@ -847,17 +847,23 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, // before all higher priority ones are serviced. Resource userLimit = computeUserLimitAndSetHeadroom(application, clusterResource, - required, requestedNodeLabels); - + required, requestedNodeLabels); + + // local to add in the user limits for unreserving which are temporary + ResourceLimits userResourceLimits = new ResourceLimits(this.currentResourceLimits + .getLimit()); + // Check queue max-capacity limit - if (!super.canAssignToThisQueue(clusterResource, node.getLabels(), - this.currentResourceLimits, required, application.getCurrentReservation())) { + boolean maxRes = super.canAssignToThisQueue(clusterResource, node.getLabels(), + userResourceLimits, required, application.getCurrentReservation()); + if (!maxRes) { return NULL_ASSIGNMENT; } // Check user limit - if (!canAssignToUser(clusterResource, application.getUser(), userLimit, - application, true, requestedNodeLabels)) { + boolean canAssign = canAssignToUser(clusterResource, application.getUser(), + userLimit, application, requestedNodeLabels, userResourceLimits); + if (!canAssign) { break; } @@ -867,7 +873,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, // Try to schedule CSAssignment assignment = assignContainersOnNode(clusterResource, node, application, priority, - null); + null, userResourceLimits); // Did the application skip this node? if (assignment.getSkipped()) { @@ -928,7 +934,7 @@ private synchronized CSAssignment assignReservedContainer( // Try to assign if we have sufficient resources assignContainersOnNode(clusterResource, node, application, priority, - rmContainer); + rmContainer, new ResourceLimits(Resources.none())); // Doesn't matter... since it's already charged for at time of reservation // "re-reservation" is *free* @@ -1109,7 +1115,7 @@ private Resource computeUserLimit(FiCaSchedulerApp application, @Private protected synchronized boolean canAssignToUser(Resource clusterResource, String userName, Resource limit, FiCaSchedulerApp application, - boolean checkReservations, Set requestLabels) { + Set requestLabels, ResourceLimits localResourceLimits) { User user = getUser(userName); String label = CommonNodeLabelsManager.NO_LABEL; @@ -1125,13 +1131,13 @@ protected synchronized boolean canAssignToUser(Resource clusterResource, limit)) { // if enabled, check to see if could we potentially use this node instead // of a reserved node if the application has reserved containers - if (this.reservationsContinueLooking && checkReservations + if (this.reservationsContinueLooking && label.equals(CommonNodeLabelsManager.NO_LABEL)) { if (Resources.lessThanOrEqual( resourceCalculator, clusterResource, - Resources.subtract(user.getUsed(), - application.getCurrentReservation()), limit)) { + Resources.subtract(user.getUsed(), application.getCurrentReservation()), + limit)) { if (LOG.isDebugEnabled()) { LOG.debug("User " + userName + " in queue " + getQueueName() @@ -1139,6 +1145,12 @@ protected synchronized boolean canAssignToUser(Resource clusterResource, + user.getUsed() + " reserved: " + application.getCurrentReservation() + " limit: " + limit); } + Resource amountNeededToUnreserve = Resources.subtract(user.getUsed(label), limit); + // we can only acquire a new container if we unreserve first since we ignored the + // user limit + localResourceLimits.setAmountNeededUnreserve( + Resources.max(resourceCalculator, clusterResource, + localResourceLimits.getAmountNeededUnreserve(), amountNeededToUnreserve)); return true; } } @@ -1185,7 +1197,7 @@ boolean shouldAllocOrReserveNewContainer(FiCaSchedulerApp application, private CSAssignment assignContainersOnNode(Resource clusterResource, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, - RMContainer reservedContainer) { + RMContainer reservedContainer, ResourceLimits limitInfo) { Resource assigned = Resources.none(); NodeType requestType = null; @@ -1198,7 +1210,7 @@ private CSAssignment assignContainersOnNode(Resource clusterResource, assigned = assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, node, application, priority, reservedContainer, - allocatedContainer); + allocatedContainer, limitInfo); if (Resources.greaterThan(resourceCalculator, clusterResource, assigned, Resources.none())) { @@ -1226,7 +1238,7 @@ private CSAssignment assignContainersOnNode(Resource clusterResource, assigned = assignRackLocalContainers(clusterResource, rackLocalResourceRequest, node, application, priority, reservedContainer, - allocatedContainer); + allocatedContainer, limitInfo); if (Resources.greaterThan(resourceCalculator, clusterResource, assigned, Resources.none())) { @@ -1254,7 +1266,7 @@ private CSAssignment assignContainersOnNode(Resource clusterResource, assigned = assignOffSwitchContainers(clusterResource, offSwitchResourceRequest, node, application, priority, reservedContainer, - allocatedContainer); + allocatedContainer, limitInfo); // update locality statistics if (allocatedContainer.getValue() != null) { @@ -1265,20 +1277,11 @@ private CSAssignment assignContainersOnNode(Resource clusterResource, return SKIP_ASSIGNMENT; } - - private Resource getMinimumResourceNeedUnreserved(Resource askedResource) { - // First we need to get minimum resource we need unreserve - // minimum-resource-need-unreserve = used + asked - limit - Resource minimumUnreservedResource = - Resources.subtract(Resources.add(queueUsage.getUsed(), askedResource), - currentResourceLimits.getLimit()); - return minimumUnreservedResource; - } @Private protected boolean findNodeToUnreserve(Resource clusterResource, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, - Resource askedResource, Resource minimumUnreservedResource) { + Resource minimumUnreservedResource) { // need to unreserve some other container first NodeId idToUnreserve = application.getNodeIdToUnreserve(priority, minimumUnreservedResource, @@ -1299,7 +1302,7 @@ protected boolean findNodeToUnreserve(Resource clusterResource, LOG.debug("unreserving for app: " + application.getApplicationId() + " on nodeId: " + idToUnreserve + " in order to replace reserved application and place it on node: " - + node.getNodeID() + " needing: " + askedResource); + + node.getNodeID() + " needing: " + minimumUnreservedResource); } // headroom @@ -1318,45 +1321,16 @@ protected boolean findNodeToUnreserve(Resource clusterResource, return true; } - @Private - protected boolean checkLimitsToReserve(Resource clusterResource, - FiCaSchedulerApp application, Resource capability) { - // we can't reserve if we got here based on the limit - // checks assuming we could unreserve!!! - Resource userLimit = computeUserLimitAndSetHeadroom(application, - clusterResource, capability, null); - - // Check queue max-capacity limit, - // TODO: Consider reservation on labels - if (!canAssignToThisQueue(clusterResource, null, - this.currentResourceLimits, capability, Resources.none())) { - if (LOG.isDebugEnabled()) { - LOG.debug("was going to reserve but hit queue limit"); - } - return false; - } - - // Check user limit - if (!canAssignToUser(clusterResource, application.getUser(), userLimit, - application, false, null)) { - if (LOG.isDebugEnabled()) { - LOG.debug("was going to reserve but hit user limit"); - } - return false; - } - return true; - } - - private Resource assignNodeLocalContainers(Resource clusterResource, ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, - RMContainer reservedContainer, MutableObject allocatedContainer) { + RMContainer reservedContainer, MutableObject allocatedContainer, + ResourceLimits limitInfo) { if (canAssign(application, priority, node, NodeType.NODE_LOCAL, reservedContainer)) { return assignContainer(clusterResource, node, application, priority, nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer, - allocatedContainer); + allocatedContainer, limitInfo); } return Resources.none(); @@ -1365,12 +1339,13 @@ private Resource assignNodeLocalContainers(Resource clusterResource, private Resource assignRackLocalContainers(Resource clusterResource, ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, - RMContainer reservedContainer, MutableObject allocatedContainer) { + RMContainer reservedContainer, MutableObject allocatedContainer, + ResourceLimits limitInfo) { if (canAssign(application, priority, node, NodeType.RACK_LOCAL, reservedContainer)) { return assignContainer(clusterResource, node, application, priority, rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer, - allocatedContainer); + allocatedContainer, limitInfo); } return Resources.none(); @@ -1379,12 +1354,13 @@ private Resource assignRackLocalContainers(Resource clusterResource, private Resource assignOffSwitchContainers(Resource clusterResource, ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, - RMContainer reservedContainer, MutableObject allocatedContainer) { + RMContainer reservedContainer, MutableObject allocatedContainer, + ResourceLimits limitInfo) { if (canAssign(application, priority, node, NodeType.OFF_SWITCH, reservedContainer)) { return assignContainer(clusterResource, node, application, priority, offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer, - allocatedContainer); + allocatedContainer, limitInfo); } return Resources.none(); @@ -1476,12 +1452,13 @@ Container createContainer(FiCaSchedulerApp application, FiCaSchedulerNode node, private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, ResourceRequest request, NodeType type, RMContainer rmContainer, - MutableObject createdContainer) { + MutableObject createdContainer, ResourceLimits limitInfo) { if (LOG.isDebugEnabled()) { LOG.debug("assignContainers: node=" + node.getNodeName() + " application=" + application.getApplicationId() + " priority=" + priority.getPriority() - + " request=" + request + " type=" + type); + + " request=" + request + " type=" + type + + " amountToUnreserve= " + limitInfo.getAmountNeededUnreserve()); } // check if the resource request can access the label @@ -1521,13 +1498,17 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod LOG.warn("Couldn't get container for allocation!"); return Resources.none(); } - + boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer( application, priority, capability); // Can we allocate a container on this node? int availableContainers = resourceCalculator.computeAvailableContainers(available, capability); + + boolean needToUnreserve = Resources.greaterThan(resourceCalculator,clusterResource, + limitInfo.getAmountNeededUnreserve(), Resources.none()); + if (availableContainers > 0) { // Allocate... @@ -1536,20 +1517,23 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod unreserve(application, priority, node, rmContainer); } else if (this.reservationsContinueLooking && node.getLabels().isEmpty()) { // when reservationsContinueLooking is set, we may need to unreserve - // some containers to meet this queue and its parents' resource limits + // some containers to meet this queue, its parents', or the users' resource limits. // TODO, need change here when we want to support continuous reservation // looking for labeled partitions. - Resource minimumUnreservedResource = - getMinimumResourceNeedUnreserved(capability); - if (!shouldAllocOrReserveNewContainer - || Resources.greaterThan(resourceCalculator, clusterResource, - minimumUnreservedResource, Resources.none())) { + if (!shouldAllocOrReserveNewContainer || needToUnreserve) { + // If we shouldn't allocate/reserve new container then we should unreserve one the same + // size we are asking for since the limitInfo.getAmountNeededUnreserve could be zero. If + // the limit was hit then use the amount we need to unreserve to be under the limit. + Resource amountToUnreserve = capability; + if (needToUnreserve) { + amountToUnreserve = limitInfo.getAmountNeededUnreserve(); + } boolean containerUnreserved = findNodeToUnreserve(clusterResource, node, application, priority, - capability, minimumUnreservedResource); + amountToUnreserve); // When (minimum-unreserved-resource > 0 OR we cannot allocate new/reserved // container (That means we *have to* unreserve some resource to - // continue)). If we failed to unreserve some resource, + // continue)). If we failed to unreserve some resource, we can't continue. if (!containerUnreserved) { return Resources.none(); } @@ -1581,13 +1565,13 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod if (shouldAllocOrReserveNewContainer || rmContainer != null) { if (reservationsContinueLooking && rmContainer == null) { - // we could possibly ignoring parent queue capacity limits when - // reservationsContinueLooking is set. - // If we're trying to reserve a container here, not container will be - // unreserved for reserving the new one. Check limits again before - // reserve the new container - if (!checkLimitsToReserve(clusterResource, - application, capability)) { + // we could possibly ignoring queue capacity or user limits when + // reservationsContinueLooking is set. Make sure we didn't need to unreserve + // one. + if (needToUnreserve) { + if (LOG.isDebugEnabled()) { + LOG.debug("we needed to unreserve to be able to allocate"); + } return Resources.none(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 5ed6bb8..e515ad6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -395,9 +395,10 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, // Are we over maximum-capacity for this queue? // This will also consider parent's limits and also continuous reservation // looking - if (!super.canAssignToThisQueue(clusterResource, nodeLabels, resourceLimits, - minimumAllocation, Resources.createResource(getMetrics() - .getReservedMB(), getMetrics().getReservedVirtualCores()))) { + boolean maxRes = super.canAssignToThisQueue(clusterResource, nodeLabels, resourceLimits, + minimumAllocation, Resources.createResource(getMetrics().getReservedMB(), + getMetrics().getReservedVirtualCores())); + if (!maxRes) { break; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java index e8a8243..063f0cc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java @@ -748,14 +748,14 @@ public void testFindNodeToUnreserve() throws Exception { // nothing reserved boolean res = a.findNodeToUnreserve(csContext.getClusterResource(), - node_1, app_0, priorityMap, capability, capability); + node_1, app_0, priorityMap, capability); assertFalse(res); // reserved but scheduler doesn't know about that node. app_0.reserve(node_1, priorityMap, rmContainer, container); node_1.reserveResource(app_0, priorityMap, rmContainer); res = a.findNodeToUnreserve(csContext.getClusterResource(), node_1, app_0, - priorityMap, capability, capability); + priorityMap, capability); assertFalse(res); } @@ -858,11 +858,12 @@ public void testAssignToQueue() throws Exception { // allocate to queue so that the potential new capacity is greater then // absoluteMaxCapacity Resource capability = Resources.createResource(32 * GB, 0); + ResourceLimits limits = new ResourceLimits(clusterResource); boolean res = a.canAssignToThisQueue(clusterResource, - CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits( - clusterResource), capability, Resources.none()); + CommonNodeLabelsManager.EMPTY_STRING_SET, limits, capability, Resources.none()); assertFalse(res); + assertEquals(limits.getAmountNeededUnreserve(), Resources.none()); // now add in reservations and make sure it continues if config set // allocate to queue so that the potential new capacity is greater then @@ -879,36 +880,43 @@ public void testAssignToQueue() throws Exception { assertEquals(3 * GB, node_1.getUsedResource().getMemory()); capability = Resources.createResource(5 * GB, 0); + Resource unreserveAmount = Resources.createResource(5 * GB); + limits = new ResourceLimits(clusterResource); + res = a.canAssignToThisQueue(clusterResource, - CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits( - clusterResource), capability, Resources - .createResource(5 * GB)); + CommonNodeLabelsManager.EMPTY_STRING_SET, limits, capability, unreserveAmount); assertTrue(res); + // 16GB total, 13GB consumed (8 allocated, 5 reserved). asking for 5GB so we would have to + // unreserve 2GB to get the total 5GB needed. + // also note vcore checks not enabled + assertEquals(Resources.createResource(2 * GB, 3), limits.getAmountNeededUnreserve()); // tell to not check reservations + limits = new ResourceLimits(clusterResource); res = a.canAssignToThisQueue(clusterResource, - CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits( - clusterResource), capability, Resources.none()); + CommonNodeLabelsManager.EMPTY_STRING_SET, limits, capability, Resources.none()); assertFalse(res); + assertEquals(Resources.none(), limits.getAmountNeededUnreserve()); refreshQueuesTurnOffReservationsContLook(a, csConf); // should return false no matter what checkReservations is passed // in since feature is off + limits = new ResourceLimits(clusterResource); res = a.canAssignToThisQueue(clusterResource, - CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits( - clusterResource), capability, Resources.none()); + CommonNodeLabelsManager.EMPTY_STRING_SET, limits, capability, Resources.none()); assertFalse(res); - + assertEquals(limits.getAmountNeededUnreserve(), Resources.none()); + limits = new ResourceLimits(clusterResource); res = a.canAssignToThisQueue(clusterResource, - CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits( - clusterResource), capability, Resources + CommonNodeLabelsManager.EMPTY_STRING_SET, limits, capability, Resources .createResource(5 * GB)); assertFalse(res); + assertEquals(Resources.none(), limits.getAmountNeededUnreserve()); } public void refreshQueuesTurnOffReservationsContLook(LeafQueue a, @@ -1056,22 +1064,32 @@ public void testAssignToUser() throws Exception { assertEquals(5 * GB, node_0.getUsedResource().getMemory()); assertEquals(3 * GB, node_1.getUsedResource().getMemory()); - // set limit so subtrace reservations it can continue - Resource limit = Resources.createResource(12 * GB, 0); - boolean res = a.canAssignToUser(clusterResource, user_0, limit, app_0, - true, null); + // not over the limit + Resource limit = Resources.createResource(14 * GB, 0); + ResourceLimits userResourceLimits = new ResourceLimits(clusterResource); + boolean res = a.canAssignToUser(clusterResource, user_0, limit, app_0, null, userResourceLimits); assertTrue(res); + assertEquals(Resources.none(), userResourceLimits.getAmountNeededUnreserve()); - // tell it not to check for reservations and should fail as already over - // limit - res = a.canAssignToUser(clusterResource, user_0, limit, app_0, false, null); - assertFalse(res); + // set limit so it subtracts reservations and it can continue + limit = Resources.createResource(12 * GB, 0); + userResourceLimits = new ResourceLimits(clusterResource); + res = a.canAssignToUser(clusterResource, user_0, limit, app_0, + null, userResourceLimits); + assertTrue(res); + // limit set to 12GB, we are using 13GB (8 allocated, 5 reserved), to get under limit + // we need to unreserve 1GB + // also note vcore checks not enabled + assertEquals(Resources.createResource(1 * GB, 4), + userResourceLimits.getAmountNeededUnreserve()); refreshQueuesTurnOffReservationsContLook(a, csConf); // should now return false since feature off - res = a.canAssignToUser(clusterResource, user_0, limit, app_0, true, null); + userResourceLimits = new ResourceLimits(clusterResource); + res = a.canAssignToUser(clusterResource, user_0, limit, app_0, null, userResourceLimits); assertFalse(res); + assertEquals(Resources.none(), userResourceLimits.getAmountNeededUnreserve()); } @Test