diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java index cb4d7af769d..85b3df6972e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java @@ -20,6 +20,7 @@ import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -42,6 +43,7 @@ Resource selected; private Resource actuallyToBePreempted; private Resource toBePreemptFromOther; + private Resource fairShare; AbstractPreemptionEntity(String queueName, Resource usedPerPartition, Resource amUsedPerPartition, Resource reserved, @@ -91,6 +93,14 @@ public void setActuallyToBePreempted(Resource actuallyToBePreempted) { this.actuallyToBePreempted = actuallyToBePreempted; } + public void deductToBePreempted(ResourceCalculator resourceCalculator, + Resource cluster, Resource toBeDeduct) { + if (Resources.greaterThanOrEqual(resourceCalculator, cluster, + toBePreempted, toBeDeduct)) { + Resources.subtractFrom(toBePreempted, toBeDeduct); + } + } + public Resource getToBePreemptFromOther() { return toBePreemptFromOther; } @@ -99,4 +109,20 @@ public void setToBePreemptFromOther(Resource toBePreemptFromOther) { this.toBePreemptFromOther = toBePreemptFromOther; } + /** + * Getter method to return fair share for an application in a given queue. + * @return resource Fair Share Resource object + */ + public Resource getFairShare() { + return fairShare; + } + + /** + * Setter method to update fair share for an application in a given queue. + * @param fairShare Fair Share Resource object + */ + public void setFairShare(Resource fairShare) { + this.fairShare = fairShare; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java index 5eac4c13d7a..bd86fb55026 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java @@ -143,6 +143,10 @@ public void computeAppsIdealAllocation(Resource clusterResource, PriorityQueue orderedByPriority = createTempAppForResCalculation( tq, apps, clusterResource, perUserAMUsed); + if(tq.leafQueue.getOrderingPolicy() instanceof FairOrderingPolicy) { + setFairShareForApps(tq); + } + // 4. Calculate idealAssigned per app by checking based on queue's // unallocated resource.Also return apps arranged from lower priority to // higher priority. @@ -180,7 +184,8 @@ public void computeAppsIdealAllocation(Resource clusterResource, // priority level, such cases are to be validated out. validateOutSameAppPriorityFromDemand(clusterResource, (TreeSet) orderedApps, tq.getUsersPerPartition(), - context.getIntraQueuePreemptionOrderPolicy()); + context.getIntraQueuePreemptionOrderPolicy(), + tq.leafQueue.getOrderingPolicy()); if (LOG.isDebugEnabled()) { LOG.debug("Queue Name:" + tq.queueName + ", partition:" + tq.partition); @@ -190,6 +195,16 @@ public void computeAppsIdealAllocation(Resource clusterResource, } } + private void setFairShareForApps(TempQueuePerPartition tq) { + for(TempUserPerPartition tmpUser : tq.getUsersPerPartition().values()){ + int numActiveApps = tmpUser.getApps().size(); + Resource fairSharePerApp = Resources.divideAndCeil(this.rc, tmpUser.getUserLimit(), numActiveApps); + for(TempAppPerPartition tmpApp : tmpUser.getApps()) { + tmpApp.setFairShare(fairSharePerApp); + } + } + } + private void calculateToBePreemptedResourcePerApp(Resource clusterResource, TreeSet orderedApps, Resource preemptionLimit) { @@ -309,6 +324,15 @@ private void calculateToBePreemptedResourcePerApp(Resource clusterResource, // idealAssigned may fall to 0 if higher priority apps demand is more. Resource appIdealAssigned = Resources.add(tmpApp.getUsedDeductAM(), tmpApp.getPending()); + + // In case of fair ordering based preemption, consider fairShare of each app in a given queue + // as a max-cap. + if (queueOrderingPolicy instanceof FairOrderingPolicy) { + appIdealAssigned = Resources.min(rc, clusterResource, tmpApp.getFairShare(), appIdealAssigned); + // Ensure we have a non-negative appIdealAssigned + appIdealAssigned = Resources.max(rc, clusterResource, Resources.createResource(0, 0), appIdealAssigned); + } + Resources.subtractFrom(appIdealAssigned, tmpApp.selected); if (Resources.lessThan(rc, clusterResource, idealAssignedForUser, @@ -454,6 +478,7 @@ private void getAlreadySelectedPreemptionCandidatesResource( } tmpApp.setTempUserPerPartition(tmpUser); orderedByPriority.add(tmpApp); + tq.getUsersPerPartition().get(tmpUser.getUserName()).addApp(tmpApp.getApplicationId(), tmpApp); } return orderedByPriority; @@ -468,7 +493,8 @@ private void getAlreadySelectedPreemptionCandidatesResource( public void validateOutSameAppPriorityFromDemand(Resource cluster, TreeSet orderedApps, Map usersPerPartition, - IntraQueuePreemptionOrderPolicy intraQueuePreemptionOrder) { + IntraQueuePreemptionOrderPolicy intraQueuePreemptionOrder, + OrderingPolicy orderingPolicy) { TempAppPerPartition[] apps = orderedApps .toArray(new TempAppPerPartition[orderedApps.size()]); @@ -493,8 +519,10 @@ public void validateOutSameAppPriorityFromDemand(Resource cluster, if (Resources.greaterThan(rc, cluster, apps[lPriority].toBePreempted, Resources.none())) { + // Skip priority checks for FairOrderingPolicy or, // If apps are of same user, and priority is same, then skip. - if ((apps[hPriority].getUser().equals(apps[lPriority].getUser())) + if (!(orderingPolicy instanceof FairOrderingPolicy) + && (apps[hPriority].getUser().equals(apps[lPriority].getUser())) && (apps[lPriority].getPriority() >= apps[hPriority] .getPriority())) { continue; @@ -609,12 +637,7 @@ public boolean skipContainerBasedOnIntraQueuePolicy(FiCaSchedulerApp app, // skip some containers as per this check. If used resource is under user // limit, then these containers of this user has to be preempted as demand // might be due to high priority apps running in same user. - String partition = context.getScheduler() - .getSchedulerNode(c.getAllocatedNode()).getPartition(); - String queuePath = - context.getScheduler().getQueue(app.getQueueName()).getQueuePath(); - TempQueuePerPartition tq = - context.getQueueByPartition(queuePath, partition); + TempQueuePerPartition tq = getTmpQueueOfContainer(app, c); TempUserPerPartition tmpUser = tq.getUsersPerPartition().get(app.getUser()); // Given user is not present, skip the check. @@ -622,6 +645,17 @@ public boolean skipContainerBasedOnIntraQueuePolicy(FiCaSchedulerApp app, return false; } + TempAppPerPartition tmpApp = tmpUser.getApp(app.getApplicationId()); + + if(tq.leafQueue.getOrderingPolicy() instanceof FairOrderingPolicy) { + // Check we don't preempt more resources than ActuallyToBePreempted from the app + if(tmpApp != null + && Resources.greaterThanOrEqual(rc, clusterResource, tmpApp.toBePreempted, c.getAllocatedResource())) { + return false; + } + return true; + } + // For ideal resource computations, user-limit got saved by subtracting am // used resource in TempUser. Hence it has to be added back here for // complete check. @@ -632,4 +666,22 @@ public boolean skipContainerBasedOnIntraQueuePolicy(FiCaSchedulerApp app, && context.getIntraQueuePreemptionOrderPolicy() .equals(IntraQueuePreemptionOrderPolicy.USERLIMIT_FIRST); } + + private TempQueuePerPartition getTmpQueueOfContainer(FiCaSchedulerApp app, RMContainer c) { + String partition = context.getScheduler().getSchedulerNode(c.getAllocatedNode()).getPartition(); + String queuePath = context.getScheduler().getQueue(app.getQueueName()).getQueuePath(); + TempQueuePerPartition tq = context.getQueueByPartition(queuePath, partition); + return tq; + } + + @Override + public void deductAppToBePreempted(FiCaSchedulerApp app, RMContainer c, Resource clusterResource) { + TempQueuePerPartition tq = getTmpQueueOfContainer(app, c); + TempUserPerPartition tmpUser = tq.getUsersPerPartition().get(app.getUser()); + if(tmpUser == null) + return; + + TempAppPerPartition tmpApp = tmpUser.getApp(app.getApplicationId()); + tmpApp.deductToBePreempted(rc, clusterResource, c.getAllocatedResource()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java index c52fd957c4c..27f494f8bd1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.AbstractComparatorOrderingPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -275,10 +276,17 @@ private void preemptFromLeastStarvedApp(LeafQueue leafQueue, // Subtract from respective user's resource usage once a container is // selected for preemption. + // + // Also, if FairOrderingPolicy is being used, subtract the preempted resource from the app's to-be-preempted. + // This is required because we don't keep track of preempted containers from apps. + // So, for fair-share preemption, we don't know if, after preemption, the app is dropping below its fairShare. if (ret && preemptionContext.getIntraQueuePreemptionOrderPolicy() .equals(IntraQueuePreemptionOrderPolicy.USERLIMIT_FIRST)) { Resources.subtractFrom(rollingUsedResourcePerUser, c.getAllocatedResource()); + if(leafQueue.getOrderingPolicy() instanceof FairOrderingPolicy) { + fifoPreemptionComputePlugin.deductAppToBePreempted(app, c, clusterResource); + } } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java index 56fd007d40b..12606f7f3e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java @@ -44,4 +44,6 @@ void computeAppsIdealAllocation(Resource clusterResource, boolean skipContainerBasedOnIntraQueuePolicy(FiCaSchedulerApp app, Resource clusterResource, Resource usedResource, RMContainer c); + + void deductAppToBePreempted(FiCaSchedulerApp app, RMContainer c, Resource clusterResource); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java index 05d8096a273..6944987df92 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java @@ -66,9 +66,12 @@ public String toString() { .append(idealAssigned).append(" PREEMPT_OTHER: ") .append(getToBePreemptFromOther()).append(" IDEAL_PREEMPT: ") .append(toBePreempted).append(" ACTUAL_PREEMPT: ") - .append(getActuallyToBePreempted()).append("\n"); + .append(getActuallyToBePreempted()); + if(getFairShare() != null) { + sb.append(" FAIR-SHARE: ").append(getFairShare()); + } - return sb.toString(); + return sb.append("\n").toString(); } void appendLogString(StringBuilder sb) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempUserPerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempUserPerPartition.java index 33ee18f1c22..369c42a6f3e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempUserPerPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempUserPerPartition.java @@ -18,11 +18,15 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; /** * Temporary data-structure tracking resource availability, pending resource @@ -34,12 +38,15 @@ private Resource userLimit; private boolean donePreemptionQuotaForULDelta = false; + private Map apps; + TempUserPerPartition(User user, String queueName, Resource usedPerPartition, Resource amUsedPerPartition, Resource reserved, Resource pendingPerPartition) { super(queueName, usedPerPartition, amUsedPerPartition, reserved, pendingPerPartition); this.user = user; + this.apps = new HashMap<>(); } @Override @@ -85,4 +92,18 @@ public boolean isPreemptionQuotaForULDeltaDone() { public void updatePreemptionQuotaForULDeltaAsDone(boolean done) { this.donePreemptionQuotaForULDelta = done; } + + public void addApp(ApplicationId applicationId, TempAppPerPartition tempAppPerPartition) { + apps.put(applicationId, tempAppPerPartition); + } + + public TempAppPerPartition getApp(ApplicationId applicationId) { + if(!apps.containsKey(applicationId)) + return null; + return apps.get(applicationId); + } + + public Collection getApps() { + return apps.values(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering.java index bf622b8b471..373b8926a34 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering.java @@ -209,9 +209,12 @@ public void testIntraQueuePreemptionFairOrderingPolicyMulitipleAppsPerUser() buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); policy.editSchedule(); - verify(eventHandler, times(20)).handle(argThat( + verify(eventHandler, times(17)).handle(argThat( new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( getAppAttemptId(1)))); + verify(eventHandler, times(3)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); } /* @@ -257,7 +260,7 @@ public void testIntraQueuePreemptionFifoOrderingPolicyMultipleAppsPerUser() buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); policy.editSchedule(); - // app3 is the younges and also over its user limit. 5 should be preempted + // app3 is the youngest and also over its user limit. 5 should be preempted // from app3 until it comes down to user3's user limit. verify(eventHandler, times(5)).handle(argThat( new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( @@ -274,4 +277,222 @@ public void testIntraQueuePreemptionFifoOrderingPolicyMultipleAppsPerUser() new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( getAppAttemptId(1)))); } + + @Test + public void testIntraQueuePreemptionFairOrderingPolicyMulitipleAppsSingleUser() + throws IOException { + /** + * Scenario: + * Queue total resources: 100 + * Minimum user limit percent: 100% + * +--------------+------+---------+-----------+ + * | APP | USER | USED | PENDING | FAIRSHARE | + * +--------------+------+---------+-----------+ + * | app1 | user1 | 25 | 0 | 25 | + * | app2 | user1 | 35 | 0 | 25 | + * | app3 | user1 | 40 | 0 | 25 | + * | app4 | user1 | 0 | 20 | 25 | + * +--------------+------+---------+-----------+ + * Because all apps are from same user, we expect each app to have 25% fairshare. + * So app2 can give 9 containers and app3 can loose 13 containers (exclude AM). + * However, because we are using FiCa scheduling, app3 is least starved + * and so we will take all its resources above its fairshare. + * (check: IntraQueuePreemptionPolicy: preemptFromLeastStarvedApp()) + * So we expect app3 to give 13 containers and app2 to give the remaining containers (20 - 13 = 7 containers) + */ + // Enable FairOrderingPolicy for yarn.scheduler.capacity.root.a + conf.set(CapacitySchedulerConfiguration.PREFIX + + CapacitySchedulerConfiguration.ROOT + ".a.ordering-policy", "fair"); + // Make sure all containers will be preempted in a single round. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 1.0); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 1 0]);" + // root + "-a(=[100 100 100 1 0])"; // a + + String appsConfig = + // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user) + "a\t" // app1, app2, app3, app4 from user1 + + "(1,1,n1,,25,false,0,user1);" + + "a\t" + + "(1,1,n1,,35,false,0,user1);" + + "a\t" + + "(1,1,n1,,40,false,0,user1);" + + "a\t" + + "(1,1,n1,,0,false,20,user1)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(eventHandler, times(13)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + verify(eventHandler, times(7)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + } + + @Test + public void testIntraQueuePreemptionFairOPIntraUserPreemption() + throws IOException { + /** + * Scenario: + * Queue total resources: 100 + * Minimum user limit percent: 50% + * +--------------+------+---------+-----------+ + * | APP | USER | USED | PENDING | FAIRSHARE | + * +--------------+------+---------+-----------+ + * | app1 | user1 | 40 | 0 | 25 | + * | app2 | user1 | 10 | 0 | 25 | + * | app3 | user2 | 50 | 0 | 25 | + * | app4 | user2 | 0 | 30 | 25 | + * +--------------+------+---------+-----------+ + * + * Because user1 is at its user limit, for app4 of user2, + * we will preempt the containers from user2 only, i.e. app3 of user2. + * Because we want to maintain fairness, we will preempt 24 containers from app3 + * (whose fairshare is 25% -> 1/2 of 50% UL) + */ + // Enable FairOrderingPolicy for yarn.scheduler.capacity.root.a + conf.set(CapacitySchedulerConfiguration.PREFIX + + CapacitySchedulerConfiguration.ROOT + ".a.ordering-policy", "fair"); + // Make sure all containers will be preempted in a single round. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 1.0); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 1 0]);" + // root + "-a(=[100 100 100 1 0])"; // a + String appsConfig = + // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user) + "a\t" // app1 and app2, user1 in a + + "(1,1,n1,,40,false,0,user1);" + + "a\t" + + "(1,1,n1,,10,false,0,user1);" + + "a\t" // app3 and app4 from user2 in a + + "(1,1,n1,,50,false,0,user2);" + + "a\t" + + "(1,1,n1,,0,false,30,user2)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(eventHandler, times(24)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + } + + @Test + public void testIntraQueuePreemptionFairOPFairnessAcrossUsers() + throws IOException { + /** + * Scenario: + * Queue total resources: 100 + * Minimum user limit percent: 50% + * +--------------+------+---------+-----------+ + * | APP | USER | USED | PENDING | FAIRSHARE | + * +--------------+------+---------+-----------+ + * | app1 | user1 | 51 | 0 | 25 | + * | app2 | user1 | 49 | 0 | 25 | + * | app3 | user2 | 0 | 50 | 50 | + * +--------------+------+---------+-----------+ + * + * User1 has 2 apps, Each app will have 25% fairshare (1/2 of 50% of user's fairShare) + * So ~25 containers from app1 and app2 will be preempted. + */ + // Enable FairOrderingPolicy for yarn.scheduler.capacity.root.a + conf.set(CapacitySchedulerConfiguration.PREFIX + + CapacitySchedulerConfiguration.ROOT + ".a.ordering-policy", "fair"); + // Make sure all containers will be preempted in a single round. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 1.0); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 1 0]);" + // root + "-a(=[100 100 100 1 0])"; // a + String appsConfig = + // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user) + "a\t" // app1 and app2, user1 in a + + "(1,1,n1,,51,false,0,user1);" + + "a\t" + + "(1,1,n1,,49,false,0,user1);" + + "a\t" // app3, user2 in a + + "(1,1,n1,,0,false,50,user2);"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(eventHandler, times(25)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(eventHandler, times(23)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + } + + @Test + public void testIntraQueuePreemptionFairOPUserLimitPreserved() + throws IOException { + /** + * Scenario: + * Queue total resources: 100 + * Minimum user limit percent: 50% + * +--------------+------------+------+---------+-----------+ + * | APP | USER | USER-LIMIT | USED | PENDING | FAIRSHARE | + * +--------------+------------+------+---------+-----------+ + * | app1 | user1 | 50 | 100 | 0 | 50 | + * | app3 | user2 | 50 | 0 | 50 | 50 | + * | app4 | user3 | 50 | 0 | 50 | 50 | + * +--------------+------------+------+---------+-----------+ + * + * User1 has 1 app, This app will have 50% fairshare (100% of user1's fairShare) + * So 50 containers from app1 should be preempted. + */ + // Enable FairOrderingPolicy for yarn.scheduler.capacity.root.a + conf.set(CapacitySchedulerConfiguration.PREFIX + + CapacitySchedulerConfiguration.ROOT + ".a.ordering-policy", "fair"); + // Make sure all containers will be preempted in a single round. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 1.0); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 1 0]);" + // root + "-a(=[100 100 100 1 0])"; // a + String appsConfig = + // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user) + "a\t" // app1 and app2, user1 in a + + "(1,1,n1,,100,false,0,user1)" + "\t50;" + + "a\t" // app3, user2 in a + + "(1,1,n1,,0,false,50,user2)" + "\t50;" + + "a\t" // app3, user2 in a + + "(1,1,n1,,0,false,50,user3)" + "\t50;"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(eventHandler, times(50)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + } }