From 6c793daa0d8f86494a68f9d2aa57590990d62578 Mon Sep 17 00:00:00 2001 From: ananyo Date: Tue, 12 Jan 2021 12:38:49 +0530 Subject: [PATCH] YARN-10559 --- .../capacity/AbstractPreemptionEntity.java | 16 ++ .../FifoIntraQueuePreemptionPlugin.java | 193 +++++++++++++- .../IntraQueueCandidatesSelector.java | 16 +- .../IntraQueuePreemptionComputePlugin.java | 3 + .../monitor/capacity/TempAppPerPartition.java | 9 +- .../capacity/TempUserPerPartition.java | 38 +++ ...reemptionPolicyIntraQueueFairOrdering.java | 241 +++++++++++++++++- 7 files changed, 499 insertions(+), 17 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java index cb4d7af769d..02288f25a4b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java @@ -42,6 +42,7 @@ Resource selected; private Resource actuallyToBePreempted; private Resource toBePreemptFromOther; + private Resource fairShare; AbstractPreemptionEntity(String queueName, Resource usedPerPartition, Resource amUsedPerPartition, Resource reserved, @@ -99,4 +100,19 @@ public void setToBePreemptFromOther(Resource toBePreemptFromOther) { this.toBePreemptFromOther = toBePreemptFromOther; } + /** + * Getter method to return fair share for an application in a given queue. + * @return resource Fair Share Resource object + */ + public Resource getFairShare() { + return fairShare; + } + + /** + * Setter method to update fair share for an application in a given queue. + * @param fairShare Fair Share Resource object + */ + public void setFairShare(Resource fairShare) { + this.fairShare = fairShare; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java index 7c3abb49254..8af5f0ac0e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java @@ -141,6 +141,10 @@ public void computeAppsIdealAllocation(Resource clusterResource, PriorityQueue orderedByPriority = createTempAppForResCalculation( tq, apps, clusterResource, perUserAMUsed); + if(tq.leafQueue.getOrderingPolicy() instanceof FairOrderingPolicy) { + setFairShareForApps(tq, clusterResource, queueReassignableResource); + } + // 4. Calculate idealAssigned per app by checking based on queue's // unallocated resource.Also return apps arranged from lower priority to // higher priority. @@ -176,9 +180,16 @@ public void computeAppsIdealAllocation(Resource clusterResource, // 8. There are chances that we may preempt for the demand from same // priority level, such cases are to be validated out. - validateOutSameAppPriorityFromDemand(clusterResource, - (TreeSet) orderedApps, tq.getUsersPerPartition(), - context.getIntraQueuePreemptionOrderPolicy()); + + // if fairOrderingPolicy is being used, calculate toBePreempted + // based on FS per app. + if(tq.leafQueue.getOrderingPolicy() instanceof FairOrderingPolicy) { + calcActuallyToBePreemptedBasedOnFS(clusterResource, orderedApps); + } else { + validateOutSameAppPriorityFromDemand(clusterResource, + (TreeSet) orderedApps, tq.getUsersPerPartition(), + context.getIntraQueuePreemptionOrderPolicy()); + } if (LOG.isDebugEnabled()) { LOG.debug("Queue Name:" + tq.queueName + ", partition:" + tq.partition); @@ -188,6 +199,59 @@ public void computeAppsIdealAllocation(Resource clusterResource, } } + /** + * For Fairness calculation, we treat UserLimit as an upper bound + * to the amount of resources allocated to a user. + * Following is the calculation used: + * + * fairSharePerApp = total Queue Cap / no: of apps + * idealFairSharePerAppwithUL = UL / no: of apps of that user + * + * fairSharePerApp = min(fairSharePerApp, + * idealFairSharePerAppwithUL); + * + * Using above formula, + * we firstly ensure all the apps in the queue get equal resources. + * However, if any user would be hitting their userlimit, + * then we try and ensure all apps for that user + * have fairness within their UserLimit. + * @param tq tempQueuePerPartition being considered + * @param clusterResource total resources present in the cluster + */ + private void setFairShareForApps(TempQueuePerPartition tq, + Resource clusterResource, + Resource queueReassignableResource) { + int numOfAppsInQueue = tq.leafQueue.getAllApplications().size(); + Resource fairShareAcrossApps = Resources.none(); + + if(numOfAppsInQueue > 0) { + fairShareAcrossApps = Resources.divideAndCeil( + this.rc, queueReassignableResource, numOfAppsInQueue); + } + + for(TempUserPerPartition tmpUser : tq.getUsersPerPartition().values()){ + int numOfAppsInUser = tmpUser.getApps().size(); + Resource fairShareWithinUL = Resources.divideAndCeil( + this.rc, tmpUser.getUserLimit(), numOfAppsInUser); + + for(TempAppPerPartition tmpApp : tmpUser.getApps()) { + Resource fairShareForApp = Resources.min( + rc, clusterResource, fairShareAcrossApps, fairShareWithinUL); + + fairShareForApp = + Resources.componentwiseMax(fairShareForApp, Resources.none()); + tmpApp.setFairShare(fairShareForApp); + + LOG.debug("App: " + tmpApp.getApplicationId() + + " from user: " + tmpUser.getUserName() + + ", FairShareAcrossApps: " + fairShareAcrossApps + + ", fairShareWithinUL: " + fairShareWithinUL + + ", num_of_apps for user: " + numOfAppsInUser + + ". Calculated FairShare for app is: " + tmpApp.getFairShare()); + } + } + } + private void calculateToBePreemptedResourcePerApp(Resource clusterResource, TreeSet orderedApps, Resource preemptionLimit) { @@ -307,6 +371,17 @@ private void calculateToBePreemptedResourcePerApp(Resource clusterResource, // idealAssigned may fall to 0 if higher priority apps demand is more. Resource appIdealAssigned = Resources.add(tmpApp.getUsedDeductAM(), tmpApp.getPending()); + + // In case of fair ordering based preemption, + // consider fairShare of each app in a given queue + // as a max-cap. + if (queueOrderingPolicy instanceof FairOrderingPolicy) { + appIdealAssigned = Resources.min( + rc, clusterResource, tmpApp.getFairShare(), appIdealAssigned); + appIdealAssigned = Resources.componentwiseMax( + appIdealAssigned, Resources.none()); + } + Resources.subtractFrom(appIdealAssigned, tmpApp.selected); if (Resources.lessThan(rc, clusterResource, idealAssignedForUser, @@ -450,6 +525,9 @@ private void getAlreadySelectedPreemptionCandidatesResource( } tmpApp.setTempUserPerPartition(tmpUser); orderedByPriority.add(tmpApp); + tq.getUsersPerPartition() + .get(tmpUser.getUserName()) + .addApp(tmpApp.getApplicationId(), tmpApp); } return orderedByPriority; @@ -573,6 +651,66 @@ public void validateOutSameAppPriorityFromDemand(Resource cluster, } } + /** + * For each starved app, iterate over all the overfed apps. + * Mark as many resources as possible from overfed apps + * to satisfy the starvation of the starved app. + * starvedResources = + * ToBePreemptFromOther - starvation_fulfilled_by_other_overfed_apps + * overfedResources = + * ToBePreempted - resources_already_marked_for_preemption + * @param clusterResource total resources present in the cluster + * @param orderedApps TreeSet of apps ordered by the excess used resources + */ + private void calcActuallyToBePreemptedBasedOnFS(Resource clusterResource, + TreeSet orderedApps) { + TempAppPerPartition[] apps = orderedApps.toArray( + new TempAppPerPartition[orderedApps.size()]); + if (apps.length <= 1) { + return; + } + + for(int starvedAppInd = apps.length-1; starvedAppInd>=0; starvedAppInd--) { + TempAppPerPartition starvedApp = apps[starvedAppInd]; + + for(TempAppPerPartition overfedApp : apps) { + if(overfedApp == starvedApp) { + continue; + } + + Resource preemptForStarved = starvedApp.getToBePreemptFromOther(); + if(Resources.lessThanOrEqual( + rc, clusterResource, preemptForStarved, Resources.none())) { + break; + } + + Resource preemptFromOverfed = + Resources.subtract(overfedApp.toBePreempted, + overfedApp.getActuallyToBePreempted()); + if(Resources.lessThanOrEqual( + rc, clusterResource, preemptFromOverfed, Resources.none())) { + continue; + } + + + Resource preempt = + Resources.lessThanOrEqual( + rc, clusterResource, preemptFromOverfed, preemptForStarved) ? + preemptFromOverfed : preemptForStarved; + + LOG.debug("Marking: " + preempt + + " resources which can be preempted from " + overfedApp + + " to " + starvedApp); + + starvedApp.setToBePreemptFromOther( + Resources.subtract(starvedApp.getToBePreemptFromOther(), preempt)); + + overfedApp.setActuallyToBePreempted( + Resources.add(overfedApp.getActuallyToBePreempted(), preempt)); + } + } + } + private Resource calculateUsedAMResourcesPerQueue(String partition, LeafQueue leafQueue, Map perUserAMUsed) { Collection runningApps = leafQueue.getApplications(); @@ -605,12 +743,7 @@ public boolean skipContainerBasedOnIntraQueuePolicy(FiCaSchedulerApp app, // skip some containers as per this check. If used resource is under user // limit, then these containers of this user has to be preempted as demand // might be due to high priority apps running in same user. - String partition = context.getScheduler() - .getSchedulerNode(c.getAllocatedNode()).getPartition(); - String queuePath = - context.getScheduler().getQueue(app.getQueueName()).getQueuePath(); - TempQueuePerPartition tq = - context.getQueueByPartition(queuePath, partition); + TempQueuePerPartition tq = getTmpQueueOfContainer(app, c); TempUserPerPartition tmpUser = tq.getUsersPerPartition().get(app.getUser()); // Given user is not present, skip the check. @@ -618,6 +751,19 @@ public boolean skipContainerBasedOnIntraQueuePolicy(FiCaSchedulerApp app, return false; } + TempAppPerPartition tmpApp = tmpUser.getApp(app.getApplicationId()); + + if(tq.leafQueue.getOrderingPolicy() instanceof FairOrderingPolicy) { + // Check we don't preempt more resources than + // ActuallyToBePreempted from the app + if(tmpApp != null + && Resources.greaterThanOrEqual(rc, clusterResource, + tmpApp.getActuallyToBePreempted(), c.getAllocatedResource())) { + return false; + } + return true; + } + // For ideal resource computations, user-limit got saved by subtracting am // used resource in TempUser. Hence it has to be added back here for // complete check. @@ -628,4 +774,33 @@ public boolean skipContainerBasedOnIntraQueuePolicy(FiCaSchedulerApp app, && context.getIntraQueuePreemptionOrderPolicy() .equals(IntraQueuePreemptionOrderPolicy.USERLIMIT_FIRST); } + + private TempQueuePerPartition getTmpQueueOfContainer( + FiCaSchedulerApp app, RMContainer c) { + String partition = context.getScheduler() + .getSchedulerNode(c.getAllocatedNode()).getPartition(); + String queuePath = context.getScheduler() + .getQueue(app.getQueueName()).getQueuePath(); + TempQueuePerPartition tq = context.getQueueByPartition( + queuePath, partition); + return tq; + } + + @Override + public void deductActuallyToBePreemptedFromApp(FiCaSchedulerApp app, + RMContainer c, Resource clusterResource) { + TempQueuePerPartition tq = getTmpQueueOfContainer(app, c); + TempUserPerPartition tmpUser = tq.getUsersPerPartition().get(app.getUser()); + if(tmpUser == null) { + return; + } + + TempAppPerPartition tmpApp = tmpUser.getApp(app.getApplicationId()); + tmpApp.deductActuallyToBePreempted( + rc, clusterResource, c.getAllocatedResource()); + + LOG.debug("App metrics after marking container: " + c.getContainerId() + + " with resources: " + c.getAllocatedResource() + + " to_be_preempted from app: " + tmpApp); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java index 8a1b47b5dee..93c561259c2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.AbstractComparatorOrderingPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -256,7 +257,9 @@ private void preemptFromLeastStarvedApp(LeafQueue leafQueue, c.getContainerId(), c.getAllocatedResource(), app.getUser(), rollingUsedResourcePerUser); - break; + // Other containers from current app may still be preemptable + // within the current constraints. + continue; } // Try to preempt this container @@ -272,6 +275,17 @@ private void preemptFromLeastStarvedApp(LeafQueue leafQueue, Resources.subtractFrom(rollingUsedResourcePerUser, c.getAllocatedResource()); } + + // If FairOrderingPolicy is being used, + // subtract the preempted resource from the app's to-be-preempted. + // This is required because we don't keep track of + // preempted containers from an app in this round. + // So, for fair-share preemption, we don't know if, + // after preemption, the app is dropping below its fairShare. + if(ret && leafQueue.getOrderingPolicy() instanceof FairOrderingPolicy) { + fifoPreemptionComputePlugin + .deductActuallyToBePreemptedFromApp(app, c, clusterResource); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java index 56fd007d40b..1f605f4c682 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java @@ -44,4 +44,7 @@ void computeAppsIdealAllocation(Resource clusterResource, boolean skipContainerBasedOnIntraQueuePolicy(FiCaSchedulerApp app, Resource clusterResource, Resource usedResource, RMContainer c); + + void deductActuallyToBePreemptedFromApp(FiCaSchedulerApp app, + RMContainer c, Resource clusterResource); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java index 05d8096a273..40a831e6f7c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java @@ -66,9 +66,12 @@ public String toString() { .append(idealAssigned).append(" PREEMPT_OTHER: ") .append(getToBePreemptFromOther()).append(" IDEAL_PREEMPT: ") .append(toBePreempted).append(" ACTUAL_PREEMPT: ") - .append(getActuallyToBePreempted()).append("\n"); + .append(getActuallyToBePreempted()); + if(getFairShare() != null) { + sb.append(" FAIR-SHARE: ").append(getFairShare()); + } - return sb.toString(); + return sb.append("\n").toString(); } void appendLogString(StringBuilder sb) { @@ -98,7 +101,7 @@ public String getUser() { public void deductActuallyToBePreempted(ResourceCalculator resourceCalculator, Resource cluster, Resource toBeDeduct) { - if (Resources.greaterThan(resourceCalculator, cluster, + if (Resources.greaterThanOrEqual(resourceCalculator, cluster, getActuallyToBePreempted(), toBeDeduct)) { Resources.subtractFrom(getActuallyToBePreempted(), toBeDeduct); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempUserPerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempUserPerPartition.java index 33ee18f1c22..92231f3c4e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempUserPerPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempUserPerPartition.java @@ -18,11 +18,15 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; /** * Temporary data-structure tracking resource availability, pending resource @@ -34,12 +38,15 @@ private Resource userLimit; private boolean donePreemptionQuotaForULDelta = false; + private Map apps; + TempUserPerPartition(User user, String queueName, Resource usedPerPartition, Resource amUsedPerPartition, Resource reserved, Resource pendingPerPartition) { super(queueName, usedPerPartition, amUsedPerPartition, reserved, pendingPerPartition); this.user = user; + this.apps = new HashMap<>(); } @Override @@ -85,4 +92,35 @@ public boolean isPreemptionQuotaForULDeltaDone() { public void updatePreemptionQuotaForULDeltaAsDone(boolean done) { this.donePreemptionQuotaForULDelta = done; } + + /** + * Method to add a new app under this user. + * @param applicationId application_id of the app + * @param tempAppPerPartition TempAppPerPartition object of the app + */ + public void addApp(ApplicationId applicationId, + TempAppPerPartition tempAppPerPartition) { + apps.put(applicationId, tempAppPerPartition); + } + + /** + * Getter method to return TempAppPerPartition for given application_id. + * @param applicationId application_id of the app + * @return TempAppPerPartition corresponding to given app_id. + * Null if app_id is absent. + */ + public TempAppPerPartition getApp(ApplicationId applicationId) { + if(!apps.containsKey(applicationId)) { + return null; + } + return apps.get(applicationId); + } + + /** + * Getter method to return all the TempAppPerPartition under given user. + * @return collection of TempAppPerPartition user this user. + */ + public Collection getApps() { + return apps.values(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering.java index eb9d21836da..eed84f43070 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering.java @@ -191,9 +191,10 @@ public void testIntraQueuePreemptionFairOrderingPolicyMulitipleAppsPerUser() // user2/app3 has 40 resources in queue a // user3/app4 is requesting 20 resources in queue a // With 3 users, preemptable user limit should be around 35 resources each. - // With FairOrderingPolicy enabled on queue a, all 20 resources should be - // preempted from app1 since it's the most over served app from the most - // over served user + // With FairOrderingPolicy enabled on queue a, + // we will preempt around (60 - 35) ~ 25 resources + // from user1's app1 and app2 + // and rest from user2's app3. String appsConfig = // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user) "a\t" // app1 and app2, user1 in a @@ -209,9 +210,12 @@ public void testIntraQueuePreemptionFairOrderingPolicyMulitipleAppsPerUser() buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); policy.editSchedule(); - verify(eventHandler, times(20)).handle(argThat( + verify(eventHandler, times(17)).handle(argThat( new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( getAppAttemptId(1)))); + verify(eventHandler, times(3)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); } /* @@ -274,4 +278,233 @@ public void testIntraQueuePreemptionFifoOrderingPolicyMultipleAppsPerUser() new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( getAppAttemptId(1)))); } + + @Test + public void testIntraQueuePreemptionFairOrderingPolicyMulitiAppsSingleUser() + throws IOException { + /** + * Scenario: + * Queue total resources: 100 + * Minimum user limit percent: 100% + * +--------------+------+---------+-----------+ + * | APP | USER | USED | PENDING | FAIRSHARE | + * +--------------+------+---------+-----------+ + * | app1 | user1 | 25 | 0 | 25 | + * | app2 | user1 | 35 | 0 | 25 | + * | app3 | user1 | 40 | 0 | 25 | + * | app4 | user1 | 0 | 20 | 25 | + * +--------------+------+---------+-----------+ + * Because all apps are from same user, + * we expect each app to have 25% fairshare. + * So app2 can give 9 containers and + * app3 can loose 13 containers (exclude AM). + * However, because we are using FiCa scheduling, + * app3 is least starved + * and so we will take all its resources above its fairshare. + * (check: IntraQueuePreemptionPolicy: preemptFromLeastStarvedApp()) + * So we expect app3 to give 13 containers and + * app2 to give the remaining containers (20 - 13 = 7 containers) + */ + // Enable FairOrderingPolicy for yarn.scheduler.capacity.root.a + conf.set(CapacitySchedulerConfiguration.PREFIX + + CapacitySchedulerConfiguration.ROOT + ".a.ordering-policy", "fair"); + // Make sure all containers will be preempted in a single round. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 1.0); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 1 0]);" + // root + "-a(=[100 100 100 1 0])"; // a + + String appsConfig = + // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user) + "a\t" // app1, app2, app3, app4 from user1 + + "(1,1,n1,,25,false,0,user1);" + + "a\t" + + "(1,1,n1,,35,false,0,user1);" + + "a\t" + + "(1,1,n1,,40,false,0,user1);" + + "a\t" + + "(1,1,n1,,0,false,20,user1)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(eventHandler, times(13)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + verify(eventHandler, times(7)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + } + + @Test + public void testIntraQueuePreemptionFairOPIntraUserPreemption() + throws IOException { + /** + * Scenario: + * Queue total resources: 100 + * Minimum user limit percent: 50% + * +--------------+------+---------+-----------+ + * | APP | USER | USED | PENDING | FAIRSHARE | + * +--------------+------+---------+-----------+ + * | app1 | user1 | 40 | 0 | 25 | + * | app2 | user1 | 10 | 0 | 25 | + * | app3 | user2 | 50 | 0 | 25 | + * | app4 | user2 | 0 | 30 | 25 | + * +--------------+------+---------+-----------+ + * + * Because user1 is at its user limit, for app4 of user2, + * we will preempt the containers from user2 only, i.e. app3 of user2. + * Because we want to maintain fairness, + * we will preempt 24 containers from app3 + * (whose fairshare is 25% -> 1/2 of 50% UL) + */ + // Enable FairOrderingPolicy for yarn.scheduler.capacity.root.a + conf.set(CapacitySchedulerConfiguration.PREFIX + + CapacitySchedulerConfiguration.ROOT + ".a.ordering-policy", "fair"); + // Make sure all containers will be preempted in a single round. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 1.0); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 1 0]);" + // root + "-a(=[100 100 100 1 0])"; // a + String appsConfig = + // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user) + "a\t" // app1 and app2, user1 in a + + "(1,1,n1,,40,false,0,user1);" + + "a\t" + + "(1,1,n1,,10,false,0,user1);" + + "a\t" // app3 and app4 from user2 in a + + "(1,1,n1,,50,false,0,user2);" + + "a\t" + + "(1,1,n1,,0,false,30,user2)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(eventHandler, times(24)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + } + + @Test + public void testIntraQueuePreemptionFairOPFairnessAcrossUsers() + throws IOException { + /** + * Scenario: + * Queue total resources: 100 + * Minimum user limit percent: 50% + * +--------------+------+---------+-----------+ + * | APP | USER | USED | PENDING | FAIRSHARE | + * +--------------+------+---------+-----------+ + * | app1 | user1 | 51 | 0 | 25 | + * | app2 | user1 | 49 | 0 | 25 | + * | app3 | user2 | 0 | 50 | 33 | + * +--------------+------+---------+-----------+ + * + * User1 has 2 apps, Each app will have + * 25% fairshare (1/2 of 50% of user's fairShare) + * App3 has fairShare + * = min (UL / num of apps in user(50%), + * queueResources / num of apps in queue (33%)) = 33% + * So app3 asks for 33 resources only. (51-25) + * = 26 are given by app1 and rest by app2. + */ + // Enable FairOrderingPolicy for yarn.scheduler.capacity.root.a + conf.set(CapacitySchedulerConfiguration.PREFIX + + CapacitySchedulerConfiguration.ROOT + ".a.ordering-policy", "fair"); + // Make sure all containers will be preempted in a single round. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 1.0); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 1 0]);" + // root + "-a(=[100 100 100 1 0])"; // a + String appsConfig = + // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user) + "a\t" // app1 and app2, user1 in a + + "(1,1,n1,,51,false,0,user1);" + + "a\t" + + "(1,1,n1,,49,false,0,user1);" + + "a\t" // app3, user2 in a + + "(1,1,n1,,0,false,50,user2);"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(eventHandler, times(25)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(eventHandler, times(8)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + } + + @Test + public void testIntraQueuePreemptionFairOPUserLimitPreserved() + throws IOException { + /** + * Scenario: + * Queue total resources: 100 + * Minimum user limit percent: 50% + * +--------------+------------+------+---------+-----------+ + * | APP | USER | USER-LIMIT | USED | PENDING | FAIRSHARE | + * +--------------+------------+------+---------+-----------+ + * | app1 | user1 | 50 | 100 | 0 | 33 | + * | app3 | user2 | 50 | 0 | 50 | 33 | + * | app4 | user3 | 50 | 0 | 50 | 33 | + * +--------------+------------+------+---------+-----------+ + * + * User1 has 1 app, This app will have 50% fairshare + * (100% of user1's fairShare) + * So 50 containers from app1 should be preempted. + */ + // Enable FairOrderingPolicy for yarn.scheduler.capacity.root.a + conf.set(CapacitySchedulerConfiguration.PREFIX + + CapacitySchedulerConfiguration.ROOT + ".a.ordering-policy", "fair"); + // Make sure all containers will be preempted in a single round. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 1.0); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 1 0]);" + // root + "-a(=[100 100 100 1 0])"; // a + String appsConfig = + // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user) + "a\t" // app1 and app2, user1 in a + + "(1,1,n1,,100,false,0,user1)" + "\t50;" + + "a\t" // app3, user2 in a + + "(1,1,n1,,0,false,50,user2)" + "\t50;" + + "a\t" // app3, user2 in a + + "(1,1,n1,,0,false,50,user3)" + "\t50;"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(eventHandler, times(65)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + } } -- 2.23.0