diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 3cd85ae..6944965 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -81,6 +81,11 @@ // Track capacities like used-capcity/abs-used-capacity/capacity/abs-capacity, // etc. QueueCapacities queueCapacities; + + // statics for LimitsInfo so we don't have to create new objects every time + static final LimitsInfo LIMIT_CONTINUE_NEED_UNRESERVE = new LimitsInfo(true, true); + static final LimitsInfo LIMIT_CONTINUE_NO_UNRESERVE = new LimitsInfo(true, false); + static final LimitsInfo LIMIT_NO_CONTINUE_NO_UNRESERVE = new LimitsInfo(false, false); private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -430,7 +435,7 @@ private Resource getCurrentLimitResource(String nodeLabel, return queueMaxResource; } - synchronized boolean canAssignToThisQueue(Resource clusterResource, + synchronized LimitsInfo canAssignToThisQueue(Resource clusterResource, Set nodeLabels, ResourceLimits currentResourceLimits, Resource nowRequired, Resource resourceCouldBeUnreserved) { // Get label of this queue can access, it's (nodeLabel AND queueLabel) @@ -453,61 +458,61 @@ synchronized boolean canAssignToThisQueue(Resource clusterResource, Resource currentLimitResource = getCurrentLimitResource(label, clusterResource, currentResourceLimits); - // if reservation continous looking enabled, check to see if could we - // potentially use this node instead of a reserved node if the application - // has reserved containers. - // TODO, now only consider reservation cases when the node has no label - if (this.reservationsContinueLooking - && label.equals(RMNodeLabelsManager.NO_LABEL) - && Resources.greaterThan(resourceCalculator, clusterResource, - resourceCouldBeUnreserved, Resources.none())) { - // resource-without-reserved = used - reserved - Resource newTotalWithoutReservedResource = - Resources.subtract(newTotalResource, resourceCouldBeUnreserved); - - // when total-used-without-reserved-resource < currentLimit, we still - // have chance to allocate on this node by unreserving some containers - if (Resources.lessThan(resourceCalculator, clusterResource, - newTotalWithoutReservedResource, currentLimitResource)) { - if (LOG.isDebugEnabled()) { - LOG.debug("try to use reserved: " + getQueueName() - + " usedResources: " + queueUsage.getUsed() - + ", clusterResources: " + clusterResource - + ", reservedResources: " + resourceCouldBeUnreserved - + ", capacity-without-reserved: " - + newTotalWithoutReservedResource + ", maxLimitCapacity: " - + currentLimitResource); - } - return true; - } - } - - // Otherwise, if any of the label of this node beyond queue limit, we - // cannot allocate on this node. Consider a small epsilon here. if (Resources.greaterThan(resourceCalculator, clusterResource, newTotalResource, currentLimitResource)) { - return false; - } - if (LOG.isDebugEnabled()) { - LOG.debug(getQueueName() - + "Check assign to queue, label=" + label - + " usedResources: " + queueUsage.getUsed(label) - + " clusterResources: " + clusterResource - + " currentUsedCapacity " - + Resources.divide(resourceCalculator, clusterResource, - queueUsage.getUsed(label), - labelManager.getResourceByLabel(label, clusterResource)) - + " max-capacity: " - + queueCapacities.getAbsoluteMaximumCapacity(label) - + ")"); + // if reservation continous looking enabled, check to see if could we + // potentially use this node instead of a reserved node if the application + // has reserved containers. + // TODO, now only consider reservation cases when the node has no label + if (this.reservationsContinueLooking + && label.equals(RMNodeLabelsManager.NO_LABEL) + && Resources.greaterThan(resourceCalculator, clusterResource, + resourceCouldBeUnreserved, Resources.none())) { + // resource-without-reserved = used - reserved + Resource newTotalWithoutReservedResource = + Resources.subtract(newTotalResource, resourceCouldBeUnreserved); + + // when total-used-without-reserved-resource < currentLimit, we still + // have chance to allocate on this node by unreserving some containers + if (Resources.lessThan(resourceCalculator, clusterResource, + newTotalWithoutReservedResource, currentLimitResource)) { + if (LOG.isDebugEnabled()) { + LOG.debug("try to use reserved: " + getQueueName() + + " usedResources: " + queueUsage.getUsed() + + ", clusterResources: " + clusterResource + + ", reservedResources: " + resourceCouldBeUnreserved + + ", capacity-without-reserved: " + + newTotalWithoutReservedResource + ", maxLimitCapacity: " + + currentLimitResource); + } + return new LimitsInfo(true, true, Resources.subtract(newTotalResource, + currentLimitResource)); + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug(getQueueName() + + "Check assign to queue, label=" + label + + " usedResources: " + queueUsage.getUsed(label) + + " clusterResources: " + clusterResource + + " currentUsedCapacity " + + Resources.divide(resourceCalculator, clusterResource, + queueUsage.getUsed(label), + labelManager.getResourceByLabel(label, clusterResource)) + + " max-capacity: " + + queueCapacities.getAbsoluteMaximumCapacity(label) + + ")"); + } + return LIMIT_NO_CONTINUE_NO_UNRESERVE; } - return true; + + return LIMIT_CONTINUE_NO_UNRESERVE; } // Actually, this will not happen, since labelCanAccess will be always // non-empty - return false; + return LIMIT_NO_CONTINUE_NO_UNRESERVE; } @Override @@ -533,4 +538,38 @@ public void decPendingResource(String nodeLabel, Resource resourceToDec) { parent.decPendingResource(nodeLabel, resourceToDec); } } + + /* + * Class used to indicate if a limit check failed/succeeded, if it succeeded whether it + * requires another container to be unreserved first, and the amount it needs to unreserve in + * order to allocate a container. + */ + static class LimitsInfo { + private boolean shouldContinue; + private boolean needsToUnreserve; + private Resource amountNeededUnreserve; + + public LimitsInfo(boolean shouldContinue, boolean needsToUnreserve) { + this(shouldContinue, needsToUnreserve, Resources.none()); + } + + public LimitsInfo(boolean shouldContinue, boolean needsToUnreserve, + Resource amountNeededUnreserve) { + this.shouldContinue = shouldContinue; + this.amountNeededUnreserve = amountNeededUnreserve; + this.needsToUnreserve = needsToUnreserve; + } + + public boolean shouldContinue() { + return shouldContinue; + } + + public boolean needsToUnreserve() { + return needsToUnreserve; + } + + public Resource getAmountNeededUnreserve() { + return amountNeededUnreserve; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index c86c0ff..e518228 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1072,14 +1072,14 @@ private synchronized void allocateContainersToNode(FiCaSchedulerNode node) { RMContainer excessReservation = assignment.getExcessReservation(); if (excessReservation != null) { - Container container = excessReservation.getContainer(); - queue.completedContainer( - clusterResource, assignment.getApplication(), node, - excessReservation, - SchedulerUtils.createAbnormalContainerStatus( - container.getId(), - SchedulerUtils.UNRESERVED_CONTAINER), - RMContainerEventType.RELEASED, null, true); + Container container = excessReservation.getContainer(); + queue.completedContainer( + clusterResource, assignment.getApplication(), node, + excessReservation, + SchedulerUtils.createAbnormalContainerStatus( + container.getId(), + SchedulerUtils.UNRESERVED_CONTAINER), + RMContainerEventType.RELEASED, null, true); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 282d407..1b0bfb0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -122,7 +122,7 @@ new QueueResourceLimitsInfo(); private volatile ResourceLimits currentResourceLimits = null; - + public LeafQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { super(cs, queueName, parent, old); @@ -847,19 +847,27 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, // before all higher priority ones are serviced. Resource userLimit = computeUserLimitAndSetHeadroom(application, clusterResource, - required, requestedNodeLabels); - + required, requestedNodeLabels); + // Check queue max-capacity limit - if (!super.canAssignToThisQueue(clusterResource, node.getLabels(), - this.currentResourceLimits, required, application.getCurrentReservation())) { + LimitsInfo maxRes = super.canAssignToThisQueue(clusterResource, node.getLabels(), + this.currentResourceLimits, required, application.getCurrentReservation()); + if (!maxRes.shouldContinue()) { return NULL_ASSIGNMENT; } // Check user limit - if (!canAssignToUser(clusterResource, application.getUser(), userLimit, - application, true, requestedNodeLabels)) { + LimitsInfo userLimitRes = canAssignToUser(clusterResource, application.getUser(), + userLimit, application, requestedNodeLabels); + if (!userLimitRes.shouldContinue()) { break; } + LimitsInfo limitInfo = new LimitsInfo( + maxRes.shouldContinue() | userLimitRes.shouldContinue(), + maxRes.needsToUnreserve() | userLimitRes.needsToUnreserve(), + Resources.max(resourceCalculator, clusterResource, + maxRes.getAmountNeededUnreserve(), + userLimitRes.getAmountNeededUnreserve())); // Inform the application it is about to get a scheduling opportunity application.addSchedulingOpportunity(priority); @@ -867,7 +875,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, // Try to schedule CSAssignment assignment = assignContainersOnNode(clusterResource, node, application, priority, - null); + null, limitInfo); // Did the application skip this node? if (assignment.getSkipped()) { @@ -928,7 +936,7 @@ private synchronized CSAssignment assignReservedContainer( // Try to assign if we have sufficient resources assignContainersOnNode(clusterResource, node, application, priority, - rmContainer); + rmContainer, LIMIT_NO_CONTINUE_NO_UNRESERVE); // Doesn't matter... since it's already charged for at time of reservation // "re-reservation" is *free* @@ -1107,9 +1115,9 @@ private Resource computeUserLimit(FiCaSchedulerApp application, } @Private - protected synchronized boolean canAssignToUser(Resource clusterResource, + protected synchronized LimitsInfo canAssignToUser(Resource clusterResource, String userName, Resource limit, FiCaSchedulerApp application, - boolean checkReservations, Set requestLabels) { + Set requestLabels) { User user = getUser(userName); String label = CommonNodeLabelsManager.NO_LABEL; @@ -1125,13 +1133,13 @@ protected synchronized boolean canAssignToUser(Resource clusterResource, limit)) { // if enabled, check to see if could we potentially use this node instead // of a reserved node if the application has reserved containers - if (this.reservationsContinueLooking && checkReservations + if (this.reservationsContinueLooking && label.equals(CommonNodeLabelsManager.NO_LABEL)) { if (Resources.lessThanOrEqual( resourceCalculator, clusterResource, - Resources.subtract(user.getUsed(), - application.getCurrentReservation()), limit)) { + Resources.subtract(user.getUsed(), application.getCurrentReservation()), + limit)) { if (LOG.isDebugEnabled()) { LOG.debug("User " + userName + " in queue " + getQueueName() @@ -1139,7 +1147,10 @@ protected synchronized boolean canAssignToUser(Resource clusterResource, + user.getUsed() + " reserved: " + application.getCurrentReservation() + " limit: " + limit); } - return true; + Resource amountNeededToUnreserve = Resources.subtract(user.getUsed(label), limit); + // we can only acquire a new container if we unreserve first since we ignored the + // user limit + return new LimitsInfo(true, true, amountNeededToUnreserve); } } if (LOG.isDebugEnabled()) { @@ -1147,9 +1158,10 @@ protected synchronized boolean canAssignToUser(Resource clusterResource, + " will exceed limit - " + " consumed: " + user.getUsed() + " limit: " + limit); } - return false; + return LIMIT_NO_CONTINUE_NO_UNRESERVE; + } - return true; + return LIMIT_CONTINUE_NO_UNRESERVE; } boolean shouldAllocOrReserveNewContainer(FiCaSchedulerApp application, @@ -1185,7 +1197,7 @@ boolean shouldAllocOrReserveNewContainer(FiCaSchedulerApp application, private CSAssignment assignContainersOnNode(Resource clusterResource, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, - RMContainer reservedContainer) { + RMContainer reservedContainer, LimitsInfo limitInfo) { Resource assigned = Resources.none(); NodeType requestType = null; @@ -1198,7 +1210,7 @@ private CSAssignment assignContainersOnNode(Resource clusterResource, assigned = assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, node, application, priority, reservedContainer, - allocatedContainer); + allocatedContainer, limitInfo); if (Resources.greaterThan(resourceCalculator, clusterResource, assigned, Resources.none())) { @@ -1226,7 +1238,7 @@ private CSAssignment assignContainersOnNode(Resource clusterResource, assigned = assignRackLocalContainers(clusterResource, rackLocalResourceRequest, node, application, priority, reservedContainer, - allocatedContainer); + allocatedContainer, limitInfo); if (Resources.greaterThan(resourceCalculator, clusterResource, assigned, Resources.none())) { @@ -1254,7 +1266,7 @@ private CSAssignment assignContainersOnNode(Resource clusterResource, assigned = assignOffSwitchContainers(clusterResource, offSwitchResourceRequest, node, application, priority, reservedContainer, - allocatedContainer); + allocatedContainer, limitInfo); // update locality statistics if (allocatedContainer.getValue() != null) { @@ -1265,20 +1277,11 @@ private CSAssignment assignContainersOnNode(Resource clusterResource, return SKIP_ASSIGNMENT; } - - private Resource getMinimumResourceNeedUnreserved(Resource askedResource) { - // First we need to get minimum resource we need unreserve - // minimum-resource-need-unreserve = used + asked - limit - Resource minimumUnreservedResource = - Resources.subtract(Resources.add(queueUsage.getUsed(), askedResource), - currentResourceLimits.getLimit()); - return minimumUnreservedResource; - } @Private protected boolean findNodeToUnreserve(Resource clusterResource, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, - Resource askedResource, Resource minimumUnreservedResource) { + Resource minimumUnreservedResource) { // need to unreserve some other container first NodeId idToUnreserve = application.getNodeIdToUnreserve(priority, minimumUnreservedResource, @@ -1299,7 +1302,7 @@ protected boolean findNodeToUnreserve(Resource clusterResource, LOG.debug("unreserving for app: " + application.getApplicationId() + " on nodeId: " + idToUnreserve + " in order to replace reserved application and place it on node: " - + node.getNodeID() + " needing: " + askedResource); + + node.getNodeID() + " needing: " + minimumUnreservedResource); } // headroom @@ -1318,45 +1321,16 @@ protected boolean findNodeToUnreserve(Resource clusterResource, return true; } - @Private - protected boolean checkLimitsToReserve(Resource clusterResource, - FiCaSchedulerApp application, Resource capability) { - // we can't reserve if we got here based on the limit - // checks assuming we could unreserve!!! - Resource userLimit = computeUserLimitAndSetHeadroom(application, - clusterResource, capability, null); - - // Check queue max-capacity limit, - // TODO: Consider reservation on labels - if (!canAssignToThisQueue(clusterResource, null, - this.currentResourceLimits, capability, Resources.none())) { - if (LOG.isDebugEnabled()) { - LOG.debug("was going to reserve but hit queue limit"); - } - return false; - } - - // Check user limit - if (!canAssignToUser(clusterResource, application.getUser(), userLimit, - application, false, null)) { - if (LOG.isDebugEnabled()) { - LOG.debug("was going to reserve but hit user limit"); - } - return false; - } - return true; - } - - private Resource assignNodeLocalContainers(Resource clusterResource, ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, - RMContainer reservedContainer, MutableObject allocatedContainer) { + RMContainer reservedContainer, MutableObject allocatedContainer, + LimitsInfo limitInfo) { if (canAssign(application, priority, node, NodeType.NODE_LOCAL, reservedContainer)) { return assignContainer(clusterResource, node, application, priority, nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer, - allocatedContainer); + allocatedContainer, limitInfo); } return Resources.none(); @@ -1365,12 +1339,13 @@ private Resource assignNodeLocalContainers(Resource clusterResource, private Resource assignRackLocalContainers(Resource clusterResource, ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, - RMContainer reservedContainer, MutableObject allocatedContainer) { + RMContainer reservedContainer, MutableObject allocatedContainer, + LimitsInfo limitInfo) { if (canAssign(application, priority, node, NodeType.RACK_LOCAL, reservedContainer)) { return assignContainer(clusterResource, node, application, priority, rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer, - allocatedContainer); + allocatedContainer, limitInfo); } return Resources.none(); @@ -1379,12 +1354,13 @@ private Resource assignRackLocalContainers(Resource clusterResource, private Resource assignOffSwitchContainers(Resource clusterResource, ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, - RMContainer reservedContainer, MutableObject allocatedContainer) { + RMContainer reservedContainer, MutableObject allocatedContainer, + LimitsInfo limitInfo) { if (canAssign(application, priority, node, NodeType.OFF_SWITCH, reservedContainer)) { return assignContainer(clusterResource, node, application, priority, offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer, - allocatedContainer); + allocatedContainer, limitInfo); } return Resources.none(); @@ -1476,12 +1452,14 @@ Container createContainer(FiCaSchedulerApp application, FiCaSchedulerNode node, private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, ResourceRequest request, NodeType type, RMContainer rmContainer, - MutableObject createdContainer) { + MutableObject createdContainer, LimitsInfo limitInfo) { if (LOG.isDebugEnabled()) { LOG.debug("assignContainers: node=" + node.getNodeName() + " application=" + application.getApplicationId() + " priority=" + priority.getPriority() - + " request=" + request + " type=" + type); + + " request=" + request + " type=" + type + + " needToUnreserve= " + limitInfo.needsToUnreserve() + + " amountToUnreserve= " + limitInfo.getAmountNeededUnreserve()); } // check if the resource request can access the label @@ -1521,7 +1499,7 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod LOG.warn("Couldn't get container for allocation!"); return Resources.none(); } - + boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer( application, priority, capability); @@ -1536,20 +1514,23 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod unreserve(application, priority, node, rmContainer); } else if (this.reservationsContinueLooking && node.getLabels().isEmpty()) { // when reservationsContinueLooking is set, we may need to unreserve - // some containers to meet this queue and its parents' resource limits + // some containers to meet this queue, its parents', or the users' resource limits. // TODO, need change here when we want to support continuous reservation // looking for labeled partitions. - Resource minimumUnreservedResource = - getMinimumResourceNeedUnreserved(capability); - if (!shouldAllocOrReserveNewContainer - || Resources.greaterThan(resourceCalculator, clusterResource, - minimumUnreservedResource, Resources.none())) { + if (!shouldAllocOrReserveNewContainer || limitInfo.needsToUnreserve()) { + // If we shouldn't allocate/reserve new container then we should unreserve one the same + // size we are asking for since the limitInfo.getAmountNeededUnreserve could be zero. If + // the limit was hit then use the amount we need to unreserve to be under the limit. + Resource amountToUnreserve = capability; + if (limitInfo.needsToUnreserve()) { + amountToUnreserve = limitInfo.getAmountNeededUnreserve(); + } boolean containerUnreserved = findNodeToUnreserve(clusterResource, node, application, priority, - capability, minimumUnreservedResource); + amountToUnreserve); // When (minimum-unreserved-resource > 0 OR we cannot allocate new/reserved // container (That means we *have to* unreserve some resource to - // continue)). If we failed to unreserve some resource, + // continue)). If we failed to unreserve some resource, we can't continue. if (!containerUnreserved) { return Resources.none(); } @@ -1581,13 +1562,13 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod if (shouldAllocOrReserveNewContainer || rmContainer != null) { if (reservationsContinueLooking && rmContainer == null) { - // we could possibly ignoring parent queue capacity limits when - // reservationsContinueLooking is set. - // If we're trying to reserve a container here, not container will be - // unreserved for reserving the new one. Check limits again before - // reserve the new container - if (!checkLimitsToReserve(clusterResource, - application, capability)) { + // we could possibly ignoring queue capacity or user limits when + // reservationsContinueLooking is set. Make sure we didn't need to unreserve + // one. + if (limitInfo.needsToUnreserve()) { + if (LOG.isDebugEnabled()) { + LOG.debug("we needed to unreserve to be able to allocate"); + } return Resources.none(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 5ed6bb8..49907d2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -395,9 +395,10 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, // Are we over maximum-capacity for this queue? // This will also consider parent's limits and also continuous reservation // looking - if (!super.canAssignToThisQueue(clusterResource, nodeLabels, resourceLimits, - minimumAllocation, Resources.createResource(getMetrics() - .getReservedMB(), getMetrics().getReservedVirtualCores()))) { + LimitsInfo maxRes = super.canAssignToThisQueue(clusterResource, nodeLabels, resourceLimits, + minimumAllocation, Resources.createResource(getMetrics().getReservedMB(), + getMetrics().getReservedVirtualCores())); + if (!maxRes.shouldContinue()) { break; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java index e8a8243..b242291 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java @@ -748,14 +748,14 @@ public void testFindNodeToUnreserve() throws Exception { // nothing reserved boolean res = a.findNodeToUnreserve(csContext.getClusterResource(), - node_1, app_0, priorityMap, capability, capability); + node_1, app_0, priorityMap, capability); assertFalse(res); // reserved but scheduler doesn't know about that node. app_0.reserve(node_1, priorityMap, rmContainer, container); node_1.reserveResource(app_0, priorityMap, rmContainer); res = a.findNodeToUnreserve(csContext.getClusterResource(), node_1, app_0, - priorityMap, capability, capability); + priorityMap, capability); assertFalse(res); } @@ -858,11 +858,12 @@ public void testAssignToQueue() throws Exception { // allocate to queue so that the potential new capacity is greater then // absoluteMaxCapacity Resource capability = Resources.createResource(32 * GB, 0); - boolean res = + AbstractCSQueue.LimitsInfo res = a.canAssignToThisQueue(clusterResource, CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits( clusterResource), capability, Resources.none()); - assertFalse(res); + assertFalse(res.shouldContinue()); + assertFalse(res.needsToUnreserve()); // now add in reservations and make sure it continues if config set // allocate to queue so that the potential new capacity is greater then @@ -884,14 +885,16 @@ public void testAssignToQueue() throws Exception { CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits( clusterResource), capability, Resources .createResource(5 * GB)); - assertTrue(res); + assertTrue(res.shouldContinue()); + assertTrue(res.needsToUnreserve()); // tell to not check reservations res = a.canAssignToThisQueue(clusterResource, CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits( clusterResource), capability, Resources.none()); - assertFalse(res); + assertFalse(res.shouldContinue()); + assertFalse(res.needsToUnreserve()); refreshQueuesTurnOffReservationsContLook(a, csConf); @@ -901,14 +904,16 @@ public void testAssignToQueue() throws Exception { a.canAssignToThisQueue(clusterResource, CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits( clusterResource), capability, Resources.none()); - assertFalse(res); + assertFalse(res.shouldContinue()); + assertFalse(res.needsToUnreserve()); res = a.canAssignToThisQueue(clusterResource, CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits( clusterResource), capability, Resources .createResource(5 * GB)); - assertFalse(res); + assertFalse(res.shouldContinue()); + assertFalse(res.needsToUnreserve()); } public void refreshQueuesTurnOffReservationsContLook(LeafQueue a, @@ -1056,22 +1061,25 @@ public void testAssignToUser() throws Exception { assertEquals(5 * GB, node_0.getUsedResource().getMemory()); assertEquals(3 * GB, node_1.getUsedResource().getMemory()); - // set limit so subtrace reservations it can continue - Resource limit = Resources.createResource(12 * GB, 0); - boolean res = a.canAssignToUser(clusterResource, user_0, limit, app_0, - true, null); - assertTrue(res); + // not over the limit + Resource limit = Resources.createResource(14 * GB, 0); + AbstractCSQueue.LimitsInfo res = a.canAssignToUser(clusterResource, user_0, limit, app_0, null); + assertTrue(res.shouldContinue()); + assertFalse(res.needsToUnreserve()); - // tell it not to check for reservations and should fail as already over - // limit - res = a.canAssignToUser(clusterResource, user_0, limit, app_0, false, null); - assertFalse(res); + // set limit so it subtracts reservations and it can continue + limit = Resources.createResource(12 * GB, 0); + res = a.canAssignToUser(clusterResource, user_0, limit, app_0, + null); + assertTrue(res.shouldContinue()); + assertTrue(res.needsToUnreserve()); refreshQueuesTurnOffReservationsContLook(a, csConf); // should now return false since feature off - res = a.canAssignToUser(clusterResource, user_0, limit, app_0, true, null); - assertFalse(res); + res = a.canAssignToUser(clusterResource, user_0, limit, app_0, null); + assertFalse(res.shouldContinue()); + assertFalse(res.needsToUnreserve()); } @Test