diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 38d4712..d124d2c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -117,6 +117,8 @@ private final QueueHeadroomInfo queueHeadroomInfo = new QueueHeadroomInfo(); + private volatile float absoluteMaxAvailCapacity; + public LeafQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { super(cs, queueName, parent, old); @@ -150,8 +152,8 @@ protected synchronized void setupQueueConfigs(Resource clusterResource) // and all queues may not be realized yet, we'll use (optimistic) // absoluteMaxCapacity (it will be replaced with the more accurate // absoluteMaxAvailCapacity during headroom/userlimit/allocation events) - updateHeadroomInfo(clusterResource, - queueCapacities.getAbsoluteMaximumCapacity()); + absoluteMaxAvailCapacity = queueCapacities.getAbsoluteMaximumCapacity(); + updateHeadroomInfo(clusterResource); CapacitySchedulerConfiguration conf = csContext.getConfiguration(); userLimit = conf.getUserLimit(getQueuePath()); @@ -732,8 +734,15 @@ private synchronized FiCaSchedulerApp getApplication( } @Override - public synchronized CSAssignment assignContainers(Resource clusterResource, + public CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, boolean needToUnreserve) { + absoluteMaxAvailCapacity = CSQueueUtils.getAbsoluteMaxAvailCapacity( + resourceCalculator, clusterResource, this); + return assignContainersInternal(clusterResource, node, needToUnreserve); + } + + private synchronized CSAssignment assignContainersInternal( + Resource clusterResource, FiCaSchedulerNode node, boolean needToUnreserve) { if(LOG.isDebugEnabled()) { LOG.debug("assignContainers: node=" + node.getNodeName() @@ -1012,8 +1021,7 @@ synchronized boolean canAssignToThisQueue(Resource clusterResource, return canAssign; } - private Resource updateHeadroomInfo(Resource clusterResource, - float absoluteMaxAvailCapacity) { + private Resource updateHeadroomInfo(Resource clusterResource) { Resource queueMaxCap = Resources.multiplyAndNormalizeDown( @@ -1042,15 +1050,9 @@ Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application, Resource userLimit = computeUserLimit(application, clusterResource, required, queueUser, requestedLabels); - - //Max avail capacity needs to take into account usage by ancestor-siblings - //which are greater than their base capacity, so we are interested in "max avail" - //capacity - float absoluteMaxAvailCapacity = CSQueueUtils.getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, this); Resource queueMaxCap = - updateHeadroomInfo(clusterResource, absoluteMaxAvailCapacity); + updateHeadroomInfo(clusterResource); Resource headroom = getHeadroom(queueUser, queueMaxCap, clusterResource, userLimit); @@ -1760,8 +1762,8 @@ public synchronized void updateClusterResource(Resource clusterResource) { // Update headroom info based on new cluster resource value // absoluteMaxCapacity now, will be replaced with absoluteMaxAvailCapacity // during allocation - updateHeadroomInfo(clusterResource, - queueCapacities.getAbsoluteMaximumCapacity()); + absoluteMaxAvailCapacity = queueCapacities.getAbsoluteMaximumCapacity(); + updateHeadroomInfo(clusterResource); // Update metrics CSQueueUtils.updateQueueStatistics(