From 1532e4addef180d5cec0f41e81e94bfb781bef63 Mon Sep 17 00:00:00 2001 From: ananyo Date: Fri, 5 Feb 2021 21:24:29 +0530 Subject: [PATCH] YARN-10559: Fair sharing intra-queue preemption support in Capacity Scheduler --- .../capacity/AbstractPreemptionEntity.java | 16 + .../FifoIntraQueuePreemptionPlugin.java | 278 +++++++++- .../IntraQueueCandidatesSelector.java | 69 ++- .../IntraQueuePreemptionComputePlugin.java | 3 + .../monitor/capacity/TempAppPerPartition.java | 9 +- .../capacity/TempQueuePerPartition.java | 8 + .../capacity/TempUserPerPartition.java | 45 +- .../scheduler/capacity/LeafQueue.java | 40 ++ ...tProportionalCapacityPreemptionPolicy.java | 3 + ...reemptionPolicyIntraQueueFairOrdering.java | 504 +++++++++++++++++- .../mockframework/MockApplications.java | 1 + .../mockframework/MockQueueHierarchy.java | 2 + ...CapacityPreemptionPolicyMockFramework.java | 2 +- 13 files changed, 934 insertions(+), 46 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java index cb4d7af769d..02288f25a4b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java @@ -42,6 +42,7 @@ Resource selected; private Resource actuallyToBePreempted; private Resource toBePreemptFromOther; + private Resource fairShare; AbstractPreemptionEntity(String queueName, Resource usedPerPartition, Resource amUsedPerPartition, Resource reserved, @@ -99,4 +100,19 @@ public void setToBePreemptFromOther(Resource toBePreemptFromOther) { this.toBePreemptFromOther = toBePreemptFromOther; } + /** + * Getter method to return fair share for an application in a given queue. + * @return resource Fair Share Resource object + */ + public Resource getFairShare() { + return fairShare; + } + + /** + * Setter method to update fair share for an application in a given queue. + * @param fairShare Fair Share Resource object + */ + public void setFairShare(Resource fairShare) { + this.fairShare = fairShare; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java index 7c3abb49254..c3dc5abd537 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -141,6 +142,10 @@ public void computeAppsIdealAllocation(Resource clusterResource, PriorityQueue orderedByPriority = createTempAppForResCalculation( tq, apps, clusterResource, perUserAMUsed); + if(tq.leafQueue.getOrderingPolicy() instanceof FairOrderingPolicy) { + setFairShareForApps(tq, Resources.add(queueReassignableResource, amUsed)); + } + // 4. Calculate idealAssigned per app by checking based on queue's // unallocated resource.Also return apps arranged from lower priority to // higher priority. @@ -169,16 +174,24 @@ public void computeAppsIdealAllocation(Resource clusterResource, // 7. From lowest priority app onwards, calculate toBePreempted resource // based on demand. calculateToBePreemptedResourcePerApp(clusterResource, orderedApps, - Resources.clone(preemptionLimit)); + Resources.clone(preemptionLimit), + tq.leafQueue.getOrderingPolicy()); // Save all apps (low to high) to temp queue for further reference tq.addAllApps(orderedApps); // 8. There are chances that we may preempt for the demand from same // priority level, such cases are to be validated out. - validateOutSameAppPriorityFromDemand(clusterResource, - (TreeSet) orderedApps, tq.getUsersPerPartition(), - context.getIntraQueuePreemptionOrderPolicy()); + + // if fairOrderingPolicy is being used, calculate toBePreempted + // based on FS per app. + if(tq.leafQueue.getOrderingPolicy() instanceof FairOrderingPolicy) { + calcActuallyToBePreemptedBasedOnFS(clusterResource, orderedApps); + } else { + validateOutSameAppPriorityFromDemand(clusterResource, + (TreeSet) orderedApps, tq.getUsersPerPartition(), + context.getIntraQueuePreemptionOrderPolicy()); + } if (LOG.isDebugEnabled()) { LOG.debug("Queue Name:" + tq.queueName + ", partition:" + tq.partition); @@ -188,8 +201,60 @@ public void computeAppsIdealAllocation(Resource clusterResource, } } + /** + * For Fairness calculation, we treat UserLimit as an upper bound + * to the amount of resources allocated to a user. + * Following is the calculation used: + * + * foreach user: + * fairSharePerApp = total Queue Cap / no: of apps of that user + * idealFairSharePerAppwithUL = UL / no: of apps of that user + * + * if(UserLimitPercent < 100) + * fairSharePerApp = idealFairSharePerAppwithUL + * + * Using above formula, + * we ensure all the apps of a user get equal resources. + * If any user would be hitting their userlimit, + * then we try and ensure all apps for that user + * have fairness within their UserLimit. + * @param tq tempQueuePerPartition being considered + */ + private void setFairShareForApps(TempQueuePerPartition tq, + Resource queueReassignableResource) { + for(TempUserPerPartition tmpUser : tq.getUsersPerPartition().values()){ + Resource fairSharePerApp; + Resource fairShareForCurrUser; + int numOfAppsInUser = tmpUser.getApps().size(); + Resource userLimitWithAmUsed = Resources.add( + tmpUser.getUserLimit(), tmpUser.getAMUsed()); + + fairShareForCurrUser = tq.leafQueue.getUserLimit() == 100 ? + queueReassignableResource : userLimitWithAmUsed; + tmpUser.setFairShare(fairShareForCurrUser); + + fairSharePerApp = Resources.divideAndCeil( + rc, tmpUser.getFairShare(), numOfAppsInUser); + + fairSharePerApp = + Resources.componentwiseMax(fairSharePerApp, Resources.none()); + + for(TempAppPerPartition tmpApp : tmpUser.getApps()) { + tmpApp.setFairShare(fairSharePerApp); + + LOG.debug("For app: " + tmpApp.getApplicationId() + + " from user: " + tmpUser.getUserName() + + ", queueResources: " + queueReassignableResource + + ", UserLimit: " + userLimitWithAmUsed + + ", Num_of_apps for user: " + numOfAppsInUser + + ". Calculated FairShare per app is: " + tmpApp.getFairShare()); + } + } + } + private void calculateToBePreemptedResourcePerApp(Resource clusterResource, - TreeSet orderedApps, Resource preemptionLimit) { + TreeSet orderedApps, Resource preemptionLimit, + OrderingPolicy orderingPolicy) { for (TempAppPerPartition tmpApp : orderedApps) { if (Resources.lessThanOrEqual(rc, clusterResource, preemptionLimit, @@ -202,7 +267,13 @@ private void calculateToBePreemptedResourcePerApp(Resource clusterResource, Resource preemtableFromApp = Resources.subtract(tmpApp.getUsed(), tmpApp.idealAssigned); Resources.subtractFromNonNegative(preemtableFromApp, tmpApp.selected); - Resources.subtractFromNonNegative(preemtableFromApp, tmpApp.getAMUsed()); + + // We already consider AM used in the FS calculated. + // So, in case of Fair Ordering Policy, + // we can skip it when calculating preemptable from app. + if(! (orderingPolicy instanceof FairOrderingPolicy)) { + Resources.subtractFromNonNegative(preemtableFromApp,tmpApp.getAMUsed()); + } if (context.getIntraQueuePreemptionOrderPolicy() .equals(IntraQueuePreemptionOrderPolicy.USERLIMIT_FIRST)) { @@ -213,9 +284,15 @@ private void calculateToBePreemptedResourcePerApp(Resource clusterResource, // Calculate toBePreempted from apps as follows: // app.preemptable = min(max(app.used - app.selected - app.ideal, 0), // intra_q_preemptable) - tmpApp.toBePreempted = Resources.min(rc, clusterResource, Resources - .max(rc, clusterResource, preemtableFromApp, Resources.none()), - Resources.clone(preemptionLimit)); + if(Resources.fitsIn(rc, + tmpApp.getFiCaSchedulerApp().getCSLeafQueue().getMinimumAllocation(), + preemtableFromApp)) { + tmpApp.toBePreempted = Resources.min(rc, clusterResource, Resources + .max(rc, clusterResource, preemtableFromApp, Resources.none()), + Resources.clone(preemptionLimit)); + } else { + tmpApp.toBePreempted = Resources.createResource(0, 0); + } preemptionLimit = Resources.subtractFromNonNegative(preemptionLimit, tmpApp.toBePreempted); @@ -287,9 +364,12 @@ private void calculateToBePreemptedResourcePerApp(Resource clusterResource, orderedApps.add(tmpApp); // Once unallocated resource is 0, we can stop assigning ideal per app. - if (Resources.lessThanOrEqual(rc, clusterResource, + // However, for FairOrderingPolicy, we want to iterate over all apps + // because fairness is calculated across all apps. + if (!(tq.leafQueue.getOrderingPolicy() instanceof FairOrderingPolicy) && + (Resources.lessThanOrEqual(rc, clusterResource, queueReassignableResource, Resources.none()) || rc - .isAnyMajorResourceZeroOrNegative(queueReassignableResource)) { + .isAnyMajorResourceZeroOrNegative(queueReassignableResource))) { continue; } @@ -307,9 +387,23 @@ private void calculateToBePreemptedResourcePerApp(Resource clusterResource, // idealAssigned may fall to 0 if higher priority apps demand is more. Resource appIdealAssigned = Resources.add(tmpApp.getUsedDeductAM(), tmpApp.getPending()); + + // In case of fair ordering based preemption, + // consider fairShare of each app in a given queue + // as a max-cap. + if (queueOrderingPolicy instanceof FairOrderingPolicy) { + appIdealAssigned = Resources.min(rc, clusterResource, + tmpApp.getFairShare(), appIdealAssigned); + appIdealAssigned = Resources.componentwiseMax( + appIdealAssigned, Resources.none()); + } + Resources.subtractFrom(appIdealAssigned, tmpApp.selected); - if (Resources.lessThan(rc, clusterResource, idealAssignedForUser, + if(tq.leafQueue.getOrderingPolicy() instanceof FairOrderingPolicy) { + tmpApp.idealAssigned = appIdealAssigned; + Resources.addTo(idealAssignedForUser, tmpApp.idealAssigned); + } else if (Resources.lessThan(rc, clusterResource, idealAssignedForUser, userLimitResource)) { Resource idealAssigned = Resources.min(rc, clusterResource, appIdealAssigned, @@ -450,6 +544,9 @@ private void getAlreadySelectedPreemptionCandidatesResource( } tmpApp.setTempUserPerPartition(tmpUser); orderedByPriority.add(tmpApp); + tq.getUsersPerPartition() + .get(tmpUser.getUserName()) + .addApp(tmpApp.getApplicationId(), tmpApp); } return orderedByPriority; @@ -573,6 +670,114 @@ public void validateOutSameAppPriorityFromDemand(Resource cluster, } } + /** + * For each starved app, iterate over all the overfed apps. + * Mark as many resources as possible from overfed apps + * to satisfy the starvation of the starved app. + * starvedResources = + * ToBePreemptFromOther - starvation_fulfilled_by_other_overfed_apps + * overfedResources = + * ToBePreempted - resources_already_marked_for_preemption + * @param clusterResource total resources present in the cluster + * @param orderedApps TreeSet of apps ordered by the excess used resources + */ + private void calcActuallyToBePreemptedBasedOnFS(Resource clusterResource, + TreeSet orderedApps) { + TempAppPerPartition[] apps = orderedApps.toArray( + new TempAppPerPartition[orderedApps.size()]); + if (apps.length <= 1) { + return; + } + + for(int starvedAppInd = apps.length-1; starvedAppInd>=0; starvedAppInd--) { + TempAppPerPartition starvedApp = apps[starvedAppInd]; + + // 1. Check if starved app can get resources from other overfed apps. + // Stay within the user's fair share. + markResourcesToBePreemptedForApp(starvedApp, Arrays.asList(apps), + clusterResource, true); + + // 2. Check if starved app is still starving. + Resource preemptForStarved = starvedApp.getToBePreemptFromOther(); + if (Resources.lessThanOrEqual( + rc, clusterResource, preemptForStarved, Resources.none())) { + continue; + } + + // 3. Check overfed apps from starved app's user with relaxed UL check. + markResourcesToBePreemptedForApp(starvedApp, + starvedApp.getTempUserPerPartition().getApps(), + clusterResource, false); + } + + + } + + private void markResourcesToBePreemptedForApp(TempAppPerPartition starvedApp, + Collection overfedApps, + Resource clusterResource, boolean checkUserFS) { + for (TempAppPerPartition overfedApp : overfedApps) { + if (overfedApp == starvedApp) { + continue; + } + + // 1. Check how much resources we require for the starved app. + Resource preemptForStarved = starvedApp.getToBePreemptFromOther(); + if (Resources.lessThanOrEqual( + rc, clusterResource, preemptForStarved, Resources.none())) { + break; + } + + // 2. Check how much resources we can get from the overfed app. + Resource preemptFromOverfed = + Resources.subtractNonNegative(overfedApp.toBePreempted, + overfedApp.getActuallyToBePreempted()); + if (Resources.lessThanOrEqual( + rc, clusterResource, preemptFromOverfed, Resources.none())) { + continue; + } + + // 3. Take min of starved and overfed app resources. + Resource preempt = Resources.min(rc, clusterResource, + preemptFromOverfed, preemptForStarved); + + // 4. Check if overfed user can spare resources. + TempUserPerPartition overfedUser = overfedApp.getTempUserPerPartition(); + if(checkUserFS) { + Resource preemptFromOverfedUser = Resources.subtractNonNegative( + overfedUser.getUsed(), overfedUser.getFairShare()); + Resources.subtractFromNonNegative(preemptFromOverfedUser, + overfedUser.getActuallyToBePreempted()); + + if (Resources.lessThanOrEqual( + rc, clusterResource, preemptFromOverfedUser, Resources.none())) { + continue; + } + + // 5. Take min of app and user preemptable resources. + preempt = Resources.min(rc, clusterResource, + preempt, preemptFromOverfedUser); + + preempt = Resources.componentwiseMax(preempt, Resources.none()); + } + + LOG.debug("Marking: " + preempt + + " resources which can be preempted from " + overfedApp + + " to " + starvedApp + + " . Overfed user stats: " + overfedUser); + + starvedApp.setToBePreemptFromOther( + Resources.subtractNonNegative( + starvedApp.getToBePreemptFromOther(), preempt)); + + overfedApp.setActuallyToBePreempted( + Resources.add(overfedApp.getActuallyToBePreempted(), preempt)); + + overfedUser.setActuallyToBePreempted( + Resources.add(overfedUser.getActuallyToBePreempted(), preempt)); + } + } + private Resource calculateUsedAMResourcesPerQueue(String partition, LeafQueue leafQueue, Map perUserAMUsed) { Collection runningApps = leafQueue.getApplications(); @@ -605,12 +810,7 @@ public boolean skipContainerBasedOnIntraQueuePolicy(FiCaSchedulerApp app, // skip some containers as per this check. If used resource is under user // limit, then these containers of this user has to be preempted as demand // might be due to high priority apps running in same user. - String partition = context.getScheduler() - .getSchedulerNode(c.getAllocatedNode()).getPartition(); - String queuePath = - context.getScheduler().getQueue(app.getQueueName()).getQueuePath(); - TempQueuePerPartition tq = - context.getQueueByPartition(queuePath, partition); + TempQueuePerPartition tq = getTmpQueueOfContainer(app, c); TempUserPerPartition tmpUser = tq.getUsersPerPartition().get(app.getUser()); // Given user is not present, skip the check. @@ -618,6 +818,19 @@ public boolean skipContainerBasedOnIntraQueuePolicy(FiCaSchedulerApp app, return false; } + TempAppPerPartition tmpApp = tmpUser.getApp(app.getApplicationId()); + + if(tq.leafQueue.getOrderingPolicy() instanceof FairOrderingPolicy) { + // Check we don't preempt more resources than + // ActuallyToBePreempted from the app + if(tmpApp != null + && Resources.fitsIn(rc, + c.getAllocatedResource(), tmpApp.getActuallyToBePreempted())) { + return false; + } + return true; + } + // For ideal resource computations, user-limit got saved by subtracting am // used resource in TempUser. Hence it has to be added back here for // complete check. @@ -628,4 +841,33 @@ public boolean skipContainerBasedOnIntraQueuePolicy(FiCaSchedulerApp app, && context.getIntraQueuePreemptionOrderPolicy() .equals(IntraQueuePreemptionOrderPolicy.USERLIMIT_FIRST); } + + private TempQueuePerPartition getTmpQueueOfContainer( + FiCaSchedulerApp app, RMContainer c) { + String partition = context.getScheduler() + .getSchedulerNode(c.getAllocatedNode()).getPartition(); + String queuePath = context.getScheduler() + .getQueue(app.getQueueName()).getQueuePath(); + TempQueuePerPartition tq = context.getQueueByPartition( + queuePath, partition); + return tq; + } + + @Override + public void deductActuallyToBePreemptedFromApp(FiCaSchedulerApp app, + RMContainer c, Resource clusterResource) { + TempQueuePerPartition tq = getTmpQueueOfContainer(app, c); + TempUserPerPartition tmpUser = tq.getUsersPerPartition().get(app.getUser()); + if(tmpUser == null) { + return; + } + + TempAppPerPartition tmpApp = tmpUser.getApp(app.getApplicationId()); + tmpApp.deductActuallyToBePreempted( + rc, clusterResource, c.getAllocatedResource()); + + LOG.debug("App metrics after marking container: " + c.getContainerId() + + " with resources: " + c.getAllocatedResource() + + " to_be_preempted from app: " + tmpApp); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java index 8a1b47b5dee..450eeb74fe6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.AbstractComparatorOrderingPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -136,8 +137,8 @@ public int compare(TempAppPerPartition ta1, TempAppPerPartition ta2) { // 3. Loop through all partitions to select containers for preemption. for (String partition : preemptionContext.getAllPartitions()) { - LinkedHashSet queueNames = preemptionContext - .getUnderServedQueuesPerPartition(partition); + LinkedHashSet queueNames = + getUnderServedQueuesPerPartition(preemptionContext, partition); // Error check to handle non-mapped labels to queue. if (null == queueNames) { @@ -256,7 +257,9 @@ private void preemptFromLeastStarvedApp(LeafQueue leafQueue, c.getContainerId(), c.getAllocatedResource(), app.getUser(), rollingUsedResourcePerUser); - break; + // Other containers from current app may still be preemptable + // within the current constraints. + continue; } // Try to preempt this container @@ -272,6 +275,17 @@ private void preemptFromLeastStarvedApp(LeafQueue leafQueue, Resources.subtractFrom(rollingUsedResourcePerUser, c.getAllocatedResource()); } + + // If FairOrderingPolicy is being used, + // subtract the preempted resource from the app's to-be-preempted. + // This is required because we don't keep track of + // preempted containers from an app in this round. + // So, for fair-share preemption, we don't know if, + // after preemption, the app is dropping below its fairShare. + if(ret && leafQueue.getOrderingPolicy() instanceof FairOrderingPolicy) { + fifoPreemptionComputePlugin + .deductActuallyToBePreemptedFromApp(app, c, clusterResource); + } } } @@ -281,8 +295,8 @@ private void computeIntraQueuePreemptionDemand(Resource clusterResource, // 1. Iterate through all partition to calculate demand within a partition. for (String partition : context.getAllPartitions()) { - LinkedHashSet queueNames = context - .getUnderServedQueuesPerPartition(partition); + LinkedHashSet queueNames = + getUnderServedQueuesPerPartition(context, partition); if (null == queueNames) { continue; @@ -320,4 +334,49 @@ private void computeIntraQueuePreemptionDemand(Resource clusterResource, } } } + + private LinkedHashSet getUnderServedQueuesPerPartition( + CapacitySchedulerPreemptionContext contextToCheck, String partition) { + + LinkedHashSet queueNames = + contextToCheck.getUnderServedQueuesPerPartition(partition); + + if(queueNames == null) { + queueNames = getUnderServedLQueuesForFairOrder( + contextToCheck, partition); + } else { + queueNames.addAll(getUnderServedLQueuesForFairOrder( + contextToCheck, partition)); + } + + return queueNames; + } + + // In case we have FairOrderingPolicy being used, + // we may want to consider pending resource requests + // without headroom consideration as well. + // This is because resources might be pending from starved apps of a user + // who has hit his headroom. + // In such cases, we want to consider pending resources beyond headroom + // so that the starved apps can claim resources from the overfed apps. + private LinkedHashSet getUnderServedLQueuesForFairOrder( + CapacitySchedulerPreemptionContext contextToCheck, String partition) { + LinkedHashSet underServedQueues = new LinkedHashSet<>(); + for(String leafQueue : contextToCheck.getLeafQueueNames()) { + TempQueuePerPartition tq = + contextToCheck.getQueueByPartition(leafQueue, partition); + if(tq == null || tq.leafQueue == null) { + continue; + } + + if(!(tq.leafQueue.getOrderingPolicy() instanceof FairOrderingPolicy)) { + continue; + } + + if(!Resources.isNone(tq.getPendingWithoutULandHeadroom())) { + underServedQueues.add(tq.getQueueName()); + } + } + return underServedQueues; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java index 56fd007d40b..1f605f4c682 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java @@ -44,4 +44,7 @@ void computeAppsIdealAllocation(Resource clusterResource, boolean skipContainerBasedOnIntraQueuePolicy(FiCaSchedulerApp app, Resource clusterResource, Resource usedResource, RMContainer c); + + void deductActuallyToBePreemptedFromApp(FiCaSchedulerApp app, + RMContainer c, Resource clusterResource); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java index 05d8096a273..40a831e6f7c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java @@ -66,9 +66,12 @@ public String toString() { .append(idealAssigned).append(" PREEMPT_OTHER: ") .append(getToBePreemptFromOther()).append(" IDEAL_PREEMPT: ") .append(toBePreempted).append(" ACTUAL_PREEMPT: ") - .append(getActuallyToBePreempted()).append("\n"); + .append(getActuallyToBePreempted()); + if(getFairShare() != null) { + sb.append(" FAIR-SHARE: ").append(getFairShare()); + } - return sb.toString(); + return sb.append("\n").toString(); } void appendLogString(StringBuilder sb) { @@ -98,7 +101,7 @@ public String getUser() { public void deductActuallyToBePreempted(ResourceCalculator resourceCalculator, Resource cluster, Resource toBeDeduct) { - if (Resources.greaterThan(resourceCalculator, cluster, + if (Resources.greaterThanOrEqual(resourceCalculator, cluster, getActuallyToBePreempted(), toBeDeduct)) { Resources.subtractFrom(getActuallyToBePreempted(), toBeDeduct); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java index 57dc6395702..572432a8cec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java @@ -61,6 +61,7 @@ boolean preemptionDisabled; protected Resource pendingDeductReserved; + private Resource pendingWithoutULandHeadroom; // Relative priority of this queue to its parent // If parent queue's ordering policy doesn't respect priority, @@ -87,10 +88,13 @@ public TempQueuePerPartition(String queueName, Resource current, totalPartitionResource, partition, false); pendingDeductReserved = l.getTotalPendingResourcesConsideringUserLimit( totalPartitionResource, partition, true); + pendingWithoutULandHeadroom = + l.getTotalPendingResources(partition, false); leafQueue = l; } else { pending = Resources.createResource(0); pendingDeductReserved = Resources.createResource(0); + pendingWithoutULandHeadroom = Resources.createResource(0); } if (queue != null && ParentQueue.class.isAssignableFrom(queue.getClass())) { @@ -405,4 +409,8 @@ protected void initializeRootIdealWithGuarangeed() { idealAssigned = Resources.clone(getGuaranteed()); } + public Resource getPendingWithoutULandHeadroom() { + return pendingWithoutULandHeadroom; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempUserPerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempUserPerPartition.java index 33ee18f1c22..1163eaf427d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempUserPerPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempUserPerPartition.java @@ -18,11 +18,15 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; /** * Temporary data-structure tracking resource availability, pending resource @@ -34,12 +38,15 @@ private Resource userLimit; private boolean donePreemptionQuotaForULDelta = false; + private Map apps; + TempUserPerPartition(User user, String queueName, Resource usedPerPartition, Resource amUsedPerPartition, Resource reserved, Resource pendingPerPartition) { super(queueName, usedPerPartition, amUsedPerPartition, reserved, pendingPerPartition); this.user = user; + this.apps = new HashMap<>(); } @Override @@ -52,9 +59,12 @@ public String toString() { .append(idealAssigned).append(" USED_WO_AMUSED: ") .append(getUsedDeductAM()).append(" IDEAL_PREEMPT: ") .append(toBePreempted).append(" ACTUAL_PREEMPT: ") - .append(getActuallyToBePreempted()).append("\n"); + .append(getActuallyToBePreempted()); + if(getFairShare() != null) { + sb.append(" FAIR-SHARE: ").append(getFairShare()); + } - return sb.toString(); + return sb.append("\n").toString(); } public String getUserName() { @@ -85,4 +95,35 @@ public boolean isPreemptionQuotaForULDeltaDone() { public void updatePreemptionQuotaForULDeltaAsDone(boolean done) { this.donePreemptionQuotaForULDelta = done; } + + /** + * Method to add a new app under this user. + * @param applicationId application_id of the app + * @param tempAppPerPartition TempAppPerPartition object of the app + */ + public void addApp(ApplicationId applicationId, + TempAppPerPartition tempAppPerPartition) { + apps.put(applicationId, tempAppPerPartition); + } + + /** + * Getter method to return TempAppPerPartition for given application_id. + * @param applicationId application_id of the app + * @return TempAppPerPartition corresponding to given app_id. + * Null if app_id is absent. + */ + public TempAppPerPartition getApp(ApplicationId applicationId) { + if(!apps.containsKey(applicationId)) { + return null; + } + return apps.get(applicationId); + } + + /** + * Getter method to return all the TempAppPerPartition under given user. + * @return collection of TempAppPerPartition user this user. + */ + public Collection getApps() { + return apps.values(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 96d309c547e..09b11da9183 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -2356,4 +2356,44 @@ boolean removeNonRunnableApp(FiCaSchedulerApp app) { } return appsToReturn; } + + /** + * Get total pending resource for the leaf queue. + * This will be used for calculating pending resources in the + * preemption monitor when FairOrderingPolicy is used. + * + * Total pending for the queue = sum(for each app pending requests) + * NOTE: + + * @param partition node partition + * @param deductReservedFromPending When a container is reserved in CS, + * pending resource will not be deducted. + * This could lead to double accounting when + * doing preemption: + * In normal cases, we should deduct reserved + * resource from pending to avoid + * excessive preemption. + * @return Total pending resources in the queue + */ + public Resource getTotalPendingResources(String partition, + boolean deductReservedFromPending) { + try { + readLock.lock(); + Resource totalPending = Resource.newInstance(0, 0); + for (FiCaSchedulerApp app : getApplications()) { + Resource pending = + app.getAppAttemptResourceUsage().getPending(partition); + // Check if we need to deduct reserved from pending + if (deductReservedFromPending) { + pending = Resources.subtract( + pending, app.getAppAttemptResourceUsage().getReserved(partition)); + } + pending = Resources.componentwiseMax(pending, Resources.none()); + Resources.addTo(totalPending, pending); + } + return totalPending; + } finally { + readLock.unlock(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 34b86bb5761..8b461009d65 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -1329,6 +1329,9 @@ LeafQueue mockLeafQueue(ParentQueue p, Resource tot, int i, Resource[] abs, reserved[i] == null ? Resources.none() : reserved[i]), Resources.none())); + when(lq.getTotalPendingResources( + isA(String.class), eq(false))).thenReturn(pending[i]); + // need to set pending resource in resource usage as well ResourceUsage ru = new ResourceUsage(); ru.setPending(pending[i]); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering.java index eb9d21836da..a31c4bd1dfc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering.java @@ -59,13 +59,13 @@ public void testIntraQueuePreemptionFairOrderingPolicyEnabledOneAppPerUser() INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, (float) 1.0); - String labelsConfig = "=100,true;"; + String labelsConfig = "=100:100,true;"; String nodesConfig = // n1 has no label "n1= res=100"; String queuesConfig = // guaranteed,max,used,pending,reserved - "root(=[100 100 100 1 0]);" + // root - "-a(=[100 100 100 1 0])"; // a + "root(=[100:100 100:100 100:100 1:1 0]);" + // root + "-a(=[100:100 100:100 100:100 1:1 0])"; // a // user1/app1 has 60 resources in queue a // user2/app2 has 40 resources in queue a @@ -76,11 +76,11 @@ public void testIntraQueuePreemptionFairOrderingPolicyEnabledOneAppPerUser() String appsConfig = // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user) "a\t" // app1, user1 in a - + "(1,1,n1,,60,false,0,user1);" + + + "(1,1:1,n1,,60,false,0,user1);" + "a\t" // app2, user2 in a - + "(1,1,n1,,40,false,0,user2);" + + + "(1,1:1,n1,,40,false,0,user2);" + "a\t" // app3, user3 in a - + "(1,1,n1,,0,false,20,user3)" + + "(1,1:1,n1,,0,false,20:20,user3)" ; buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); @@ -178,40 +178,45 @@ public void testIntraQueuePreemptionFairOrderingPolicyMulitipleAppsPerUser() INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, (float) 1.0); - String labelsConfig = "=100,true;"; + String labelsConfig = "=100:100,true;"; String nodesConfig = // n1 has no label "n1= res=100"; String queuesConfig = // guaranteed,max,used,pending,reserved - "root(=[100 100 100 1 0]);" + // root - "-a(=[100 100 100 1 0])"; // a + "root(=[100:100 100:100 100:100 1:1 0]);" + // root + "-a(=[100:100 100:100 100:100 1:1 0])"; // a // user1/app1 has 35 resources in queue a // user1/app2 has 25 resources in queue a // user2/app3 has 40 resources in queue a // user3/app4 is requesting 20 resources in queue a // With 3 users, preemptable user limit should be around 35 resources each. - // With FairOrderingPolicy enabled on queue a, all 20 resources should be - // preempted from app1 since it's the most over served app from the most - // over served user + // With FairOrderingPolicy enabled on queue a, + // we will preempt around (60 - 35) ~ 25 resources + // from user1's app1 and app2. + // However, because app4 has only 20 container demand, + // 20 containers will be picked from user1. String appsConfig = // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user) "a\t" // app1 and app2, user1 in a - + "(1,1,n1,,35,false,0,user1);" + + + "(1,1:1,n1,,35,false,0,user1);" + "a\t" - + "(1,1,n1,,25,false,0,user1);" + + + "(1,1:1,n1,,25,false,0,user1);" + "a\t" // app3, user2 in a - + "(1,1,n1,,40,false,0,user2);" + + + "(1,1:1,n1,,40,false,0,user2);" + "a\t" // app4, user3 in a - + "(1,1,n1,,0,false,20,user3)" + + "(1,1:1,n1,,0,false,20:20,user3)" ; buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); policy.editSchedule(); - verify(eventHandler, times(20)).handle(argThat( + verify(eventHandler, times(17)).handle(argThat( new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( getAppAttemptId(1)))); + verify(eventHandler, times(3)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); } /* @@ -274,4 +279,469 @@ public void testIntraQueuePreemptionFifoOrderingPolicyMultipleAppsPerUser() new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( getAppAttemptId(1)))); } + + @Test + public void testIntraQueuePreemptionFairOrderingPolicyMultiAppsSingleUser() + throws IOException { + /** + * Scenario: + * Queue total resources: 100 + * Minimum user limit percent: 100% + * +--------------+------+---------+-----------+ + * | APP | USER | USED | PENDING | FAIRSHARE | + * +--------------+------+---------+-----------+ + * | app1 | user1 | 25 | 0 | 25 | + * | app2 | user1 | 35 | 0 | 25 | + * | app3 | user1 | 40 | 0 | 25 | + * | app4 | user1 | 0 | 20 | 25 | + * +--------------+------+---------+-----------+ + * Because all apps are from same user, + * we expect each app to have 25% fairshare. + * So app2 can give 9 containers and + * app3 can loose 13 containers (exclude AM). + * However, because we are using FiCa scheduling, + * app3 is least starved + * and so we will take all its resources above its fairshare. + * (check: IntraQueuePreemptionPolicy: preemptFromLeastStarvedApp()) + * So we expect app3 to give 13 containers and + * app2 to give the remaining containers (20 - 13 = 7 containers) + */ + // Enable FairOrderingPolicy for yarn.scheduler.capacity.root.a + conf.set(CapacitySchedulerConfiguration.PREFIX + + CapacitySchedulerConfiguration.ROOT + ".a.ordering-policy", "fair"); + // Make sure all containers will be preempted in a single round. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 1.0); + + String labelsConfig = "=100:100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100:100 100:100 100:100 1:1 0]);" + // root + "-a(=[100:100 100:100 100:100 1:1 0])"; // a + + String appsConfig = + // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user) + "a\t" // app1, app2, app3, app4 from user1 + + "(1,1:1,n1,,25,false,0,user1);" + + "a\t" + + "(1,1:1,n1,,35,false,0,user1);" + + "a\t" + + "(1,1:1,n1,,40,false,0,user1);" + + "a\t" + + "(1,1:1,n1,,0,false,20:20,user1)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(eventHandler, times(14)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + verify(eventHandler, times(6)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + } + + @Test + public void testIntraQueuePreemptionFairOPIntraUserPreemption() + throws IOException { + /** + * Scenario: + * Queue total resources: 100 + * Minimum user limit percent: 50% + * +--------------+------+---------+-----------+ + * | APP | USER | USED | PENDING | FAIRSHARE | + * +--------------+------+---------+-----------+ + * | app1 | user1 | 40 | 0 | 25 | + * | app2 | user1 | 10 | 0 | 25 | + * | app3 | user2 | 50 | 0 | 25 | + * | app4 | user2 | 0 | 30 | 25 | + * +--------------+------+---------+-----------+ + * + * Because user1 is at its user limit, for app4 of user2, + * we will preempt the containers from user2 only, i.e. app3 of user2. + * Because we want to maintain fairness, + * we will preempt 24 containers from app3 + * (whose fairshare is 25% -> 1/2 of 50% UL) + */ + // Enable FairOrderingPolicy for yarn.scheduler.capacity.root.a + conf.set(CapacitySchedulerConfiguration.PREFIX + + CapacitySchedulerConfiguration.ROOT + ".a.ordering-policy", "fair"); + // Make sure all containers will be preempted in a single round. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 1.0); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 1 0]);" + // root + "-a(=[100 100 100 1 0])"; // a + String appsConfig = + // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user) + "a\t" // app1 and app2, user1 in a + + "(1,1,n1,,40,false,0,user1);" + + "a\t" + + "(1,1,n1,,10,false,0,user1);" + + "a\t" // app3 and app4 from user2 in a + + "(1,1,n1,,50,false,0,user2);" + + "a\t" + + "(1,1,n1,,0,false,30,user2)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(eventHandler, times(24)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + } + + @Test + public void testIntraQueuePreemptionFairOPFairnessAcrossUsers() + throws IOException { + /** + * Scenario: + * Queue total resources: 100 + * Minimum user limit percent: 50% + * +--------------+------+---------+-----------+ + * | APP | USER | USED | PENDING | FAIRSHARE | + * +--------------+------+---------+-----------+ + * | app1 | user1 | 51 | 0 | 25 | + * | app2 | user1 | 49 | 0 | 25 | + * | app3 | user2 | 0 | 50 | 50 | + * +--------------+------+---------+-----------+ + * + * User1 has 2 apps, Each app will have + * 25% fairshare (1/2 of 50% of user's fairShare) + * App3 has fairShare + * = UL / num of apps in user(50%) = 50% + * So app3 asks for 50 resources. + * ~25 are given by app1 and app2. + */ + // Enable FairOrderingPolicy for yarn.scheduler.capacity.root.a + conf.set(CapacitySchedulerConfiguration.PREFIX + + CapacitySchedulerConfiguration.ROOT + ".a.ordering-policy", "fair"); + // Make sure all containers will be preempted in a single round. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 1.0); + + String labelsConfig = "=100:100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100:100 100:100 100:100 1:1 0]);" + // root + "-a(=[100:100 100:100 100:100 1:1 0])"; // a + String appsConfig = + // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user) + "a\t" // app1 and app2, user1 in a + + "(1,1:1,n1,,51,false,0,user1);" + + "a\t" + + "(1,1:1,n1,,49,false,0,user1);" + + "a\t" // app3, user2 in a + + "(1,1:1,n1,,0,false,50:50,user2);"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(eventHandler, times(25)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(eventHandler, times(23)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + } + + @Test + public void testIntraQueuePreemptionFairOPUserLimitPreserved() + throws IOException { + /** + * Scenario: + * Queue total resources: 100 + * Minimum user limit percent: 50% + * +--------------+------------+------+---------+-----------+ + * | APP | USER | USER-LIMIT | USED | PENDING | FAIRSHARE | + * +--------------+------------+------+---------+-----------+ + * | app1 | user1 | 50 | 100 | 0 | 50 | + * | app3 | user2 | 50 | 0 | 50 | 50 | + * | app4 | user3 | 50 | 0 | 50 | 50 | + * +--------------+------------+------+---------+-----------+ + * + * User1 has 1 app, This app will have 50% fairshare + * (100% of user1's fairShare) + * So 50 containers from app1 should be preempted. + */ + // Enable FairOrderingPolicy for yarn.scheduler.capacity.root.a + conf.set(CapacitySchedulerConfiguration.PREFIX + + CapacitySchedulerConfiguration.ROOT + ".a.ordering-policy", "fair"); + // Make sure all containers will be preempted in a single round. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 1.0); + + String labelsConfig = "=100:100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100:100 100:100 100:100 1:1 0]);" + // root + "-a(=[100:100 100:100 100:100 1:1 0])"; // a + String appsConfig = + // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user) + "a\t" // app1 and app2, user1 in a + + "(1,1:1,n1,,100,false,0,user1)" + "\t50;" + + "a\t" // app3, user2 in a + + "(1,1:1,n1,,0,false,50,user2)" + "\t50;" + + "a\t" // app3, user2 in a + + "(1,1:1,n1,,0,false,50:50,user3)" + "\t50;"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(eventHandler, times(49)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + } + + @Test + public void testIntraQueuePreemptionFairOPwithUL100noContPreempted() + throws IOException { + /** + * Scenario: + * Queue total resources: 100 + * Minimum user limit percent: 100% + * +--------------+------+---------+-----------+ + * | APP | USER | USED | PENDING | FAIRSHARE | + * +--------------+------+---------+-----------+ + * | app1 | user1 | 10 | 0 | 50 | + * | app2 | user1 | 0 | 100 | 50 | + * | app3 | user2 | 90 | 0 | 100 | + * +--------------+------+---------+-----------+ + * + * User1 has 2 apps, Each app will have + * 25% fairshare (1/2 of 100% of queue cap) + * App3 has fairShare + * = 100% of queue cap / num of apps in user = 100% + * So app1 and app3 don't release any resources as they are below their FS. + */ + // Enable FairOrderingPolicy for yarn.scheduler.capacity.root.a + conf.set(CapacitySchedulerConfiguration.PREFIX + + CapacitySchedulerConfiguration.ROOT + ".a.ordering-policy", "fair"); + // Make sure all containers will be preempted in a single round. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 1.0); + + String labelsConfig = "=100:100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100:100 100:100 100:100 1:1 0]);" + // root + "-a(=[100:100 100:100 100:100 1:1 0])"; // a + String appsConfig = + // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user) + "a\t" // app1 and app2, user1 in a + + "(1,1,n1,,10,false,0,user1)" + "\t100;" + + "a\t" + + "(1,1,n1,,0,false,100,user1)" + "\t100;" + + "a\t" // app3, user2 in a + + "(1,1,n1,,90,false,0,user2)" + "\t100;"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(eventHandler, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + + verify(eventHandler, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + } + + @Test + public void testIntraQueuePreemptionFairOPwithUL100oneUserManyApps() + throws IOException { + /** + * Scenario: + * Queue total resources: 100 + * Minimum user limit percent: 100% + * +--------------+------+---------+-----------+ + * | APP | USER | USED | PENDING | FAIRSHARE | + * +--------------+------+---------+-----------+ + * | app1 | user1 | 15 | 10 | 20 | + * | app2 | user1 | 17 | 10 | 20 | + * | app3 | user1 | 20 | 10 | 20 | + * | app4 | user1 | 23 | 10 | 20 | + * | app5 | user1 | 25 | 10 | 20 | + * | app6 | user2 | 0 | 100 | 100 | + * +--------------+------+---------+-----------+ + * + * User1 has 5 apps, Each app will have + * 20% fairshare (1/5 of queue cap) + * App6 has fairShare + * = 100% of queue cap / num of apps in user = 100% + * So app4 to app5 only release resources. + */ + // Enable FairOrderingPolicy for yarn.scheduler.capacity.root.a + conf.set(CapacitySchedulerConfiguration.PREFIX + + CapacitySchedulerConfiguration.ROOT + ".a.ordering-policy", "fair"); + // Make sure all containers will be preempted in a single round. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 1.0); + + String labelsConfig = "=100:100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100:100 100:100 100:100 1:1 0]);" + // root + "-a(=[100:100 100:100 100:100 1:1 0])"; // a + String appsConfig = + // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user) + "a\t" // app1 and app2, user1 in a + + "(1,1,n1,,15,false,10,user1)" + "\t100;" + + "a\t" + + "(1,1,n1,,17,false,10,user1)" + "\t100;" + + "a\t" + + "(1,1,n1,,20,false,10,user1)" + "\t100;" + + "a\t" + + "(1,1,n1,,23,false,10,user1)" + "\t100;" + + "a\t" + + "(1,1,n1,,25,false,10,user1)" + "\t100;" + + "a\t" // app3, user2 in a + + "(1,1,n1,,0,false,100,user2)" + "\t100;"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(eventHandler, times(4)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(5)))); + + verify(eventHandler, times(2)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(4)))); + } + + @Test + public void testIntraQueuePreemptionFairOPwithUL100MultiUsersSkewedRes() + throws IOException { + /** + * Scenario: + * Queue total resources: 100 + * Minimum user limit percent: 100% + * +--------------+------+---------+-----------+ + * | APP | USER | USED | PENDING | FAIRSHARE | + * +--------------+------+---------+-----------+ + * | app1 | user1 | 10 | 100 | 50 | + * | app2 | user1 | 70 | 100 | 50 | + * | app3 | user2 | 20 | 0 | 50 | + * | app4 | user2 | 0 | 100 | 50 | + * +--------------+------+---------+-----------+ + * + * User1 has 2 apps, Each app will have + * 25% fairshare (1/2 of 100% of queue cap) + * App3 has fairShare + * = 100% of queue cap / num of apps in user = 100% + * So only app2 releases ~20 resources to be given to app1. + */ + // Enable FairOrderingPolicy for yarn.scheduler.capacity.root.a + conf.set(CapacitySchedulerConfiguration.PREFIX + + CapacitySchedulerConfiguration.ROOT + ".a.ordering-policy", "fair"); + // Make sure all containers will be preempted in a single round. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 1.0); + + String labelsConfig = "=100:100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100:100 100:100 100:100 1:1 0]);" + // root + "-a(=[100:100 100:100 100:100 1:1 0])"; // a + String appsConfig = + // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user) + "a\t" // app1 and app2, user1 in a + + "(1,1,n1,,10,false,100,user1)" + "\t100;" + + "a\t" + + "(1,1,n1,,70,false,0,user1)" + "\t100;" + + "a\t" // app3, user2 in a + + "(1,1,n1,,20,false,0,user2)" + "\t100;" + + "a\t" // app3, user2 in a + + "(1,1,n1,,0,false,100,user2)" + "\t100;"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(eventHandler, times(19)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + } + + @Test + public void testIntraQueuePreemptionFairOPMultiUsersSkewedRes() + throws IOException { + /** + * Scenario: + * Queue total resources: 100 + * Minimum user limit percent: 50% + * +--------------+------+---------+-----------+ + * | APP | USER | USED | PENDING | FAIRSHARE | + * +--------------+------+---------+-----------+ + * | app1 | user1 | 10 | 0 | 25 | + * | app2 | user1 | 99 | 0 | 25 | + * | app3 | user2 | 0 | 100 | 50 | + * | app4 | user3 | 0 | 100 | 50 | + * +--------------+------+---------+-----------+ + * + * User1 has 2 apps, Each app will have + * 25% fairshare (1/2 of 50% of user's fairShare) + * App3 and App4 has fairShare + * = UL / num of apps in user = 50% + * So app3 and app4 should ask for 33 resources each from app2. + * But app2 can only give 50 resources + * because it will drop below its UL after that. + */ + // Enable FairOrderingPolicy for yarn.scheduler.capacity.root.a + conf.set(CapacitySchedulerConfiguration.PREFIX + + CapacitySchedulerConfiguration.ROOT + ".a.ordering-policy", "fair"); + // Make sure all containers will be preempted in a single round. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 1.0); + + String labelsConfig = "=100:100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100:100 100:100 100:100 1:1 0]);" + // root + "-a(=[100:100 100:100 100:100 1:1 0])"; // a + String appsConfig = + // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user) + "a\t" // app1 and app2, user1 in a + + "(1,1,n1,,1,false,0,user1)" + "\t50;" + + "a\t" + + "(1,1,n1,,99,false,0,user1)" + "\t50;" + + "a\t" // app3, user2 in a + + "(1,1,n1,,40,false,100,user2)" + "\t50;" + + "a\t" // app3, user2 in a + + "(1,1,n1,,0,false,100,user3)" + "\t50;"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(eventHandler, times(50)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockApplications.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockApplications.java index b16861257ff..b852f8f02ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockApplications.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockApplications.java @@ -143,6 +143,7 @@ private void setupUserResourceUsagePerLabel(ResourceCalculator resourceCalculato users = userMap.get(queue.getQueuePath()); } when(queue.getAllUsers()).thenReturn(users); + when(queue.getUserLimit()).thenReturn(mulp * 100); Resource userLimit = calculateUserLimit(resourceCalculator, mulp, capacity, users); LOG.debug("Updating user-limit from mock: toResourcePartition=" diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockQueueHierarchy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockQueueHierarchy.java index ae4ff5a663e..e8c561622d2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockQueueHierarchy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockQueueHierarchy.java @@ -262,6 +262,8 @@ private void setupQueue(CSQueue queue, String q, String[] queueExprArray, when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class), isA(String.class), eq(true))).thenReturn( Resources.subtract(pending, reserved)); + when(lq.getTotalPendingResources( + isA(String.class), eq(false))).thenReturn(pending); } ru.setUsed(partitionName, parseResourceFromString(values[2].trim())); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/ProportionalCapacityPreemptionPolicyMockFramework.java index 024ec86f7d7..04c09384654 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/ProportionalCapacityPreemptionPolicyMockFramework.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/ProportionalCapacityPreemptionPolicyMockFramework.java @@ -179,7 +179,7 @@ public void buildEnv(String labelsConfig, String nodesConfig, when(cs.getRootQueue()).thenReturn(root); when(cs.getClusterResource()).thenReturn(clusterResource); - new MockApplications(appsConfig, resourceCalculator, nameToCSQueues, + new MockApplications(appsConfig, cs.getResourceCalculator(), nameToCSQueues, partitionToResource, nodeIdToSchedulerNodes); policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock); -- 2.23.0