From 81b3b4c511bc4e25827b092ddaa5fd59cb070837 Mon Sep 17 00:00:00 2001 From: Sunil Date: Fri, 16 Sep 2016 23:52:09 +0530 Subject: [PATCH] YARN-2009 --- .../AbstractPreemptableResourceCalculator.java | 228 ++++++++++++++++++ .../CapacitySchedulerPreemptionContext.java | 14 ++ .../capacity/CapacitySchedulerPreemptionUtils.java | 33 ++- .../monitor/capacity/FifoCandidatesSelector.java | 41 ---- .../capacity/FifoIntraQueuePreemptionPolicy.java | 259 +++++++++++++++++++++ .../capacity/IntraQueueCandidatesSelector.java | 212 +++++++++++++++++ .../IntraQueuePreemptableResourceCalculator.java | 141 +++++++++++ .../IntraQueuePreemptionComputePlugin.java | 39 ++++ .../capacity/PreemptableResourceCalculator.java | 179 +------------- .../capacity/PreemptionCandidatesSelector.java | 49 ++++ .../ProportionalCapacityPreemptionPolicy.java | 61 ++++- .../monitor/capacity/TempAppPerPartition.java | 173 ++++++++++++++ .../monitor/capacity/TempQueuePerPartition.java | 180 ++++++++++---- .../capacity/CapacitySchedulerConfiguration.java | 24 ++ .../scheduler/capacity/LeafQueue.java | 48 ++++ .../scheduler/common/fica/FiCaSchedulerApp.java | 18 ++ ...ionalCapacityPreemptionPolicyMockFramework.java | 61 ++++- ...ortionalCapacityPreemptionPolicyIntraQueue.java | 99 ++++++++ 18 files changed, 1588 insertions(+), 271 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPolicy.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptableResourceCalculator.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java new file mode 100644 index 0000000..1846d1f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java @@ -0,0 +1,228 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.PriorityQueue; + +/** + * Calculate how much resources need to be preempted for each queue, + * will be used by {@link PreemptionCandidatesSelector} + */ +public class AbstractPreemptableResourceCalculator { + private static final Log LOG = LogFactory + .getLog(AbstractPreemptableResourceCalculator.class); + + protected final CapacitySchedulerPreemptionContext context; + protected final ResourceCalculator rc; + private boolean isReservedPreemptionCandidatesSelector; + + static class TQComparator implements Comparator { + private ResourceCalculator rc; + private Resource clusterRes; + + TQComparator(ResourceCalculator rc, Resource clusterRes) { + this.rc = rc; + this.clusterRes = clusterRes; + } + + @Override + public int compare(TempQueuePerPartition tq1, TempQueuePerPartition tq2) { + if (getIdealPctOfGuaranteed(tq1) < getIdealPctOfGuaranteed(tq2)) { + return -1; + } + if (getIdealPctOfGuaranteed(tq1) > getIdealPctOfGuaranteed(tq2)) { + return 1; + } + return 0; + } + + // Calculates idealAssigned / guaranteed + // TempQueues with 0 guarantees are always considered the most over + // capacity and therefore considered last for resources. + private double getIdealPctOfGuaranteed(TempQueuePerPartition q) { + double pctOver = Integer.MAX_VALUE; + if (q != null && Resources.greaterThan(rc, clusterRes, q.getGuaranteed(), + Resources.none())) { + pctOver = Resources.divide(rc, clusterRes, q.idealAssigned, + q.getGuaranteed()); + } + return (pctOver); + } + } + + /** + * PreemptableResourceCalculator constructor + * + * @param preemptionContext + * @param isReservedPreemptionCandidatesSelector + * this will be set by different implementation of candidate + * selectors, please refer to TempQueuePerPartition#offer for + * details. + * @param priorityBasedPolicy + */ + public AbstractPreemptableResourceCalculator( + CapacitySchedulerPreemptionContext preemptionContext, + boolean isReservedPreemptionCandidatesSelector) { + context = preemptionContext; + rc = preemptionContext.getResourceCalculator(); + this.isReservedPreemptionCandidatesSelector = isReservedPreemptionCandidatesSelector; + } + + /** + * Given a set of queues compute the fix-point distribution of unassigned + * resources among them. As pending request of a queue are exhausted, the + * queue is removed from the set and remaining capacity redistributed among + * remaining queues. The distribution is weighted based on guaranteed + * capacity, unless asked to ignoreGuarantee, in which case resources are + * distributed uniformly. + */ + protected void computeFixpointAllocation(ResourceCalculator rc, + Resource tot_guarant, Collection qAlloc, + Resource unassigned, boolean ignoreGuarantee) { + // Prior to assigning the unused resources, process each queue as follows: + // If current > guaranteed, idealAssigned = guaranteed + untouchable extra + // Else idealAssigned = current; + // Subtract idealAssigned resources from unassigned. + // If the queue has all of its needs met (that is, if + // idealAssigned >= current + pending), remove it from consideration. + // Sort queues from most under-guaranteed to most over-guaranteed. + TQComparator tqComparator = new TQComparator(rc, tot_guarant); + PriorityQueue orderedByNeed = new PriorityQueue<>(10, + tqComparator); + for (Iterator i = qAlloc.iterator(); i.hasNext();) { + TempQueuePerPartition q = i.next(); + Resource used = q.getUsed(); + + if (Resources.greaterThan(rc, tot_guarant, used, q.getGuaranteed())) { + q.idealAssigned = Resources.add(q.getGuaranteed(), q.untouchableExtra); + } else { + q.idealAssigned = Resources.clone(used); + } + Resources.subtractFrom(unassigned, q.idealAssigned); + // If idealAssigned < (allocated + used + pending), q needs more + // resources, so + // add it to the list of underserved queues, ordered by need. + Resource curPlusPend = Resources.add(q.getUsed(), q.pending); + if (Resources.lessThan(rc, tot_guarant, q.idealAssigned, curPlusPend)) { + orderedByNeed.add(q); + } + } + + // assign all cluster resources until no more demand, or no resources are + // left + while (!orderedByNeed.isEmpty() && Resources.greaterThan(rc, tot_guarant, + unassigned, Resources.none())) { + Resource wQassigned = Resource.newInstance(0, 0); + // we compute normalizedGuarantees capacity based on currently active + // queues + resetCapacity(rc, unassigned, orderedByNeed, ignoreGuarantee); + + // For each underserved queue (or set of queues if multiple are equally + // underserved), offer its share of the unassigned resources based on its + // normalized guarantee. After the offer, if the queue is not satisfied, + // place it back in the ordered list of queues, recalculating its place + // in the order of most under-guaranteed to most over-guaranteed. In this + // way, the most underserved queue(s) are always given resources first. + Collection underserved = getMostUnderservedQueues( + orderedByNeed, tqComparator); + for (Iterator i = underserved.iterator(); i + .hasNext();) { + TempQueuePerPartition sub = i.next(); + Resource wQavail = Resources.multiplyAndNormalizeUp(rc, unassigned, + sub.normalizedGuarantee, Resource.newInstance(1, 1)); + Resource wQidle = sub.offer(wQavail, rc, tot_guarant, + isReservedPreemptionCandidatesSelector); + Resource wQdone = Resources.subtract(wQavail, wQidle); + + if (Resources.greaterThan(rc, tot_guarant, wQdone, Resources.none())) { + // The queue is still asking for more. Put it back in the priority + // queue, recalculating its order based on need. + orderedByNeed.add(sub); + } + Resources.addTo(wQassigned, wQdone); + } + Resources.subtractFrom(unassigned, wQassigned); + } + } + + /** + * Computes a normalizedGuaranteed capacity based on active queues + * + * @param rc + * resource calculator + * @param clusterResource + * the total amount of resources in the cluster + * @param queues + * the list of queues to consider + */ + private void resetCapacity(ResourceCalculator rc, Resource clusterResource, + Collection queues, boolean ignoreGuar) { + Resource activeCap = Resource.newInstance(0, 0); + + if (ignoreGuar) { + for (TempQueuePerPartition q : queues) { + q.normalizedGuarantee = 1.0f / queues.size(); + } + } else { + for (TempQueuePerPartition q : queues) { + Resources.addTo(activeCap, q.getGuaranteed()); + } + for (TempQueuePerPartition q : queues) { + q.normalizedGuarantee = Resources.divide(rc, clusterResource, + q.getGuaranteed(), activeCap); + } + } + } + + // Take the most underserved TempQueue (the one on the head). Collect and + // return the list of all queues that have the same idealAssigned + // percentage of guaranteed. + private Collection getMostUnderservedQueues( + PriorityQueue orderedByNeed, + TQComparator tqComparator) { + ArrayList underserved = new ArrayList<>(); + while (!orderedByNeed.isEmpty()) { + TempQueuePerPartition q1 = orderedByNeed.remove(); + underserved.add(q1); + + // Add underserved queues in order for later uses + context.addPartitionToUnderServedQueues(q1.queueName, q1.partition); + TempQueuePerPartition q2 = orderedByNeed.peek(); + // q1's pct of guaranteed won't be larger than q2's. If it's less, then + // return what has already been collected. Otherwise, q1's pct of + // guaranteed == that of q2, so add q2 to underserved list during the + // next pass. + if (q2 == null || tqComparator.compare(q1, q2) < 0) { + return underserved; + } + } + return underserved; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java index c52127d..8fd7f0e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java @@ -19,11 +19,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import java.util.Collection; +import java.util.LinkedHashSet; import java.util.Set; interface CapacitySchedulerPreemptionContext { @@ -49,4 +51,16 @@ TempQueuePerPartition getQueueByPartition(String queueName, Set getLeafQueueNames(); Set getAllPartitions(); + + int getClusterMaxApplicationPriority(); + + Resource getPartitionResource(String partition); + + LinkedHashSet getUnderServedQueuesPerPartition(String partition); + + void addPartitionToUnderServedQueues(String queueName, String partition); + + float getMaxIgnoredOverCapacityForIntraQueue(); + + float getMaxAllowablePreemptLimitForIntraQueue(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java index 42d8730..bddbf2e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java @@ -22,10 +22,11 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.util.resource.Resources; +import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -40,7 +41,8 @@ continue; } - // Only add resToObtainByPartition when actuallyToBePreempted resource >= 0 + // Only add resToObtainByPartition when actuallyToBePreempted resource >= + // 0 if (Resources.greaterThan(context.getResourceCalculator(), clusterResource, qT.getActuallyToBePreempted(), Resources.none())) { resToObtainByPartition.put(qT.partition, @@ -57,8 +59,8 @@ public static boolean isContainerAlreadySelected(RMContainer container, return false; } - Set containers = selectedCandidates.get( - container.getApplicationAttemptId()); + Set containers = selectedCandidates + .get(container.getApplicationAttemptId()); if (containers == null) { return false; } @@ -70,8 +72,8 @@ public static void deductPreemptableResourcesBasedSelectedCandidates( Map> selectedCandidates) { for (Set containers : selectedCandidates.values()) { for (RMContainer c : containers) { - SchedulerNode schedulerNode = context.getScheduler().getSchedulerNode( - c.getAllocatedNode()); + SchedulerNode schedulerNode = context.getScheduler() + .getSchedulerNode(c.getAllocatedNode()); if (null == schedulerNode) { continue; } @@ -89,8 +91,27 @@ public static void deductPreemptableResourcesBasedSelectedCandidates( if (null != res) { tq.deductActuallyToBePreempted(context.getResourceCalculator(), tq.totalPartitionResource, res); + Collection tas = tq.getApps(); + if (null == tas || tas.isEmpty()) { + continue; + } + + deductPreemptableResourcePerApp(context, tq.totalPartitionResource, + tas, res, partition); } } } } + + private static void deductPreemptableResourcePerApp( + CapacitySchedulerPreemptionContext context, + Resource totalPartitionResource, Collection tas, + Resource res, String partition) { + for (TempAppPerPartition ta : tas) { + Resource demand = ta.pendingPerPartition.get(partition); + ta.deductActuallyToBePreempted(context.getResourceCalculator(), + totalPartitionResource, res, partition); + Resources.subtractFrom(demand, res); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java index 9df395d..59024dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java @@ -194,16 +194,6 @@ private void preemptAMContainers(Resource clusterResource, skippedAMContainerlist.clear(); } - private boolean preemptMapContains( - Map> preemptMap, - ApplicationAttemptId attemptId, RMContainer rmContainer) { - Set rmContainers; - if (null == (rmContainers = preemptMap.get(attemptId))) { - return false; - } - return rmContainers.contains(rmContainer); - } - /** * Return should we preempt rmContainer. If we should, deduct from * resourceToObtainByPartition @@ -331,35 +321,4 @@ private void preemptFrom(FiCaSchedulerApp app, clusterResource, selectedContainers, totalPreemptionAllowed); } } - - /** - * Compare by reversed priority order first, and then reversed containerId - * order - * @param containers - */ - @VisibleForTesting - static void sortContainers(List containers){ - Collections.sort(containers, new Comparator() { - @Override - public int compare(RMContainer a, RMContainer b) { - int schedKeyComp = b.getAllocatedSchedulerKey() - .compareTo(a.getAllocatedSchedulerKey()); - if (schedKeyComp != 0) { - return schedKeyComp; - } - return b.getContainerId().compareTo(a.getContainerId()); - } - }); - } - - private void addToPreemptMap( - Map> preemptMap, - ApplicationAttemptId appAttemptId, RMContainer containerToPreempt) { - Set set; - if (null == (set = preemptMap.get(appAttemptId))) { - set = new HashSet<>(); - preemptMap.put(appAttemptId, set); - } - set.add(containerToPreempt); - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPolicy.java new file mode 100644 index 0000000..2c9b1f2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPolicy.java @@ -0,0 +1,259 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.Map.Entry; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueuePreemptableResourceCalculator.TAPriorityComparator; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueuePreemptableResourceCalculator.TAReverseComparator; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +public class FifoIntraQueuePreemptionPolicy + implements + IntraQueuePreemptionComputePlugin { + + protected final CapacitySchedulerPreemptionContext context; + protected final ResourceCalculator rc; + + public FifoIntraQueuePreemptionPolicy(ResourceCalculator rc, + CapacitySchedulerPreemptionContext preemptionContext) { + this.context = preemptionContext; + this.rc = rc; + } + + @Override + public Map getResourceDemandFromAppsPerQueue(LeafQueue leafQueue, + String partition) { + + Map resToObtainByPartition = new HashMap<>(); + TempQueuePerPartition tq = context + .getQueueByPartition(leafQueue.getQueueName(), partition); + + Collection appsOrderedByPriority = tq.getApps(); + Resource actualPreemptNeeded = null; + + for (TempAppPerPartition a1 : appsOrderedByPriority) { + for (Entry entry : a1.getPendingPerPartition() + .entrySet()) { + String label = entry.getKey(); + + // Updating pending resource per-partition level. + if ((actualPreemptNeeded = resToObtainByPartition.get(label)) == null) { + actualPreemptNeeded = Resources.createResource(0, 0); + resToObtainByPartition.put(label, actualPreemptNeeded); + } + + Resources.addTo(actualPreemptNeeded, a1.getActuallyToBePreempted()); + } + } + return resToObtainByPartition; + } + + @Override + public void computeAppsIdealAllocation(Resource clusterResource, + Resource partitionBasedResource, TempQueuePerPartition tq, + Map> selectedCandidates, + Resource totalPreemptedResourceAllowed, float maxAllowablePreemptLimit) { + + Collection apps = tq.leafQueue.getAllApplications(); + String partition = tq.partition; + + TAPriorityComparator taComparator = new TAPriorityComparator(); + PriorityQueue orderedByPriority = createTempAppForResourceCalculcation( + rc, apps, taComparator); + + // Apps ordered from highest to lower priority. + ArrayList orderedApps = getHighPriorityApps( + orderedByPriority, taComparator); + for (TempAppPerPartition tmpApp : orderedApps) { + // Once unallocated resource is 0, we can stop assigning ideal per app. + if (Resources.lessThanOrEqual(rc, clusterResource, tq.unAllocated, + Resources.none())) { + break; + } + + // Calculate total selected container size from current app. + Resource selected = getSelectedResourcePerApp(selectedCandidates, tmpApp); + + // For any app, used+pending will give its idealAssigned. However it will + // be tightly linked to queue's unallocated quota. So lower priority apps + // idealAssigned may fall to 0 if higher priority apps demand is more. + Resource initialIdealAssigned = Resources.add(tmpApp.getUsed(), + tmpApp.pending); + Resources.subtractFrom(initialIdealAssigned, selected); + + // Can skip apps which are already crossing user-limit. + // For this, Get the userlimit from scheduler and ensure that app is + // not crossing userlimit here. Such apps can be skipped. + Resource userHeadroom = tq.leafQueue.getUserLimitHeadRoomPerApp( + tmpApp.getFiCaSchedulerApp(), partitionBasedResource, partition); + if (Resources.lessThanOrEqual(rc, partitionBasedResource, userHeadroom, + Resources.none())) { + continue; + } + + tmpApp.idealAssigned = Resources.min(rc, clusterResource, tq.unAllocated, + initialIdealAssigned); + + // Also set how resource is to needed by this app in real from others. + Resource remainingUsed = Resources.subtract(tmpApp.getUsed(), selected); + tmpApp.setToBePreemptFromOther( + Resources.subtract(tmpApp.idealAssigned, remainingUsed)); + + Resources.subtractFrom(tq.unAllocated, tmpApp.idealAssigned); + } + + // A configurable limit that could define an ideal allowable preemption + // limit. + // Based on current queue's capacity, defined how much % could become + // preemptable. + Resource maxIntraQueuePreemptable = Resources.multiply(tq.getGuaranteed(), + maxAllowablePreemptLimit); + Resources.subtractFrom(maxIntraQueuePreemptable, + tq.getActuallyToBePreempted()); + + // WE have two configurations here, one is intra queue limit and second one + // is per-round limit for any time preemption. Take a minimum of these two + Resource preemptionLimit = Resources.min(rc, clusterResource, + maxIntraQueuePreemptable, totalPreemptedResourceAllowed); + + // Re-sort the collection to get apps from lower to high priority to + // calculate preemptable resource per app. + Collections.sort(orderedApps, new TAReverseComparator()); + + for (TempAppPerPartition tmpApp : orderedApps) { + if (Resources.lessThanOrEqual(rc, clusterResource, preemptionLimit, + Resources.none())) { + break; + } + + Resource preemtableFromApp = Resources.subtract(tmpApp.getUsed(), + tmpApp.idealAssigned); + Resources.subtractFrom(preemtableFromApp, tmpApp.selected); + + // Calculate toBePreempted from apps as follows: + // app.preemptable = min(max(app.used - app.selected - app.ideal, 0), + // intra_q_preemptable) + tmpApp.toBePreempted = Resources.min(rc, clusterResource, Resources + .max(rc, clusterResource, preemtableFromApp, Resources.none()), + preemptionLimit); + + preemptionLimit = Resources.subtract(preemptionLimit, + tmpApp.toBePreempted); + } + + // Save all apps (l to h) to temp queue for further reference + tq.addAllApps(orderedApps); + + // There are chances that we may preempt for the demand from same + // priority level, such cases are to be validated out. + tq.validateOutSameAppPriorityFromDemand(rc, clusterResource, + (ArrayList) tq.getApps()); + } + + // Take the most underserved TempQueue (the one on the head). Collect and + // return the list of all queues that have the same idealAssigned + // percentage of guaranteed. + private ArrayList getHighPriorityApps( + PriorityQueue orderedByNeed, + TAPriorityComparator taComparator) { + ArrayList underserved = new ArrayList<>(); + while (!orderedByNeed.isEmpty()) { + TempAppPerPartition a1 = orderedByNeed.remove(); + underserved.add(a1); + TempAppPerPartition a2 = orderedByNeed.peek(); + // q1's pct of guaranteed won't be larger than q2's. If it's less, then + // return what has already been collected. Otherwise, q1's pct of + // guaranteed == that of q2, so add q2 to underserved list during the + // next pass. + if (a2 == null || taComparator.compare(a1, a2) > 0) { + return underserved; + } + } + + return underserved; + } + + private Resource getSelectedResourcePerApp( + Map> selectedCandidates, + TempAppPerPartition tmpApp) { + tmpApp.selected = Resources.createResource(0, 0); + Set containers = selectedCandidates + .get(tmpApp.app.getApplicationAttemptId()); + + if (containers == null) { + return tmpApp.selected; + } + for (RMContainer cont : containers) { + Resources.addTo(tmpApp.selected, cont.getAllocatedResource()); + } + + return tmpApp.selected; + } + + private PriorityQueue createTempAppForResourceCalculcation( + ResourceCalculator rc, Collection apps, + TAPriorityComparator taComparator) { + PriorityQueue orderedByPriority = new PriorityQueue<>(100, + taComparator); + + // have an internal temp app structure to store intermediate data (priority) + for (FiCaSchedulerApp app : apps) { + + Set partitions = app.getAppAttemptResourceUsage() + .getNodePartitionsSet(); + HashMap usedPerPartition = new HashMap(); + for (String partition : partitions) { + usedPerPartition.put(partition, + app.getAppAttemptResourceUsage().getUsed(partition)); + } + + // Create TempAppPerQueue for further calculation. + TempAppPerPartition tmpApp = new TempAppPerPartition(app.getQueueName(), + app.getApplicationId(), usedPerPartition, app.getCurrentReservation(), + app.getTotalPendingRequests(), + (HashMap) app.getTotalPendingRequestsPerPartition(), + app.getPriority().getPriority(), app); + + // Set ideal allocation of app as 0. + tmpApp.idealAssigned = Resources.createResource(0, 0); + + // Skip an app which doesn't have any outstanding resource requests + if (Resources.lessThanOrEqual(rc, Resources.none(), + app.getTotalPendingRequests(), Resources.none())) { + continue; + } + orderedByPriority.add(tmpApp); + } + return orderedByPriority; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java new file mode 100644 index 0000000..c21c64c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.util.ArrayList; + +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + + +public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector { + private static final Log LOG = LogFactory + .getLog(IntraQueueCandidatesSelector.class); + + private IntraQueuePreemptableResourceCalculator intraQPreemptableAmountCalculator; + IntraQueuePreemptionComputePlugin priorityBasedPolicy = null; + + IntraQueueCandidatesSelector( + CapacitySchedulerPreemptionContext preemptionContext) { + super(preemptionContext); + priorityBasedPolicy = new FifoIntraQueuePreemptionPolicy(rc, preemptionContext); + intraQPreemptableAmountCalculator = new IntraQueuePreemptableResourceCalculator( + preemptionContext, true, priorityBasedPolicy); + } + + @Override + public Map> selectCandidates( + Map> selectedCandidates, + Resource clusterResource, Resource totalPreemptedResourceAllowed) { + + // 1. Calculate the abnormality within each queue one by one. + intraQPreemptableAmountCalculator.computeIntraQueuePreemptionDemand( + clusterResource, totalPreemptedResourceAllowed, selectedCandidates); + + // 2. Previous selectors (with higher priority) could have already + // selected containers. We need to deduct pre-emptable resources + // based on already selected candidates. + CapacitySchedulerPreemptionUtils + .deductPreemptableResourcesBasedSelectedCandidates(preemptionContext, + selectedCandidates); + + // 3. Loop through all partitions to calculate demand + for (String partition : preemptionContext.getAllPartitions()) { + LinkedHashSet queueNames = preemptionContext + .getUnderServedQueuesPerPartition(partition); + + // Error check to handle non-mapped labels to queue. + if (null == queueNames) { + continue; + } + + // 4. Iterate from most under-served queue in order. + for (String queueName : queueNames) { + LeafQueue leafQueue = preemptionContext.getQueueByPartition(queueName, + RMNodeLabelsManager.NO_LABEL).leafQueue; + + // skip if not a leafqueue + if(null == leafQueue) { + continue; + } + + // 5. Calculate the resource to obtain per partition + Map resToObtainByPartition = priorityBasedPolicy + .getResourceDemandFromAppsPerQueue(leafQueue, partition); + + synchronized (leafQueue) { + Iterator desc = leafQueue.getOrderingPolicy() + .getPreemptionIterator(); + while (desc.hasNext()) { + FiCaSchedulerApp app = desc.next(); + preemptFromLeastStarvedApp(selectedCandidates, clusterResource, + totalPreemptedResourceAllowed, resToObtainByPartition, + leafQueue, app); + } + } + } + } + + return selectedCandidates; + } + + private void preemptFromLeastStarvedApp( + Map> selectedCandidates, + Resource clusterResource, Resource totalPreemptedResourceAllowed, + Map resToObtainByPartition, LeafQueue leafQueue, FiCaSchedulerApp app) { + + // ToDo: Reuse reservation selector here. + + List liveContainers = new ArrayList<>( + app.getLiveContainers()); + sortContainers(liveContainers); + + for (RMContainer c : liveContainers) { + + // if there are no demand, return. + if (resToObtainByPartition.isEmpty()) { + return; + } + + String nodePartition = getPartitionByNodeId(c.getAllocatedNode()); + Resource toObtainByPartition = resToObtainByPartition.get(nodePartition); + + // When we have no more resource need to obtain, remove from map. + if (Resources.lessThanOrEqual(rc, clusterResource, toObtainByPartition, + Resources.none())) { + resToObtainByPartition.remove(nodePartition); + } + + // skip preselected containers. + if (CapacitySchedulerPreemptionUtils.isContainerAlreadySelected(c, + selectedCandidates)) { + continue; + } + + // Skip already marked to killable containers + if (null != preemptionContext.getKillableContainers() && preemptionContext + .getKillableContainers().contains(c.getContainerId())) { + continue; + } + + // Skip AM Container from preemption for now. + // ToDo: Need to decide whether intra preemption need to take AM containers + // or not. + if (c.isAMContainer()) { + continue; + } + + // Try to preempt this container + tryPreemptContainerAndDeductResToObtain(toObtainByPartition, c, + clusterResource, selectedCandidates, totalPreemptedResourceAllowed); + } + } + + private String getPartitionByNodeId(NodeId nodeId) { + return preemptionContext.getScheduler().getSchedulerNode(nodeId) + .getPartition(); + } + + + /** + * Return should we preempt rmContainer. If we should, deduct from + * resourceToObtainByPartition + */ + private boolean tryPreemptContainerAndDeductResToObtain( + Resource actualPreemptNeeded, RMContainer rmContainer, + Resource clusterResource, + Map> preemptMap, + Resource totalPreemptionAllowed) { + ApplicationAttemptId attemptId = rmContainer.getApplicationAttemptId(); + + // We will not account resource of a container twice or more + if (preemptMapContains(preemptMap, attemptId, rmContainer)) { + return false; + } + + Resource toObtainByPartition = actualPreemptNeeded; + + if (null != toObtainByPartition + && Resources.greaterThan(rc, clusterResource, toObtainByPartition, + Resources.none()) + && Resources.fitsIn(rc, clusterResource, + rmContainer.getAllocatedResource(), totalPreemptionAllowed)) { + Resources.subtractFrom(toObtainByPartition, + rmContainer.getAllocatedResource()); + Resources.subtractFrom(totalPreemptionAllowed, + rmContainer.getAllocatedResource()); + + if (LOG.isDebugEnabled()) { + LOG.debug(this.getClass().getName() + " Marked container=" + + rmContainer.getContainerId() + " queue=" + + rmContainer.getQueueName() + " to be preemption candidates"); + } + // Add to preemptMap + addToPreemptMap(preemptMap, attemptId, rmContainer); + return true; + } + + return false; + } + + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptableResourceCalculator.java new file mode 100644 index 0000000..593dec9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptableResourceCalculator.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.util.Collection; +import java.util.Comparator; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; + +/** + * Calculate how much resources need to be preempted for each queue, + * will be used by {@link PreemptionCandidatesSelector} + */ +public class IntraQueuePreemptableResourceCalculator + extends + AbstractPreemptableResourceCalculator { + + protected IntraQueuePreemptionComputePlugin priorityBasedPolicy; + + static class TAReverseComparator implements Comparator { + + @Override + public int compare(TempAppPerPartition tq1, TempAppPerPartition tq2) { + Priority p1 = Priority.newInstance(tq1.getPriority()); + Priority p2 = Priority.newInstance(tq2.getPriority()); + + if (!p1.equals(p2)) { + return p2.compareTo(p1); + } + return tq2.getApplicationId().compareTo(tq1.getApplicationId()); + } + } + + static class TAPriorityComparator implements Comparator { + + @Override + public int compare(TempAppPerPartition tq1, TempAppPerPartition tq2) { + Priority p1 = Priority.newInstance(tq1.getPriority()); + Priority p2 = Priority.newInstance(tq2.getPriority()); + + if (!p1.equals(p2)) { + return p1.compareTo(p2); + } + return tq1.getApplicationId().compareTo(tq2.getApplicationId()); + } + } + /** + * PreemptableResourceCalculator constructor + * + * @param preemptionContext + * @param isReservedPreemptionCandidatesSelector + * this will be set by different implementation of candidate + * selectors, please refer to TempQueuePerPartition#offer for + * details. + * @param priorityBasedPolicy + */ + public IntraQueuePreemptableResourceCalculator( + CapacitySchedulerPreemptionContext preemptionContext, + boolean isReservedPreemptionCandidatesSelector, + IntraQueuePreemptionComputePlugin priorityBasedPolicy) { + super(preemptionContext, isReservedPreemptionCandidatesSelector); + this.priorityBasedPolicy = priorityBasedPolicy; + } + + public void computeIntraQueuePreemptionDemand(Resource clusterResource, + Resource totalPreemptedResourceAllowed, + Map> selectedCandidates) { + + for (String partition : context.getAllPartitions()) { + LinkedHashSet queueNames = context + .getUnderServedQueuesPerPartition(partition); + + if (null == queueNames) { + continue; + } + + // Its better to get partition based resource limit earlier before + // starting calculation + Resource partitionBasedResource = context.getPartitionResource(partition); + + for (String queueName : queueNames) { + TempQueuePerPartition tq = context.getQueueByPartition(queueName, + partition); + LeafQueue leafQueue = tq.leafQueue; + + // skipp if its parent queue + if (null == leafQueue) { + continue; + } + tq.setUnAllocated( + Resources.subtract(tq.getGuaranteed(), tq.getActuallyToBePreempted())); + + Collection apps = leafQueue.getAllApplications(); + + if (apps.size() == 1) { + // We do not need preemption for a single app + continue; + } + + // Check queue's used capacity. Make sure that the used capacity is + // above certain limit to consider for intra queue preemption. + if (leafQueue.getUsedCapacity() < context + .getMaxIgnoredOverCapacityForIntraQueue()) { + continue; + } + + // compute the allocation of all apps based on queue's unallocated + // capacity + priorityBasedPolicy.computeAppsIdealAllocation(clusterResource, + partitionBasedResource, tq, selectedCandidates, + totalPreemptedResourceAllowed, + context.getMaxAllowablePreemptLimitForIntraQueue()); + } + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java new file mode 100644 index 0000000..2316ea3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; + + +interface IntraQueuePreemptionComputePlugin { + + Map getResourceDemandFromAppsPerQueue(LeafQueue leafQueue, + String partition); + + void computeAppsIdealAllocation(Resource clusterResource, + Resource partitionBasedResource, TempQueuePerPartition tq, + Map> selectedCandidates, + Resource totalPreemptedResourceAllowed, float maxAllowablePreemptLimit); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java index d1d2485..ae0776d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java @@ -27,61 +27,22 @@ import org.apache.hadoop.yarn.util.resource.Resources; import java.util.ArrayList; -import java.util.Collection; -import java.util.Comparator; import java.util.HashSet; -import java.util.Iterator; import java.util.List; -import java.util.PriorityQueue; import java.util.Set; /** * Calculate how much resources need to be preempted for each queue, * will be used by {@link PreemptionCandidatesSelector} */ -public class PreemptableResourceCalculator { +public class PreemptableResourceCalculator + extends + AbstractPreemptableResourceCalculator { private static final Log LOG = LogFactory.getLog(PreemptableResourceCalculator.class); - private final CapacitySchedulerPreemptionContext context; - private final ResourceCalculator rc; private boolean isReservedPreemptionCandidatesSelector; - static class TQComparator implements Comparator { - private ResourceCalculator rc; - private Resource clusterRes; - - TQComparator(ResourceCalculator rc, Resource clusterRes) { - this.rc = rc; - this.clusterRes = clusterRes; - } - - @Override - public int compare(TempQueuePerPartition tq1, TempQueuePerPartition tq2) { - if (getIdealPctOfGuaranteed(tq1) < getIdealPctOfGuaranteed(tq2)) { - return -1; - } - if (getIdealPctOfGuaranteed(tq1) > getIdealPctOfGuaranteed(tq2)) { - return 1; - } - return 0; - } - - // Calculates idealAssigned / guaranteed - // TempQueues with 0 guarantees are always considered the most over - // capacity and therefore considered last for resources. - private double getIdealPctOfGuaranteed(TempQueuePerPartition q) { - double pctOver = Integer.MAX_VALUE; - if (q != null && Resources.greaterThan(rc, clusterRes, - q.getGuaranteed(), - Resources.none())) { - pctOver = Resources.divide(rc, clusterRes, q.idealAssigned, - q.getGuaranteed()); - } - return (pctOver); - } - } - /** * PreemptableResourceCalculator constructor * @@ -93,136 +54,7 @@ private double getIdealPctOfGuaranteed(TempQueuePerPartition q) { public PreemptableResourceCalculator( CapacitySchedulerPreemptionContext preemptionContext, boolean isReservedPreemptionCandidatesSelector) { - context = preemptionContext; - rc = preemptionContext.getResourceCalculator(); - this.isReservedPreemptionCandidatesSelector = - isReservedPreemptionCandidatesSelector; - } - - /** - * Computes a normalizedGuaranteed capacity based on active queues - * @param rc resource calculator - * @param clusterResource the total amount of resources in the cluster - * @param queues the list of queues to consider - */ - private void resetCapacity(ResourceCalculator rc, Resource clusterResource, - Collection queues, boolean ignoreGuar) { - Resource activeCap = Resource.newInstance(0, 0); - - if (ignoreGuar) { - for (TempQueuePerPartition q : queues) { - q.normalizedGuarantee = 1.0f / queues.size(); - } - } else { - for (TempQueuePerPartition q : queues) { - Resources.addTo(activeCap, q.getGuaranteed()); - } - for (TempQueuePerPartition q : queues) { - q.normalizedGuarantee = Resources.divide(rc, clusterResource, - q.getGuaranteed(), activeCap); - } - } - } - - // Take the most underserved TempQueue (the one on the head). Collect and - // return the list of all queues that have the same idealAssigned - // percentage of guaranteed. - protected Collection getMostUnderservedQueues( - PriorityQueue orderedByNeed, - TQComparator tqComparator) { - ArrayList underserved = new ArrayList<>(); - while (!orderedByNeed.isEmpty()) { - TempQueuePerPartition q1 = orderedByNeed.remove(); - underserved.add(q1); - TempQueuePerPartition q2 = orderedByNeed.peek(); - // q1's pct of guaranteed won't be larger than q2's. If it's less, then - // return what has already been collected. Otherwise, q1's pct of - // guaranteed == that of q2, so add q2 to underserved list during the - // next pass. - if (q2 == null || tqComparator.compare(q1,q2) < 0) { - return underserved; - } - } - return underserved; - } - - - /** - * Given a set of queues compute the fix-point distribution of unassigned - * resources among them. As pending request of a queue are exhausted, the - * queue is removed from the set and remaining capacity redistributed among - * remaining queues. The distribution is weighted based on guaranteed - * capacity, unless asked to ignoreGuarantee, in which case resources are - * distributed uniformly. - */ - private void computeFixpointAllocation(ResourceCalculator rc, - Resource tot_guarant, Collection qAlloc, - Resource unassigned, boolean ignoreGuarantee) { - // Prior to assigning the unused resources, process each queue as follows: - // If current > guaranteed, idealAssigned = guaranteed + untouchable extra - // Else idealAssigned = current; - // Subtract idealAssigned resources from unassigned. - // If the queue has all of its needs met (that is, if - // idealAssigned >= current + pending), remove it from consideration. - // Sort queues from most under-guaranteed to most over-guaranteed. - TQComparator tqComparator = new TQComparator(rc, tot_guarant); - PriorityQueue orderedByNeed = new PriorityQueue<>(10, - tqComparator); - for (Iterator i = qAlloc.iterator(); i.hasNext();) { - TempQueuePerPartition q = i.next(); - Resource used = q.getUsed(); - - if (Resources.greaterThan(rc, tot_guarant, used, - q.getGuaranteed())) { - q.idealAssigned = Resources.add( - q.getGuaranteed(), q.untouchableExtra); - } else { - q.idealAssigned = Resources.clone(used); - } - Resources.subtractFrom(unassigned, q.idealAssigned); - // If idealAssigned < (allocated + used + pending), q needs more resources, so - // add it to the list of underserved queues, ordered by need. - Resource curPlusPend = Resources.add(q.getUsed(), q.pending); - if (Resources.lessThan(rc, tot_guarant, q.idealAssigned, curPlusPend)) { - orderedByNeed.add(q); - } - } - - //assign all cluster resources until no more demand, or no resources are left - while (!orderedByNeed.isEmpty() - && Resources.greaterThan(rc,tot_guarant, unassigned,Resources.none())) { - Resource wQassigned = Resource.newInstance(0, 0); - // we compute normalizedGuarantees capacity based on currently active - // queues - resetCapacity(rc, unassigned, orderedByNeed, ignoreGuarantee); - - // For each underserved queue (or set of queues if multiple are equally - // underserved), offer its share of the unassigned resources based on its - // normalized guarantee. After the offer, if the queue is not satisfied, - // place it back in the ordered list of queues, recalculating its place - // in the order of most under-guaranteed to most over-guaranteed. In this - // way, the most underserved queue(s) are always given resources first. - Collection underserved = - getMostUnderservedQueues(orderedByNeed, tqComparator); - for (Iterator i = underserved.iterator(); i - .hasNext();) { - TempQueuePerPartition sub = i.next(); - Resource wQavail = Resources.multiplyAndNormalizeUp(rc, - unassigned, sub.normalizedGuarantee, Resource.newInstance(1, 1)); - Resource wQidle = sub.offer(wQavail, rc, tot_guarant, - isReservedPreemptionCandidatesSelector); - Resource wQdone = Resources.subtract(wQavail, wQidle); - - if (Resources.greaterThan(rc, tot_guarant, - wQdone, Resources.none())) { - // The queue is still asking for more. Put it back in the priority - // queue, recalculating its order based on need. - orderedByNeed.add(sub); - } - Resources.addTo(wQassigned, wQdone); - } - Resources.subtractFrom(unassigned, wQassigned); - } + super(preemptionContext, isReservedPreemptionCandidatesSelector); } /** @@ -321,13 +153,12 @@ private void recursivelyComputeIdealAssignment( computeIdealResourceDistribution(rc, root.getChildren(), totalPreemptionAllowed, root.idealAssigned); // compute recursively for lower levels and build list of leafs - for(TempQueuePerPartition t : root.getChildren()) { + for (TempQueuePerPartition t : root.getChildren()) { recursivelyComputeIdealAssignment(t, totalPreemptionAllowed); } } } - private void calculateResToObtainByPartitionForLeafQueues( Set leafQueueNames, Resource clusterResource) { // Loop all leaf queues diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java index dd33d8f..1d86787 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java @@ -23,6 +23,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import com.google.common.annotations.VisibleForTesting; + +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; @@ -49,4 +55,47 @@ public abstract Map> selectCandidates( Map> selectedCandidates, Resource clusterResource, Resource totalPreemptedResourceAllowed); + + /** + * Compare by reversed priority order first, and then reversed containerId + * order + * + * @param containers + */ + // ToDo: reuse form FiFoCandidatesSelector + @VisibleForTesting + static void sortContainers(List containers) { + Collections.sort(containers, new Comparator() { + @Override + public int compare(RMContainer a, RMContainer b) { + int schedKeyComp = b.getAllocatedSchedulerKey() + .compareTo(a.getAllocatedSchedulerKey()); + if (schedKeyComp != 0) { + return schedKeyComp; + } + return b.getContainerId().compareTo(a.getContainerId()); + } + }); + } + + static protected void addToPreemptMap( + Map> preemptMap, + ApplicationAttemptId appAttemptId, RMContainer containerToPreempt) { + Set set; + if (null == (set = preemptMap.get(appAttemptId))) { + set = new HashSet<>(); + preemptMap.put(appAttemptId, set); + } + set.add(containerToPreempt); + } + + static protected boolean preemptMapContains( + Map> preemptMap, + ApplicationAttemptId attemptId, RMContainer rmContainer) { + Set rmContainers; + if (null == (rmContainers = preemptMap.get(attemptId))) { + return false; + } + return rmContainers.contains(rmContainer); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 36383502..a35eee4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -52,6 +52,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -91,6 +92,9 @@ private boolean observeOnly; private boolean lazyPreempionEnabled; + private float maxAllowablePreemptLimitForIntraQueue; + private float maxIgnoredOverCapacityForIntraQueue; + // Pointer to other RM components private RMContext rmContext; private ResourceCalculator rc; @@ -102,6 +106,8 @@ new HashMap<>(); private Map> queueToPartitions = new HashMap<>(); + private Map> partitionToUnderServedQueues = + new HashMap>(); private List candidatesSelectionPolicies = new ArrayList<>(); private Set allPartitions; @@ -171,6 +177,14 @@ public void init(Configuration config, RMContext context, CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED, CapacitySchedulerConfiguration.DEFAULT_LAZY_PREEMPTION_ENABLED); + maxAllowablePreemptLimitForIntraQueue = csConfig.getFloat( + CapacitySchedulerConfiguration.MAX_ALLOWABLE_PREEMPTION_LIMIT_FOR_INTRA_QUEUE, + CapacitySchedulerConfiguration.DEFAULT_MAX_ALLOWABLE_PREEMPTION_LIMIT_FOR_INTRA_QUEUE); + + maxIgnoredOverCapacityForIntraQueue = csConfig.getFloat( + CapacitySchedulerConfiguration.MAX_IGNORED_OVER_CAPACITY_FOR_INTRA_QUEUE, + CapacitySchedulerConfiguration.DEFAULT_MAX_IGNORED_OVER_CAPACITY_FOR_INTRA_QUEUE); + rc = scheduler.getResourceCalculator(); nlm = scheduler.getRMContext().getNodeLabelManager(); @@ -186,8 +200,16 @@ public void init(Configuration config, RMContext context, // initialize candidates preemption selection policies candidatesSelectionPolicies.add( new FifoCandidatesSelector(this)); + + // Do we need to specially consider intra queue + boolean selectIntraQueuePreemptCandidatesByPriority = csConfig.getBoolean( + CapacitySchedulerConfiguration.SELECT_CANDIDATES_FOR_INTRAQUEUE_POLICIES, + CapacitySchedulerConfiguration.DEFAULT_SELECT_CANDIDATES_FOR_INTRAQUEUE_POLICIES); + if (selectIntraQueuePreemptCandidatesByPriority) { + candidatesSelectionPolicies.add(new IntraQueueCandidatesSelector(this)); + } } - + @Override public ResourceCalculator getResourceCalculator() { return rc; @@ -234,6 +256,7 @@ private void preemptOrkillSelectedContainerAfterWait( // not have to raise another event. continue; } + //otherwise just send preemption events rmContext.getDispatcher().getEventHandler().handle( new ContainerPreemptEvent(appAttemptId, container, @@ -542,4 +565,40 @@ public double getNaturalTerminationFactor() { Map> getQueuePartitions() { return queueToPartitions; } + + @Override + public int getClusterMaxApplicationPriority() { + return scheduler.getMaxClusterLevelAppPriority().getPriority(); + } + + @Override + public float getMaxAllowablePreemptLimitForIntraQueue() { + return maxAllowablePreemptLimitForIntraQueue; + } + + @Override + public float getMaxIgnoredOverCapacityForIntraQueue() { + return maxIgnoredOverCapacityForIntraQueue; + } + + @Override + public Resource getPartitionResource(String partition) { + return Resources.clone(nlm.getResourceByLabel(partition, + Resources.clone(scheduler.getClusterResource()))); + } + + public LinkedHashSet getUnderServedQueuesPerPartition(String partition) { + return partitionToUnderServedQueues.get(partition); + } + + public void addPartitionToUnderServedQueues(String queueName, + String partition) { + LinkedHashSet underServedQueues; + if (null == (underServedQueues = partitionToUnderServedQueues + .get(partition))) { + underServedQueues = new LinkedHashSet(); + partitionToUnderServedQueues.put(partition, underServedQueues); + } + underServedQueues.add(queueName); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java new file mode 100644 index 0000000..e52ba27 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.util.HashMap; + +/** + * Temporary data-structure tracking resource availability, pending resource + * need, current utilization for an application. + */ +public class TempAppPerPartition { + // Following fields are copied from scheduler + final String queueName; + final Resource pending; + final HashMap pendingPerPartition; + private final HashMap usedPerPartition; + private final Resource reserved; + + // Following fields are settled and used by candidate selection policies + Resource idealAssigned; + Resource toBePreempted; + Resource selected; + private Resource actuallyToBePreempted; + private Resource toBePreemptFromOther; + + private final int priority; + private final ApplicationId applicationId; + LeafQueue leafQueue; + FiCaSchedulerApp app; + + TempAppPerPartition(String queueName, ApplicationId applicationId, + HashMap usedPerPartition, Resource reserved, + Resource pending, HashMap pendingPerPartition, + int priority, FiCaSchedulerApp app) { + this.queueName = queueName; + this.usedPerPartition = usedPerPartition; + this.pending = pending; + this.pendingPerPartition = pendingPerPartition; + + this.idealAssigned = Resource.newInstance(0, 0); + this.actuallyToBePreempted = Resource.newInstance(0, 0); + this.toBePreempted = Resource.newInstance(0, 0); + this.toBePreemptFromOther = Resource.newInstance(0, 0); + this.selected = Resource.newInstance(0, 0); + + this.reserved = reserved; + this.priority = priority; + this.applicationId = applicationId; + this.app = app; + } + + public void setLeafQueue(LeafQueue l) { + this.leafQueue = l; + } + + public Resource getUsed() { + return usedPerPartition.get(""); + } + + public Resource getUsed(String partition) { + return usedPerPartition.get(partition); + } + + public HashMap getPendingPerPartition() { + return pendingPerPartition; + } + + public FiCaSchedulerApp getFiCaSchedulerApp() { + return app; + } + + public Resource getGuaranteed() { + return Resources.none(); + } + + public void updateDemand(Resource demand, String partition) { + Resource perPartitionDemand = pendingPerPartition.get(partition); + + if (perPartitionDemand == null) { + // Should not happen. + return; + } + + Resources.subtractFrom(perPartitionDemand, demand); + } + + public void assignPreemption(Resource killable) { + Resources.addTo(toBePreempted, killable); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(" NAME: " + getApplicationId()).append(" CUR: ").append(getUsed()) + .append(" PEN: ").append(pending).append(" RESERVED: ").append(reserved) + .append(" IDEAL_ASSIGNED: ").append(idealAssigned) + .append(" IDEAL_PREEMPT: ").append(toBePreempted) + .append(" ACTUAL_PREEMPT: ").append(actuallyToBePreempted).append("\n"); + + return sb.toString(); + } + + public Resource getActuallyToBePreempted() { + return actuallyToBePreempted; + } + + public void setActuallyToBePreempted(Resource res) { + this.actuallyToBePreempted = res; + } + + public Resource getToBePreemptFromOther() { + return toBePreemptFromOther; + } + + public void setToBePreemptFromOther(Resource res) { + this.toBePreemptFromOther = res; + } + + void appendLogString(StringBuilder sb) { + sb.append(queueName).append(", ").append(getUsed().getMemorySize()) + .append(", ").append(getUsed().getVirtualCores()).append(", ") + .append(pending.getMemorySize()).append(", ") + .append(pending.getVirtualCores()).append(", ") + .append(getGuaranteed().getMemorySize()).append(", ") + .append(getGuaranteed().getVirtualCores()).append(", ") + .append(idealAssigned.getMemorySize()).append(", ") + .append(idealAssigned.getVirtualCores()).append(", ") + .append(toBePreempted.getMemorySize()).append(", ") + .append(toBePreempted.getVirtualCores()).append(", ") + .append(actuallyToBePreempted.getMemorySize()).append(", ") + .append(actuallyToBePreempted.getVirtualCores()); + } + + public int getPriority() { + return priority; + } + + public ApplicationId getApplicationId() { + return applicationId; + } + + public void deductActuallyToBePreempted(ResourceCalculator resourceCalculator, + Resource toBeDeduct, Resource cluster, String partition) { + Resource pending = pendingPerPartition.get(partition); + if (Resources.greaterThan(resourceCalculator, cluster, pending, + toBeDeduct)) { + Resources.subtractFrom(pending, toBeDeduct); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java index 04ed135..61e05a5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java @@ -19,12 +19,19 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueuePreemptableResourceCalculator.TAReverseComparator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.ListIterator; +import java.util.TreeSet; /** * Temporary data-structure tracking resource availability, pending resource @@ -48,13 +55,19 @@ Resource toBePreempted; Resource untouchableExtra; Resource preemptableExtra; + Resource unAllocated; private Resource actuallyToBePreempted; + Resource selectedContainers; double normalizedGuarantee; + boolean intraQueuePreemptionCalculationDone; final ArrayList children; + private Collection apps; + private HashMap pendingPerPriorityLevel; LeafQueue leafQueue; boolean preemptionDisabled; + private int maxPriority = 0; TempQueuePerPartition(String queueName, Resource current, boolean preemptionDisabled, String partition, Resource killable, @@ -77,6 +90,7 @@ this.toBePreempted = Resource.newInstance(0, 0); this.normalizedGuarantee = Float.NaN; this.children = new ArrayList<>(); + this.apps = new ArrayList<>(); this.untouchableExtra = Resource.newInstance(0, 0); this.preemptableExtra = Resource.newInstance(0, 0); this.preemptionDisabled = preemptionDisabled; @@ -86,6 +100,8 @@ this.absMaxCapacity = absMaxCapacity; this.totalPartitionResource = totalPartitionResource; this.reserved = reserved; + this.intraQueuePreemptionCalculationDone = false; + this.pendingPerPriorityLevel = new HashMap(); } public void setLeafQueue(LeafQueue l) { @@ -95,7 +111,9 @@ public void setLeafQueue(LeafQueue l) { /** * When adding a child we also aggregate its pending resource needs. - * @param q the child queue to add to this queue + * + * @param q + * the child queue to add to this queue */ public void addChild(TempQueuePerPartition q) { assert leafQueue == null; @@ -103,7 +121,7 @@ public void addChild(TempQueuePerPartition q) { Resources.addTo(pending, q.pending); } - public ArrayList getChildren(){ + public ArrayList getChildren() { return children; } @@ -122,28 +140,30 @@ Resource offer(Resource avail, ResourceCalculator rc, Resource absMaxCapIdealAssignedDelta = Resources.componentwiseMax( Resources.subtract(getMax(), idealAssigned), Resource.newInstance(0, 0)); - // remain = avail - min(avail, (max - assigned), (current + pending - assigned)) + // remain = avail - min(avail, (max - assigned), (current + pending - + // assigned)) Resource accepted = Resources.min(rc, clusterResource, - absMaxCapIdealAssignedDelta, Resources.min(rc, clusterResource, avail, - Resources - /* - * When we're using FifoPreemptionSelector - * (considerReservedResource = false). - * - * We should deduct reserved resource to avoid excessive preemption: - * - * For example, if an under-utilized queue has used = reserved = 20. - * Preemption policy will try to preempt 20 containers - * (which is not satisfied) from different hosts. - * - * In FifoPreemptionSelector, there's no guarantee that preempted - * resource can be used by pending request, so policy will preempt - * resources repeatly. - */ - .subtract(Resources.add( - (considersReservedResource ? getUsed() : - getUsedDeductReservd()), - pending), idealAssigned))); + absMaxCapIdealAssignedDelta, + Resources.min(rc, clusterResource, avail, Resources + /* + * When we're using FifoPreemptionSelector (considerReservedResource + * = false). + * + * We should deduct reserved resource to avoid excessive preemption: + * + * For example, if an under-utilized queue has used = reserved = 20. + * Preemption policy will try to preempt 20 containers (which is not + * satisfied) from different hosts. + * + * In FifoPreemptionSelector, there's no guarantee that preempted + * resource can be used by pending request, so policy will preempt + * resources repeatly. + */ + .subtract( + Resources.add((considersReservedResource + ? getUsed() + : getUsedDeductReservd()), pending), + idealAssigned))); Resource remain = Resources.subtract(avail, accepted); Resources.addTo(idealAssigned, accepted); return remain; @@ -162,8 +182,7 @@ public void updatePreemptableExtras(ResourceCalculator rc) { untouchableExtra = Resources.none(); preemptableExtra = Resources.none(); - Resource extra = Resources.subtract(getUsed(), - getGuaranteed()); + Resource extra = Resources.subtract(getUsed(), getGuaranteed()); if (Resources.lessThan(rc, totalPartitionResource, extra, Resources.none())) { extra = Resources.none(); @@ -197,26 +216,21 @@ public void updatePreemptableExtras(ResourceCalculator rc) { @Override public String toString() { StringBuilder sb = new StringBuilder(); - sb.append(" NAME: " + queueName) - .append(" CUR: ").append(current) - .append(" PEN: ").append(pending) - .append(" RESERVED: ").append(reserved) - .append(" GAR: ").append(getGuaranteed()) - .append(" NORM: ").append(normalizedGuarantee) - .append(" IDEAL_ASSIGNED: ").append(idealAssigned) - .append(" IDEAL_PREEMPT: ").append(toBePreempted) + sb.append(" NAME: " + queueName).append(" CUR: ").append(current) + .append(" PEN: ").append(pending).append(" RESERVED: ").append(reserved) + .append(" GAR: ").append(getGuaranteed()).append(" NORM: ") + .append(normalizedGuarantee).append(" IDEAL_ASSIGNED: ") + .append(idealAssigned).append(" IDEAL_PREEMPT: ").append(toBePreempted) .append(" ACTUAL_PREEMPT: ").append(actuallyToBePreempted) .append(" UNTOUCHABLE: ").append(untouchableExtra) - .append(" PREEMPTABLE: ").append(preemptableExtra) - .append("\n"); + .append(" PREEMPTABLE: ").append(preemptableExtra).append("\n"); return sb.toString(); } public void assignPreemption(float scalingFactor, ResourceCalculator rc, Resource clusterResource) { - Resource usedDeductKillable = Resources.subtract( - getUsed(), killable); + Resource usedDeductKillable = Resources.subtract(getUsed(), killable); Resource totalResource = Resources.add(getUsed(), pending); // The minimum resource that we need to keep for a queue is: @@ -224,7 +238,8 @@ public void assignPreemption(float scalingFactor, ResourceCalculator rc, // // Doing this because when we calculate ideal allocation doesn't consider // reserved resource, ideal-allocation calculated could be less than - // guaranteed and total. We should avoid preempt from a queue if it is already + // guaranteed and total. We should avoid preempt from a queue if it is + // already // <= its guaranteed resource. Resource minimumQueueResource = Resources.max(rc, clusterResource, Resources.min(rc, clusterResource, totalResource, getGuaranteed()), @@ -233,7 +248,8 @@ public void assignPreemption(float scalingFactor, ResourceCalculator rc, if (Resources.greaterThan(rc, clusterResource, usedDeductKillable, minimumQueueResource)) { toBePreempted = Resources.multiply( - Resources.subtract(usedDeductKillable, minimumQueueResource), scalingFactor); + Resources.subtract(usedDeductKillable, minimumQueueResource), + scalingFactor); } else { toBePreempted = Resources.none(); } @@ -247,6 +263,14 @@ public void setActuallyToBePreempted(Resource res) { this.actuallyToBePreempted = res; } + public Resource getUnAllocated() { + return unAllocated; + } + + public void setUnAllocated(Resource res) { + this.unAllocated = res; + } + public void deductActuallyToBePreempted(ResourceCalculator rc, Resource cluster, Resource toBeDeduct) { if (Resources.greaterThan(rc, cluster, actuallyToBePreempted, toBeDeduct)) { @@ -257,9 +281,8 @@ public void deductActuallyToBePreempted(ResourceCalculator rc, } void appendLogString(StringBuilder sb) { - sb.append(queueName).append(", ") - .append(current.getMemorySize()).append(", ") - .append(current.getVirtualCores()).append(", ") + sb.append(queueName).append(", ").append(current.getMemorySize()) + .append(", ").append(current.getVirtualCores()).append(", ") .append(pending.getMemorySize()).append(", ") .append(pending.getVirtualCores()).append(", ") .append(getGuaranteed().getMemorySize()).append(", ") @@ -267,9 +290,80 @@ void appendLogString(StringBuilder sb) { .append(idealAssigned.getMemorySize()).append(", ") .append(idealAssigned.getVirtualCores()).append(", ") .append(toBePreempted.getMemorySize()).append(", ") - .append(toBePreempted.getVirtualCores() ).append(", ") + .append(toBePreempted.getVirtualCores()).append(", ") .append(actuallyToBePreempted.getMemorySize()).append(", ") .append(actuallyToBePreempted.getVirtualCores()); } + public void addAllApps(Collection orderedApps) { + this.apps = orderedApps; + } + + public Collection getApps() { + return apps; + } + + public void validateOutSameAppPriorityFromDemand(ResourceCalculator rc, + Resource clusterResource, + ArrayList appsOrderedfromLowerPriority) { + + ListIterator fromHigh = appsOrderedfromLowerPriority + .listIterator(appsOrderedfromLowerPriority.size()); + Iterator fromLow = appsOrderedfromLowerPriority + .iterator(); + + TempAppPerPartition highPriorityApp = null; + TempAppPerPartition lowPriorityApp = null; + + if (fromHigh.hasPrevious()) { + highPriorityApp = fromHigh.previous(); + } + + if (fromLow.hasNext()) { + lowPriorityApp = fromLow.next(); + } + + while (highPriorityApp != null && lowPriorityApp != null) { + if (!lowPriorityApp.equals(highPriorityApp) + && lowPriorityApp.getPriority() < highPriorityApp.getPriority()) { + Resource toPreemptFromOther = highPriorityApp.getToBePreemptFromOther(); + Resource actuallyToPreempt = lowPriorityApp.getActuallyToBePreempted(); + Resource delta = Resources.subtract(lowPriorityApp.toBePreempted, + actuallyToPreempt); + + if (Resources.greaterThan(rc, clusterResource, delta, + Resources.none())) { + Resource toPreempt = Resources.min(rc, clusterResource, + toPreemptFromOther, delta); + + highPriorityApp.setToBePreemptFromOther( + Resources.subtract(toPreemptFromOther, toPreempt)); + lowPriorityApp.setActuallyToBePreempted( + Resources.add(actuallyToPreempt, toPreempt)); + + } + + if (Resources.lessThanOrEqual(rc, clusterResource, + lowPriorityApp.toBePreempted, + lowPriorityApp.getActuallyToBePreempted())) { + lowPriorityApp = null; + if (fromLow.hasNext()) { + lowPriorityApp = fromLow.next(); + } + continue; + } + + if (Resources.equals(highPriorityApp.getToBePreemptFromOther(), + Resources.none())) { + highPriorityApp = null; + if (fromHigh.hasPrevious()) { + highPriorityApp = fromHigh.previous(); + } + continue; + } + } else { + break; + } + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index d5d1374..d1f1f44 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -1077,4 +1077,28 @@ public boolean getLazyPreemptionEnabled() { PREEMPTION_CONFIG_PREFIX + "select_based_on_reserved_containers"; public static final boolean DEFAULT_PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS = false; + + /** + * For intra-queue preemption, priority or user-limit based selector can help + * to preempt containers + */ + public static final String SELECT_CANDIDATES_FOR_INTRAQUEUE_POLICIES = + PREEMPTION_CONFIG_PREFIX + + "select_based_on_intra_queue_policies"; + public static final boolean DEFAULT_SELECT_CANDIDATES_FOR_INTRAQUEUE_POLICIES = + true; // Change to false later. + + /** + * For intra-queue preemption, consider those queues which are above used cap limit + */ + public static final String MAX_IGNORED_OVER_CAPACITY_FOR_INTRA_QUEUE = + PREEMPTION_CONFIG_PREFIX + "max_ignored_over_capacity_for_intra_queue"; + public static final float DEFAULT_MAX_IGNORED_OVER_CAPACITY_FOR_INTRA_QUEUE = 0.5f; + + /** + * For intra-queue preemption, allowable maximum-preemptable limit per queue. + */ + public static final String MAX_ALLOWABLE_PREEMPTION_LIMIT_FOR_INTRA_QUEUE = + PREEMPTION_CONFIG_PREFIX + "max_allowable_preempt_limit_for_intra_queue"; + public static final float DEFAULT_MAX_ALLOWABLE_PREEMPTION_LIMIT_FOR_INTRA_QUEUE = 0.5f; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 1ca69be..4d6bcea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -133,6 +133,9 @@ private Map> ignorePartitionExclusivityRMContainers = new HashMap<>(); + private Map preCalculatedUserLimit = + new HashMap(); + @SuppressWarnings({ "unchecked", "rawtypes" }) public LeafQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { @@ -468,6 +471,10 @@ public synchronized void reinitialize( setupQueueConfigs(clusterResource); + // It's better to clear pre calculated user limits as partition based + // resource is updated in the cluster + preCalculatedUserLimit.clear(); + // queue metrics are updated, more resource may be available // activate the pending applications if possible activateApplications(); @@ -1610,6 +1617,10 @@ public synchronized void updateClusterResource(Resource clusterResource, // activate the pending applications if possible activateApplications(); + // It's better to clear pre calculated user limits as partition based + // resource is updated in the cluster + preCalculatedUserLimit.clear(); + // Update application properties for (FiCaSchedulerApp application : orderingPolicy.getSchedulableEntities()) { @@ -1839,6 +1850,17 @@ public void recoverContainer(Resource clusterResource, .getSchedulableEntities()); } + /** + * Obtain (read-only) collection of all applications. + */ + public Collection getAllApplications() { + Collection apps = new TreeSet( + pendingOrderingPolicy.getSchedulableEntities()); + apps.addAll(orderingPolicy.getSchedulableEntities()); + + return Collections.unmodifiableCollection(apps); + } + // Consider the headroom for each user in the queue. // Total pending for the queue = // sum(for each user(min((user's headroom), sum(user's pending requests)))) @@ -1870,6 +1892,32 @@ public synchronized Resource getTotalPendingResourcesConsideringUserLimit( return pendingConsideringUserLimit; } + public synchronized Resource getUserLimitHeadRoomPerApp(FiCaSchedulerApp app, + Resource resources, String partition) { + + // Check user resource limit + String userName = app.getUser(); + User user = getUser(userName); + Resource userLimit = preCalculatedUserLimit.get(userName); + + // Verify whether we already calculated headroom for this user. + if (userLimit == null) { + userLimit = computeUserLimit(app, resources, user, partition, + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + + preCalculatedUserLimit.put(app.getUser(), userLimit); + } + + Resource headroomForUser = Resources.subtract( + computeUserLimit(app, resources, user, partition, + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), + user.getUsed(partition)); + headroomForUser = Resources.componentwiseMax(headroomForUser, + Resources.none()); + + return headroomForUser; + } + @Override public synchronized void collectSchedulerApplications( Collection apps) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 9c84a23..ab83e93 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -326,6 +327,23 @@ public synchronized Resource getTotalPendingRequests() { return ret; } + public synchronized Map getTotalPendingRequestsPerPartition() { + + Map ret = new HashMap(); + Resource res = null; + for (SchedulerRequestKey key : appSchedulingInfo.getSchedulerKeys()) { + ResourceRequest rr = appSchedulingInfo.getResourceRequest(key, "*"); + if ((res = ret.get(rr.getNodeLabelExpression())) == null) { + res = Resources.createResource(0, 0); + ret.put(rr.getNodeLabelExpression(), res); + } + + Resources.addTo(res, + Resources.multiply(rr.getCapability(), rr.getNumContainers())); + } + return ret; + } + public synchronized void markContainerForPreemption(ContainerId cont) { // ignore already completed containers if (liveContainers.containsKey(cont)) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java index 3d3f1ea..c69a4a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java @@ -70,6 +70,7 @@ import java.util.TreeSet; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.doAnswer; @@ -166,7 +167,7 @@ public void buildEnv(String labelsConfig, String nodesConfig, private void mockContainers(String containersConfig, ApplicationAttemptId attemptId, String queueName, List reservedContainers, - List liveContainers) { + List liveContainers, Resource used, Resource pending, Priority pri) { int containerId = 1; int start = containersConfig.indexOf("=") + 1; int end = -1; @@ -192,19 +193,23 @@ private void mockContainers(String containersConfig, ApplicationAttemptId attemp // now we found start/end, get container values String[] values = containersConfig.substring(start + 1, end).split(","); - if (values.length != 6) { + if (values.length < 6 || values.length > 7) { throw new IllegalArgumentException("Format to define container is:" + "(priority,resource,host,expression,repeat,reserved)"); } - Priority pri = Priority.newInstance(Integer.valueOf(values[0])); + pri.setPriority(Integer.valueOf(values[0])); Resource res = parseResourceFromString(values[1]); NodeId host = NodeId.newInstance(values[2], 1); String exp = values[3]; int repeat = Integer.valueOf(values[4]); boolean reserved = Boolean.valueOf(values[5]); + if(values.length == 7) { + Resources.addTo(pending, parseResourceFromString(values[6])); + } for (int i = 0; i < repeat; i++) { Container c = mock(Container.class); + Resources.addTo(used, res); when(c.getResource()).thenReturn(res); when(c.getPriority()).thenReturn(pri); SchedulerRequestKey sk = SchedulerRequestKey.extractFrom(c); @@ -286,16 +291,33 @@ private void mockApplications(String appsConfig) { List reservedContainers = new ArrayList(); ApplicationId appId = ApplicationId.newInstance(0L, id); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); + Resource used = Resources.createResource(0, 0); + Resource pending = Resources.createResource(0, 0); + Priority pri = Priority.newInstance(0); mockContainers(strs[1], appAttemptId, queueName, reservedContainers, - liveContainers); + liveContainers, used, pending, pri); + LOG.debug("Application mock: queue: " + queueName + ", appId:" + appId + + ",used" + used + ", pending:" + pending + ",priority=" + pri); FiCaSchedulerApp app = mock(FiCaSchedulerApp.class); when(app.getLiveContainers()).thenReturn(liveContainers); when(app.getReservedContainers()).thenReturn(reservedContainers); when(app.getApplicationAttemptId()).thenReturn(appAttemptId); when(app.getApplicationId()).thenReturn(appId); - when(app.getPriority()).thenReturn(Priority.newInstance(0)); + when(app.getPriority()).thenReturn(pri); + when(app.getTotalPendingRequests()).thenReturn(pending); + when(app.getCurrentConsumption()).thenReturn(used); + + Map pendingForDefaultPartition = new HashMap(); + // Add for default partition for now. + pendingForDefaultPartition.put("", pending); + when(app.getTotalPendingRequestsPerPartition()).thenReturn(pendingForDefaultPartition); + + // need to set pending resource in resource usage as well + ResourceUsage ru = new ResourceUsage(); + ru.setUsed(used); + when(app.getAppAttemptResourceUsage()).thenReturn(ru); // add to LeafQueue LeafQueue queue = (LeafQueue) nameToCSQueues.get(queueName); @@ -436,10 +458,18 @@ private ParentQueue mockQueueHierarchy(String queueExprs) { new Comparator() { @Override public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) { - return a1.getApplicationId().compareTo(a2.getApplicationId()); + if (a1.getPriority() != null + && !a1.getPriority().equals(a2.getPriority())) { + return a1.getPriority().compareTo(a2.getPriority()); + } + + int res = a1.getApplicationId() + .compareTo(a2.getApplicationId()); + return res; } }); when(leafQueue.getApplications()).thenReturn(apps); + when(leafQueue.getAllApplications()).thenReturn(apps); OrderingPolicy so = mock(OrderingPolicy.class); when(so.getPreemptionIterator()).thenAnswer(new Answer() { public Object answer(InvocationOnMock invocation) { @@ -459,6 +489,17 @@ public Object answer(InvocationOnMock invocation) { if (queue.getQueueName().equals(ROOT)) { rootQueue = (ParentQueue) queue; } + + // Added for test + if (queue instanceof LeafQueue) { + LeafQueue leafQueue = (LeafQueue) queue; + Resource pending = leafQueue.getQueueResourceUsage().getPending(); + when(leafQueue.getTotalPendingResourcesConsideringUserLimit( + isA(Resource.class), isA(String.class))).thenReturn(pending); + // Added for test + when(leafQueue.getUserLimitHeadRoomPerApp(any(FiCaSchedulerApp.class), + any(Resource.class), anyString())).thenReturn(pending); + } } return rootQueue; } @@ -518,10 +559,15 @@ private void setupQueue(CSQueue queue, String q, String[] queueExprArray, float absUsed = Resources.divide(rc, totResoucePerPartition, parseResourceFromString(values[2].trim()), totResoucePerPartition) + epsilon; + float used = Resources.divide(rc, totResoucePerPartition, + parseResourceFromString(values[2].trim()), parseResourceFromString(values[0].trim())) + + epsilon; Resource pending = parseResourceFromString(values[3].trim()); qc.setAbsoluteCapacity(partitionName, absGuaranteed); qc.setAbsoluteMaximumCapacity(partitionName, absMax); qc.setAbsoluteUsedCapacity(partitionName, absUsed); + qc.setUsedCapacity(used); + when(queue.getUsedCapacity()).thenReturn(used); ru.setPending(partitionName, pending); if (!isParent(queueExprArray, idx)) { LeafQueue lq = (LeafQueue) queue; @@ -536,6 +582,9 @@ private void setupQueue(CSQueue queue, String q, String[] queueExprArray, reserved = parseResourceFromString(values[4].trim()); ru.setReserved(partitionName, reserved); } + + + LOG.debug("Setup queue=" + queueName + " partition=" + partitionName + " [abs_guaranteed=" + absGuaranteed + ",abs_max=" + absMax + ",abs_used" + absUsed + ",pending_resource=" + pending diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java new file mode 100644 index 0000000..1dd344a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class TestProportionalCapacityPreemptionPolicyIntraQueue + extends ProportionalCapacityPreemptionPolicyMockFramework { + @Before + public void setup() { + super.setup(); + policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock); + } + + @Test + public void testSimpleIntraQueuePreemption() throws IOException { + /** + * The simplest test of reserved container, Queue structure is: + * + *
+     *       root
+     *       /  \
+     *      a    b
+     * 
+ * Guaranteed resource of a/b are 50:50 + * Total cluster resource = 100 + * - A has 90 containers on two node, n1 has 45, n2 has 45, size of each + * container is 1. + * - B has am container at n1, and reserves 1 container with size = 9 at n1, + * so B needs to preempt 9 containers from A at n1 instead of randomly + * preempt from n1 and n2. + */ + String labelsConfig = + "=100,true;"; + String nodesConfig = // n1 / n2 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 80 120 0]);" + //root + "-a(=[11 100 10 50 0]);" + // a + "-b(=[40 100 40 60 0]);" + // b + "-c(=[20 100 10 10 0]);" + // c + "-d(=[29 100 20 0 0])"; // d + + String appsConfig= + //queueName\t(priority,resource,host,expression,#repeat,reserved, pending) + "a\t" // app1 in a + + "(1,1,n1,,6,false,25);" + // app1 a + "a\t" // app2 in a + + "(1,1,n1,,5,false,25);" + // app2 a + "b\t" // app3 in b + + "(4,1,n1,,34,false,20);" + // app3 b + "b\t" // app4 in b + + "(4,1,n1,,2,false,10);" + // app4 b + "b\t" // app4 in b + + "(5,1,n1,,1,false,20);" + // app5 b + "b\t" // app4 in b + + "(6,1,n1,,1,false,10);" + // app6 in b + "c\t" // app1 in a + + "(1,1,n1,,10,false,10);" + + "d\t" // app1 in a + + "(1,1,n1,,20,false,0)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // For queue B, app3 and app4 were of lower priority. Hence take 20 containers + // from them + verify(mDisp, times(1)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(4)))); + verify(mDisp, times(19)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + } +} -- 2.7.4 (Apple Git-66)