From 3e546625c304bdd335790d1fb656bf25013be44c Mon Sep 17 00:00:00 2001 From: Sunil Date: Tue, 6 Sep 2016 20:40:02 +0530 Subject: [PATCH] YARN-2009 --- .../AbstractPreemptableResourceCalculator.java | 209 ++++++ .../CapacitySchedulerPreemptionContext.java | 7 + .../monitor/capacity/FifoCandidatesSelector.java | 41 -- .../capacity/IntraQueueCandidatesSelector.java | 200 ++++++ .../IntraQueuePreemptableResourceCalculator.java | 303 +++++++++ .../capacity/PreemptableResourceCalculator.java | 178 ++---- .../capacity/PreemptionCandidatesSelector.java | 51 ++ .../ProportionalCapacityPreemptionPolicy.java | 32 +- .../monitor/capacity/TempAppPerQueue.java | 157 +++++ .../monitor/capacity/TempQueuePerPartition.java | 4 + .../capacity/CapacitySchedulerConfiguration.java | 20 + .../scheduler/capacity/LeafQueue.java | 26 + .../scheduler/common/fica/FiCaSchedulerApp.java | 18 + ...ionalCapacityPreemptionPolicyForIntraQueue.java | 708 +++++++++++++++++++++ 14 files changed, 1773 insertions(+), 181 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptableResourceCalculator.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerQueue.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForIntraQueue.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java new file mode 100644 index 0000000..f67f629 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.PreemptableResourceCalculator.TQComparator; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; + +/** + * Calculate how much resources need to be preempted for each queue, + * will be used by {@link PreemptionCandidatesSelector} + */ +public class AbstractPreemptableResourceCalculator { + private static final Log LOG = + LogFactory.getLog(AbstractPreemptableResourceCalculator.class); + + protected final CapacitySchedulerPreemptionContext context; + protected final ResourceCalculator rc; + protected boolean isReservedPreemptionCandidatesSelector; + + /** + * PreemptableResourceCalculator constructor + * + * @param preemptionContext + * @param isReservedPreemptionCandidatesSelector this will be set by + * different implementation of candidate selectors, please refer to + * TempQueuePerPartition#offer for details. + */ + public AbstractPreemptableResourceCalculator( + CapacitySchedulerPreemptionContext preemptionContext, + boolean isReservedPreemptionCandidatesSelector) { + context = preemptionContext; + rc = preemptionContext.getResourceCalculator(); + this.isReservedPreemptionCandidatesSelector = + isReservedPreemptionCandidatesSelector; + } + + /** + * Given a set of queues compute the fix-point distribution of unassigned + * resources among them. As pending request of a queue are exhausted, the + * queue is removed from the set and remaining capacity redistributed among + * remaining queues. The distribution is weighted based on guaranteed + * capacity, unless asked to ignoreGuarantee, in which case resources are + * distributed uniformly. + */ + protected void computeFixpointAllocation(ResourceCalculator rc, + Resource tot_guarant, Collection qAlloc, + Resource unassigned, boolean ignoreGuarantee, + Collection underServedQueues) { + // Prior to assigning the unused resources, process each queue as follows: + // If current > guaranteed, idealAssigned = guaranteed + untouchable extra + // Else idealAssigned = current; + // Subtract idealAssigned resources from unassigned. + // If the queue has all of its needs met (that is, if + // idealAssigned >= current + pending), remove it from consideration. + // Sort queues from most under-guaranteed to most over-guaranteed. + TQComparator tqComparator = new TQComparator(rc, tot_guarant); + PriorityQueue orderedByNeed = new PriorityQueue<>(10, + tqComparator); + for (Iterator i = qAlloc.iterator(); i.hasNext();) { + TempQueuePerPartition q = i.next(); + Resource used = q.getUsed(); + + if (Resources.greaterThan(rc, tot_guarant, used, + q.getGuaranteed())) { + q.idealAssigned = Resources.add( + q.getGuaranteed(), q.untouchableExtra); + } else { + q.idealAssigned = Resources.clone(used); + } + Resources.subtractFrom(unassigned, q.idealAssigned); + // If idealAssigned < (allocated + used + pending), q needs more resources, so + // add it to the list of underserved queues, ordered by need. + Resource curPlusPend = Resources.add(q.getUsed(), q.pending); + if (Resources.lessThan(rc, tot_guarant, q.idealAssigned, curPlusPend)) { + orderedByNeed.add(q); + } + } + + // As per the first round of under-served calculation, get all queues which + // are under-served. Both zero-guaranteed and guaranteed queues at one tree + // level hiearchy will be added to this list. + for (TempQueuePerPartition q : orderedByNeed) { + underServedQueues.add(q); + } + + //assign all cluster resources until no more demand, or no resources are left + while (!orderedByNeed.isEmpty() + && Resources.greaterThan(rc,tot_guarant, unassigned,Resources.none())) { + Resource wQassigned = Resource.newInstance(0, 0); + // we compute normalizedGuarantees capacity based on currently active + // queues + resetCapacity(rc, unassigned, orderedByNeed, ignoreGuarantee); + + // For each underserved queue (or set of queues if multiple are equally + // underserved), offer its share of the unassigned resources based on its + // normalized guarantee. After the offer, if the queue is not satisfied, + // place it back in the ordered list of queues, recalculating its place + // in the order of most under-guaranteed to most over-guaranteed. In this + // way, the most underserved queue(s) are always given resources first. + Collection underserved = + getMostUnderservedQueues(orderedByNeed, tqComparator); + for (Iterator i = underserved.iterator(); i + .hasNext();) { + TempQueuePerPartition sub = i.next(); + Resource wQavail = Resources.multiplyAndNormalizeUp(rc, + unassigned, sub.normalizedGuarantee, Resource.newInstance(1, 1)); + Resource wQidle = sub.offer(wQavail, rc, tot_guarant, + isReservedPreemptionCandidatesSelector); + Resource wQdone = Resources.subtract(wQavail, wQidle); + + if (Resources.greaterThan(rc, tot_guarant, + wQdone, Resources.none())) { + // The queue is still asking for more. Put it back in the priority + // queue, recalculating its order based on need. + orderedByNeed.add(sub); + } + Resources.addTo(wQassigned, wQdone); + } + Resources.subtractFrom(unassigned, wQassigned); + } + } + + /** + * Computes a normalizedGuaranteed capacity based on active queues + * @param rc resource calculator + * @param clusterResource the total amount of resources in the cluster + * @param queues the list of queues to consider + */ + protected void resetCapacity(ResourceCalculator rc, Resource clusterResource, + Collection queues, boolean ignoreGuar) { + Resource activeCap = Resource.newInstance(0, 0); + + if (ignoreGuar) { + for (TempQueuePerPartition q : queues) { + q.normalizedGuarantee = 1.0f / queues.size(); + } + } else { + for (TempQueuePerPartition q : queues) { + Resources.addTo(activeCap, q.getGuaranteed()); + } + for (TempQueuePerPartition q : queues) { + q.normalizedGuarantee = Resources.divide(rc, clusterResource, + q.getGuaranteed(), activeCap); + } + } + } + + // Take the most underserved TempQueue (the one on the head). Collect and + // return the list of all queues that have the same idealAssigned + // percentage of guaranteed. + protected Collection getMostUnderservedQueues( + PriorityQueue orderedByNeed, + TQComparator tqComparator) { + ArrayList underserved = new ArrayList<>(); + while (!orderedByNeed.isEmpty()) { + TempQueuePerPartition q1 = orderedByNeed.remove(); + underserved.add(q1); + TempQueuePerPartition q2 = orderedByNeed.peek(); + // q1's pct of guaranteed won't be larger than q2's. If it's less, then + // return what has already been collected. Otherwise, q1's pct of + // guaranteed == that of q2, so add q2 to underserved list during the + // next pass. + if (q2 == null || tqComparator.compare(q1,q2) < 0) { + return underserved; + } + } + return underserved; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java index c52127d..f700df1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -49,4 +50,10 @@ TempQueuePerPartition getQueueByPartition(String queueName, Set getLeafQueueNames(); Set getAllPartitions(); + + int getClusterMaxApplicationPriority(); + + float getMaxLimitForPreemptableAppsIntraQueue(); + + Resource getPartitionResource(String partition); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java index 9df395d..59024dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java @@ -194,16 +194,6 @@ private void preemptAMContainers(Resource clusterResource, skippedAMContainerlist.clear(); } - private boolean preemptMapContains( - Map> preemptMap, - ApplicationAttemptId attemptId, RMContainer rmContainer) { - Set rmContainers; - if (null == (rmContainers = preemptMap.get(attemptId))) { - return false; - } - return rmContainers.contains(rmContainer); - } - /** * Return should we preempt rmContainer. If we should, deduct from * resourceToObtainByPartition @@ -331,35 +321,4 @@ private void preemptFrom(FiCaSchedulerApp app, clusterResource, selectedContainers, totalPreemptionAllowed); } } - - /** - * Compare by reversed priority order first, and then reversed containerId - * order - * @param containers - */ - @VisibleForTesting - static void sortContainers(List containers){ - Collections.sort(containers, new Comparator() { - @Override - public int compare(RMContainer a, RMContainer b) { - int schedKeyComp = b.getAllocatedSchedulerKey() - .compareTo(a.getAllocatedSchedulerKey()); - if (schedKeyComp != 0) { - return schedKeyComp; - } - return b.getContainerId().compareTo(a.getContainerId()); - } - }); - } - - private void addToPreemptMap( - Map> preemptMap, - ApplicationAttemptId appAttemptId, RMContainer containerToPreempt) { - Set set; - if (null == (set = preemptMap.get(appAttemptId))) { - set = new HashSet<>(); - preemptMap.put(appAttemptId, set); - } - set.add(containerToPreempt); - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java new file mode 100644 index 0000000..6fe2b99 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.util.ArrayList; +import java.util.HashMap; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + + +public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector { + private static final Log LOG = LogFactory + .getLog(IntraQueueCandidatesSelector.class); + + private IntraQueuePreemptableResourceCalculator intraQPreemptableAmountCalculator; + + IntraQueueCandidatesSelector( + CapacitySchedulerPreemptionContext preemptionContext) { + super(preemptionContext); + intraQPreemptableAmountCalculator = new IntraQueuePreemptableResourceCalculator( + preemptionContext, true); + } + + @Override + public Map> selectCandidates( + Map> selectedCandidates, + Resource clusterResource, Resource totalPreemptedResourceAllowed) { + + Map> resToObtainByPartitionPerQueue = + new HashMap>(); + + // 1. Calculate the abnormality within each queue one by one. + // Get back Partition -> under-served queues mappings. + Map> partitionToUnderServedQueues = intraQPreemptableAmountCalculator + .computeIntraQueuePreemptionDemand(clusterResource, + totalPreemptedResourceAllowed, resToObtainByPartitionPerQueue); + + // 2. Loop all partitions + for (String partition : preemptionContext.getAllPartitions()) { + ArrayList queues = partitionToUnderServedQueues.get(partition); + + // 3. Iterate from most under-served queue in order. + for (LeafQueue leafQueue : queues) { + HashMap resToObtainByPartition = resToObtainByPartitionPerQueue + .get(leafQueue.getQueueName()); + + // This should not happen + if (resToObtainByPartition == null) { + LOG.warn("Could not find resToObtainByPartition for queue:" + + leafQueue.getQueueName()); + continue; + } + + // 4. if more resources are to be freed, go through all live containers + // in reverse priority and reverse allocation order and mark them for + // preemption + Iterator desc = leafQueue.getOrderingPolicy() + .getPreemptionIterator(); + while (desc.hasNext()) { + FiCaSchedulerApp app = desc.next(); + preemptFromLeastStarvedApp(selectedCandidates, clusterResource, + totalPreemptedResourceAllowed, resToObtainByPartition, leafQueue, + app); + } + } + } + + return selectedCandidates; + } + + private void preemptFromLeastStarvedApp( + Map> selectedCandidates, + Resource clusterResource, Resource totalPreemptedResourceAllowed, + Map resToObtainByPartition, LeafQueue leafQueue, FiCaSchedulerApp app) { + + // ToDo: + // 1. Reuse reservation selector here. + + List liveContainers = new ArrayList<>( + app.getLiveContainers()); + sortContainers(liveContainers); + + for (RMContainer c : liveContainers) { + + // if there are no demand, return. + if (resToObtainByPartition.isEmpty()) { + return; + } + + String nodePartition = getPartitionByNodeId(c.getAllocatedNode()); + Resource toObtainByPartition = resToObtainByPartition.get(nodePartition); + + // When we have no more resource need to obtain, remove from map. + if (Resources.lessThanOrEqual(rc, clusterResource, toObtainByPartition, + Resources.none())) { + resToObtainByPartition.remove(nodePartition); + } + + // skip preselected containers. + if (CapacitySchedulerPreemptionUtils.isContainerAlreadySelected(c, + selectedCandidates)) { + Resources.subtractFrom(toObtainByPartition, c.getAllocatedResource()); + continue; + } + + // Skip already marked to killable containers + if (null != preemptionContext.getKillableContainers() && preemptionContext + .getKillableContainers().contains(c.getContainerId())) { + continue; + } + + // Skip AM Container from preemption for now. + // ToDo: Need to decide whether intra preemption need to take AM containers + // or not. + if (c.isAMContainer()) { + continue; + } + + // Try to preempt this container + tryPreemptContainerAndDeductResToObtain(toObtainByPartition, c, + clusterResource, selectedCandidates, totalPreemptedResourceAllowed); + } + } + + private String getPartitionByNodeId(NodeId nodeId) { + return preemptionContext.getScheduler().getSchedulerNode(nodeId) + .getPartition(); + } + + + /** + * Return should we preempt rmContainer. If we should, deduct from + * resourceToObtainByPartition + */ + private boolean tryPreemptContainerAndDeductResToObtain( + Resource actualPreemptNeeded, RMContainer rmContainer, + Resource clusterResource, + Map> preemptMap, + Resource totalPreemptionAllowed) { + ApplicationAttemptId attemptId = rmContainer.getApplicationAttemptId(); + + // We will not account resource of a container twice or more + if (preemptMapContains(preemptMap, attemptId, rmContainer)) { + return false; + } + + Resource toObtainByPartition = actualPreemptNeeded; + + if (null != toObtainByPartition + && Resources.greaterThan(rc, clusterResource, toObtainByPartition, + Resources.none()) + && Resources.fitsIn(rc, clusterResource, + rmContainer.getAllocatedResource(), totalPreemptionAllowed)) { + Resources.subtractFrom(toObtainByPartition, + rmContainer.getAllocatedResource()); + Resources.subtractFrom(totalPreemptionAllowed, + rmContainer.getAllocatedResource()); + + if (LOG.isDebugEnabled()) { + LOG.debug(this.getClass().getName() + " Marked container=" + + rmContainer.getContainerId() + " queue=" + + rmContainer.getQueueName() + " to be preemption candidates"); + } + // Add to preemptMap + addToPreemptMap(preemptMap, attemptId, rmContainer); + return true; + } + + return false; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptableResourceCalculator.java new file mode 100644 index 0000000..f4ead34 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptableResourceCalculator.java @@ -0,0 +1,303 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.TreeSet; +import java.util.Map.Entry; + +/** + * Calculate how much resources need to be preempted for each queue, + * will be used by {@link PreemptionCandidatesSelector} + */ +public class IntraQueuePreemptableResourceCalculator + extends + AbstractPreemptableResourceCalculator { + private static final Log LOG = LogFactory + .getLog(IntraQueuePreemptableResourceCalculator.class); + + static class TQComparator implements Comparator { + private ResourceCalculator rc; + private Resource clusterRes; + + TQComparator(ResourceCalculator rc, Resource clusterRes) { + this.rc = rc; + this.clusterRes = clusterRes; + } + + @Override + public int compare(TempQueuePerPartition tq1, TempQueuePerPartition tq2) { + if (getIdealPctOfGuaranteed(tq1) < getIdealPctOfGuaranteed(tq2)) { + return -1; + } + if (getIdealPctOfGuaranteed(tq1) > getIdealPctOfGuaranteed(tq2)) { + return 1; + } + return 0; + } + + // Calculates idealAssigned / guaranteed + // TempQueues with 0 guarantees are always considered the most over + // capacity and therefore considered last for resources. + private double getIdealPctOfGuaranteed(TempQueuePerPartition q) { + double pctOver = Integer.MAX_VALUE; + if (q != null && Resources.greaterThan(rc, clusterRes, + q.getGuaranteed(), + Resources.none())) { + pctOver = Resources.divide(rc, clusterRes, q.idealAssigned, + q.getGuaranteed()); + } + return (pctOver); + } + } + + static class TAComparator implements Comparator { + TAComparator(ResourceCalculator rc) { + } + + @Override + public int compare(TempAppPerQueue tq1, TempAppPerQueue tq2) { + if (tq1.getPriority() < tq2.getPriority()) { + return 1; + } + if (tq1.getPriority() > tq2.getPriority()) { + return -1; + } + return 0; + } + } + + /** + * PreemptableResourceCalculator constructor + * + * @param preemptionContext + * @param isReservedPreemptionCandidatesSelector + * this will be set by different implementation of candidate + * selectors, please refer to TempQueuePerPartition#offer for + * details. + */ + public IntraQueuePreemptableResourceCalculator( + CapacitySchedulerPreemptionContext preemptionContext, + boolean isReservedPreemptionCandidatesSelector) { + super(preemptionContext, isReservedPreemptionCandidatesSelector); + } + + public Map> computeIntraQueuePreemptionDemand( + Resource clusterResource, Resource totalPreemptedResourceAllowed, + Map> resToObtainByPartitionPerQueue) { + Map> queuesPerPartition = + new HashMap>(); + + // 1. By using existing pre-calculated interqueue queue mapping, create a + // working map of partition->tmp queue. + Map> tmpQueuesPerPartition = + getPartitionToUnderServedQueueMapping(clusterResource); + + // 2. Iterate over the partitions again to calculate pending resource of apps + // per partition + for (Entry> entry : tmpQueuesPerPartition + .entrySet()) { + ArrayList queueList = null; + if((queueList = queuesPerPartition.get(entry.getKey())) == null){ + queueList = new ArrayList(); + queuesPerPartition.put(entry.getKey(), queueList); + } + + // per-partition level queues in most under-served order + PriorityQueue tmpQueues = entry.getValue(); + while (!tmpQueues.isEmpty()) { + TempQueuePerPartition tq = tmpQueues.remove(); + LeafQueue leafQueue = tq.leafQueue; + queueList.add(leafQueue); + + // After visiting a queue to get high priority apps, we also ensure that + // all partition based pending resource also has calculated per app + // level inside getResourceDemandFromAppsPerQueue. + // Hence we need not have to consider this queue from other partitions. + if (tq.intraQueuePreemptionCalculationDone == true) { + continue; + } + + // 3. Get all running and pending apps of the queue + Collection apps = leafQueue.getAllApplications(); + + if (apps.size() == 1) { + // We do not need preemption for a single app + continue; + } + + // 4. Get apps from a queue with priority comparator + PriorityQueue appsOrderedByPriority = getStarvingAppsRatioPerQueue( + leafQueue, apps); + + // This map will store per-partition level resource demand (per queue) + HashMap resToObtainByPartition = null; + if ((resToObtainByPartition = resToObtainByPartitionPerQueue + .get(leafQueue.getQueueName())) == null) { + resToObtainByPartition = new HashMap(); + resToObtainByPartitionPerQueue.put(leafQueue.getQueueName(), + resToObtainByPartition); + } + + // 5. Calculate demand from high priority apps per partition level. + getResourceDemandFromAppsPerQueue(leafQueue, appsOrderedByPriority, + resToObtainByPartition, + context.getMaxLimitForPreemptableAppsIntraQueue()); + + // Consider this queue as already visited. + tq.intraQueuePreemptionCalculationDone = true; + } + } + + return queuesPerPartition; + } + + private Map> getPartitionToUnderServedQueueMapping( + Resource clusterResource) { + Map> tmpQueuesPerPartition = + new HashMap>(); + + // 1. Use same inter-queue design to find under-served queues per partition + TQComparator tqComparator = new TQComparator(rc, clusterResource); + + for (String queueName : context.getLeafQueueNames()) { + for (TempQueuePerPartition qT : context.getQueuePartitions(queueName)) { + String partition = qT.partition; + PriorityQueue tmpQueues = null; + if ((tmpQueues = tmpQueuesPerPartition.get(partition)) == null) { + tmpQueues = new PriorityQueue<>(10, + tqComparator); + tmpQueuesPerPartition.put(partition, tmpQueues); + } + + // Recalculate idealAssigned per queue again as old data has been + // updated/resetted after inter-queue preemption round. + Resource used = qT.getUsed(); + if (Resources.greaterThan(rc, clusterResource, used, + qT.getGuaranteed())) { + qT.idealAssigned = Resources.add( + qT.getGuaranteed(), qT.untouchableExtra); + } else { + qT.idealAssigned = Resources.clone(used); + } + + Resource curPlusPend = Resources.add(qT.getUsed(), qT.pending); + if (Resources.lessThan(rc, clusterResource, qT.idealAssigned, curPlusPend)) { + tmpQueues.add(qT); + } + } + } + return tmpQueuesPerPartition; + } + + public PriorityQueue getStarvingAppsRatioPerQueue( + LeafQueue q, Collection apps) { + TAComparator taComparator = new TAComparator(rc); + PriorityQueue orderedByPriority = new PriorityQueue<>(10, + taComparator); + + // have an internal temp app structure to store intermediate data (priority) + for (FiCaSchedulerApp app : apps) { + + // Create TempAppPerQueue for further calculatuion. + TempAppPerQueue tmpApp = new TempAppPerQueue(app.getQueueName(), + app.getApplicationId(), app.getCurrentConsumption(), + Resources.createResource(0, 0), null, app.getCurrentReservation(), + app.getTotalPendingRequests(), + (HashMap) app.getTotalPendingRequestsPerPartition(), + app.getPriority().getPriority(), app); + + // Skip an app which doesn't have any outstanding resource requests + if (Resources.lessThanOrEqual(rc, Resources.none(), + app.getTotalPendingRequests(), Resources.none())) { + continue; + } + orderedByPriority.add(tmpApp); + } + + return orderedByPriority; + } + + private void getResourceDemandFromAppsPerQueue(LeafQueue leafQueue, + PriorityQueue perQueueApps, + Map resToObtainByPartition, + float maxPreemptableApssLimit) { + Resource actualPreemptNeeded = null; + + int maxPreemptableApps = (int) (maxPreemptableApssLimit + * perQueueApps.size()); + int preemptableAppsCounter = 0; + + while (!perQueueApps.isEmpty()) { + TempAppPerQueue a1 = perQueueApps.remove(); + + // Identify how much pending resource is there. Add to actualPreemptNeeded + if (preemptableAppsCounter < maxPreemptableApps) { + // For each app, add pending resource request per partition level to + // resToObtainByPartition. + for (Entry entry : a1.getPendingPerPartition() + .entrySet()) { + String partition = entry.getKey(); + + // Can skip apps which are already crossing user-limit. + // For this, Get the userlimit from scheduler and ensure that app is + // not crossing userlimit here. Such apps can be skipped. + Resource userHeadroom = leafQueue.getUserLimitHeadRoomPerApp( + a1.getFiCaSchedulerApp(), context.getPartitionResource(partition), + partition); + if (Resources.lessThanOrEqual(rc, + context.getPartitionResource(partition), userHeadroom, + Resources.none())) { + continue; + } + + // Updating pending resource per-partition level. + if ((actualPreemptNeeded = resToObtainByPartition + .get(partition)) == null) { + actualPreemptNeeded = Resources.createResource(0, 0); + resToObtainByPartition.put(partition, actualPreemptNeeded); + } + Resources.addTo(actualPreemptNeeded, + a1.getPendingPerPartition().get(partition)); + System.out.println("resToObtainPerPartition: " + actualPreemptNeeded + ", partition:" + partition); + } + } + preemptableAppsCounter++; + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java index d1d2485..007daa8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java @@ -20,18 +20,27 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.PriorityQueue; import java.util.Set; @@ -39,12 +48,12 @@ * Calculate how much resources need to be preempted for each queue, * will be used by {@link PreemptionCandidatesSelector} */ -public class PreemptableResourceCalculator { +public class PreemptableResourceCalculator + extends + AbstractPreemptableResourceCalculator { private static final Log LOG = LogFactory.getLog(PreemptableResourceCalculator.class); - private final CapacitySchedulerPreemptionContext context; - private final ResourceCalculator rc; private boolean isReservedPreemptionCandidatesSelector; static class TQComparator implements Comparator { @@ -82,6 +91,25 @@ private double getIdealPctOfGuaranteed(TempQueuePerPartition q) { } } + static class TAComparator implements Comparator { + private ResourceCalculator rc; + + TAComparator(ResourceCalculator rc) { + this.rc = rc; + } + + @Override + public int compare(TempAppPerQueue tq1, TempAppPerQueue tq2) { + if (tq1.getPriority() < tq2.getPriority()) { + return 1; + } + if (tq1.getPriority() > tq2.getPriority()) { + return -1; + } + return 0; + } + } + /** * PreemptableResourceCalculator constructor * @@ -93,136 +121,7 @@ private double getIdealPctOfGuaranteed(TempQueuePerPartition q) { public PreemptableResourceCalculator( CapacitySchedulerPreemptionContext preemptionContext, boolean isReservedPreemptionCandidatesSelector) { - context = preemptionContext; - rc = preemptionContext.getResourceCalculator(); - this.isReservedPreemptionCandidatesSelector = - isReservedPreemptionCandidatesSelector; - } - - /** - * Computes a normalizedGuaranteed capacity based on active queues - * @param rc resource calculator - * @param clusterResource the total amount of resources in the cluster - * @param queues the list of queues to consider - */ - private void resetCapacity(ResourceCalculator rc, Resource clusterResource, - Collection queues, boolean ignoreGuar) { - Resource activeCap = Resource.newInstance(0, 0); - - if (ignoreGuar) { - for (TempQueuePerPartition q : queues) { - q.normalizedGuarantee = 1.0f / queues.size(); - } - } else { - for (TempQueuePerPartition q : queues) { - Resources.addTo(activeCap, q.getGuaranteed()); - } - for (TempQueuePerPartition q : queues) { - q.normalizedGuarantee = Resources.divide(rc, clusterResource, - q.getGuaranteed(), activeCap); - } - } - } - - // Take the most underserved TempQueue (the one on the head). Collect and - // return the list of all queues that have the same idealAssigned - // percentage of guaranteed. - protected Collection getMostUnderservedQueues( - PriorityQueue orderedByNeed, - TQComparator tqComparator) { - ArrayList underserved = new ArrayList<>(); - while (!orderedByNeed.isEmpty()) { - TempQueuePerPartition q1 = orderedByNeed.remove(); - underserved.add(q1); - TempQueuePerPartition q2 = orderedByNeed.peek(); - // q1's pct of guaranteed won't be larger than q2's. If it's less, then - // return what has already been collected. Otherwise, q1's pct of - // guaranteed == that of q2, so add q2 to underserved list during the - // next pass. - if (q2 == null || tqComparator.compare(q1,q2) < 0) { - return underserved; - } - } - return underserved; - } - - - /** - * Given a set of queues compute the fix-point distribution of unassigned - * resources among them. As pending request of a queue are exhausted, the - * queue is removed from the set and remaining capacity redistributed among - * remaining queues. The distribution is weighted based on guaranteed - * capacity, unless asked to ignoreGuarantee, in which case resources are - * distributed uniformly. - */ - private void computeFixpointAllocation(ResourceCalculator rc, - Resource tot_guarant, Collection qAlloc, - Resource unassigned, boolean ignoreGuarantee) { - // Prior to assigning the unused resources, process each queue as follows: - // If current > guaranteed, idealAssigned = guaranteed + untouchable extra - // Else idealAssigned = current; - // Subtract idealAssigned resources from unassigned. - // If the queue has all of its needs met (that is, if - // idealAssigned >= current + pending), remove it from consideration. - // Sort queues from most under-guaranteed to most over-guaranteed. - TQComparator tqComparator = new TQComparator(rc, tot_guarant); - PriorityQueue orderedByNeed = new PriorityQueue<>(10, - tqComparator); - for (Iterator i = qAlloc.iterator(); i.hasNext();) { - TempQueuePerPartition q = i.next(); - Resource used = q.getUsed(); - - if (Resources.greaterThan(rc, tot_guarant, used, - q.getGuaranteed())) { - q.idealAssigned = Resources.add( - q.getGuaranteed(), q.untouchableExtra); - } else { - q.idealAssigned = Resources.clone(used); - } - Resources.subtractFrom(unassigned, q.idealAssigned); - // If idealAssigned < (allocated + used + pending), q needs more resources, so - // add it to the list of underserved queues, ordered by need. - Resource curPlusPend = Resources.add(q.getUsed(), q.pending); - if (Resources.lessThan(rc, tot_guarant, q.idealAssigned, curPlusPend)) { - orderedByNeed.add(q); - } - } - - //assign all cluster resources until no more demand, or no resources are left - while (!orderedByNeed.isEmpty() - && Resources.greaterThan(rc,tot_guarant, unassigned,Resources.none())) { - Resource wQassigned = Resource.newInstance(0, 0); - // we compute normalizedGuarantees capacity based on currently active - // queues - resetCapacity(rc, unassigned, orderedByNeed, ignoreGuarantee); - - // For each underserved queue (or set of queues if multiple are equally - // underserved), offer its share of the unassigned resources based on its - // normalized guarantee. After the offer, if the queue is not satisfied, - // place it back in the ordered list of queues, recalculating its place - // in the order of most under-guaranteed to most over-guaranteed. In this - // way, the most underserved queue(s) are always given resources first. - Collection underserved = - getMostUnderservedQueues(orderedByNeed, tqComparator); - for (Iterator i = underserved.iterator(); i - .hasNext();) { - TempQueuePerPartition sub = i.next(); - Resource wQavail = Resources.multiplyAndNormalizeUp(rc, - unassigned, sub.normalizedGuarantee, Resource.newInstance(1, 1)); - Resource wQidle = sub.offer(wQavail, rc, tot_guarant, - isReservedPreemptionCandidatesSelector); - Resource wQdone = Resources.subtract(wQavail, wQidle); - - if (Resources.greaterThan(rc, tot_guarant, - wQdone, Resources.none())) { - // The queue is still asking for more. Put it back in the priority - // queue, recalculating its order based on need. - orderedByNeed.add(sub); - } - Resources.addTo(wQassigned, wQdone); - } - Resources.subtractFrom(unassigned, wQassigned); - } + super(preemptionContext, isReservedPreemptionCandidatesSelector); } /** @@ -240,7 +139,7 @@ private void computeFixpointAllocation(ResourceCalculator rc, */ private void computeIdealResourceDistribution(ResourceCalculator rc, List queues, Resource totalPreemptionAllowed, - Resource tot_guarant) { + Resource tot_guarant, List underServedQueues) { // qAlloc tracks currently active queues (will decrease progressively as // demand is met) @@ -264,14 +163,14 @@ private void computeIdealResourceDistribution(ResourceCalculator rc, // first compute the allocation as a fixpoint based on guaranteed capacity computeFixpointAllocation(rc, tot_guarant, nonZeroGuarQueues, unassigned, - false); + false, underServedQueues); // if any capacity is left unassigned, distributed among zero-guarantee // queues uniformly (i.e., not based on guaranteed capacity, as this is zero) if (!zeroGuarQueues.isEmpty() && Resources.greaterThan(rc, tot_guarant, unassigned, Resources.none())) { computeFixpointAllocation(rc, tot_guarant, zeroGuarQueues, unassigned, - true); + true, underServedQueues); } // based on ideal assignment computed above and current assignment we derive @@ -317,17 +216,18 @@ private void recursivelyComputeIdealAssignment( TempQueuePerPartition root, Resource totalPreemptionAllowed) { if (root.getChildren() != null && root.getChildren().size() > 0) { + List underServedQueues = + new ArrayList(); // compute ideal distribution at this level computeIdealResourceDistribution(rc, root.getChildren(), - totalPreemptionAllowed, root.idealAssigned); + totalPreemptionAllowed, root.idealAssigned, underServedQueues); // compute recursively for lower levels and build list of leafs - for(TempQueuePerPartition t : root.getChildren()) { + for (TempQueuePerPartition t : root.getChildren()) { recursivelyComputeIdealAssignment(t, totalPreemptionAllowed); } } } - private void calculateResToObtainByPartitionForLeafQueues( Set leafQueueNames, Resource clusterResource) { // Loop all leaf queues diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java index dd33d8f..36b4961 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java @@ -23,6 +23,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import com.google.common.annotations.VisibleForTesting; + +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; @@ -49,4 +55,49 @@ public abstract Map> selectCandidates( Map> selectedCandidates, Resource clusterResource, Resource totalPreemptedResourceAllowed); + + /** + * Compare by reversed priority order first, and then reversed containerId + * order + * + * @param containers + */ + // ToDo: reuse form FiFoCandidatesSelector + @VisibleForTesting + static void sortContainers(List containers) { + Collections.sort(containers, new Comparator() { + @Override + public int compare(RMContainer a, RMContainer b) { + int schedKeyComp = b.getAllocatedSchedulerKey() + .compareTo(a.getAllocatedSchedulerKey()); + if (schedKeyComp != 0) { + return schedKeyComp; + } + return b.getContainerId().compareTo(a.getContainerId()); + } + }); + } + + // ToDo: reuse form FiFoCandidatesSelector + protected void addToPreemptMap( + Map> preemptMap, + ApplicationAttemptId appAttemptId, RMContainer containerToPreempt) { + Set set; + if (null == (set = preemptMap.get(appAttemptId))) { + set = new HashSet<>(); + preemptMap.put(appAttemptId, set); + } + set.add(containerToPreempt); + } + + // ToDo: reuse form FiFoCandidatesSelector + protected boolean preemptMapContains( + Map> preemptMap, + ApplicationAttemptId attemptId, RMContainer rmContainer) { + Set rmContainers; + if (null == (rmContainers = preemptMap.get(attemptId))) { + return false; + } + return rmContainers.contains(rmContainer); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 36383502..586038d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -91,6 +91,8 @@ private boolean observeOnly; private boolean lazyPreempionEnabled; + private float maxLimitForPreemptableAppsIntraQueue; + // Pointer to other RM components private RMContext rmContext; private ResourceCalculator rc; @@ -171,6 +173,10 @@ public void init(Configuration config, RMContext context, CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED, CapacitySchedulerConfiguration.DEFAULT_LAZY_PREEMPTION_ENABLED); + maxLimitForPreemptableAppsIntraQueue = csConfig.getFloat( + CapacitySchedulerConfiguration.INTRAQUEUE_MAXIMUM_PREEMPTABLE_APPS_LIMIT, + CapacitySchedulerConfiguration.DEFAULT_INTRAQUEUE_MAXIMUM_PREEMPTABLE_APPS_LIMIT); + rc = scheduler.getResourceCalculator(); nlm = scheduler.getRMContext().getNodeLabelManager(); @@ -186,8 +192,16 @@ public void init(Configuration config, RMContext context, // initialize candidates preemption selection policies candidatesSelectionPolicies.add( new FifoCandidatesSelector(this)); + + // Do we need to specially consider reserved containers? + boolean selectIntraQueuePreemptCandidatesByPriority = csConfig.getBoolean( + CapacitySchedulerConfiguration.SELECT_CANDIDATES_FOR_INTRAQUEUE_PREEMPTION, + CapacitySchedulerConfiguration.DEFAULT_SELECT_CANDIDATES_FOR_INTRAQUEUE_PREEMPTION); + if (selectIntraQueuePreemptCandidatesByPriority) { + candidatesSelectionPolicies.add(new IntraQueueCandidatesSelector(this)); + } } - + @Override public ResourceCalculator getResourceCalculator() { return rc; @@ -542,4 +556,20 @@ public double getNaturalTerminationFactor() { Map> getQueuePartitions() { return queueToPartitions; } + + @Override + public int getClusterMaxApplicationPriority() { + return scheduler.getMaxClusterLevelAppPriority().getPriority(); + } + + @Override + public float getMaxLimitForPreemptableAppsIntraQueue() { + return maxLimitForPreemptableAppsIntraQueue; + } + + @Override + public Resource getPartitionResource(String partition) { + return Resources.clone(nlm.getResourceByLabel(partition, + Resources.clone(scheduler.getClusterResource()))); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerQueue.java new file mode 100644 index 0000000..45080f9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerQueue.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Temporary data-structure tracking resource availability, pending resource + * need, current utilization for an application. + */ +public class TempAppPerQueue { + // Following fields are copied from scheduler + final String queueName; + final Resource pending; + final HashMap pendingPerPartition; + + private final Resource current; + private final Resource killable; + private final Resource reserved; + + // Following fields are settled and used by candidate selection policies + Resource idealAssigned; + Resource toBePreempted; + Resource untouchableExtra; + Resource preemptableExtra; + private Resource actuallyToBePreempted; + + private final int priority; + private final ApplicationId applicationId; + LeafQueue leafQueue; + FiCaSchedulerApp app; + + TempAppPerQueue(String queueName, ApplicationId applicationId, + Resource current, Resource killable, Resource totalPartitionResource, + Resource reserved, Resource pending, + HashMap pendingPerPartition, int priority, FiCaSchedulerApp app) { + this.queueName = queueName; + this.current = current; + this.pending = pending; + this.pendingPerPartition = pendingPerPartition; + + this.idealAssigned = Resource.newInstance(0, 0); + this.actuallyToBePreempted = Resource.newInstance(0, 0); + this.toBePreempted = Resource.newInstance(0, 0); + this.untouchableExtra = Resource.newInstance(0, 0); + this.preemptableExtra = Resource.newInstance(0, 0); + + this.killable = killable; + this.reserved = reserved; + this.priority = priority; + this.applicationId = applicationId; + this.app = app; + } + + public void setLeafQueue(LeafQueue l) { + this.leafQueue = l; + } + + public Resource getUsed() { + return current; + } + + public Resource getUsedDeductReservd() { + return Resources.subtract(current, reserved); + } + + public HashMap getPendingPerPartition() { + return pendingPerPartition; + } + + public FiCaSchedulerApp getFiCaSchedulerApp() { + return app; + } + + public Resource getGuaranteed() { + return Resources.none(); + } + public void updatePreemptableExtras(ResourceCalculator rc) { + // Reset untouchableExtra and preemptableExtra + untouchableExtra = Resources.none(); + preemptableExtra = Resources.none(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(" NAME: " + queueName).append(" CUR: ").append(current) + .append(" PEN: ").append(pending).append(" RESERVED: ").append(reserved) + .append(" IDEAL_ASSIGNED: ").append(idealAssigned) + .append(" IDEAL_PREEMPT: ").append(toBePreempted) + .append(" ACTUAL_PREEMPT: ").append(actuallyToBePreempted) + .append(" UNTOUCHABLE: ").append(untouchableExtra) + .append(" PREEMPTABLE: ").append(preemptableExtra).append("\n"); + + return sb.toString(); + } + + + public Resource getActuallyToBePreempted() { + return actuallyToBePreempted; + } + + public void setActuallyToBePreempted(Resource res) { + this.actuallyToBePreempted = res; + } + + void appendLogString(StringBuilder sb) { + sb.append(queueName).append(", ").append(current.getMemorySize()) + .append(", ").append(current.getVirtualCores()).append(", ") + .append(pending.getMemorySize()).append(", ") + .append(pending.getVirtualCores()).append(", ") + .append(getGuaranteed().getMemorySize()).append(", ") + .append(getGuaranteed().getVirtualCores()).append(", ") + .append(idealAssigned.getMemorySize()).append(", ") + .append(idealAssigned.getVirtualCores()).append(", ") + .append(toBePreempted.getMemorySize()).append(", ") + .append(toBePreempted.getVirtualCores()).append(", ") + .append(actuallyToBePreempted.getMemorySize()).append(", ") + .append(actuallyToBePreempted.getVirtualCores()); + } + + public int getPriority() { + return priority; + } + + public ApplicationId getApplicationId() { + return applicationId; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java index 04ed135..6ab4ce9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java @@ -51,8 +51,10 @@ private Resource actuallyToBePreempted; double normalizedGuarantee; + boolean intraQueuePreemptionCalculationDone; final ArrayList children; + private ArrayList apps; LeafQueue leafQueue; boolean preemptionDisabled; @@ -77,6 +79,7 @@ this.toBePreempted = Resource.newInstance(0, 0); this.normalizedGuarantee = Float.NaN; this.children = new ArrayList<>(); + this.apps = new ArrayList<>(); this.untouchableExtra = Resource.newInstance(0, 0); this.preemptableExtra = Resource.newInstance(0, 0); this.preemptionDisabled = preemptionDisabled; @@ -86,6 +89,7 @@ this.absMaxCapacity = absMaxCapacity; this.totalPartitionResource = totalPartitionResource; this.reserved = reserved; + this.intraQueuePreemptionCalculationDone = false; } public void setLeafQueue(LeafQueue l) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index d5d1374..60b0bb8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -1077,4 +1077,24 @@ public boolean getLazyPreemptionEnabled() { PREEMPTION_CONFIG_PREFIX + "select_based_on_reserved_containers"; public static final boolean DEFAULT_PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS = false; + + /** + * For intra-queue preemption, priority based selector can help to preempt + * containers of lowest priority apps to find resources for high priority + * apps. + */ + public static final String SELECT_CANDIDATES_FOR_INTRAQUEUE_PREEMPTION = + PREEMPTION_CONFIG_PREFIX + + "select_based_on_priority_of_applications"; + public static final boolean DEFAULT_SELECT_CANDIDATES_FOR_INTRAQUEUE_PREEMPTION = + true; // Change to false later. + + /** + * For intra-queue preemption, we should not try to consider all the demand from + * high priority apps in one shot. Certain percentage of active apps can only + * be considered per round. + */ + public static final String INTRAQUEUE_MAXIMUM_PREEMPTABLE_APPS_LIMIT = + PREEMPTION_CONFIG_PREFIX + "intraqueue_max_preemptable_app_limit"; + public static final float DEFAULT_INTRAQUEUE_MAXIMUM_PREEMPTABLE_APPS_LIMIT = 0.3f; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 9aae909..c36545c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -1633,6 +1633,17 @@ public void recoverContainer(Resource clusterResource, .getSchedulableEntities()); } + /** + * Obtain (read-only) collection of all applications. + */ + public Collection getAllApplications() { + Collection apps = pendingOrderingPolicy + .getSchedulableEntities(); + apps.addAll(orderingPolicy.getSchedulableEntities()); + + return Collections.unmodifiableCollection(apps); + } + // Consider the headroom for each user in the queue. // Total pending for the queue = // sum(for each user(min((user's headroom), sum(user's pending requests)))) @@ -1664,6 +1675,21 @@ public synchronized Resource getTotalPendingResourcesConsideringUserLimit( return pendingConsideringUserLimit; } + public synchronized Resource getUserLimitHeadRoomPerApp(FiCaSchedulerApp app, + Resource resources, String partition) { + + String userName = app.getUser(); + User user = getUser(userName); + Resource headroom = Resources.subtract( + computeUserLimit(app, resources, user, partition, + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), + user.getUsed(partition)); + // Make sure headroom is not negative. + headroom = Resources.componentwiseMax(headroom, Resources.none()); + + return headroom; + } + @Override public synchronized void collectSchedulerApplications( Collection apps) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 67d93a4..3b16ff7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -314,6 +315,23 @@ public synchronized Resource getTotalPendingRequests() { return ret; } + public synchronized Map getTotalPendingRequestsPerPartition() { + + Map ret = new HashMap(); + Resource res = null; + for (SchedulerRequestKey key : appSchedulingInfo.getSchedulerKeys()) { + ResourceRequest rr = appSchedulingInfo.getResourceRequest(key, "*"); + if ((res = ret.get(rr.getNodeLabelExpression())) == null) { + res = Resources.createResource(0, 0); + ret.put(rr.getNodeLabelExpression(), res); + } + + Resources.addTo(res, + Resources.multiply(rr.getCapability(), rr.getNumContainers())); + } + return ret; + } + public synchronized void markContainerForPreemption(ContainerId cont) { // ignore already completed containers if (liveContainers.containsKey(cont)) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForIntraQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForIntraQueue.java new file mode 100644 index 0000000..58b3bed --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForIntraQueue.java @@ -0,0 +1,708 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .SchedulerRequestKey; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatcher; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Deque; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Random; +import java.util.StringTokenizer; +import java.util.TreeSet; + +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TestProportionalCapacityPreemptionPolicyForIntraQueue { + + static final long TS = 3141592653L; + + int appAlloc = 0; + boolean setAMContainer = false; + boolean setLabeledContainer = false; + float setAMResourcePercent = 0.0f; + Random rand = null; + Clock mClock = null; + CapacitySchedulerConfiguration conf = null; + CapacityScheduler mCS = null; + RMContext rmContext = null; + RMNodeLabelsManager lm = null; + EventHandler mDisp = null; + ResourceCalculator rc = new DefaultResourceCalculator(); + Resource clusterResources = null; + final ApplicationAttemptId appA = ApplicationAttemptId + .newInstance(ApplicationId.newInstance(TS, 0), 0); + final ApplicationAttemptId appB = ApplicationAttemptId + .newInstance(ApplicationId.newInstance(TS, 1), 0); + final ApplicationAttemptId appC = ApplicationAttemptId + .newInstance(ApplicationId.newInstance(TS, 2), 0); + final ApplicationAttemptId appD = ApplicationAttemptId + .newInstance(ApplicationId.newInstance(TS, 3), 0); + final ApplicationAttemptId appE = ApplicationAttemptId + .newInstance(ApplicationId.newInstance(TS, 4), 0); + final ApplicationAttemptId appF = ApplicationAttemptId + .newInstance(ApplicationId.newInstance(TS, 5), 0); + final ApplicationAttemptId appG = ApplicationAttemptId + .newInstance(ApplicationId.newInstance(TS, 6), 0); + final ArgumentCaptor evtCaptor = ArgumentCaptor + .forClass(ContainerPreemptEvent.class); + + public enum priority { + AMCONTAINER(0), CONTAINER(1), LABELEDCONTAINER(2); + int value; + + priority(int value) { + this.value = value; + } + + public int getValue() { + return this.value; + } + }; + + @Rule + public TestName name = new TestName(); + + @Before + @SuppressWarnings("unchecked") + public void setup() { + conf = new CapacitySchedulerConfiguration(new Configuration(false)); + conf.setLong( + CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL, 10000); + conf.setLong(CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL, + 3000); + // report "ideal" preempt + conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND, + 1.0f); + conf.setFloat( + CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR, + 1.0f); + conf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, + ProportionalCapacityPreemptionPolicy.class.getCanonicalName()); + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true); + // FairScheduler doesn't support this test, + // Set CapacityScheduler as the scheduler for this test. + conf.set("yarn.resourcemanager.scheduler.class", + CapacityScheduler.class.getName()); + + mClock = mock(Clock.class); + mCS = mock(CapacityScheduler.class); + when(mCS.getResourceCalculator()).thenReturn(rc); + lm = mock(RMNodeLabelsManager.class); + try { + when(lm.isExclusiveNodeLabel(anyString())).thenReturn(true); + } catch (IOException e) { + // do nothing + } + when(mCS.getConfiguration()).thenReturn(conf); + rmContext = mock(RMContext.class); + when(mCS.getRMContext()).thenReturn(rmContext); + when(mCS.getPreemptionManager()).thenReturn(new PreemptionManager()); + when(rmContext.getNodeLabelManager()).thenReturn(lm); + mDisp = mock(EventHandler.class); + Dispatcher disp = mock(Dispatcher.class); + when(rmContext.getDispatcher()).thenReturn(disp); + when(disp.getEventHandler()).thenReturn(mDisp); + rand = new Random(); + long seed = rand.nextLong(); + System.out.println(name.getMethodName() + " SEED: " + seed); + rand.setSeed(seed); + appAlloc = 0; + } + + @Test + public void testIntraQueueProportionalPreemption() { + int[][] qData = new int[][]{ + // / A B C D + {100, 10, 40, 20, 30}, // abs + {100, 100, 100, 100, 100}, // maxCap + {100, 10, 40, 10, 20}, // used + { 50, 20, 10, 60, 0}, // pending + { 0, 0, 0, 0, 0}, // reserved + { 7, 2, 4, 1, 1}, // apps + { -1, 1, 1, 1, 1}, // req granularity + { 4, 0, 0, 0, 0}, // subqueues + }; + + String[][] appData = new String[][]{ + // Queue used pending reserved gran priority + {"B", "40", "20", "0", "1", "4"}, // 2(C) + {"B", "0", "10", "0", "1", "4"}, // 3 (D) + {"B", "0", "20", "0", "1", "5"}, // 4 (E) + {"B", "0", "10", "0", "1", "6"}, // 5 (F) + }; + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData, appData); + policy.editSchedule(); + verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC))); + } + + @Test + public void testContainerOrdering() { + + List containers = new ArrayList(); + + ApplicationAttemptId appAttId = ApplicationAttemptId + .newInstance(ApplicationId.newInstance(TS, 10), 0); + + // create a set of containers + RMContainer rm1 = mockContainer(appAttId, 5, mock(Resource.class), 3); + RMContainer rm2 = mockContainer(appAttId, 3, mock(Resource.class), 3); + RMContainer rm3 = mockContainer(appAttId, 2, mock(Resource.class), 2); + RMContainer rm4 = mockContainer(appAttId, 1, mock(Resource.class), 2); + RMContainer rm5 = mockContainer(appAttId, 4, mock(Resource.class), 1); + + // insert them in non-sorted order + containers.add(rm3); + containers.add(rm2); + containers.add(rm1); + containers.add(rm5); + containers.add(rm4); + + // sort them + FifoCandidatesSelector.sortContainers(containers); + + // verify the "priority"-first, "reverse container-id"-second + // ordering is enforced correctly + assert containers.get(0).equals(rm1); + assert containers.get(1).equals(rm2); + assert containers.get(2).equals(rm3); + assert containers.get(3).equals(rm4); + assert containers.get(4).equals(rm5); + } + static class IsPreemptionRequestFor + extends + ArgumentMatcher { + private final ApplicationAttemptId appAttId; + private final SchedulerEventType type; + IsPreemptionRequestFor(ApplicationAttemptId appAttId) { + this(appAttId, MARK_CONTAINER_FOR_PREEMPTION); + } + IsPreemptionRequestFor(ApplicationAttemptId appAttId, + SchedulerEventType type) { + this.appAttId = appAttId; + this.type = type; + } + @Override + public boolean matches(Object o) { + return appAttId.equals(((ContainerPreemptEvent) o).getAppId()) + && type.equals(((ContainerPreemptEvent) o).getType()); + } + @Override + public String toString() { + return appAttId.toString(); + } + } + + ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData, + String[][] appData) { + ProportionalCapacityPreemptionPolicy policy = new ProportionalCapacityPreemptionPolicy( + rmContext, mCS, mClock); + clusterResources = Resource + .newInstance(leafAbsCapacities(qData[0], qData[7]), 0); + ParentQueue mRoot = buildMockRootQueue(rand, appData, qData); + when(mCS.getRootQueue()).thenReturn(mRoot); + + setResourceAndNodeDetails(); + return policy; + } + + ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData, + String[][] resData, String[][] appData, + boolean useDominantResourceCalculator) { + if (useDominantResourceCalculator) { + when(mCS.getResourceCalculator()) + .thenReturn(new DominantResourceCalculator()); + } + ProportionalCapacityPreemptionPolicy policy = new ProportionalCapacityPreemptionPolicy( + rmContext, mCS, mClock); + clusterResources = leafAbsCapacities(parseResourceDetails(resData[0]), + qData[2]); + ParentQueue mRoot = buildMockRootQueue(rand, appData, resData, qData); + when(mCS.getRootQueue()).thenReturn(mRoot); + + setResourceAndNodeDetails(); + return policy; + } + + private void setResourceAndNodeDetails() { + when(mCS.getClusterResource()).thenReturn(clusterResources); + when(lm.getResourceByLabel(anyString(), any(Resource.class))) + .thenReturn(clusterResources); + + SchedulerNode mNode = mock(SchedulerNode.class); + when(mNode.getPartition()).thenReturn(RMNodeLabelsManager.NO_LABEL); + when(mCS.getSchedulerNode(any(NodeId.class))).thenReturn(mNode); + } + + ParentQueue buildMockRootQueue(Random r, String[][] appData, + int[]... queueData) { + Resource[] abs = generateResourceList(queueData[0]); + Resource[] used = generateResourceList(queueData[2]); + Resource[] pending = generateResourceList(queueData[3]); + Resource[] reserved = generateResourceList(queueData[4]); + Resource[] gran = generateResourceList(queueData[6]); + int[] maxCap = queueData[1]; + int[] apps = queueData[5]; + int[] queues = queueData[7]; + + return mockNested(abs, maxCap, used, pending, reserved, apps, gran, queues, + appData); + } + + ParentQueue buildMockRootQueue(Random r, String[][] appData, + String[][] resData, int[]... queueData) { + Resource[] abs = parseResourceDetails(resData[0]); + Resource[] used = parseResourceDetails(resData[1]); + Resource[] pending = parseResourceDetails(resData[2]); + Resource[] reserved = parseResourceDetails(resData[3]); + Resource[] gran = parseResourceDetails(resData[4]); + int[] maxCap = queueData[0]; + int[] apps = queueData[1]; + int[] queues = queueData[2]; + + return mockNested(abs, maxCap, used, pending, reserved, apps, gran, queues, + appData); + } + + Resource[] parseResourceDetails(String[] resData) { + List resourceList = new ArrayList(); + for (int i = 0; i < resData.length; i++) { + String[] resource = resData[i].split(":"); + if (resource.length == 1) { + resourceList + .add(Resource.newInstance(Integer.parseInt(resource[0]), 0)); + } else { + resourceList.add(Resource.newInstance(Integer.parseInt(resource[0]), + Integer.parseInt(resource[1]))); + } + } + return resourceList.toArray(new Resource[resourceList.size()]); + } + + Resource[] generateResourceList(int[] qData) { + List resourceList = new ArrayList(); + for (int i = 0; i < qData.length; i++) { + resourceList.add(Resource.newInstance(qData[i], 0)); + } + return resourceList.toArray(new Resource[resourceList.size()]); + } + + ParentQueue mockNested(Resource[] abs, int[] maxCap, Resource[] used, + Resource[] pending, Resource[] reserved, int[] apps, Resource[] gran, + int[] queues, String[][] appData) { + ResourceCalculator rc = mCS.getResourceCalculator(); + Resource tot = leafAbsCapacities(abs, queues); + Deque pqs = new LinkedList(); + ParentQueue root = mockParentQueue(null, queues[0], pqs); + ResourceUsage resUsage = new ResourceUsage(); + resUsage.setUsed(used[0]); + resUsage.setReserved(reserved[0]); + when(root.getQueueName()).thenReturn(CapacitySchedulerConfiguration.ROOT); + when(root.getAbsoluteUsedCapacity()) + .thenReturn(Resources.divide(rc, tot, used[0], tot)); + when(root.getAbsoluteCapacity()) + .thenReturn(Resources.divide(rc, tot, abs[0], tot)); + when(root.getAbsoluteMaximumCapacity()) + .thenReturn(maxCap[0] / (float) tot.getMemorySize()); + when(root.getQueueResourceUsage()).thenReturn(resUsage); + QueueCapacities rootQc = new QueueCapacities(true); + rootQc.setAbsoluteUsedCapacity(Resources.divide(rc, tot, used[0], tot)); + rootQc.setAbsoluteCapacity(Resources.divide(rc, tot, abs[0], tot)); + rootQc.setAbsoluteMaximumCapacity(maxCap[0] / (float) tot.getMemorySize()); + when(root.getQueueCapacities()).thenReturn(rootQc); + when(root.getQueuePath()).thenReturn(CapacitySchedulerConfiguration.ROOT); + boolean preemptionDisabled = mockPreemptionStatus("root"); + when(root.getPreemptionDisabled()).thenReturn(preemptionDisabled); + + HashMap> perQAppStore = new HashMap<>(); + perQAppStore = parseAppDataPerQueue(appData); + for (int i = 1; i < queues.length; ++i) { + final CSQueue q; + final ParentQueue p = pqs.removeLast(); + final String queueName = "queue" + ((char) ('A' + i - 1)); + List appsPerQueue = perQAppStore.get(queueName); + if (queues[i] > 0) { + q = mockParentQueue(p, queues[i], pqs); + ResourceUsage resUsagePerQueue = new ResourceUsage(); + resUsagePerQueue.setUsed(used[i]); + resUsagePerQueue.setReserved(reserved[i]); + when(q.getQueueResourceUsage()).thenReturn(resUsagePerQueue); + } else { + System.out.println("queue Name:"+ queueName); + q = mockLeafQueue(p, tot, i, abs, used, pending, reserved, apps, gran, + appsPerQueue); + } + when(q.getParent()).thenReturn(p); + when(q.getQueueName()).thenReturn(queueName); + when(q.getAbsoluteUsedCapacity()) + .thenReturn(Resources.divide(rc, tot, used[i], tot)); + when(q.getAbsoluteCapacity()) + .thenReturn(Resources.divide(rc, tot, abs[i], tot)); + when(q.getAbsoluteMaximumCapacity()) + .thenReturn(maxCap[i] / (float) tot.getMemorySize()); + + // We need to make these fields to QueueCapacities + QueueCapacities qc = new QueueCapacities(false); + qc.setAbsoluteUsedCapacity(Resources.divide(rc, tot, used[i], tot)); + qc.setAbsoluteCapacity(Resources.divide(rc, tot, abs[i], tot)); + qc.setAbsoluteMaximumCapacity(maxCap[i] / (float) tot.getMemorySize()); + when(q.getQueueCapacities()).thenReturn(qc); + + String parentPathName = p.getQueuePath(); + parentPathName = (parentPathName == null) ? "root" : parentPathName; + String queuePathName = (parentPathName + "." + queueName).replace("/", + "root"); + when(q.getQueuePath()).thenReturn(queuePathName); + preemptionDisabled = mockPreemptionStatus(queuePathName); + when(q.getPreemptionDisabled()).thenReturn(preemptionDisabled); + } + assert 0 == pqs.size(); + return root; + } + + private HashMap> parseAppDataPerQueue( + String[][] appData) { + // // Queue used pending reserved gran priority + // { "B", "40", "20", "0", "1", "4" }, // 1 + // { "B", "0", "10", "0", "1", "4" }, // 2 + // { "B", "0", "20", "0", "1", "5" }, // 3 + // { "B", "0", "10", "0", "1", "6" }, // 4 + + HashMap> queueToApp = new HashMap>(); + for (int i = 0; i < appData.length; i++) { + processEachAppData(appData[i], queueToApp); + } + + return queueToApp; + } + + private void processEachAppData(String[] perAppData, + HashMap> queueToApp) { + String queue = "queue" + perAppData[0]; + List apps = null; + + if ((apps = queueToApp.get(queue)) == null) { + apps = new ArrayList(); + queueToApp.put(queue, apps); + System.out.println("add queue : '" + queue + "' to queueToApp"); + } + + Integer[] app = new Integer[perAppData.length - 1]; + for (int i = 0; i < perAppData.length - 1; ++i) { + app[i] = Integer.parseInt(perAppData[i + 1]); + System.out.println("per app data:" + app[i]); + } + System.out.println("per app data done!"); + + apps.add(app); + } + + // Determine if any of the elements in the queupath have preemption disabled. + // Also must handle the case where preemption disabled property is explicitly + // set to something other than the default. Assumes system-wide preemption + // property is true. + private boolean mockPreemptionStatus(String queuePathName) { + boolean preemptionDisabled = false; + StringTokenizer tokenizer = new StringTokenizer(queuePathName, "."); + String qName = ""; + while (tokenizer.hasMoreTokens()) { + qName += tokenizer.nextToken(); + preemptionDisabled = conf.getPreemptionDisabled(qName, + preemptionDisabled); + qName += "."; + } + return preemptionDisabled; + } + + ParentQueue mockParentQueue(ParentQueue p, int subqueues, + Deque pqs) { + ParentQueue pq = mock(ParentQueue.class); + List cqs = new ArrayList(); + when(pq.getChildQueues()).thenReturn(cqs); + for (int i = 0; i < subqueues; ++i) { + pqs.add(pq); + } + if (p != null) { + p.getChildQueues().add(pq); + } + return pq; + } + + @SuppressWarnings("rawtypes") + LeafQueue mockLeafQueue(ParentQueue p, Resource tot, int i, Resource[] abs, + Resource[] used, Resource[] pending, Resource[] reserved, int[] apps, + Resource[] gran, List appsPerQueue) { + LeafQueue lq = mock(LeafQueue.class); + ResourceCalculator rc = mCS.getResourceCalculator(); + List appAttemptIdList = new ArrayList(); + when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class), + isA(String.class))).thenReturn(pending[i]); + // need to set pending resource in resource usage as well + ResourceUsage ru = new ResourceUsage(); + ru.setPending(pending[i]); + ru.setUsed(used[i]); + ru.setReserved(reserved[i]); + when(lq.getQueueResourceUsage()).thenReturn(ru); + // Added for test + when(lq.getUserLimitHeadRoomPerApp(any(FiCaSchedulerApp.class), + any(Resource.class), anyString())).thenReturn(pending[i]); + + // consider moving where CapacityScheduler::comparator accessible + final NavigableSet qApps = new TreeSet( + new Comparator() { + @Override + public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) { + if (a1.getPriority() != null + && !a1.getPriority().equals(a2.getPriority())) { + return a1.getPriority().compareTo(a2.getPriority()); + } + + int res = a1.getApplicationAttemptId() + .compareTo(a2.getApplicationAttemptId()); + return res; + + } + }); + // applications are added in global L->R order in queues + if (appsPerQueue == null) { + if (apps[i] != 0) { + Resource aUsed = Resources.divideAndCeil(rc, used[i], apps[i]); + Resource aPending = Resources.divideAndCeil(rc, pending[i], apps[i]); + Resource aReserve = Resources.divideAndCeil(rc, reserved[i], apps[i]); + for (int a = 0; a < apps[i]; ++a) { + FiCaSchedulerApp mockFiCaApp = mockApp(i, appAlloc, aUsed, aPending, + aReserve, gran[i], 1); + qApps.add(mockFiCaApp); + ++appAlloc; + appAttemptIdList.add(mockFiCaApp.getApplicationAttemptId()); + System.out.println("attempt:" + mockFiCaApp.getApplicationAttemptId()); + } + when(mCS.getAppsInQueue("queue" + (char) ('A' + i - 1))) + .thenReturn(appAttemptIdList); + } + } else { + for (Integer[] app : appsPerQueue) { + // Queue used pending reserved gran priority + assertTrue(app.length == 5); + Resource aUsed = Resource.newInstance(app[0], 0); + Resource aPending = Resource.newInstance(app[1], 0); + Resource aReserve = Resource.newInstance(app[2], 0); + Resource aGrans = Resource.newInstance(app[3], 0); + int aPriority = app[4]; + + FiCaSchedulerApp mockFiCaApp = mockApp(i, appAlloc, aUsed, aPending, + aReserve, aGrans, aPriority); + qApps.add(mockFiCaApp); + ++appAlloc; + appAttemptIdList.add(mockFiCaApp.getApplicationAttemptId()); + System.out.println("attempt new:" + mockFiCaApp.getApplicationAttemptId()); + } + } + + when(mCS.getAppsInQueue("queue" + (char) ('A' + i - 1))) + .thenReturn(appAttemptIdList); + when(lq.getApplications()).thenReturn(qApps); + when(lq.getAllApplications()).thenReturn(qApps); + @SuppressWarnings("unchecked") + OrderingPolicy so = mock(OrderingPolicy.class); + when(so.getPreemptionIterator()).thenAnswer(new Answer() { + public Object answer(InvocationOnMock invocation) { + return qApps.descendingIterator(); + } + }); + when(lq.getOrderingPolicy()).thenReturn(so); + if (setAMResourcePercent != 0.0f) { + when(lq.getMaxAMResourcePerQueuePercent()) + .thenReturn(setAMResourcePercent); + } + p.getChildQueues().add(lq); + return lq; + } + + FiCaSchedulerApp mockApp(int qid, int id, Resource used, Resource pending, + Resource reserved, Resource gran, int appPriority) { + FiCaSchedulerApp app = mock(FiCaSchedulerApp.class); + ResourceCalculator rc = mCS.getResourceCalculator(); + + ApplicationId appId = ApplicationId.newInstance(TS, id); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(appId, 0); + when(app.getApplicationId()).thenReturn(appId); + when(app.getApplicationAttemptId()).thenReturn(appAttId); + when(app.getPriority()).thenReturn( + org.apache.hadoop.yarn.api.records.Priority.newInstance(appPriority)); + when(app.getTotalPendingRequests()).thenReturn(pending); + + Map pendingForDefaultPartition = new HashMap(); + // Add for default partition for now. + pendingForDefaultPartition.put("", pending); + when(app.getTotalPendingRequestsPerPartition()).thenReturn(pendingForDefaultPartition); + + int cAlloc = 0; + Resource unit = gran; + List cReserved = new ArrayList(); + Resource resIter = Resource.newInstance(0, 0); + for (; Resources.lessThan(rc, clusterResources, resIter, + reserved); Resources.addTo(resIter, gran)) { + cReserved.add( + mockContainer(appAttId, cAlloc, unit, priority.CONTAINER.getValue())); + ++cAlloc; + } + when(app.getReservedContainers()).thenReturn(cReserved); + + List cLive = new ArrayList(); + Resource usedIter = Resource.newInstance(0, 0); + int i = 0; + for (; Resources.lessThan(rc, clusterResources, usedIter, used); Resources + .addTo(usedIter, gran)) { + if (setAMContainer && i == 0) { + cLive.add(mockContainer(appAttId, cAlloc, unit, + priority.AMCONTAINER.getValue())); + } else if (setLabeledContainer && i == 1) { + cLive.add(mockContainer(appAttId, cAlloc, unit, + priority.LABELEDCONTAINER.getValue())); + Resources.addTo(used, Resource.newInstance(1, 1)); + } else { + cLive.add(mockContainer(appAttId, cAlloc, unit, + priority.CONTAINER.getValue())); + } + ++cAlloc; + ++i; + } + when(app.getLiveContainers()).thenReturn(cLive); + return app; + } + + RMContainer mockContainer(ApplicationAttemptId appAttId, int id, Resource r, + int cpriority) { + ContainerId cId = ContainerId.newContainerId(appAttId, id); + Container c = mock(Container.class); + when(c.getResource()).thenReturn(r); + when(c.getPriority()).thenReturn(Priority.create(cpriority)); + SchedulerRequestKey sk = SchedulerRequestKey.extractFrom(c); + RMContainer mC = mock(RMContainer.class); + when(mC.getContainerId()).thenReturn(cId); + when(mC.getAllocatedSchedulerKey()).thenReturn(sk); + when(mC.getContainer()).thenReturn(c); + when(mC.getApplicationAttemptId()).thenReturn(appAttId); + when(mC.getAllocatedResource()).thenReturn(r); + if (priority.AMCONTAINER.getValue() == cpriority) { + when(mC.isAMContainer()).thenReturn(true); + } + if (priority.LABELEDCONTAINER.getValue() == cpriority) { + when(mC.getAllocatedNode()).thenReturn(NodeId.newInstance("node1", 0)); + } + return mC; + } + + static int leafAbsCapacities(int[] abs, int[] subqueues) { + int ret = 0; + for (int i = 0; i < abs.length; ++i) { + if (0 == subqueues[i]) { + ret += abs[i]; + } + } + return ret; + } + + static Resource leafAbsCapacities(Resource[] abs, int[] subqueues) { + Resource ret = Resource.newInstance(0, 0); + for (int i = 0; i < abs.length; ++i) { + if (0 == subqueues[i]) { + Resources.addTo(ret, abs[i]); + } + } + return ret; + } + + void printString(CSQueue nq, String indent) { + if (nq instanceof ParentQueue) { + System.out.println(indent + nq.getQueueName() + " cur:" + + nq.getAbsoluteUsedCapacity() + " guar:" + nq.getAbsoluteCapacity()); + for (CSQueue q : ((ParentQueue) nq).getChildQueues()) { + printString(q, indent + " "); + } + } else { + System.out.println(indent + nq.getQueueName() + " pen:" + + ((LeafQueue) nq).getTotalPendingResourcesConsideringUserLimit( + isA(Resource.class), isA(String.class)) + + " cur:" + nq.getAbsoluteUsedCapacity() + " guar:" + + nq.getAbsoluteCapacity()); + for (FiCaSchedulerApp a : ((LeafQueue) nq).getApplications()) { + System.out.println(indent + " " + a.getApplicationId()); + } + } + } + +} -- 2.7.4 (Apple Git-66)