From 79bc2c5d441ff6ff1e2953e6b594e2999b5ed62e Mon Sep 17 00:00:00 2001 From: Sunil G Date: Thu, 9 Mar 2017 22:55:11 +0530 Subject: [PATCH] YARN-2113 --- .../capacity/FifoIntraQueuePreemptionPlugin.java | 54 +++- .../capacity/IntraQueueCandidatesSelector.java | 67 +++- .../ProportionalCapacityPreemptionPolicy.java | 1 - .../monitor/capacity/TempAppPerPartition.java | 4 + ...apacityPreemptionPolicyIntraQueueUserLimit.java | 349 +++++++++++++++++++++ 5 files changed, 458 insertions(+), 17 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java index 5f1af1e..801e264 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java @@ -32,6 +32,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueueCandidatesSelector.TAPreemptionOrderComparator; import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueueCandidatesSelector.TAPriorityComparator; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; @@ -393,13 +394,27 @@ private void getAlreadySelectedPreemptionCandidatesResource( /* * Fifo+Priority based preemption policy need not have to preempt resources at - * same priority level. Such cases will be validated out. + * same priority level. Such cases will be validated out. But if the demand is + * from an app of different user, force to preempt resources even if apps are + * at same priority. */ public void validateOutSameAppPriorityFromDemand(Resource cluster, TreeSet appsOrderedfromLowerPriority) { - TempAppPerPartition[] apps = appsOrderedfromLowerPriority - .toArray(new TempAppPerPartition[appsOrderedfromLowerPriority.size()]); + // Incoming apps are ordered from lower to higher priority. However its + // possible that some low priority app may have demand (due to userlimit). + // Make sure that higher priority apps with 0 demand is pushed down to + // provide preemption for lower priority apps with demand. + // + // TAPreemptionOrderComparator is considering app's toBePreemptFromOther + // in addition to priority and appId. + Comparator reverseComp = Collections + .reverseOrder(new TAPreemptionOrderComparator()); + TreeSet orderedApps = new TreeSet<>(reverseComp); + orderedApps.addAll(appsOrderedfromLowerPriority); + + TempAppPerPartition[] apps = orderedApps + .toArray(new TempAppPerPartition[orderedApps.size()]); if (apps.length <= 0) { return; } @@ -407,18 +422,34 @@ public void validateOutSameAppPriorityFromDemand(Resource cluster, int lPriority = 0; int hPriority = apps.length - 1; - while (lPriority < hPriority - && !apps[lPriority].equals(apps[hPriority]) - && apps[lPriority].getPriority() < apps[hPriority].getPriority()) { - Resource toPreemptFromOther = apps[hPriority] - .getToBePreemptFromOther(); + while (lPriority < hPriority && !apps[lPriority].equals(apps[hPriority])) { + + // Check whether app with demand needs resource from other user. + if (Resources.greaterThan(rc, cluster, + apps[hPriority].getToBePreemptFromOther(), Resources.none())) { + + // If apps are of same user, increment lPriority as current app at + // lPriority is under same user as of hPriority. + if ((apps[hPriority].getUser().equals(apps[lPriority].getUser())) + && (apps[lPriority].getPriority() == apps[hPriority] + .getPriority())) { + lPriority++; + continue; + } + } else { + // decrement hPriority as current hPriority app doesnt need resource. + hPriority--; + continue; + } + + Resource toPreemptFromOther = apps[hPriority].getToBePreemptFromOther(); Resource actuallyToPreempt = apps[lPriority].getActuallyToBePreempted(); Resource delta = Resources.subtract(apps[lPriority].toBePreempted, actuallyToPreempt); if (Resources.greaterThan(rc, cluster, delta, Resources.none())) { - Resource toPreempt = Resources.min(rc, cluster, - toPreemptFromOther, delta); + Resource toPreempt = Resources.min(rc, cluster, toPreemptFromOther, + delta); apps[hPriority].setToBePreemptFromOther( Resources.subtract(toPreemptFromOther, toPreempt)); @@ -426,8 +457,7 @@ public void validateOutSameAppPriorityFromDemand(Resource cluster, Resources.add(actuallyToPreempt, toPreempt)); } - if (Resources.lessThanOrEqual(rc, cluster, - apps[lPriority].toBePreempted, + if (Resources.lessThanOrEqual(rc, cluster, apps[lPriority].toBePreempted, apps[lPriority].getActuallyToBePreempted())) { lPriority++; continue; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java index 4f2b272..fd0e600 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java @@ -51,17 +51,76 @@ Comparator { @Override - public int compare(TempAppPerPartition tq1, TempAppPerPartition tq2) { - Priority p1 = Priority.newInstance(tq1.getPriority()); - Priority p2 = Priority.newInstance(tq2.getPriority()); + public int compare(TempAppPerPartition ta1, TempAppPerPartition ta2) { + Priority p1 = Priority.newInstance(ta1.getPriority()); + Priority p2 = Priority.newInstance(ta2.getPriority()); if (!p1.equals(p2)) { return p1.compareTo(p2); } - return tq1.getApplicationId().compareTo(tq2.getApplicationId()); + return ta1.getApplicationId().compareTo(ta2.getApplicationId()); } } + /** + * For preemption calculation, apart from priority and appId of an app, + * consider toBePreemptFromOther value also. This is to support user limit + * based preemption. In that case, an app with higher priority will be + * chosen as lower indexed app since another low priority app has some demand + * based on user-limit. + */ + @SuppressWarnings("serial") + static class TAPreemptionOrderComparator + implements + Serializable, + Comparator { + + @Override + public int compare(TempAppPerPartition ta1, TempAppPerPartition ta2) { + Priority p1 = Priority.newInstance(ta1.getPriority()); + Priority p2 = Priority.newInstance(ta2.getPriority()); + + int priorityComparator = p1.compareTo(p2); + + // Consider demand from each app. + Resource demandApp1 = ta1.getToBePreemptFromOther(); + Resource demandApp2 = ta2.getToBePreemptFromOther(); + + if (!p1.equals(p2)) { + // If ta2 is of higher priority, priorityComparator will be non-zero. + // Flip the comparator order if ta2 has no demand, and ta1 has demand. + // Else case is the mirror scenario of same. + if (priorityComparator > 0) { + if (demandApp2.equals(Resources.none()) + && !demandApp1.equals(Resources.none())) + return demandApp2.compareTo(demandApp1); + } else if (priorityComparator < 0) { + if (demandApp1.equals(Resources.none()) + && !demandApp2.equals(Resources.none())) + return demandApp2.compareTo(demandApp1); + } + return priorityComparator; + } + + int fifoComparator = ta1.getApplicationId() + .compareTo(ta2.getApplicationId()); + + // If ta2 submitted earlier to ta1, fifoComparator will be non-zero. + // Flip the comparator order if ta2 has no demand, and ta1 has demand. + // Else case is the mirror scenario of same. + if (fifoComparator > 0) { + if (demandApp2.equals(Resources.none()) + && !demandApp1.equals(Resources.none())) + return demandApp2.compareTo(demandApp1); + } else if (fifoComparator < 0) { + if (demandApp1.equals(Resources.none()) + && !demandApp2.equals(Resources.none())) + return demandApp2.compareTo(demandApp1); + } + return fifoComparator; + } + } + IntraQueuePreemptionComputePlugin fifoPreemptionComputePlugin = null; final CapacitySchedulerPreemptionContext context; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 3bf6994..43c320b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -243,7 +243,6 @@ public synchronized void editSchedule() { } } - @SuppressWarnings("unchecked") private void preemptOrkillSelectedContainerAfterWait( Map> selectedCandidates, long currentTime) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java index fccd2a7..13be65d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java @@ -91,6 +91,10 @@ public ApplicationId getApplicationId() { return applicationId; } + public String getUser() { + return this.app.getUser(); + } + public void deductActuallyToBePreempted(ResourceCalculator resourceCalculator, Resource cluster, Resource toBeDeduct, String partition) { if (Resources.greaterThan(resourceCalculator, cluster, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit.java new file mode 100644 index 0000000..0b12e1e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit.java @@ -0,0 +1,349 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * Test class for IntraQueuePreemption scenarios. + */ +public class TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit + extends + ProportionalCapacityPreemptionPolicyMockFramework { + @Before + public void setup() { + super.setup(); + conf.setBoolean( + CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED, true); + policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock); + } + + @Test + public void testSimpleIntraQueuePreemptionWithTwoUsers() + throws IOException { + /** + * Queue structure is: + * + *
+     *       root
+     *        |
+     *        a    
+     * 
+ * + * Scenario: + * Guaranteed resource of a is 100 Total cluster resource = 100 + * Consider 2 users in a queue, assume minimum user limit factor is 50%. + * Hence in queueA of 100, each user has a quota of 50. app1 of high priority + * has a demand of 0 and its already using 100. app2 from user2 has a demand + * of 30, and UL is 50. 30 would be preempted from app1. + */ + + // Set max preemption limit as 50%. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.5); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 55 170 0]);" + // root + "-a(=[100 100 100 30 0])"; // b + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(1,1,n1,,100,false,0,user1);" + // app1 a + "a\t" // app2 in a + + "(1,1,n1,,0,false,30,user2)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // app2 needs more resource and its well under its user-limit. Hence preempt + // resources from app1. + verify(mDisp, times(30)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + } + + @Test + public void testNoIntraQueuePreemptionWithSingleUser() + throws IOException { + /** + * Queue structure is: + * + *
+     *       root
+     *        |
+     *        a
+     * 
+ * + * Scenario: + * Guaranteed resource of a is 100 Total cluster resource = 100 + * Given single user, lower priority/late submitted apps has to + * wait. + */ + + // Set max preemption limit as 50%. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.5); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 55 170 0]);" + // root + "-a(=[100 100 100 30 0])"; // b + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(1,1,n1,,100,false,0,user1);" + // app1 a + "a\t" // app2 in a + + "(1,1,n1,,0,false,30,user1)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // app2 needs more resource. Since app1,2 are from same user, there wont be + // any preemption. + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + } + + @Test + public void testNoIntraQueuePreemptionWithTwoUserUnderUserLimit() + throws IOException { + /** + * Queue structure is: + * + *
+     *       root
+     *        |
+     *        a
+     * 
+ * + * Scenario: + * Guaranteed resource of a is 100 Total cluster resource = 100 + * Consider 2 users in a queue, assume minimum user limit factor is 50%. + * Hence in queueA of 100, each user has a quota of 50. app1 of high priority + * has a demand of 0 and its already using 50. app2 from user2 has a demand + * of 30, and UL is 50. Since app1 is under UL, there should not be any + * preemption. + */ + + // Set max preemption limit as 50%. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.5); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 55 170 0]);" + // root + "-a(=[100 100 100 30 0])"; // b + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(1,1,n1,,50,false,0,user1);" + // app1 a + "a\t" // app2 in a + + "(1,1,n1,,30,false,30,user2)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // app2 needs more resource. Since app1,2 are from same user, there wont be + // any preemption. + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + } + + @Test + public void testSimpleIntraQueuePreemptionWithTwoUsersWithAppPriority() + throws IOException { + /** + * Queue structure is: + * + *
+     *       root
+     *        |
+     *        a    
+     * 
+ * + * Scenario: + * Guaranteed resource of a is 100 Total cluster resource = 100 + * Consider 2 users in a queue, assume minimum user limit factor is 50%. + * Hence in queueA of 100, each user has a quota of 50. app1 of high priority + * has a demand of 0 and its already using 100. app2 from user2 has a demand + * of 30, and UL is 50. 30 would be preempted from app1. + */ + + // Set max preemption limit as 50%. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.5); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 55 170 0]);" + // root + "-a(=[100 100 100 30 0])"; // b + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(2,1,n1,,100,false,0,user1);" + // app1 a + "a\t" // app2 in a + + "(1,1,n1,,0,false,30,user2)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // app2 needs more resource and its well under its user-limit. Hence preempt + // resources from app1 even though its priority is more than app2. + verify(mDisp, times(30)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + } + + @Test + public void testIntraQueuePreemptionOfUserLimitWithMultipleApps() + throws IOException { + /** + * Queue structure is: + * + *
+     *       root
+     *        |
+     *        a    
+     * 
+ * + * Scenario: + * Guaranteed resource of a is 100 Total cluster resource = 100 + * Consider 2 users in a queue, assume minimum user limit factor is 50%. + * Hence in queueA of 100, each user has a quota of 50. Now have multiple + * apps and check for preemption across apps. + */ + + // Set max preemption limit as 50%. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.5); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 55 170 0]);" + // root + "-a(=[100 100 100 30 0])"; // b + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(1,1,n1,,30,false,0,user1);" + // app1 a + "a\t" // app2 in a + + "(1,1,n1,,20,false,20,user2);" + + "a\t" // app3 in a + + "(1,1,n1,,30,false,30,user1);" + + "a\t" // app4 in a + + "(1,1,n1,,0,false,10,user2)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // app2/app4 needs more resource and its well under its user-limit. Hence + // preempt resources from app3 (compare to app1, app3 has low priority). + verify(mDisp, times(10)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + } + + @Test + public void testIntraQueuePreemptionOfUserLimitWitAppsOfDifferentPriority() + throws IOException { + /** + * Queue structure is: + * + *
+     *       root
+     *        |
+     *        a    
+     * 
+ * + * Scenario: + * Guaranteed resource of a is 100 Total cluster resource = 100 + * Consider 2 users in a queue, assume minimum user limit factor is 50%. + * Hence in queueA of 100, each user has a quota of 50. Now have multiple + * apps and check for preemption across apps. + */ + + // Set max preemption limit as 50%. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.5); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 55 170 0]);" + // root + "-a(=[100 100 100 30 0])"; // b + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(3,1,n1,,30,false,0,user1);" + // app1 a + "a\t" // app2 in a + + "(1,1,n1,,20,false,20,user2);" + + "a\t" // app3 in a + + "(4,1,n1,,30,false,30,user1);" + + "a\t" // app4 in a + + "(1,1,n1,,0,false,10,user2)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // app2/app4 needs more resource and its well under its user-limit. Hence + // preempt resources from app1 (compare to app3, app1 has low priority). + verify(mDisp, times(10)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + } +} -- 2.10.1 (Apple Git-78)