diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 7ea73d9..684c82b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -293,34 +294,31 @@ private void computeIdealResourceDistribution(ResourceCalculator rc, // with the total capacity for this set of queues Resource unassigned = Resources.clone(tot_guarant); - //assign all cluster resources until no more demand, or no resources are left - while (!qAlloc.isEmpty() && Resources.greaterThan(rc, tot_guarant, - unassigned, Resources.none())) { - Resource wQassigned = Resource.newInstance(0, 0); + // group queues based on whether they have non-zero guaranteed capacity + Set nonZeroGuarQueues = new HashSet(); + Set zeroGuarQueues = new HashSet(); - // we compute normalizedGuarantees capacity based on currently active - // queues - resetCapacity(rc, unassigned, qAlloc); - - // offer for each queue their capacity first and in following invocations - // their share of over-capacity - for (Iterator i = qAlloc.iterator(); i.hasNext();) { - TempQueue sub = i.next(); - Resource wQavail = - Resources.multiply(unassigned, sub.normalizedGuarantee); - Resource wQidle = sub.offer(wQavail, rc, tot_guarant); - Resource wQdone = Resources.subtract(wQavail, wQidle); - // if the queue returned a value > 0 it means it is fully satisfied - // and it is removed from the list of active queues qAlloc - if (!Resources.greaterThan(rc, tot_guarant, - wQdone, Resources.none())) { - i.remove(); - } - Resources.addTo(wQassigned, wQdone); + for (TempQueue q : qAlloc) { + if (Resources + .greaterThan(rc, tot_guarant, q.guaranteed, Resources.none())) { + nonZeroGuarQueues.add(q); + } else { + zeroGuarQueues.add(q); } - Resources.subtractFrom(unassigned, wQassigned); } + // first compute the allocation as a fixpoint based on guaranteed capacity + computeFixpointAllocation(rc, tot_guarant, nonZeroGuarQueues, unassigned, + false); + + // if any capacity is left unassigned, distributed among zero-guarantee + // queues uniformly (i.e., not based on guaranteed capacity, as this is zero) + if (!zeroGuarQueues.isEmpty() + && Resources.greaterThan(rc, tot_guarant, unassigned, Resources.none())) { + computeFixpointAllocation(rc, tot_guarant, zeroGuarQueues, unassigned, + true); + } + // based on ideal assignment computed above and current assignment we derive // how much preemption is required overall Resource totPreemptionNeeded = Resource.newInstance(0, 0); @@ -353,6 +351,46 @@ private void computeIdealResourceDistribution(ResourceCalculator rc, } } + + /** + * Given a set of queues compute the fix-point distribution of unassigned + * resources among them. As pending request of a queue are exhausted, the + * queue is removed from the set and remaining capacity redistributed among + * remaining queues. The distribution is weighted based on guaranteed + * capacity, unless asked to ignoreGuarantee, in which case resources are + * distributed uniformly. + */ + private void computeFixpointAllocation(ResourceCalculator rc, + Resource tot_guarant, Collection qAlloc, Resource unassigned, + boolean ignoreGuarantee) { + //assign all cluster resources until no more demand, or no resources are left + while (!qAlloc.isEmpty() && Resources.greaterThan(rc, tot_guarant, + unassigned, Resources.none())) { + Resource wQassigned = Resource.newInstance(0, 0); + + // we compute normalizedGuarantees capacity based on currently active + // queues + resetCapacity(rc, unassigned, qAlloc, ignoreGuarantee); + + // offer for each queue their capacity first and in following invocations + // their share of over-capacity + for (Iterator i = qAlloc.iterator(); i.hasNext();) { + TempQueue sub = i.next(); + Resource wQavail = + Resources.multiply(unassigned, sub.normalizedGuarantee); + Resource wQidle = sub.offer(wQavail, rc, tot_guarant); + Resource wQdone = Resources.subtract(wQavail, wQidle); + // if the queue returned a value > 0 it means it is fully satisfied + // and it is removed from the list of active queues qAlloc + if (!Resources.greaterThan(rc, tot_guarant, + wQdone, Resources.none())) { + i.remove(); + } + Resources.addTo(wQassigned, wQdone); + } + Resources.subtractFrom(unassigned, wQassigned); + } + } /** * Computes a normalizedGuaranteed capacity based on active queues @@ -361,14 +399,21 @@ private void computeIdealResourceDistribution(ResourceCalculator rc, * @param queues the list of queues to consider */ private void resetCapacity(ResourceCalculator rc, Resource clusterResource, - List queues) { + Collection queues, boolean ignoreGuar) { Resource activeCap = Resource.newInstance(0, 0); - for (TempQueue q : queues) { - Resources.addTo(activeCap, q.guaranteed); - } - for (TempQueue q : queues) { - q.normalizedGuarantee = Resources.divide(rc, clusterResource, - q.guaranteed, activeCap); + + if (ignoreGuar) { + for (TempQueue q : queues) { + q.normalizedGuarantee = (float) 1.0f / ((float) queues.size()); + } + } else { + for (TempQueue q : queues) { + Resources.addTo(activeCap, q.guaranteed); + } + for (TempQueue q : queues) { + q.normalizedGuarantee = Resources.divide(rc, clusterResource, + q.guaranteed, activeCap); + } } } @@ -515,18 +560,25 @@ public String getPolicyName() { private TempQueue cloneQueues(CSQueue root, Resource clusterResources) { TempQueue ret; synchronized (root) { - float absUsed = root.getAbsoluteUsedCapacity(); + String queueName = root.getQueueName(); + float absUsed = root.getAbsoluteUsedCapacity(); + float absCap = root.getAbsoluteCapacity(); + float absMaxCap = root.getAbsoluteMaximumCapacity(); + Resource current = Resources.multiply(clusterResources, absUsed); - Resource guaranteed = - Resources.multiply(clusterResources, root.getAbsoluteCapacity()); + Resource guaranteed = Resources.multiply(clusterResources, absCap); + Resource maxCapacity = Resources.multiply(clusterResources, absMaxCap); if (root instanceof LeafQueue) { LeafQueue l = (LeafQueue) root; Resource pending = l.getTotalResourcePending(); - ret = new TempQueue(root.getQueueName(), current, pending, guaranteed); + ret = new TempQueue(queueName, current, pending, guaranteed, + maxCapacity); + ret.setLeafQueue(l); } else { Resource pending = Resource.newInstance(0, 0); - ret = new TempQueue(root.getQueueName(), current, pending, guaranteed); + ret = new TempQueue(root.getQueueName(), current, pending, guaranteed, + maxCapacity); for (CSQueue c : root.getChildQueues()) { ret.addChild(cloneQueues(c, clusterResources)); } @@ -563,6 +615,7 @@ public int compare(TempQueue o1, TempQueue o2) { final Resource current; final Resource pending; final Resource guaranteed; + final Resource maxCapacity; Resource idealAssigned; Resource toBePreempted; Resource actuallyPreempted; @@ -573,11 +626,12 @@ public int compare(TempQueue o1, TempQueue o2) { LeafQueue leafQueue; TempQueue(String queueName, Resource current, Resource pending, - Resource guaranteed) { + Resource guaranteed, Resource maxCapacity) { this.queueName = queueName; this.current = current; this.pending = pending; this.guaranteed = guaranteed; + this.maxCapacity = maxCapacity; this.idealAssigned = Resource.newInstance(0, 0); this.actuallyPreempted = Resource.newInstance(0, 0); this.toBePreempted = Resource.newInstance(0, 0); @@ -614,12 +668,12 @@ public void addChildren(ArrayList queues) { // the unused ones Resource offer(Resource avail, ResourceCalculator rc, Resource clusterResource) { - // remain = avail - min(avail, current + pending - assigned) - Resource accepted = Resources.min(rc, clusterResource, - avail, - Resources.subtract( - Resources.add(current, pending), - idealAssigned)); + // remain = avail - min(avail, (max - assigned), (current + pending - assigned)) + Resource accepted = + Resources.min(rc, clusterResource, + Resources.subtract(maxCapacity, idealAssigned), + Resources.min(rc, clusterResource, avail, Resources.subtract( + Resources.add(current, pending), idealAssigned))); Resource remain = Resources.subtract(avail, accepted); Resources.addTo(idealAssigned, accepted); return remain; @@ -628,13 +682,15 @@ Resource offer(Resource avail, ResourceCalculator rc, @Override public String toString() { StringBuilder sb = new StringBuilder(); - sb.append("CUR: ").append(current) + sb.append(" NAME: " + queueName) + .append(" CUR: ").append(current) .append(" PEN: ").append(pending) .append(" GAR: ").append(guaranteed) .append(" NORM: ").append(normalizedGuarantee) .append(" IDEAL_ASSIGNED: ").append(idealAssigned) .append(" IDEAL_PREEMPT: ").append(toBePreempted) - .append(" ACTUAL_PREEMPT: ").append(actuallyPreempted); + .append(" ACTUAL_PREEMPT: ").append(actuallyPreempted) + .append("\n"); return sb.toString(); }