diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java index 64b36151d85..ec31c6a634b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java @@ -42,6 +42,7 @@ protected final ResourceCalculator rc; protected boolean isReservedPreemptionCandidatesSelector; private Resource stepFactor; + private boolean allowQueuesBalanceAfterAllQueuesSatisfied; static class TQComparator implements Comparator { private ResourceCalculator rc; @@ -83,15 +84,27 @@ private double getIdealPctOfGuaranteed(TempQueuePerPartition q) { * this will be set by different implementation of candidate * selectors, please refer to TempQueuePerPartition#offer for * details. + * @param allowQueuesBalanceAfterAllQueuesSatisfied + * Should we allow queues continue grow after queues satisfied? + * An example is, there're 10 queues under root, guaranteed resource + * of them are all 10%. + * Assume there're two queues are using resources, queueA uses 10% + * queueB uses 90%. For all queues are guaranteed, but it's not fair + * for queueA. + * We wanna make this behavior can be configured. By default it is + * not allowed. + * */ public AbstractPreemptableResourceCalculator( CapacitySchedulerPreemptionContext preemptionContext, - boolean isReservedPreemptionCandidatesSelector) { + boolean isReservedPreemptionCandidatesSelector, + boolean allowQueuesBalanceAfterAllQueuesSatisfied) { context = preemptionContext; rc = preemptionContext.getResourceCalculator(); this.isReservedPreemptionCandidatesSelector = isReservedPreemptionCandidatesSelector; - + this.allowQueuesBalanceAfterAllQueuesSatisfied = + allowQueuesBalanceAfterAllQueuesSatisfied; stepFactor = Resource.newInstance(0, 0); for (ResourceInformation ri : stepFactor.getResources()) { ri.setValue(1); @@ -193,7 +206,8 @@ protected void computeFixpointAllocation(Resource totGuarant, wQavail = Resources.componentwiseMin(wQavail, unassigned); Resource wQidle = sub.offer(wQavail, rc, totGuarant, - isReservedPreemptionCandidatesSelector); + isReservedPreemptionCandidatesSelector, + allowQueuesBalanceAfterAllQueuesSatisfied); Resource wQdone = Resources.subtract(wQavail, wQidle); if (Resources.greaterThan(rc, totGuarant, wQdone, Resources.none())) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java index 3b2fcbb90d7..308c3a157ac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java @@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; @@ -42,19 +43,43 @@ private static final Log LOG = LogFactory.getLog(FifoCandidatesSelector.class); private PreemptableResourceCalculator preemptableAmountCalculator; + private boolean allowQueuesBalanceAfterAllQueuesSatisfied; FifoCandidatesSelector(CapacitySchedulerPreemptionContext preemptionContext, - boolean includeReservedResource) { + boolean includeReservedResource, + boolean allowQueuesBalanceAfterAllQueuesSatisfied) { super(preemptionContext); + this.allowQueuesBalanceAfterAllQueuesSatisfied = + allowQueuesBalanceAfterAllQueuesSatisfied; preemptableAmountCalculator = new PreemptableResourceCalculator( - preemptionContext, includeReservedResource); + preemptionContext, includeReservedResource, + allowQueuesBalanceAfterAllQueuesSatisfied); } @Override public Map> selectCandidates( Map> selectedCandidates, Resource clusterResource, Resource totalPreemptionAllowed) { + + // When we want to do additional queue balance, just to make sure that + // there's no containers being chosen by previous selectors. + // Otherwise we will skip + if (allowQueuesBalanceAfterAllQueuesSatisfied) { + if (selectedCandidates != null && !selectedCandidates.isEmpty()) { + for (Set containers : selectedCandidates.values()) { + if (!containers.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Fairness inter-queue balance is on, there're some " + + "containers are chosen to preempt by previous selectors, skip" + + " selecting containers for this selector"); + } + } + return selectedCandidates; + } + } + } + // Calculate how much resources we need to preempt preemptableAmountCalculator.computeIdealAllocation(clusterResource, totalPreemptionAllowed); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java index 08d834ea004..89a015e4122 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java @@ -48,11 +48,14 @@ * @param isReservedPreemptionCandidatesSelector this will be set by * different implementation of candidate selectors, please refer to * TempQueuePerPartition#offer for details. + * @param allowQueuesBalanceAfterAllQueuesSatisfied */ public PreemptableResourceCalculator( CapacitySchedulerPreemptionContext preemptionContext, - boolean isReservedPreemptionCandidatesSelector) { - super(preemptionContext, isReservedPreemptionCandidatesSelector); + boolean isReservedPreemptionCandidatesSelector, + boolean allowQueuesBalanceAfterAllQueuesSatisfied) { + super(preemptionContext, isReservedPreemptionCandidatesSelector, + allowQueuesBalanceAfterAllQueuesSatisfied); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 3f9fd171689..9183f4cebfe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -249,7 +249,7 @@ private void updateConfigIfNeeded() { // initialize candidates preemption selection policies candidatesSelectionPolicies.add(new FifoCandidatesSelector(this, - additionalPreemptionBasedOnReservedResource)); + additionalPreemptionBasedOnReservedResource, false)); // Do we need to specially consider intra queue boolean isIntraQueuePreemptionEnabled = config.getBoolean( @@ -259,6 +259,20 @@ private void updateConfigIfNeeded() { candidatesSelectionPolicies.add(new IntraQueueCandidatesSelector(this)); } + // Do we need to do fairness balance of queues even after queues get satisified? + boolean isFairnessQueueBalanceRequired = config.getBoolean( + CapacitySchedulerConfiguration.FAIRNESS_BALANCE_QUEUES_AFTER_SATISFIED, + CapacitySchedulerConfiguration.DEFAULT_FAIRNESS_BALANCE_QUEUES_AFTER_SATISFIED); +// long maximumKillWaitTimeForFairnessQueueBalance = config.getLong( +// CapacitySchedulerConfiguration.FAIRNESS_BALANCE_QUEUES_AFTER_SATISFIED_MAX_WAIT_BEFORE_KILL, +// CapacitySchedulerConfiguration.DEFAULT_FAIRNESS_BALANCE_QUEUES_AFTER_SATISFIED_MAX_WAIT_BEFORE_KILL); + if (isFairnessQueueBalanceRequired) { + PreemptionCandidatesSelector selector = new FifoCandidatesSelector(this, + false, true); + candidatesSelectionPolicies.add(selector); +// selector.setMaximumKillWaitTime(maximumKillWaitTimeForFairnessQueueBalance); + } + LOG.info("Capacity Scheduler configuration changed, updated preemption " + "properties to:\n" + "max_ignored_over_capacity = " + maxIgnoredOverCapacity + "\n" + @@ -282,7 +296,8 @@ private void updateConfigIfNeeded() { "select_based_on_reserved_containers = " + selectCandidatesForResevedContainers + "\n" + "additional_res_balance_based_on_reserved_containers = " + - additionalPreemptionBasedOnReservedResource); + additionalPreemptionBasedOnReservedResource + "\n" + + "fairness-queue-balance-enabled = " + isFairnessQueueBalanceRequired); csConfig = config; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ReservedContainerCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ReservedContainerCandidatesSelector.java index ff100d9a6ec..da6698609be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ReservedContainerCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ReservedContainerCandidatesSelector.java @@ -63,7 +63,7 @@ public NodeForPreemption(float preemptionCost, CapacitySchedulerPreemptionContext preemptionContext) { super(preemptionContext); preemptableAmountCalculator = new PreemptableResourceCalculator( - preemptionContext, true); + preemptionContext, true, false); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java index 4214acc5524..4fb1862b887 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java @@ -138,7 +138,8 @@ public void addChild(TempQueuePerPartition q) { // This function "accepts" all the resources it can (pending) and return // the unused ones Resource offer(Resource avail, ResourceCalculator rc, - Resource clusterResource, boolean considersReservedResource) { + Resource clusterResource, boolean considersReservedResource, + boolean allowQueueBalanceAfterAllSafisfied) { Resource absMaxCapIdealAssignedDelta = Resources.componentwiseMax( Resources.subtract(getMax(), idealAssigned), Resource.newInstance(0, 0)); @@ -179,7 +180,10 @@ Resource offer(Resource avail, ResourceCalculator rc, // leaf queues. Such under-utilized leaf queue could preemption resources // from over-utilized leaf queue located at other hierarchies. - accepted = filterByMaxDeductAssigned(rc, clusterResource, accepted); + // Allow queues can continue grow and balance even if all queues are satisfied. + if (!allowQueueBalanceAfterAllSafisfied) { + accepted = filterByMaxDeductAssigned(rc, clusterResource, accepted); + } // accepted so far contains the "quota acceptable" amount, we now filter by // locality acceptable diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 76eaac05718..dd97d4c1360 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -1460,6 +1460,23 @@ public boolean getLazyPreemptionEnabled() { public static final String DEFAULT_INTRAQUEUE_PREEMPTION_ORDER_POLICY = "userlimit_first"; /** + * Should we allow queues continue grow after all queue reaches their + * guaranteed capacity. + */ + public static final String FAIRNESS_BALANCE_QUEUES_AFTER_SATISFIED = + PREEMPTION_CONFIG_PREFIX + "fairness-balance-queue-after-satisfied.enabled"; + public static final boolean DEFAULT_FAIRNESS_BALANCE_QUEUES_AFTER_SATISFIED = false; + + /** + * How long we will wait to balance queues, by default it is 5 mins. + */ + public static final String FAIRNESS_BALANCE_QUEUES_AFTER_SATISFIED_MAX_WAIT_BEFORE_KILL = + PREEMPTION_CONFIG_PREFIX + "fairness-balance-queue-after-satisfied.max-wait-before-kill"; + public static final long + DEFAULT_FAIRNESS_BALANCE_QUEUES_AFTER_SATISFIED_MAX_WAIT_BEFORE_KILL = + 300 * 1000; + + /** * Maximum application for a queue to be used when application per queue is * not defined.To be consistent with previous version the default value is set * as UNDEFINED. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 22a241f6fec..e75b560c4cf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -57,6 +57,8 @@ import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -1038,6 +1040,152 @@ public void testPreemptionNotHappenForSingleReservedQueue() { verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); } + @Test + public void testFairnessBalanceBasic() { + /* + * Test case to make sure, when all active queues are beyond their + * guaranteed, preemption will happen to balance for fairness + */ + + int[][] qData = new int[][]{ + // / A B C + { 100, 30, 30, 40 }, // abs + { 100, 100, 100, 100 }, // maxCap + { 100, 30, 60, 10 }, // used + { 10, 10, 100, 0 }, // pending + { 0, 0, 0, 0 }, // reserved + { 3, 1, 1, 1 }, // apps + { -1, 1, 1, 1 }, // req granularity + { 3, 0, 0, 0 }, // subqueues + }; + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + CapacitySchedulerConfiguration newConf = + new CapacitySchedulerConfiguration(conf); + boolean isFairnessBalanceEnabled = true; + newConf.setBoolean( + CapacitySchedulerConfiguration.FAIRNESS_BALANCE_QUEUES_AFTER_SATISFIED, + isFairnessBalanceEnabled); + when(mCS.getConfiguration()).thenReturn(newConf); + policy.editSchedule(); + + // I_A: A:40 B:50 C:10, preempt 10 from B to A + verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appB))); + + assertEquals(40, policy.getQueuePartitions().get("queueA") + .get("").getIdealAssigned().getMemorySize()); + assertEquals(50, policy.getQueuePartitions().get("queueB") + .get("").getIdealAssigned().getMemorySize()); + assertEquals(10, policy.getQueuePartitions().get("queueC") + .get("").getIdealAssigned().getMemorySize()); + } + + @Test + public void testFairnessBalanceHierarchical() throws Exception { + /* + * Test case to make sure, when all active queues are beyond their + * guaranteed, preemption will happen to balance for fairness + */ + Logger.getRootLogger().setLevel(Level.DEBUG); + int[][] qData = new int[][] { + // / A F I + // B C G H J K + // D E + { 400, 200, 60, 140, 100, 40, 100, 70, 30, 100, 10, 90 }, // abs + { 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400 }, // maxCap + { 400, 201, 60, 141, 100, 41, 100, 50, 50, 99, 10, 89 }, // used + { 10, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10 }, // pending + { 50, 30, 20, 10, 5, 5, 0, 0, 0, 10, 10, 0 }, // reserved + // appA appB appC appD appE appF appG + { 7, 3, 1, 2, 1, 1, 2, 1, 1, 2, 1, 1 }, // apps + { -1, -1, 1, -1, 1, 1, -1, 1, 1, -1, 1, 1 }, // req granularity + { 3, 2, 0, 2, 0, 0, 2, 0, 0, 2, 0, 0 }, // subqueues + }; + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + CapacitySchedulerConfiguration newConf = + new CapacitySchedulerConfiguration(conf); + boolean isFairnessBalanceEnabled = true; + newConf.setBoolean( + CapacitySchedulerConfiguration.FAIRNESS_BALANCE_QUEUES_AFTER_SATISFIED, + isFairnessBalanceEnabled); + when(mCS.getConfiguration()).thenReturn(newConf); + policy.editSchedule(); + + // I_A: A B C D E F G H I J K + // 201 60 141 100 41 100 50 50 99 10 89 + // no preemption happened + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB))); + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appC))); + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appD))); + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appE))); + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appF))); + + assertEquals(201, policy.getQueuePartitions().get("queueA") + .get("").getIdealAssigned().getMemorySize()); + assertEquals(141, policy.getQueuePartitions().get("queueC") + .get("").getIdealAssigned().getMemorySize()); + assertEquals(100, policy.getQueuePartitions().get("queueD") + .get("").getIdealAssigned().getMemorySize()); + assertEquals(41, policy.getQueuePartitions().get("queueE") + .get("").getIdealAssigned().getMemorySize()); + assertEquals(99, policy.getQueuePartitions().get("queueI") + .get("").getIdealAssigned().getMemorySize()); + assertEquals(10, policy.getQueuePartitions().get("queueJ") + .get("").getIdealAssigned().getMemorySize()); + assertEquals(89, policy.getQueuePartitions().get("queueK") + .get("").getIdealAssigned().getMemorySize()); + } + + + @Test + public void testFairnessBalanceWithVcoreResource() throws Exception { + /* + * Test case to make sure, when all active queues are beyond their + * guaranteed, preemption will happen to balance for fairness + */ + Logger.getRootLogger().setLevel(Level.DEBUG); + int[][] qData = new int[][]{ + // / A B + {100, 100, 100}, // maxcap + {2, 1, 1}, // apps + {2, 0, 0}, // subqueues + }; + + // Resources can be set like memory:vcores + String[][] resData = new String[][]{ + // / A B + {"100:100", "30:60", "40:40"}, // abs + {"100:100", "40:40", "60:60"}, // used + {"70:20", "70:40", "50:100"}, // pending + {"0", "0", "0"}, // reserved + {"-1", "1:1", "1:1"}, // req granularity + }; + + // Passing last param as TRUE to use DominantResourceCalculator + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData, resData, + true); + CapacitySchedulerConfiguration newConf = + new CapacitySchedulerConfiguration(conf); + boolean isFairnessBalanceEnabled = true; + newConf.setBoolean( + CapacitySchedulerConfiguration.FAIRNESS_BALANCE_QUEUES_AFTER_SATISFIED, + isFairnessBalanceEnabled); + when(mCS.getConfiguration()).thenReturn(newConf); + policy.editSchedule(); + + // 20 containers will be preempted here + verify(mDisp, times(20)).handle(argThat(new IsPreemptionRequestFor(appB))); + + assertEquals(60, policy.getQueuePartitions().get("queueA") + .get("").getIdealAssigned().getMemorySize()); + assertEquals(40, policy.getQueuePartitions().get("queueA") + .get("").getIdealAssigned().getVirtualCores()); + assertEquals(40, policy.getQueuePartitions().get("queueB") + .get("").getIdealAssigned().getMemorySize()); + assertEquals(40, policy.getQueuePartitions().get("queueB") + .get("").getIdealAssigned().getVirtualCores()); + } + @Test public void testRefreshPreemptionProperties() throws Exception {