From 296111ce76a65f8c351cad53a1f4dc43b66e6077 Mon Sep 17 00:00:00 2001 From: ananyo Date: Fri, 8 Jan 2021 17:17:42 +0530 Subject: [PATCH] YARN-10559 --- .../capacity/AbstractPreemptionEntity.java | 17 ++ .../FifoIntraQueuePreemptionPlugin.java | 148 ++++++++++- .../IntraQueueCandidatesSelector.java | 49 +++- .../IntraQueuePreemptionComputePlugin.java | 2 + .../monitor/capacity/TempAppPerPartition.java | 9 +- .../capacity/TempQueuePerPartition.java | 7 + .../capacity/TempUserPerPartition.java | 35 +++ .../scheduler/capacity/LeafQueue.java | 37 +++ ...tProportionalCapacityPreemptionPolicy.java | 1 + ...reemptionPolicyIntraQueueFairOrdering.java | 231 +++++++++++++++++- .../mockframework/MockQueueHierarchy.java | 1 + 11 files changed, 517 insertions(+), 20 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java index cb4d7af769d..c94c10a07a6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java @@ -20,6 +20,7 @@ import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -42,6 +43,7 @@ Resource selected; private Resource actuallyToBePreempted; private Resource toBePreemptFromOther; + private Resource fairShare; AbstractPreemptionEntity(String queueName, Resource usedPerPartition, Resource amUsedPerPartition, Resource reserved, @@ -99,4 +101,19 @@ public void setToBePreemptFromOther(Resource toBePreemptFromOther) { this.toBePreemptFromOther = toBePreemptFromOther; } + /** + * Getter method to return fair share for an application in a given queue. + * @return resource Fair Share Resource object + */ + public Resource getFairShare() { + return fairShare; + } + + /** + * Setter method to update fair share for an application in a given queue. + * @param fairShare Fair Share Resource object + */ + public void setFairShare(Resource fairShare) { + this.fairShare = fairShare; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java index 7c3abb49254..7e7eb7249e0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java @@ -141,6 +141,10 @@ public void computeAppsIdealAllocation(Resource clusterResource, PriorityQueue orderedByPriority = createTempAppForResCalculation( tq, apps, clusterResource, perUserAMUsed); + if(tq.leafQueue.getOrderingPolicy() instanceof FairOrderingPolicy) { + setFairShareForApps(tq, clusterResource); + } + // 4. Calculate idealAssigned per app by checking based on queue's // unallocated resource.Also return apps arranged from lower priority to // higher priority. @@ -176,9 +180,14 @@ public void computeAppsIdealAllocation(Resource clusterResource, // 8. There are chances that we may preempt for the demand from same // priority level, such cases are to be validated out. - validateOutSameAppPriorityFromDemand(clusterResource, - (TreeSet) orderedApps, tq.getUsersPerPartition(), - context.getIntraQueuePreemptionOrderPolicy()); + // Skip priority and UserLimit checks for fairOrderingPolicy + if(tq.leafQueue.getOrderingPolicy() instanceof FairOrderingPolicy) { + calcActuallyToBePreemptedBasedOnFS(clusterResource, orderedApps); + } else { + validateOutSameAppPriorityFromDemand(clusterResource, + (TreeSet) orderedApps, tq.getUsersPerPartition(), + context.getIntraQueuePreemptionOrderPolicy()); + } if (LOG.isDebugEnabled()) { LOG.debug("Queue Name:" + tq.queueName + ", partition:" + tq.partition); @@ -188,6 +197,47 @@ public void computeAppsIdealAllocation(Resource clusterResource, } } + /** + * For Fairness calculation, we treat UserLimit as an upper bound to the amount of resources allocated to a user. + * Following is the calculation used: + * + * fairSharePerApp = total Queue Cap / no: of apps + * idealFairSharePerAppwithUL = UL / no: of apps of that user + * + * if(fairSharePerApp * num of apps by user >= UL) + * fairSharePerApp = idealFairSharePerAppwithUL; + * + * Using above formula, we firstly ensure all the apps in the queue get equal resources. + * However, if any user would be hitting their userlimit, + * then we try and ensure all apps for that user have fairness within their UserLimit. + * @param tq tempQueuePerPartition being considered + * @param clusterResource total resources present in the cluster + */ + private void setFairShareForApps(TempQueuePerPartition tq, Resource clusterResource) { + int numOfAppsInQueue = tq.leafQueue.getAllApplications().size(); + Resource fairShareAcrossApps = Resources.divideAndCeil(this.rc, tq.getMax(), numOfAppsInQueue); + + for(TempUserPerPartition tmpUser : tq.getUsersPerPartition().values()){ + Resource userLimit = tmpUser.getUserLimit(); + Resource userLimitWithAM = Resources.add(userLimit, tmpUser.getAMUsed()); + int numOfAppsInUser = tmpUser.getApps().size(); + Resource fairShareOfAllAppsOfUser = Resources.multiplyAndRoundUp(fairShareAcrossApps, numOfAppsInUser); + Resource fairShareWithinUL = Resources.divideAndCeil(this.rc, userLimitWithAM, numOfAppsInUser); + + for(TempAppPerPartition tmpApp : tmpUser.getApps()) { + Resource fairShareForApp = + Resources.lessThanOrEqual(rc, clusterResource, fairShareOfAllAppsOfUser, userLimitWithAM) ? + fairShareAcrossApps : fairShareWithinUL; + tmpApp.setFairShare(fairShareForApp); + + LOG.debug("App: " + tmpApp.getApplicationId() + " from user: " + tmpUser.getUserName() + + " has UserLimitWithAM: " + userLimitWithAM + ", FairShareAcrossApps: " + fairShareAcrossApps + + " num_of_apps for user: " + numOfAppsInUser + " and fairShareWithinUL: " + fairShareWithinUL + + ". Calculated FairShare for app is: " + tmpApp.getFairShare()); + } + } + } + private void calculateToBePreemptedResourcePerApp(Resource clusterResource, TreeSet orderedApps, Resource preemptionLimit) { @@ -307,6 +357,14 @@ private void calculateToBePreemptedResourcePerApp(Resource clusterResource, // idealAssigned may fall to 0 if higher priority apps demand is more. Resource appIdealAssigned = Resources.add(tmpApp.getUsedDeductAM(), tmpApp.getPending()); + + // In case of fair ordering based preemption, consider fairShare of each app in a given queue + // as a max-cap. + if (queueOrderingPolicy instanceof FairOrderingPolicy) { + appIdealAssigned = Resources.min(rc, clusterResource, tmpApp.getFairShare(), appIdealAssigned); + appIdealAssigned = Resources.componentwiseMax(appIdealAssigned, Resources.none()); + } + Resources.subtractFrom(appIdealAssigned, tmpApp.selected); if (Resources.lessThan(rc, clusterResource, idealAssignedForUser, @@ -450,6 +508,7 @@ private void getAlreadySelectedPreemptionCandidatesResource( } tmpApp.setTempUserPerPartition(tmpUser); orderedByPriority.add(tmpApp); + tq.getUsersPerPartition().get(tmpUser.getUserName()).addApp(tmpApp.getApplicationId(), tmpApp); } return orderedByPriority; @@ -573,6 +632,52 @@ public void validateOutSameAppPriorityFromDemand(Resource cluster, } } + /** + * For each starved app, iterate over all the overfed apps and mark as many resources as possible + * to satisfy the starvation of the starved app. + * starvedResources = ToBePreemptFromOther - starvation_fulfilled_by_other_overfed_apps + * overfedResources = ToBePreempted - resources_already_marked_for_preemption + * @param clusterResource total resources present in the cluster + * @param orderedApps TreeSet of apps ordered by the excess resources they have + */ + private void calcActuallyToBePreemptedBasedOnFS(Resource clusterResource, TreeSet orderedApps) { + TempAppPerPartition[] apps = orderedApps.toArray(new TempAppPerPartition[orderedApps.size()]); + if (apps.length <= 1) { + return; + } + + for(int starvedAppIndex = apps.length - 1; starvedAppIndex >= 0; starvedAppIndex--) { + TempAppPerPartition starvedApp = apps[starvedAppIndex]; + + for(TempAppPerPartition overfedApp : apps) { + if(overfedApp == starvedApp) + continue; + + Resource preemptForStarved = starvedApp.getToBePreemptFromOther(); + if(Resources.lessThanOrEqual(rc, clusterResource, preemptForStarved, Resources.none())) + break; + + Resource preemptFromOverfed = + Resources.subtract(overfedApp.toBePreempted, overfedApp.getActuallyToBePreempted()); + if(Resources.lessThanOrEqual(rc, clusterResource, preemptFromOverfed, Resources.none())) + continue; + + + Resource toPreempt = + Resources.lessThanOrEqual(rc, clusterResource, preemptFromOverfed, preemptForStarved) ? + preemptFromOverfed : preemptForStarved; + + LOG.debug("Marking: " + toPreempt + " resources to be preempted from " + + overfedApp + " to " + starvedApp); + + starvedApp.setToBePreemptFromOther( + Resources.subtract(starvedApp.getToBePreemptFromOther(), toPreempt)); + overfedApp.setActuallyToBePreempted( + Resources.add(overfedApp.getActuallyToBePreempted(), toPreempt)); + } + } + } + private Resource calculateUsedAMResourcesPerQueue(String partition, LeafQueue leafQueue, Map perUserAMUsed) { Collection runningApps = leafQueue.getApplications(); @@ -605,12 +710,7 @@ public boolean skipContainerBasedOnIntraQueuePolicy(FiCaSchedulerApp app, // skip some containers as per this check. If used resource is under user // limit, then these containers of this user has to be preempted as demand // might be due to high priority apps running in same user. - String partition = context.getScheduler() - .getSchedulerNode(c.getAllocatedNode()).getPartition(); - String queuePath = - context.getScheduler().getQueue(app.getQueueName()).getQueuePath(); - TempQueuePerPartition tq = - context.getQueueByPartition(queuePath, partition); + TempQueuePerPartition tq = getTmpQueueOfContainer(app, c); TempUserPerPartition tmpUser = tq.getUsersPerPartition().get(app.getUser()); // Given user is not present, skip the check. @@ -618,6 +718,18 @@ public boolean skipContainerBasedOnIntraQueuePolicy(FiCaSchedulerApp app, return false; } + TempAppPerPartition tmpApp = tmpUser.getApp(app.getApplicationId()); + + if(tq.leafQueue.getOrderingPolicy() instanceof FairOrderingPolicy) { + // Check we don't preempt more resources than ActuallyToBePreempted from the app + if(tmpApp != null + && Resources.greaterThanOrEqual(rc, clusterResource, + tmpApp.getActuallyToBePreempted(), c.getAllocatedResource())) { + return false; + } + return true; + } + // For ideal resource computations, user-limit got saved by subtracting am // used resource in TempUser. Hence it has to be added back here for // complete check. @@ -628,4 +740,22 @@ public boolean skipContainerBasedOnIntraQueuePolicy(FiCaSchedulerApp app, && context.getIntraQueuePreemptionOrderPolicy() .equals(IntraQueuePreemptionOrderPolicy.USERLIMIT_FIRST); } + + private TempQueuePerPartition getTmpQueueOfContainer(FiCaSchedulerApp app, RMContainer c) { + String partition = context.getScheduler().getSchedulerNode(c.getAllocatedNode()).getPartition(); + String queuePath = context.getScheduler().getQueue(app.getQueueName()).getQueuePath(); + TempQueuePerPartition tq = context.getQueueByPartition(queuePath, partition); + return tq; + } + + @Override + public void deductActuallyToBePreemptedFromApp(FiCaSchedulerApp app, RMContainer c, Resource clusterResource) { + TempQueuePerPartition tq = getTmpQueueOfContainer(app, c); + TempUserPerPartition tmpUser = tq.getUsersPerPartition().get(app.getUser()); + if(tmpUser == null) + return; + + TempAppPerPartition tmpApp = tmpUser.getApp(app.getApplicationId()); + tmpApp.deductActuallyToBePreempted(rc, clusterResource, c.getAllocatedResource()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java index 8a1b47b5dee..75471f037e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.AbstractComparatorOrderingPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -136,8 +137,7 @@ public int compare(TempAppPerPartition ta1, TempAppPerPartition ta2) { // 3. Loop through all partitions to select containers for preemption. for (String partition : preemptionContext.getAllPartitions()) { - LinkedHashSet queueNames = preemptionContext - .getUnderServedQueuesPerPartition(partition); + LinkedHashSet queueNames = getUnderServedQueuesPerPartition(preemptionContext, partition); // Error check to handle non-mapped labels to queue. if (null == queueNames) { @@ -267,10 +267,17 @@ private void preemptFromLeastStarvedApp(LeafQueue leafQueue, // Subtract from respective user's resource usage once a container is // selected for preemption. + // + // Also, if FairOrderingPolicy is being used, subtract the preempted resource from the app's to-be-preempted. + // This is required because we don't keep track of preempted containers from an app in this round. + // So, for fair-share preemption, we don't know if, after preemption, the app is dropping below its fairShare. if (ret && preemptionContext.getIntraQueuePreemptionOrderPolicy() .equals(IntraQueuePreemptionOrderPolicy.USERLIMIT_FIRST)) { Resources.subtractFrom(rollingUsedResourcePerUser, c.getAllocatedResource()); + if(leafQueue.getOrderingPolicy() instanceof FairOrderingPolicy) { + fifoPreemptionComputePlugin.deductActuallyToBePreemptedFromApp(app, c, clusterResource); + } } } } @@ -281,8 +288,7 @@ private void computeIntraQueuePreemptionDemand(Resource clusterResource, // 1. Iterate through all partition to calculate demand within a partition. for (String partition : context.getAllPartitions()) { - LinkedHashSet queueNames = context - .getUnderServedQueuesPerPartition(partition); + LinkedHashSet queueNames = getUnderServedQueuesPerPartition(context, partition); if (null == queueNames) { continue; @@ -320,4 +326,39 @@ private void computeIntraQueuePreemptionDemand(Resource clusterResource, } } } + + private LinkedHashSet getUnderServedQueuesPerPartition( + CapacitySchedulerPreemptionContext context, String partition) { + + LinkedHashSet queueNames = context.getUnderServedQueuesPerPartition(partition); + + if(queueNames == null) + queueNames = getUnderServedLQueuesForFairOrder(context, partition); + else + queueNames.addAll(getUnderServedLQueuesForFairOrder(context, partition)); + + return queueNames; + } + + // In case we have FairOrderingPolicy being used, + // we may want to consider pending resource requests without headroom consideration as well. + // Resources might be pending from starved apps of a user who has hit his headroom. + // In such cases, we want to consider pending resources beyond headroom so that + // the starved apps can claim resources from the overfed apps. + private LinkedHashSet getUnderServedLQueuesForFairOrder( + CapacitySchedulerPreemptionContext context, String partition) { + LinkedHashSet underServedQueues = new LinkedHashSet<>(); + for(String leafQueue : context.getLeafQueueNames()) { + TempQueuePerPartition tq = context.getQueueByPartition(leafQueue, partition); + if(tq == null || tq.leafQueue == null) + continue; + + if(!(tq.leafQueue.getOrderingPolicy() instanceof FairOrderingPolicy)) + continue; + + if(!Resources.isNone(tq.getPendingWithoutULandHeadroom())) + underServedQueues.add(tq.getQueueName()); + } + return underServedQueues; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java index 56fd007d40b..18dc2b1b4d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java @@ -44,4 +44,6 @@ void computeAppsIdealAllocation(Resource clusterResource, boolean skipContainerBasedOnIntraQueuePolicy(FiCaSchedulerApp app, Resource clusterResource, Resource usedResource, RMContainer c); + + void deductActuallyToBePreemptedFromApp(FiCaSchedulerApp app, RMContainer c, Resource clusterResource); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java index 05d8096a273..40a831e6f7c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java @@ -66,9 +66,12 @@ public String toString() { .append(idealAssigned).append(" PREEMPT_OTHER: ") .append(getToBePreemptFromOther()).append(" IDEAL_PREEMPT: ") .append(toBePreempted).append(" ACTUAL_PREEMPT: ") - .append(getActuallyToBePreempted()).append("\n"); + .append(getActuallyToBePreempted()); + if(getFairShare() != null) { + sb.append(" FAIR-SHARE: ").append(getFairShare()); + } - return sb.toString(); + return sb.append("\n").toString(); } void appendLogString(StringBuilder sb) { @@ -98,7 +101,7 @@ public String getUser() { public void deductActuallyToBePreempted(ResourceCalculator resourceCalculator, Resource cluster, Resource toBeDeduct) { - if (Resources.greaterThan(resourceCalculator, cluster, + if (Resources.greaterThanOrEqual(resourceCalculator, cluster, getActuallyToBePreempted(), toBeDeduct)) { Resources.subtractFrom(getActuallyToBePreempted(), toBeDeduct); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java index 57dc6395702..b0290da737d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java @@ -61,6 +61,7 @@ boolean preemptionDisabled; protected Resource pendingDeductReserved; + private Resource pendingWithoutULandHeadroom; // Relative priority of this queue to its parent // If parent queue's ordering policy doesn't respect priority, @@ -87,10 +88,12 @@ public TempQueuePerPartition(String queueName, Resource current, totalPartitionResource, partition, false); pendingDeductReserved = l.getTotalPendingResourcesConsideringUserLimit( totalPartitionResource, partition, true); + pendingWithoutULandHeadroom = l.getTotalPendingResources(partition, false); leafQueue = l; } else { pending = Resources.createResource(0); pendingDeductReserved = Resources.createResource(0); + pendingWithoutULandHeadroom = Resources.createResource(0); } if (queue != null && ParentQueue.class.isAssignableFrom(queue.getClass())) { @@ -405,4 +408,8 @@ protected void initializeRootIdealWithGuarangeed() { idealAssigned = Resources.clone(getGuaranteed()); } + public Resource getPendingWithoutULandHeadroom() { + return pendingWithoutULandHeadroom; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempUserPerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempUserPerPartition.java index 33ee18f1c22..26719fbcedd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempUserPerPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempUserPerPartition.java @@ -18,11 +18,15 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; /** * Temporary data-structure tracking resource availability, pending resource @@ -34,12 +38,15 @@ private Resource userLimit; private boolean donePreemptionQuotaForULDelta = false; + private Map apps; + TempUserPerPartition(User user, String queueName, Resource usedPerPartition, Resource amUsedPerPartition, Resource reserved, Resource pendingPerPartition) { super(queueName, usedPerPartition, amUsedPerPartition, reserved, pendingPerPartition); this.user = user; + this.apps = new HashMap<>(); } @Override @@ -85,4 +92,32 @@ public boolean isPreemptionQuotaForULDeltaDone() { public void updatePreemptionQuotaForULDeltaAsDone(boolean done) { this.donePreemptionQuotaForULDelta = done; } + + /** + * Method to add a new app under this user + * @param applicationId application_id of the app whose TempAppPerPartition is being added. + * @param tempAppPerPartition TempAppPerPartition object of the app being added + */ + public void addApp(ApplicationId applicationId, TempAppPerPartition tempAppPerPartition) { + apps.put(applicationId, tempAppPerPartition); + } + + /** + * Getter method to return TempAppPerPartition corresponding to given application_id + * @param applicationId application_id of the app whose TempAppPerPartition is to be fetched. + * @return TempAppPerPartition corresponding to given app_id. Null if app_id is absent. + */ + public TempAppPerPartition getApp(ApplicationId applicationId) { + if(!apps.containsKey(applicationId)) + return null; + return apps.get(applicationId); + } + + /** + * Getter method to return all the TempAppPerPartition under given user. + * @return collection of TempAppPerPartition user this user. + */ + public Collection getApps() { + return apps.values(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 96d309c547e..8b58cb84ac9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -2356,4 +2356,41 @@ boolean removeNonRunnableApp(FiCaSchedulerApp app) { } return appsToReturn; } + + /** + * Get total pending resource for the leaf queue. This + * will be used for calculating pending resources in the preemption monitor + * when FairOrderingPolicy is used. + * + * Total pending for the queue = sum(for each app pending requests) + * NOTE: + + * @param partition node partition + * @param deductReservedFromPending When a container is reserved in CS, + * pending resource will not be deducted. + * This could lead to double accounting when + * doing preemption: + * In normal cases, we should deduct reserved + * resource from pending to avoid + * excessive preemption. + * @return Total pending resources in the queue + */ + public Resource getTotalPendingResources(String partition, boolean deductReservedFromPending) { + try { + readLock.lock(); + Resource totalPending = Resource.newInstance(0, 0); + for (FiCaSchedulerApp app : getApplications()) { + Resource pending = app.getAppAttemptResourceUsage().getPending(partition); + // Check if we need to deduct reserved from pending + if (deductReservedFromPending) { + pending = Resources.subtract(pending, app.getAppAttemptResourceUsage().getReserved(partition)); + } + pending = Resources.componentwiseMax(pending, Resources.none()); + Resources.addTo(totalPending, pending); + } + return totalPending; + } finally { + readLock.unlock(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 34b86bb5761..a5033e8f455 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -1322,6 +1322,7 @@ LeafQueue mockLeafQueue(ParentQueue p, Resource tot, int i, Resource[] abs, new ArrayList(); when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class), isA(String.class), eq(false))).thenReturn(pending[i]); + when(lq.getTotalPendingResources(isA(String.class), eq(false))).thenReturn(pending[i]); when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class), isA(String.class), eq(true))).thenReturn(Resources.componentwiseMax( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering.java index eb9d21836da..fb8e3654a95 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering.java @@ -191,9 +191,9 @@ public void testIntraQueuePreemptionFairOrderingPolicyMulitipleAppsPerUser() // user2/app3 has 40 resources in queue a // user3/app4 is requesting 20 resources in queue a // With 3 users, preemptable user limit should be around 35 resources each. - // With FairOrderingPolicy enabled on queue a, all 20 resources should be - // preempted from app1 since it's the most over served app from the most - // over served user + // With FairOrderingPolicy enabled on queue a, + // we will preempt around (60 - 35) ~ 25 resources from user1's app1 and app2 + // and rest from user2's app3. String appsConfig = // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user) "a\t" // app1 and app2, user1 in a @@ -209,9 +209,12 @@ public void testIntraQueuePreemptionFairOrderingPolicyMulitipleAppsPerUser() buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); policy.editSchedule(); - verify(eventHandler, times(20)).handle(argThat( + verify(eventHandler, times(18)).handle(argThat( new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( getAppAttemptId(1)))); + verify(eventHandler, times(2)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); } /* @@ -274,4 +277,224 @@ public void testIntraQueuePreemptionFifoOrderingPolicyMultipleAppsPerUser() new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( getAppAttemptId(1)))); } + + @Test + public void testIntraQueuePreemptionFairOrderingPolicyMulitipleAppsSingleUser() + throws IOException { + /** + * Scenario: + * Queue total resources: 100 + * Minimum user limit percent: 100% + * +--------------+------+---------+-----------+ + * | APP | USER | USED | PENDING | FAIRSHARE | + * +--------------+------+---------+-----------+ + * | app1 | user1 | 25 | 0 | 25 | + * | app2 | user1 | 35 | 0 | 25 | + * | app3 | user1 | 40 | 0 | 25 | + * | app4 | user1 | 0 | 20 | 25 | + * +--------------+------+---------+-----------+ + * Because all apps are from same user, we expect each app to have 25% fairshare. + * So app2 can give 9 containers and app3 can loose 13 containers (exclude AM). + * However, because we are using FiCa scheduling, app3 is least starved + * and so we will take all its resources above its fairshare. + * (check: IntraQueuePreemptionPolicy: preemptFromLeastStarvedApp()) + * So we expect app3 to give 13 containers and app2 to give the remaining containers (20 - 13 = 7 containers) + */ + // Enable FairOrderingPolicy for yarn.scheduler.capacity.root.a + conf.set(CapacitySchedulerConfiguration.PREFIX + + CapacitySchedulerConfiguration.ROOT + ".a.ordering-policy", "fair"); + // Make sure all containers will be preempted in a single round. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 1.0); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 1 0]);" + // root + "-a(=[100 100 100 1 0])"; // a + + String appsConfig = + // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user) + "a\t" // app1, app2, app3, app4 from user1 + + "(1,1,n1,,25,false,0,user1);" + + "a\t" + + "(1,1,n1,,35,false,0,user1);" + + "a\t" + + "(1,1,n1,,40,false,0,user1);" + + "a\t" + + "(1,1,n1,,0,false,20,user1)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(eventHandler, times(13)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + verify(eventHandler, times(7)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + } + + @Test + public void testIntraQueuePreemptionFairOPIntraUserPreemption() + throws IOException { + /** + * Scenario: + * Queue total resources: 100 + * Minimum user limit percent: 50% + * +--------------+------+---------+-----------+ + * | APP | USER | USED | PENDING | FAIRSHARE | + * +--------------+------+---------+-----------+ + * | app1 | user1 | 40 | 0 | 25 | + * | app2 | user1 | 10 | 0 | 25 | + * | app3 | user2 | 50 | 0 | 25 | + * | app4 | user2 | 0 | 30 | 25 | + * +--------------+------+---------+-----------+ + * + * Because user1 is at its user limit, for app4 of user2, + * we will preempt the containers from user2 only, i.e. app3 of user2. + * Because we want to maintain fairness, we will preempt 24 containers from app3 + * (whose fairshare is 25% -> 1/2 of 50% UL) + */ + // Enable FairOrderingPolicy for yarn.scheduler.capacity.root.a + conf.set(CapacitySchedulerConfiguration.PREFIX + + CapacitySchedulerConfiguration.ROOT + ".a.ordering-policy", "fair"); + // Make sure all containers will be preempted in a single round. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 1.0); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 1 0]);" + // root + "-a(=[100 100 100 1 0])"; // a + String appsConfig = + // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user) + "a\t" // app1 and app2, user1 in a + + "(1,1,n1,,40,false,0,user1);" + + "a\t" + + "(1,1,n1,,10,false,0,user1);" + + "a\t" // app3 and app4 from user2 in a + + "(1,1,n1,,50,false,0,user2);" + + "a\t" + + "(1,1,n1,,0,false,30,user2)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(eventHandler, times(24)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + } + + @Test + public void testIntraQueuePreemptionFairOPFairnessAcrossUsers() + throws IOException { + /** + * Scenario: + * Queue total resources: 100 + * Minimum user limit percent: 50% + * +--------------+------+---------+-----------+ + * | APP | USER | USED | PENDING | FAIRSHARE | + * +--------------+------+---------+-----------+ + * | app1 | user1 | 51 | 0 | 25 | + * | app2 | user1 | 49 | 0 | 25 | + * | app3 | user2 | 0 | 50 | 33 | + * +--------------+------+---------+-----------+ + * + * User1 has 2 apps, Each app will have 25% fairshare (1/2 of 50% of user's fairShare) + * App3 has fairShare = min (UL / num of apps in user(50%), + * queueResources / num of apps in queue (33%)) = 33% + * So app3 asks for 33 resources only. (51-25) = 26 are given by app1 and rest by app2. + */ + // Enable FairOrderingPolicy for yarn.scheduler.capacity.root.a + conf.set(CapacitySchedulerConfiguration.PREFIX + + CapacitySchedulerConfiguration.ROOT + ".a.ordering-policy", "fair"); + // Make sure all containers will be preempted in a single round. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 1.0); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 1 0]);" + // root + "-a(=[100 100 100 1 0])"; // a + String appsConfig = + // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user) + "a\t" // app1 and app2, user1 in a + + "(1,1,n1,,51,false,0,user1);" + + "a\t" + + "(1,1,n1,,49,false,0,user1);" + + "a\t" // app3, user2 in a + + "(1,1,n1,,0,false,50,user2);"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(eventHandler, times(26)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(eventHandler, times(8)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + } + + @Test + public void testIntraQueuePreemptionFairOPUserLimitPreserved() + throws IOException { + /** + * Scenario: + * Queue total resources: 100 + * Minimum user limit percent: 50% + * +--------------+------------+------+---------+-----------+ + * | APP | USER | USER-LIMIT | USED | PENDING | FAIRSHARE | + * +--------------+------------+------+---------+-----------+ + * | app1 | user1 | 50 | 100 | 0 | 50 | + * | app3 | user2 | 50 | 0 | 50 | 50 | + * | app4 | user3 | 50 | 0 | 50 | 50 | + * +--------------+------------+------+---------+-----------+ + * + * User1 has 1 app, This app will have 50% fairshare (100% of user1's fairShare) + * So 50 containers from app1 should be preempted. + */ + // Enable FairOrderingPolicy for yarn.scheduler.capacity.root.a + conf.set(CapacitySchedulerConfiguration.PREFIX + + CapacitySchedulerConfiguration.ROOT + ".a.ordering-policy", "fair"); + // Make sure all containers will be preempted in a single round. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 1.0); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 1 0]);" + // root + "-a(=[100 100 100 1 0])"; // a + String appsConfig = + // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user) + "a\t" // app1 and app2, user1 in a + + "(1,1,n1,,100,false,0,user1)" + "\t50;" + + "a\t" // app3, user2 in a + + "(1,1,n1,,0,false,50,user2)" + "\t50;" + + "a\t" // app3, user2 in a + + "(1,1,n1,,0,false,50,user3)" + "\t50;"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(eventHandler, times(67)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockQueueHierarchy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockQueueHierarchy.java index ae4ff5a663e..c37030b139f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockQueueHierarchy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockQueueHierarchy.java @@ -259,6 +259,7 @@ private void setupQueue(CSQueue queue, String q, String[] queueExprArray, LeafQueue lq = (LeafQueue) queue; when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class), isA(String.class), eq(false))).thenReturn(pending); + when(lq.getTotalPendingResources(isA(String.class), eq(false))).thenReturn(pending); when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class), isA(String.class), eq(true))).thenReturn( Resources.subtract(pending, reserved)); -- 2.23.0