From ea32f9f15d340b5befc6488e39374c0c853a2eac Mon Sep 17 00:00:00 2001 From: Sunil G Date: Fri, 24 Mar 2017 22:09:31 +0530 Subject: [PATCH] YARN-2113 --- .../capacity/FifoIntraQueuePreemptionPlugin.java | 198 +++++--- .../capacity/IntraQueueCandidatesSelector.java | 138 +++++- .../IntraQueuePreemptionComputePlugin.java | 5 + .../monitor/capacity/MultiComparatorForApp.java | 48 ++ .../ProportionalCapacityPreemptionPolicy.java | 1 - .../monitor/capacity/TempAppPerPartition.java | 16 +- .../monitor/capacity/TempUserPerPartition.java | 83 ++++ .../scheduler/capacity/UsersManager.java | 9 + .../scheduler/policy/CompoundComparator.java | 3 - ...ionalCapacityPreemptionPolicyMockFramework.java | 76 ++- ...apacityPreemptionPolicyIntraQueueUserLimit.java | 508 +++++++++++++++++++++ 11 files changed, 991 insertions(+), 94 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/MultiComparatorForApp.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempUserPerPartition.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java index 5f1af1e..322cd8f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java @@ -18,11 +18,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.PriorityQueue; import java.util.Set; @@ -32,8 +35,11 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueueCandidatesSelector.TAPreemptionOrderComparatorByFifo; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueueCandidatesSelector.TAPreemptionOrderComparatorByPriority; import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueueCandidatesSelector.TAPriorityComparator; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; @@ -61,6 +67,19 @@ public FifoIntraQueuePreemptionPlugin(ResourceCalculator rc, } @Override + public Collection getPreemptableApps(String queueName, + String partition) { + TempQueuePerPartition tq = context.getQueueByPartition(queueName, + partition); + + List apps = new ArrayList(); + for (TempAppPerPartition tmpApp : tq.getApps()) { + apps.add(tmpApp.app); + } + return apps; + } + + @Override public Map getResourceDemandFromAppsPerQueue( String queueName, String partition) { @@ -111,19 +130,22 @@ public void computeAppsIdealAllocation(Resource clusterResource, return; } + // This will hold a temp user data structure and will hold userlimit, + // idealAssigned, used etc to help calculation at each stage. + Map usersPerPartition = new LinkedHashMap<>(); + // 3. Create all tempApps for internal calculation and return a list from // high priority to low priority order. TAPriorityComparator taComparator = new TAPriorityComparator(); - PriorityQueue orderedByPriority = - createTempAppForResCalculation(tq.partition, apps, taComparator); + PriorityQueue orderedByPriority = createTempAppForResCalculation( + tq, apps, taComparator, clusterResource, usersPerPartition, perUserAMUsed); // 4. Calculate idealAssigned per app by checking based on queue's // unallocated resource.Also return apps arranged from lower priority to // higher priority. - TreeSet orderedApps = - calculateIdealAssignedResourcePerApp(clusterResource, - partitionBasedResource, tq, selectedCandidates, - queueReassignableResource, orderedByPriority, perUserAMUsed); + TreeSet orderedApps = calculateIdealAssignedResourcePerApp( + clusterResource, partitionBasedResource, tq, selectedCandidates, + queueReassignableResource, orderedByPriority, usersPerPartition); // 5. A configurable limit that could define an ideal allowable preemption // limit. Based on current queue's capacity,defined how much % could become @@ -154,7 +176,7 @@ public void computeAppsIdealAllocation(Resource clusterResource, // 8. There are chances that we may preempt for the demand from same // priority level, such cases are to be validated out. validateOutSameAppPriorityFromDemand(clusterResource, - (TreeSet) tq.getApps()); + (TreeSet) orderedApps); if (LOG.isDebugEnabled()) { LOG.debug("Queue Name:" + tq.queueName + ", partition:" + tq.partition); @@ -227,7 +249,7 @@ private void calculateToBePreemptedResourcePerApp(Resource clusterResource, * @param selectedCandidates Already Selected preemption candidates * @param queueReassignableResource Resource used in a queue * @param orderedByPriority List of running apps - * @param perUserAMUsed AM used resource + * @param usersPerPartition AM used resource * @return List of temp apps ordered from low to high priority */ private TreeSet calculateIdealAssignedResourcePerApp( @@ -236,64 +258,50 @@ private void calculateToBePreemptedResourcePerApp(Resource clusterResource, Map> selectedCandidates, Resource queueReassignableResource, PriorityQueue orderedByPriority, - Map perUserAMUsed) { - - Comparator reverseComp = Collections - .reverseOrder(new TAPriorityComparator()); - TreeSet orderedApps = new TreeSet<>(reverseComp); - - Map userIdealAssignedMapping = new HashMap<>(); + Map usersPerPartition) { + + // Incoming apps are to be ordered from lower to higher priority. However + // its possible that some low priority app may have demand (due to + // userlimit). + // Make sure that higher priority apps with 0 demand is pushed down to + // provide preemption for lower priority apps with demand. + // TAPreemptionOrderComparator is considering app's toBePreemptFromOther + // in addition to priority and appId. + List> comparators = + new ArrayList>(); + comparators.add( + Collections.reverseOrder(new TAPreemptionOrderComparatorByPriority())); + comparators + .add(Collections.reverseOrder(new TAPreemptionOrderComparatorByFifo())); + MultiComparatorForApp multiComparator = new MultiComparatorForApp( + comparators); + + TreeSet orderedApps = new TreeSet<>(multiComparator); String partition = tq.partition; - Map preCalculatedUserLimit = - new HashMap(); - while (!orderedByPriority.isEmpty()) { // Remove app from the next highest remaining priority and process it to // calculate idealAssigned per app. TempAppPerPartition tmpApp = orderedByPriority.remove(); - orderedApps.add(tmpApp); // Once unallocated resource is 0, we can stop assigning ideal per app. if (Resources.lessThanOrEqual(rc, clusterResource, queueReassignableResource, Resources.none())) { + orderedApps.add(tmpApp); continue; } String userName = tmpApp.app.getUser(); - Resource userLimitResource = preCalculatedUserLimit.get(userName); - - // Verify whether we already calculated headroom for this user. - if (userLimitResource == null) { - userLimitResource = Resources.clone( - tq.leafQueue.getResourceLimitForAllUsers(userName, clusterResource, - partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)); - - Resource amUsed = perUserAMUsed.get(userName); - if (null == amUsed) { - amUsed = Resources.createResource(0, 0); - } - - // Real AM used need not have to be considered for user-limit as well. - userLimitResource = Resources.subtract(userLimitResource, amUsed); - if (LOG.isDebugEnabled()) { - LOG.debug("Userlimit for user '" + userName + "' is :" - + userLimitResource + ", and amUsed is:" + amUsed); - } - - preCalculatedUserLimit.put(userName, userLimitResource); - } + TempUserPerPartition tmpUser = usersPerPartition.get(userName); + Resource userLimitResource = tmpUser.getUserLimit(); + Resource idealAssignedForUser = tmpUser.idealAssigned; - Resource idealAssignedForUser = userIdealAssignedMapping.get(userName); - - if (idealAssignedForUser == null) { - idealAssignedForUser = Resources.createResource(0, 0); - userIdealAssignedMapping.put(userName, idealAssignedForUser); - } + // update user-limit-met flag to each app. + tmpApp.setBelowUserLimit(tmpUser.isUserLimitReached() ? false : true); // Calculate total selected container resources from current app. - getAlreadySelectedPreemptionCandidatesResource(selectedCandidates, - tmpApp, partition); + getAlreadySelectedPreemptionCandidatesResource(selectedCandidates, tmpApp, + partition); // For any app, used+pending will give its idealAssigned. However it will // be tightly linked to queue's unallocated quota. So lower priority apps @@ -304,12 +312,14 @@ private void calculateToBePreemptedResourcePerApp(Resource clusterResource, if (Resources.lessThan(rc, clusterResource, idealAssignedForUser, userLimitResource)) { - appIdealAssigned = Resources.min(rc, clusterResource, appIdealAssigned, + Resource idealAssigned = Resources.min(rc, clusterResource, + appIdealAssigned, Resources.subtract(userLimitResource, idealAssignedForUser)); tmpApp.idealAssigned = Resources.clone(Resources.min(rc, - clusterResource, queueReassignableResource, appIdealAssigned)); + clusterResource, queueReassignableResource, idealAssigned)); Resources.addTo(idealAssignedForUser, tmpApp.idealAssigned); } else { + orderedApps.add(tmpApp); continue; } @@ -322,6 +332,7 @@ private void calculateToBePreemptedResourcePerApp(Resource clusterResource, Resources.subtract(tmpApp.idealAssigned, appUsedExcludedSelected)); } + orderedApps.add(tmpApp); Resources.subtractFrom(queueReassignableResource, tmpApp.idealAssigned); } @@ -351,11 +362,15 @@ private void getAlreadySelectedPreemptionCandidatesResource( } private PriorityQueue createTempAppForResCalculation( - String partition, Collection apps, - TAPriorityComparator taComparator) { + TempQueuePerPartition tq, Collection apps, + TAPriorityComparator taComparator, Resource clusterResource, + Map usersPerPartition, + Map perUserAMUsed) { PriorityQueue orderedByPriority = new PriorityQueue<>( 100, taComparator); + String partition = tq.partition; + // have an internal temp app structure to store intermediate data(priority) for (FiCaSchedulerApp app : apps) { @@ -387,19 +402,52 @@ private void getAlreadySelectedPreemptionCandidatesResource( tmpApp.idealAssigned = Resources.createResource(0, 0); orderedByPriority.add(tmpApp); + + // Add to users linked map to maintain user's order. + String userName = app.getUser(); + if (!usersPerPartition.containsKey(userName)) { + ResourceUsage userResourceUsage = tq.leafQueue.getUser(userName) + .getResourceUsage(); + + TempUserPerPartition tmpUser = new TempUserPerPartition( + tq.leafQueue.getUser(userName), tq.queueName, + Resources.clone(userResourceUsage.getUsed(partition)), + Resources.clone(perUserAMUsed.get(userName)), + Resources.clone(userResourceUsage.getReserved(partition)), + Resources.none()); + + Resource userLimitResource = Resources.clone( + tq.leafQueue.getResourceLimitForAllUsers(userName, clusterResource, + partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)); + + // Real AM used need not have to be considered for user-limit as well. + userLimitResource = Resources.subtract(userLimitResource, tmpUser.amUsed); + tmpUser.setUserLimit(userLimitResource); + tmpUser.setUserLimitReached(rc, clusterResource); + + if (LOG.isDebugEnabled()) { + LOG.debug("TempUser:" + tmpUser); + } + + tmpUser.idealAssigned = Resources.createResource(0, 0); + usersPerPartition.put(userName, tmpUser); + } } return orderedByPriority; } /* * Fifo+Priority based preemption policy need not have to preempt resources at - * same priority level. Such cases will be validated out. + * same priority level. Such cases will be validated out. But if the demand is + * from an app of different user, force to preempt resources even if apps are + * at same priority. */ - public void validateOutSameAppPriorityFromDemand(Resource cluster, - TreeSet appsOrderedfromLowerPriority) { + public void validateOutSameAppPriorityFromDemand( + Resource cluster, + TreeSet orderedApps) { - TempAppPerPartition[] apps = appsOrderedfromLowerPriority - .toArray(new TempAppPerPartition[appsOrderedfromLowerPriority.size()]); + TempAppPerPartition[] apps = orderedApps + .toArray(new TempAppPerPartition[orderedApps.size()]); if (apps.length <= 0) { return; } @@ -407,18 +455,34 @@ public void validateOutSameAppPriorityFromDemand(Resource cluster, int lPriority = 0; int hPriority = apps.length - 1; - while (lPriority < hPriority - && !apps[lPriority].equals(apps[hPriority]) - && apps[lPriority].getPriority() < apps[hPriority].getPriority()) { - Resource toPreemptFromOther = apps[hPriority] - .getToBePreemptFromOther(); + while (lPriority < hPriority && !apps[lPriority].equals(apps[hPriority])) { + + // Check whether app with demand needs resource from other user. + if (Resources.greaterThan(rc, cluster, + apps[hPriority].getToBePreemptFromOther(), Resources.none())) { + + // If apps are of same user, increment lPriority as current app at + // lPriority is under same user as of hPriority. + if ((apps[hPriority].getUser().equals(apps[lPriority].getUser())) + && (apps[lPriority].getPriority() == apps[hPriority] + .getPriority())) { + lPriority++; + continue; + } + } else { + // decrement hPriority as current hPriority app doesnt need resource. + hPriority--; + continue; + } + + Resource toPreemptFromOther = apps[hPriority].getToBePreemptFromOther(); Resource actuallyToPreempt = apps[lPriority].getActuallyToBePreempted(); Resource delta = Resources.subtract(apps[lPriority].toBePreempted, actuallyToPreempt); if (Resources.greaterThan(rc, cluster, delta, Resources.none())) { - Resource toPreempt = Resources.min(rc, cluster, - toPreemptFromOther, delta); + Resource toPreempt = Resources.min(rc, cluster, toPreemptFromOther, + delta); apps[hPriority].setToBePreemptFromOther( Resources.subtract(toPreemptFromOther, toPreempt)); @@ -426,8 +490,7 @@ public void validateOutSameAppPriorityFromDemand(Resource cluster, Resources.add(actuallyToPreempt, toPreempt)); } - if (Resources.lessThanOrEqual(rc, cluster, - apps[lPriority].toBePreempted, + if (Resources.lessThanOrEqual(rc, cluster, apps[lPriority].toBePreempted, apps[lPriority].getActuallyToBePreempted())) { lPriority++; continue; @@ -456,6 +519,7 @@ private Resource calculateUsedAMResourcesPerQueue(String partition, Resources.addTo(userAMResource, app.getAMResource(partition)); Resources.addTo(amUsed, app.getAMResource(partition)); } + return amUsed; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java index 2890414..db242eb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java @@ -31,8 +31,8 @@ import java.io.Serializable; import java.util.ArrayList; +import java.util.Collection; import java.util.Comparator; -import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -51,17 +51,133 @@ Comparator { @Override - public int compare(TempAppPerPartition tq1, TempAppPerPartition tq2) { - Priority p1 = Priority.newInstance(tq1.getPriority()); - Priority p2 = Priority.newInstance(tq2.getPriority()); + public int compare(TempAppPerPartition ta1, TempAppPerPartition ta2) { + Priority p1 = Priority.newInstance(ta1.getPriority()); + Priority p2 = Priority.newInstance(ta2.getPriority()); if (!p1.equals(p2)) { return p1.compareTo(p2); } - return tq1.getApplicationId().compareTo(tq2.getApplicationId()); + return ta1.getApplicationId().compareTo(ta2.getApplicationId()); } } + /** + * For preemption calculation, apart from priority and appId of an app, + * consider toBePreemptFromOther value also. This is to support user limit + * based preemption. In that case, an app with higher priority will be chosen + * as lower indexed app since another low priority app has some demand based + * on user-limit. + */ + @SuppressWarnings("serial") + static class TAPreemptionOrderComparatorByPriority + implements + Serializable, + Comparator { + + @Override + public int compare(TempAppPerPartition ta1, TempAppPerPartition ta2) { + Priority p1 = Priority.newInstance(ta1.getPriority()); + Priority p2 = Priority.newInstance(ta2.getPriority()); + + int priorityComparedValue = p1.compareTo(p2); + + // Consider demand from each app. + Resource demandApp1 = ta1.getToBePreemptFromOther(); + Resource demandApp2 = ta2.getToBePreemptFromOther(); + + if (!p1.equals(p2)) { + // If ta2 is of higher priority, priorityComparator will be non-zero. + // Flip the comparator order if ta2 has no demand, and ta1 has demand. + // Else case is the mirror scenario of same. + if (compareWithPreemptionDemand(priorityComparedValue, demandApp1, + demandApp2)) { + return demandApp2.compareTo(demandApp1); + } + + // If ta1 or ta2 is of higher priority and its already over its + // user-limit, we should de-prioritize whichever is higher. + // isBelowUserLimit() helps to cross verify whether app has demand, + // but its already above its limit. + if (compareWithUserLimitQuota(priorityComparedValue, ta1, ta2)) { + return ta2.isBelowUserLimit().compareTo(ta1.isBelowUserLimit()); + } + + return priorityComparedValue; + } + + return priorityComparedValue; + } + } + + @SuppressWarnings("serial") + static class TAPreemptionOrderComparatorByFifo + implements + Serializable, + Comparator { + + @Override + public int compare(TempAppPerPartition ta1, TempAppPerPartition ta2) { + + // Consider demand from each app. + Resource demandApp1 = ta1.getToBePreemptFromOther(); + Resource demandApp2 = ta2.getToBePreemptFromOther(); + + int fifoComparedValue = ta1.getApplicationId() + .compareTo(ta2.getApplicationId()); + + // If ta2 submitted earlier to ta1, fifoComparator will be non-zero. + // Flip the comparator order if ta2 has no demand, and ta1 has demand. + // Else case is the mirror scenario of same. + if (compareWithPreemptionDemand(fifoComparedValue, demandApp1, + demandApp2)) { + return demandApp2.compareTo(demandApp1); + } + + // If ta1 or ta2 is of higher priority and its already over its + // user-limit, we should de-prioritize whichever is higher. + // isBelowUserLimit() helps to cross verify whether app has demand, + // but its already above its limit. + if (compareWithUserLimitQuota(fifoComparedValue, ta1, ta2)) { + return ta2.isBelowUserLimit().compareTo(ta1.isBelowUserLimit()); + } + + return fifoComparedValue; + } + } + + public static boolean compareWithPreemptionDemand(int comparedValue, + Resource demandApp1, Resource demandApp2) { + if (comparedValue > 0) { + if (demandApp2.equals(Resources.none()) + && !demandApp1.equals(Resources.none())) + return true; + } else if (comparedValue < 0) { + if (demandApp1.equals(Resources.none()) + && !demandApp2.equals(Resources.none())) + return true; + } + return false; + } + + public static boolean compareWithUserLimitQuota(int comparedValue, + TempAppPerPartition app1, TempAppPerPartition app2) { + if (comparedValue > 0) { + // app2 has high priority + if (!app2.isBelowUserLimit() && app1.isBelowUserLimit()) { + // flip + return true; + } + } else if (comparedValue < 0) { + // app1 has high priority + if (!app1.isBelowUserLimit() && app2.isBelowUserLimit()) { + // flip + return true; + } + } + return false; + } + IntraQueuePreemptionComputePlugin fifoPreemptionComputePlugin = null; final CapacitySchedulerPreemptionContext context; @@ -121,14 +237,18 @@ public int compare(TempAppPerPartition tq1, TempAppPerPartition tq2) { Map resToObtainByPartition = fifoPreemptionComputePlugin .getResourceDemandFromAppsPerQueue(queueName, partition); + // Default preemption iterator considers only FIFO+priority. For + // userlimit preemption, its possible that some lower priority apps + // needs from high priority app of another user. Hence use apps + // ordered by userlimit starvation as well. + Collection apps = fifoPreemptionComputePlugin + .getPreemptableApps(queueName, partition); + // 6. Based on the selected resource demand per partition, select // containers with known policy from inter-queue preemption. try { leafQueue.getReadLock().lock(); - Iterator desc = leafQueue.getOrderingPolicy() - .getPreemptionIterator(); - while (desc.hasNext()) { - FiCaSchedulerApp app = desc.next(); + for (FiCaSchedulerApp app : apps) { preemptFromLeastStarvedApp(selectedCandidates, clusterResource, totalPreemptedResourceAllowed, resToObtainByPartition, leafQueue, app); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java index 93ebe65..4bc9c80 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java @@ -18,12 +18,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; +import java.util.Collection; import java.util.Map; import java.util.Set; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; interface IntraQueuePreemptionComputePlugin { @@ -36,4 +38,7 @@ void computeAppsIdealAllocation(Resource clusterResource, Map> selectedCandidates, Resource totalPreemptedResourceAllowed, Resource queueTotalUnassigned, float maxAllowablePreemptLimit); + + Collection getPreemptableApps(String queueName, + String partition); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/MultiComparatorForApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/MultiComparatorForApp.java new file mode 100644 index 0000000..a6b00ad --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/MultiComparatorForApp.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import java.io.Serializable; +import java.util.Comparator; +import java.util.List; + +@SuppressWarnings("serial") +public class MultiComparatorForApp + implements + Serializable, + Comparator { + // Some policies will use multiple comparators joined together + + List> comparators; + + MultiComparatorForApp(List> comparators) { + this.comparators = comparators; + } + + @Override + public int compare(final TempAppPerPartition r1, + final TempAppPerPartition r2) { + for (Comparator comparator : comparators) { + int result = comparator.compare(r1, r2); + if (result != 0) + return result; + } + return 0; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 3bf6994..43c320b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -243,7 +243,6 @@ public synchronized void editSchedule() { } } - @SuppressWarnings("unchecked") private void preemptOrkillSelectedContainerAfterWait( Map> selectedCandidates, long currentTime) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java index fccd2a7..69a31a6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java @@ -34,6 +34,7 @@ // Following fields are settled and used by candidate selection policies private final int priority; private final ApplicationId applicationId; + private Boolean isBelowUserLimit = true; FiCaSchedulerApp app; @@ -65,7 +66,8 @@ public String toString() { .append(idealAssigned).append(" PREEMPT_OTHER: ") .append(getToBePreemptFromOther()).append(" IDEAL_PREEMPT: ") .append(toBePreempted).append(" ACTUAL_PREEMPT: ") - .append(getActuallyToBePreempted()).append("\n"); + .append(getActuallyToBePreempted()).append(" IS_USER_LIMIT: "). + append(isBelowUserLimit()).append("\n"); return sb.toString(); } @@ -91,6 +93,10 @@ public ApplicationId getApplicationId() { return applicationId; } + public String getUser() { + return this.app.getUser(); + } + public void deductActuallyToBePreempted(ResourceCalculator resourceCalculator, Resource cluster, Resource toBeDeduct, String partition) { if (Resources.greaterThan(resourceCalculator, cluster, @@ -98,4 +104,12 @@ public void deductActuallyToBePreempted(ResourceCalculator resourceCalculator, Resources.subtractFrom(getActuallyToBePreempted(), toBeDeduct); } } + + public Boolean isBelowUserLimit() { + return isBelowUserLimit; + } + + public void setBelowUserLimit(boolean isBelowUserLimit) { + this.isBelowUserLimit = isBelowUserLimit; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempUserPerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempUserPerPartition.java new file mode 100644 index 0000000..93962fb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempUserPerPartition.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + + +/** + * Temporary data-structure tracking resource availability, pending resource + * need, current utilization for an application. + */ +public class TempUserPerPartition extends AbstractPreemptionEntity { + + private final User user; + private Resource userLimit; + private boolean isUserLimitReached = false; + + TempUserPerPartition(User user, String queueName, Resource usedPerPartition, + Resource amUsedPerPartition, Resource reserved, + Resource pendingPerPartition) { + super(queueName, usedPerPartition, amUsedPerPartition, reserved, + pendingPerPartition); + this.user = user; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(" NAME: " + getUserName()).append(" CUR: ").append(getUsed()) + .append(" PEN: ").append(pending).append(" RESERVED: ").append(reserved) + .append(" AM_USED: ").append(amUsed).append(" USER_LIMIT: ") + .append(getUserLimit()).append(" IDEAL_ASSIGNED: ") + .append(idealAssigned).append(" USED_WO_AMUSED: ") + .append(getUsedDeductAM()).append(" IDEAL_PREEMPT: ") + .append(toBePreempted).append(" ACTUAL_PREEMPT: ") + .append(getActuallyToBePreempted()).append("\n"); + + return sb.toString(); + } + + public String getUserName() { + return user.getUserName(); + } + + public Resource getUserLimit() { + return userLimit; + } + + public void setUserLimit(Resource userLimitResource) { + this.userLimit = userLimitResource; + } + + public boolean isUserLimitReached() { + return this.isUserLimitReached; + } + + public void setUserLimitReached(ResourceCalculator rc, + Resource clusterResource) { + if (Resources.greaterThan(rc, clusterResource, getUsedDeductAM(), + userLimit)) { + this.isUserLimitReached = true; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java index c2134eb..151b43b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java @@ -253,6 +253,15 @@ public Resource getUserResourceLimit() { public void setUserResourceLimit(Resource userResourceLimit) { this.userResourceLimit = userResourceLimit; } + + public String getUserName() { + return this.userName; + } + + @VisibleForTesting + public void setResourceUsage(ResourceUsage resourceUsage) { + this.userResourceUsage = resourceUsage; + } } /* End of User class */ /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/CompoundComparator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/CompoundComparator.java index 3027ab7..e8b00f7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/CompoundComparator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/CompoundComparator.java @@ -19,9 +19,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; import java.util.*; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; //Some policies will use multiple comparators joined together class CompoundComparator implements Comparator { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java index a9e97fd..06060c2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java @@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -96,6 +97,7 @@ Clock mClock = null; CapacitySchedulerConfiguration conf = null; CapacityScheduler cs = null; + @SuppressWarnings("rawtypes") EventHandler mDisp = null; ProportionalCapacityPreemptionPolicy policy = null; Resource clusterResource = null; @@ -247,6 +249,7 @@ public Integer answer(InvocationOnMock invocation) throws Throwable { if (containerId == 1) { when(rmc.isAMContainer()).thenReturn(true); when(app.getAMResource(label)).thenReturn(res); + when(app.getAppAMNodePartitionName()).thenReturn(label); } if (reserved) { @@ -280,6 +283,12 @@ public Integer answer(InvocationOnMock invocation) throws Throwable { containerId++; } + // If app has 0 container, and it has only pending, still make sure to + // update label. + if (repeat == 0) { + when(app.getAppAMNodePartitionName()).thenReturn(label); + } + // Some more app specific aggregated data can be better filled here. when(app.getPriority()).thenReturn(pri); when(app.getUser()).thenReturn(userName); @@ -315,6 +324,7 @@ public Integer answer(InvocationOnMock invocation) throws Throwable { private void mockApplications(String appsConfig) { int id = 1; HashMap> userMap = new HashMap>(); + HashMap>> userResourceUsagePerLabel = new HashMap<>(); LeafQueue queue = null; for (String a : appsConfig.split(";")) { String[] strs = a.split("\t"); @@ -349,23 +359,63 @@ private void mockApplications(String appsConfig) { users = new HashSet(); userMap.put(queueName, users); } - users.add(app.getUser()); + + String label = app.getAppAMNodePartitionName(); + + // Get label to queue + HashMap> userResourceUsagePerQueue = userResourceUsagePerLabel + .get(label); + if (null == userResourceUsagePerQueue) { + userResourceUsagePerQueue = new HashMap<>(); + userResourceUsagePerLabel.put(label, userResourceUsagePerQueue); + } + + // Get queue to user based resource map + HashMap userResourceUsage = userResourceUsagePerQueue + .get(queueName); + if (null == userResourceUsage) { + userResourceUsage = new HashMap<>(); + userResourceUsagePerQueue.put(queueName, userResourceUsage); + } + + // Get user to its resource usage. + ResourceUsage usage = userResourceUsage.get(app.getUser()); + if (null == usage) { + usage = new ResourceUsage(); + userResourceUsage.put(app.getUser(), usage); + } + + usage.incAMUsed(app.getAMResource(label)); + usage.incUsed(app.getAppAttemptResourceUsage().getUsed(label)); id++; } - for (String queueName : userMap.keySet()) { - queue = (LeafQueue) nameToCSQueues.get(queueName); - // Currently we have user-limit test support only for default label. - Resource totResoucePerPartition = partitionToResource.get(""); - Resource capacity = Resources.multiply(totResoucePerPartition, - queue.getQueueCapacities().getAbsoluteCapacity()); - HashSet users = userMap.get(queue.getQueueName()); - Resource userLimit = Resources.divideAndCeil(rc, capacity, users.size()); - for (String userName : users) { - when(queue.getResourceLimitForAllUsers(eq(userName), - any(Resource.class), anyString(), any(SchedulingMode.class))) - .thenReturn(userLimit); + for (String label : userResourceUsagePerLabel.keySet()) { + for (String queueName : userMap.keySet()) { + queue = (LeafQueue) nameToCSQueues.get(queueName); + // Currently we have user-limit test support only for default label. + Resource totResoucePerPartition = partitionToResource.get(""); + Resource capacity = Resources.multiply(totResoucePerPartition, + queue.getQueueCapacities().getAbsoluteCapacity()); + HashSet users = userMap.get(queue.getQueueName()); + Resource userLimit = Resources.divideAndCeil(rc, capacity, + users.size()); + LOG.debug("Updating user-limit from mock: totResoucePerPartition=" + + totResoucePerPartition + ", capacity=" + capacity + + ", users.size()=" + users.size() + ", userlimit= " + userLimit + + ",label= " + label + ",queueName= " + queueName); + + HashMap userResourceUsage = userResourceUsagePerLabel + .get(label).get(queueName); + for (String userName : users) { + User user = new User(userName); + user.setResourceUsage(userResourceUsage.get(userName)); + when(queue.getUser(eq(userName))).thenReturn(user); + when(queue.getResourceLimitForAllUsers(eq(userName), + any(Resource.class), anyString(), any(SchedulingMode.class))) + .thenReturn(userLimit); + } } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit.java new file mode 100644 index 0000000..d3c8d1a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit.java @@ -0,0 +1,508 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * Test class for IntraQueuePreemption scenarios. + */ +public class TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit + extends + ProportionalCapacityPreemptionPolicyMockFramework { + @Before + public void setup() { + super.setup(); + conf.setBoolean( + CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED, true); + policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock); + } + + @Test + public void testSimpleIntraQueuePreemptionWithTwoUsers() + throws IOException { + /** + * Queue structure is: + * + *
+     *       root
+     *        |
+     *        a
+     * 
+ * + * Scenario: + * Guaranteed resource of a is 100 Total cluster resource = 100 + * Consider 2 users in a queue, assume minimum user limit factor is 50%. + * Hence in queueA of 100, each user has a quota of 50. app1 of high priority + * has a demand of 0 and its already using 100. app2 from user2 has a demand + * of 30, and UL is 50. 30 would be preempted from app1. + */ + + // Set max preemption limit as 50%. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.5); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 55 170 0]);" + // root + "-a(=[100 100 100 30 0])"; // b + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(1,1,n1,,100,false,0,user1);" + // app1 a + "a\t" // app2 in a + + "(1,1,n1,,0,false,30,user2)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // app2 needs more resource and its well under its user-limit. Hence preempt + // resources from app1. + verify(mDisp, times(30)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + } + + @Test + public void testNoIntraQueuePreemptionWithSingleUser() + throws IOException { + /** + * Queue structure is: + * + *
+     *       root
+     *        |
+     *        a
+     * 
+ * + * Scenario: + * Guaranteed resource of a is 100 Total cluster resource = 100 + * Given single user, lower priority/late submitted apps has to + * wait. + */ + + // Set max preemption limit as 50%. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.5); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 55 170 0]);" + // root + "-a(=[100 100 100 30 0])"; // b + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(1,1,n1,,100,false,0,user1);" + // app1 a + "a\t" // app2 in a + + "(1,1,n1,,0,false,30,user1)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // app2 needs more resource. Since app1,2 are from same user, there wont be + // any preemption. + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + } + + @Test + public void testNoIntraQueuePreemptionWithTwoUserUnderUserLimit() + throws IOException { + /** + * Queue structure is: + * + *
+     *       root
+     *        |
+     *        a
+     * 
+ * + * Scenario: + * Guaranteed resource of a is 100 Total cluster resource = 100 + * Consider 2 users in a queue, assume minimum user limit factor is 50%. + * Hence in queueA of 100, each user has a quota of 50. app1 of high priority + * has a demand of 0 and its already using 50. app2 from user2 has a demand + * of 30, and UL is 50. Since app1 is under UL, there should not be any + * preemption. + */ + + // Set max preemption limit as 50%. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.5); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 55 170 0]);" + // root + "-a(=[100 100 100 30 0])"; // b + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(1,1,n1,,50,false,0,user1);" + // app1 a + "a\t" // app2 in a + + "(1,1,n1,,30,false,30,user2)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // app2 needs more resource. Since app1,2 are from same user, there wont be + // any preemption. + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + } + + @Test + public void testSimpleIntraQueuePreemptionWithTwoUsersWithAppPriority() + throws IOException { + /** + * Queue structure is: + * + *
+     *       root
+     *        |
+     *        a
+     * 
+ * + * Scenario: + * Guaranteed resource of a is 100 Total cluster resource = 100 + * Consider 2 users in a queue, assume minimum user limit factor is 50%. + * Hence in queueA of 100, each user has a quota of 50. app1 of high priority + * has a demand of 0 and its already using 100. app2 from user2 has a demand + * of 30, and UL is 50. 30 would be preempted from app1. + */ + + // Set max preemption limit as 50%. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.5); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 55 170 0]);" + // root + "-a(=[100 100 100 30 0])"; // b + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(2,1,n1,,100,false,0,user1);" + // app1 a + "a\t" // app2 in a + + "(1,1,n1,,0,false,30,user2)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // app2 needs more resource and its well under its user-limit. Hence preempt + // resources from app1 even though its priority is more than app2. + verify(mDisp, times(30)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + } + + @Test + public void testIntraQueuePreemptionOfUserLimitWithMultipleApps() + throws IOException { + /** + * Queue structure is: + * + *
+     *       root
+     *        |
+     *        a
+     * 
+ * + * Scenario: + * Guaranteed resource of a is 100 Total cluster resource = 100 + * Consider 2 users in a queue, assume minimum user limit factor is 50%. + * Hence in queueA of 100, each user has a quota of 50. Now have multiple + * apps and check for preemption across apps. + */ + + // Set max preemption limit as 50%. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.5); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 80 170 0]);" + // root + "-a(=[100 100 100 30 0])"; // b + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(1,1,n1,,30,false,0,user1);" + // app1 a + "a\t" // app2 in a + + "(1,1,n1,,20,false,20,user2);" + + "a\t" // app3 in a + + "(1,1,n1,,30,false,30,user1);" + + "a\t" // app4 in a + + "(1,1,n1,,0,false,10,user2)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // app2/app4 needs more resource and its well under its user-limit. Hence + // preempt resources from app3 (compare to app1, app3 has low priority). + verify(mDisp, times(10)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + } + + @Test + public void testNoPreemptionOfUserLimitWithMultipleAppsAndSameUser() + throws IOException { + /** + * Queue structure is: + * + *
+     *       root
+     *        |
+     *        a
+     * 
+ * + * Scenario: + * Guaranteed resource of a is 100 Total cluster resource = 100 + * Consider 2 users in a queue, assume minimum user limit factor is 50%. + * Hence in queueA of 100, each user has a quota of 50. Now have multiple + * apps and check for preemption across apps. + */ + + // Set max preemption limit as 50%. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.5); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 80 170 0]);" + // root + "-a(=[100 100 100 30 0])"; // b + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(1,1,n1,,30,false,20,user1);" + // app1 a + "a\t" // app2 in a + + "(1,1,n1,,20,false,20,user1);" + + "a\t" // app3 in a + + "(1,1,n1,,30,false,30,user1);" + + "a\t" // app4 in a + + "(1,1,n1,,0,false,10,user1)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // app2/app4 needs more resource and its well under its user-limit. Hence + // preempt resources from app3 (compare to app1, app3 has low priority). + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(4)))); + } + + @Test + public void testIntraQueuePreemptionOfUserLimitWitAppsOfDifferentPriority() + throws IOException { + /** + * Queue structure is: + * + *
+     *       root
+     *        |
+     *        a
+     * 
+ * + * Scenario: + * Guaranteed resource of a is 100 Total cluster resource = 100 + * Consider 2 users in a queue, assume minimum user limit factor is 50%. + * Hence in queueA of 100, each user has a quota of 50. Now have multiple + * apps and check for preemption across apps. + */ + + // Set max preemption limit as 50%. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.5); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 55 170 0]);" + // root + "-a(=[100 100 100 30 0])"; // b + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(3,1,n1,,30,false,30,user1);" + // app1 a + "a\t" // app2 in a + + "(1,1,n1,,20,false,20,user2);" + + "a\t" // app3 in a + + "(4,1,n1,,30,false,0,user1);" + + "a\t" // app4 in a + + "(1,1,n1,,0,false,10,user2)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // app2/app4 needs more resource and its well under its user-limit. Hence + // preempt resources from app1 (compare to app3, app1 has low priority). + verify(mDisp, times(10)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + } + + @Test + public void testIntraQueuePreemptionOfUserLimitInTwoQueues() + throws IOException { + /** + * Queue structure is: + * + *
+     *       root
+     *      /   \
+     *     a     b
+     * 
+ * + * Guaranteed resource of a/b are 40:60 Total cluster resource = 100 + * maxIntraQueuePreemptableLimit by default is 50%. This test is to verify + * that intra-queue preemption could occur in two queues when user-limit + * irreuglarity is present in queue. + */ + + // Set max preemption limit as 50%. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.5); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 90 60 0]);" + // root + "-a(=[60 100 55 30 0]);" + // a + "-b(=[40 100 35 30 0])" ; // b + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(3,1,n1,,20,false,0,user1);" + // app1 a + "a\t" // app2 in a + + "(1,1,n1,,20,false,20,user2);" + + "a\t" // app3 in a + + "(4,1,n1,,15,false,5,user1);" + + "a\t" // app4 in a + + "(1,1,n1,,0,false,10,user2);" + + "b\t" // app5 in b + + "(3,1,n1,,25,false,10,user1);" + + "b\t" // app6 in b + + "(1,1,n1,,10,false,10,user2)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // app2/app4 needs more resource and its well under its user-limit. Hence + // preempt resources from app1 (compare to app3, app1 has low priority). + verify(mDisp, times(10)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, times(5)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(5)))); + } + + @Test + public void testIntraQueuePreemptionWithTwoRequestingUsers() + throws IOException { + + // Set max preemption limit as 50%. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.5); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 20 0]);" + // root + "-a(=[100 100 100 20 0])"; // a + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(1,1,n1,,60,false,10,user1);" + // app1 a + "a\t" // app2 in a + + "(1,1,n1,,40,false,10,user2)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // app2 needs more resource and its well under its user-limit. Hence preempt + // resources from app1. + verify(mDisp, times(10)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + } +} -- 2.10.1 (Apple Git-78)