diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java index 865b0b4..c2a0402 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java @@ -233,6 +233,12 @@ public static float getAbsoluteMaxAvailCapacity( return queue.getAbsoluteMaximumCapacity(); } + //We should not hold a lock on a queue and its parent concurrently - it + //can lead to deadlocks when calls which walk down the tree occur + //concurrently (getQueueInfo...) + //this method should not be called from within a scope synchronized on + //the passed queue... + //Get my parent's max avail, needed to determine my own float parentMaxAvail = getAbsoluteMaxAvailCapacity( resourceCalculator, clusterResource, parent); @@ -243,15 +249,29 @@ public static float getAbsoluteMaxAvailCapacity( if (Resources.isInvalidDivisor(resourceCalculator, parentResource)) { return 0.0f; } + + Resource queueUsedResources; + float queueMaxCap; + synchronized (queue) { + queueUsedResources = queue.getUsedResources(); + queueMaxCap = queue.getMaximumCapacity(); + } + + Resource parentUsedResources; + synchronized (parent) { + parentUsedResources = parent.getUsedResources(); + } + + //sibling used is parent used - my used... float siblingUsedCapacity = Resources.ratio( resourceCalculator, - Resources.subtract(parent.getUsedResources(), queue.getUsedResources()), + Resources.subtract(parentUsedResources, queueUsedResources), parentResource); //my max avail is the lesser of my max capacity and what is unused from my parent //by my siblings (if they are beyond their base capacity) float maxAvail = Math.min( - queue.getMaximumCapacity(), + queueMaxCap, 1.0f - siblingUsedCapacity); //and, mutiply by parent to get absolute (cluster relative) value float absoluteMaxAvail = maxAvail * parentMaxAvail; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 38d4712..b5dec63 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -117,6 +117,8 @@ private final QueueHeadroomInfo queueHeadroomInfo = new QueueHeadroomInfo(); + private float absoluteMaxAvailCapacity; + public LeafQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { super(cs, queueName, parent, old); @@ -150,8 +152,8 @@ protected synchronized void setupQueueConfigs(Resource clusterResource) // and all queues may not be realized yet, we'll use (optimistic) // absoluteMaxCapacity (it will be replaced with the more accurate // absoluteMaxAvailCapacity during headroom/userlimit/allocation events) - updateHeadroomInfo(clusterResource, - queueCapacities.getAbsoluteMaximumCapacity()); + setAbsoluteMaxAvailCapacity(queueCapacities.getAbsoluteMaximumCapacity()); + updateHeadroomInfo(clusterResource); CapacitySchedulerConfiguration conf = csContext.getConfiguration(); userLimit = conf.getUserLimit(getQueuePath()); @@ -256,6 +258,15 @@ protected synchronized void setupQueueConfigs(Resource clusterResource) public String getQueuePath() { return getParent().getQueuePath() + "." + getQueueName(); } + + private synchronized void setAbsoluteMaxAvailCapacity( + float absoluteMaxAvailCapacity) { + this.absoluteMaxAvailCapacity = absoluteMaxAvailCapacity; + } + + private synchronized float getAbsoluteMaxAvailCapacity() { + return absoluteMaxAvailCapacity; + } /** * Used only by tests. @@ -732,8 +743,16 @@ private synchronized FiCaSchedulerApp getApplication( } @Override - public synchronized CSAssignment assignContainers(Resource clusterResource, + public CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, boolean needToUnreserve) { + float absoluteMaxAvailCapacity = CSQueueUtils.getAbsoluteMaxAvailCapacity( + resourceCalculator, clusterResource, this); + setAbsoluteMaxAvailCapacity(absoluteMaxAvailCapacity); + return assignContainersInner(clusterResource, node, needToUnreserve); + } + + private synchronized CSAssignment assignContainersInner( + Resource clusterResource, FiCaSchedulerNode node, boolean needToUnreserve) { if(LOG.isDebugEnabled()) { LOG.debug("assignContainers: node=" + node.getNodeName() @@ -1012,9 +1031,9 @@ synchronized boolean canAssignToThisQueue(Resource clusterResource, return canAssign; } - private Resource updateHeadroomInfo(Resource clusterResource, - float absoluteMaxAvailCapacity) { + private Resource updateHeadroomInfo(Resource clusterResource) { + float absoluteMaxAvailCapacity = getAbsoluteMaxAvailCapacity(); Resource queueMaxCap = Resources.multiplyAndNormalizeDown( resourceCalculator, @@ -1042,15 +1061,9 @@ Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application, Resource userLimit = computeUserLimit(application, clusterResource, required, queueUser, requestedLabels); - - //Max avail capacity needs to take into account usage by ancestor-siblings - //which are greater than their base capacity, so we are interested in "max avail" - //capacity - float absoluteMaxAvailCapacity = CSQueueUtils.getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, this); Resource queueMaxCap = - updateHeadroomInfo(clusterResource, absoluteMaxAvailCapacity); + updateHeadroomInfo(clusterResource); Resource headroom = getHeadroom(queueUser, queueMaxCap, clusterResource, userLimit); @@ -1760,8 +1773,8 @@ public synchronized void updateClusterResource(Resource clusterResource) { // Update headroom info based on new cluster resource value // absoluteMaxCapacity now, will be replaced with absoluteMaxAvailCapacity // during allocation - updateHeadroomInfo(clusterResource, - queueCapacities.getAbsoluteMaximumCapacity()); + setAbsoluteMaxAvailCapacity(queueCapacities.getAbsoluteMaximumCapacity()); + updateHeadroomInfo(clusterResource); // Update metrics CSQueueUtils.updateQueueStatistics(