diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java index 64b36151d85..5b8360a1c9e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java @@ -42,6 +42,7 @@ protected final ResourceCalculator rc; protected boolean isReservedPreemptionCandidatesSelector; private Resource stepFactor; + private boolean allowQueuesBalanceAfterAllQueuesSatisfied; static class TQComparator implements Comparator { private ResourceCalculator rc; @@ -83,15 +84,28 @@ private double getIdealPctOfGuaranteed(TempQueuePerPartition q) { * this will be set by different implementation of candidate * selectors, please refer to TempQueuePerPartition#offer for * details. + * @param allowQueuesBalanceAfterAllQueuesSatisfied + * Should resources be preempted from an over-served queue when the + * requesting queues are all at or over their guarantees? + * An example is, there're 10 queues under root, guaranteed resource + * of them are all 10%. + * Assume there're two queues are using resources, queueA uses 10% + * queueB uses 90%. For all queues are guaranteed, but it's not fair + * for queueA. + * We wanna make this behavior can be configured. By default it is + * not allowed. + * */ public AbstractPreemptableResourceCalculator( CapacitySchedulerPreemptionContext preemptionContext, - boolean isReservedPreemptionCandidatesSelector) { + boolean isReservedPreemptionCandidatesSelector, + boolean allowQueuesBalanceAfterAllQueuesSatisfied) { context = preemptionContext; rc = preemptionContext.getResourceCalculator(); this.isReservedPreemptionCandidatesSelector = isReservedPreemptionCandidatesSelector; - + this.allowQueuesBalanceAfterAllQueuesSatisfied = + allowQueuesBalanceAfterAllQueuesSatisfied; stepFactor = Resource.newInstance(0, 0); for (ResourceInformation ri : stepFactor.getResources()) { ri.setValue(1); @@ -193,7 +207,8 @@ protected void computeFixpointAllocation(Resource totGuarant, wQavail = Resources.componentwiseMin(wQavail, unassigned); Resource wQidle = sub.offer(wQavail, rc, totGuarant, - isReservedPreemptionCandidatesSelector); + isReservedPreemptionCandidatesSelector, + allowQueuesBalanceAfterAllQueuesSatisfied); Resource wQdone = Resources.subtract(wQavail, wQidle); if (Resources.greaterThan(rc, totGuarant, wQdone, Resources.none())) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java index 098acdd8510..7985296fcaf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java @@ -70,6 +70,8 @@ TempQueuePerPartition getQueueByPartition(String queueName, float getMaxAllowableLimitForIntraQueuePreemption(); + long getDefaultMaximumKillWaitTimeout(); + @Unstable IntraQueuePreemptionOrderPolicy getIntraQueuePreemptionOrderPolicy(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java index 3b2fcbb90d7..a334cebede0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java @@ -42,18 +42,24 @@ private static final Log LOG = LogFactory.getLog(FifoCandidatesSelector.class); private PreemptableResourceCalculator preemptableAmountCalculator; + private boolean allowQueuesBalanceAfterAllQueuesSatisfied; FifoCandidatesSelector(CapacitySchedulerPreemptionContext preemptionContext, - boolean includeReservedResource) { + boolean includeReservedResource, + boolean allowQueuesBalanceAfterAllQueuesSatisfied) { super(preemptionContext); + this.allowQueuesBalanceAfterAllQueuesSatisfied = + allowQueuesBalanceAfterAllQueuesSatisfied; preemptableAmountCalculator = new PreemptableResourceCalculator( - preemptionContext, includeReservedResource); + preemptionContext, includeReservedResource, + allowQueuesBalanceAfterAllQueuesSatisfied); } @Override public Map> selectCandidates( Map> selectedCandidates, + Map> curCandidates, Resource clusterResource, Resource totalPreemptionAllowed) { // Calculate how much resources we need to preempt preemptableAmountCalculator.computeIdealAllocation(clusterResource, @@ -115,6 +121,7 @@ if (!preempted) { continue; } + this.updateCurCandidates(c, curCandidates); } } } @@ -134,7 +141,7 @@ preemptFrom(fc, clusterResource, resToObtainByPartition, skippedAMContainerlist, skippedAMSize, selectedCandidates, - totalPreemptionAllowed); + curCandidates, totalPreemptionAllowed); } // Can try preempting AMContainers (still saving atmost @@ -145,15 +152,15 @@ leafQueue.getEffectiveCapacity(RMNodeLabelsManager.NO_LABEL), leafQueue.getMaxAMResourcePerQueuePercent()); - preemptAMContainers(clusterResource, selectedCandidates, skippedAMContainerlist, - resToObtainByPartition, skippedAMSize, maxAMCapacityForThisQueue, - totalPreemptionAllowed); + preemptAMContainers(clusterResource, selectedCandidates, curCandidates, + skippedAMContainerlist, resToObtainByPartition, skippedAMSize, + maxAMCapacityForThisQueue, totalPreemptionAllowed); } finally { leafQueue.getReadLock().unlock(); } } - return selectedCandidates; + return curCandidates; } /** @@ -169,6 +176,7 @@ */ private void preemptAMContainers(Resource clusterResource, Map> preemptMap, + Map> curCandidates, List skippedAMContainerlist, Map resToObtainByPartition, Resource skippedAMSize, Resource maxAMCapacityForThisQueue, Resource totalPreemptionAllowed) { @@ -190,6 +198,7 @@ private void preemptAMContainers(Resource clusterResource, totalPreemptionAllowed, false); if (preempted) { Resources.subtractFrom(skippedAMSize, c.getAllocatedResource()); + this.updateCurCandidates(c, curCandidates); } } skippedAMContainerlist.clear(); @@ -203,6 +212,7 @@ private void preemptFrom(FiCaSchedulerApp app, Resource clusterResource, Map resToObtainByPartition, List skippedAMContainerlist, Resource skippedAMSize, Map> selectedContainers, + Map> curCandidates, Resource totalPreemptionAllowed) { ApplicationAttemptId appId = app.getApplicationAttemptId(); @@ -219,9 +229,13 @@ private void preemptFrom(FiCaSchedulerApp app, } // Try to preempt this container - CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain( - rc, preemptionContext, resToObtainByPartition, c, clusterResource, - selectedContainers, totalPreemptionAllowed, false); + boolean preempted = CapacitySchedulerPreemptionUtils + .tryPreemptContainerAndDeductResToObtain(rc, preemptionContext, + resToObtainByPartition, c, clusterResource, selectedContainers, + totalPreemptionAllowed, false); + if (preempted) { + this.updateCurCandidates(c, curCandidates); + } if (!preemptionContext.isObserveOnly()) { preemptionContext.getRMContext().getDispatcher().getEventHandler() @@ -262,9 +276,17 @@ private void preemptFrom(FiCaSchedulerApp app, } // Try to preempt this container - CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain( - rc, preemptionContext, resToObtainByPartition, c, clusterResource, - selectedContainers, totalPreemptionAllowed, false); + boolean preempted = CapacitySchedulerPreemptionUtils + .tryPreemptContainerAndDeductResToObtain(rc, preemptionContext, + resToObtainByPartition, c, clusterResource, selectedContainers, + totalPreemptionAllowed, false); + if (preempted) { + this.updateCurCandidates(c, curCandidates); + } } } + + public boolean getAllowQueuesBalanceAfterAllQueuesSatisfied() { + return allowQueuesBalanceAfterAllQueuesSatisfied; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java index 8ab9507adfd..ad1f9772e01 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java @@ -121,6 +121,7 @@ public int compare(TempAppPerPartition ta1, TempAppPerPartition ta2) { @Override public Map> selectCandidates( Map> selectedCandidates, + Map> curCandidates, Resource clusterResource, Resource totalPreemptedResourceAllowed) { // 1. Calculate the abnormality within each queue one by one. @@ -182,7 +183,7 @@ public int compare(TempAppPerPartition ta1, TempAppPerPartition ta2) { leafQueue.getReadLock().lock(); for (FiCaSchedulerApp app : apps) { preemptFromLeastStarvedApp(leafQueue, app, selectedCandidates, - clusterResource, totalPreemptedResourceAllowed, + curCandidates, clusterResource, totalPreemptedResourceAllowed, resToObtainByPartition, rollingResourceUsagePerUser); } } finally { @@ -191,7 +192,7 @@ public int compare(TempAppPerPartition ta1, TempAppPerPartition ta2) { } } - return selectedCandidates; + return curCandidates; } private void initializeUsageAndUserLimitForCompute(Resource clusterResource, @@ -211,6 +212,7 @@ private void initializeUsageAndUserLimitForCompute(Resource clusterResource, private void preemptFromLeastStarvedApp(LeafQueue leafQueue, FiCaSchedulerApp app, Map> selectedCandidates, + Map> curCandidates, Resource clusterResource, Resource totalPreemptedResourceAllowed, Map resToObtainByPartition, Map rollingResourceUsagePerUser) { @@ -271,7 +273,9 @@ private void preemptFromLeastStarvedApp(LeafQueue leafQueue, .tryPreemptContainerAndDeductResToObtain(rc, preemptionContext, resToObtainByPartition, c, clusterResource, selectedCandidates, totalPreemptedResourceAllowed, true); - + if (ret) { + this.updateCurCandidates(c, curCandidates); + } // Subtract from respective user's resource usage once a container is // selected for preemption. if (ret && preemptionContext.getIntraQueuePreemptionOrderPolicy() diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java index 08d834ea004..89a015e4122 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java @@ -48,11 +48,14 @@ * @param isReservedPreemptionCandidatesSelector this will be set by * different implementation of candidate selectors, please refer to * TempQueuePerPartition#offer for details. + * @param allowQueuesBalanceAfterAllQueuesSatisfied */ public PreemptableResourceCalculator( CapacitySchedulerPreemptionContext preemptionContext, - boolean isReservedPreemptionCandidatesSelector) { - super(preemptionContext, isReservedPreemptionCandidatesSelector); + boolean isReservedPreemptionCandidatesSelector, + boolean allowQueuesBalanceAfterAllQueuesSatisfied) { + super(preemptionContext, isReservedPreemptionCandidatesSelector, + allowQueuesBalanceAfterAllQueuesSatisfied); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java index 4d8afaf0c93..63ca77bf6e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java @@ -27,6 +27,7 @@ import java.util.Collections; import java.util.Comparator; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -34,6 +35,7 @@ public abstract class PreemptionCandidatesSelector { protected CapacitySchedulerPreemptionContext preemptionContext; protected ResourceCalculator rc; + private long maximumKillWaitTime = -1; PreemptionCandidatesSelector( CapacitySchedulerPreemptionContext preemptionContext) { @@ -54,6 +56,7 @@ */ public abstract Map> selectCandidates( Map> selectedCandidates, + Map> curCandidates, Resource clusterResource, Resource totalPreemptedResourceAllowed); /** @@ -77,4 +80,24 @@ public int compare(RMContainer a, RMContainer b) { }); } + public long getMaximumKillWaitTimeMs() { + if (maximumKillWaitTime > 0) { + return maximumKillWaitTime; + } + return preemptionContext.getDefaultMaximumKillWaitTimeout(); + } + + public void setMaximumKillWaitTime(long maximumKillWaitTime) { + this.maximumKillWaitTime = maximumKillWaitTime; + } + + public void updateCurCandidates(RMContainer c, + Map> curCandidates) { + Set cSet = curCandidates.get(c.getApplicationAttemptId()); + if (cSet == null) { + cSet = new HashSet<>(); + } + cSet.add(c); + curCandidates.put(c.getApplicationAttemptId(), cSet); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index cc69fbae366..a5dca7cf22e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -131,6 +131,8 @@ private List candidatesSelectionPolicies; private Set allPartitions; private Set leafQueueNames; + Map>> pcsMap; // Preemptable Entities, synced from scheduler at every run private Map preemptableQueues; @@ -249,7 +251,21 @@ private void updateConfigIfNeeded() { // initialize candidates preemption selection policies candidatesSelectionPolicies.add(new FifoCandidatesSelector(this, - additionalPreemptionBasedOnReservedResource)); + additionalPreemptionBasedOnReservedResource, false)); + + // Do we need to do preemption to balance queue even after queues get satisfied? + boolean isPreemptionToBalanceRequired = config.getBoolean( + CapacitySchedulerConfiguration.PREEMPTION_TO_BALANCE_QUEUES_AFTER_SATISFIED, + CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_TO_BALANCE_QUEUES_AFTER_SATISFIED); + long maximumKillWaitTimeForPreemptionToQueueBalance = config.getLong( + CapacitySchedulerConfiguration.PREEMPTION_TO_BALANCE_QUEUES_AFTER_SATISFIED_MAX_WAIT_BEFORE_KILL, + CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_TO_BALANCE_QUEUES_AFTER_SATISFIED_MAX_WAIT_BEFORE_KILL); + if (isPreemptionToBalanceRequired) { + PreemptionCandidatesSelector selector = new FifoCandidatesSelector(this, + false, true); + selector.setMaximumKillWaitTime(maximumKillWaitTimeForPreemptionToQueueBalance); + candidatesSelectionPolicies.add(selector); + } // Do we need to specially consider intra queue boolean isIntraQueuePreemptionEnabled = config.getBoolean( @@ -282,7 +298,8 @@ private void updateConfigIfNeeded() { "select_based_on_reserved_containers = " + selectCandidatesForResevedContainers + "\n" + "additional_res_balance_based_on_reserved_containers = " + - additionalPreemptionBasedOnReservedResource); + additionalPreemptionBasedOnReservedResource + "\n" + + "Preemption-to-balance-queue-enabled = " + isPreemptionToBalanceRequired); csConfig = config; } @@ -308,44 +325,56 @@ public synchronized void editSchedule() { } private void preemptOrkillSelectedContainerAfterWait( - Map> selectedCandidates, - long currentTime) { + Map>> toPreemptPerSelector, long currentTime) { + int toPreemptCount = 0; + for (Map> containers : + toPreemptPerSelector.values()) { + toPreemptCount += containers.size(); + } if (LOG.isDebugEnabled()) { LOG.debug( "Starting to preempt containers for selectedCandidates and size:" - + selectedCandidates.size()); + + toPreemptCount); } // preempt (or kill) the selected containers - for (Map.Entry> e : selectedCandidates + for (Map.Entry>> pc : toPreemptPerSelector .entrySet()) { - ApplicationAttemptId appAttemptId = e.getKey(); - if (LOG.isDebugEnabled()) { - LOG.debug("Send to scheduler: in app=" + appAttemptId - + " #containers-to-be-preemptionCandidates=" + e.getValue().size()); - } - for (RMContainer container : e.getValue()) { - // if we tried to preempt this for more than maxWaitTime - if (preemptionCandidates.get(container) != null - && preemptionCandidates.get(container) - + maxWaitTime <= currentTime) { - // kill it - rmContext.getDispatcher().getEventHandler().handle( - new ContainerPreemptEvent(appAttemptId, container, - SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE)); - preemptionCandidates.remove(container); - } else { - if (preemptionCandidates.get(container) != null) { - // We already updated the information to scheduler earlier, we need - // not have to raise another event. - continue; + Map> cMap = pc.getValue(); + if (cMap.size() > 0) { + for (Map.Entry> e : cMap.entrySet()) { + ApplicationAttemptId appAttemptId = e.getKey(); + if (LOG.isDebugEnabled()) { + LOG.debug("Send to scheduler: in app=" + appAttemptId + + " #containers-to-be-preemptionCandidates=" + e.getValue().size()); + } + for (RMContainer container : e.getValue()) { + // if we tried to preempt this for more than maxWaitTime + if (preemptionCandidates.get(container) != null + && preemptionCandidates.get(container) + + pc.getKey().getMaximumKillWaitTimeMs() <= currentTime) { + // kill it + rmContext.getDispatcher().getEventHandler().handle( + new ContainerPreemptEvent(appAttemptId, container, + SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE)); + preemptionCandidates.remove(container); + } else { + if (preemptionCandidates.get(container) != null) { + // We already updated the information to scheduler earlier, we need + // not have to raise another event. + continue; + } + + //otherwise just send preemption events + rmContext.getDispatcher().getEventHandler().handle( + new ContainerPreemptEvent(appAttemptId, container, + SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION)); + preemptionCandidates.put(container, currentTime); + } } - - //otherwise just send preemption events - rmContext.getDispatcher().getEventHandler().handle( - new ContainerPreemptEvent(appAttemptId, container, - SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION)); - preemptionCandidates.put(container, currentTime); } } } @@ -438,29 +467,38 @@ private void containerBasedPreemptOrKill(CSQueue root, // queue and each application Map> toPreempt = new HashMap<>(); + Map>> toPreemptPerSelector = new HashMap<>();; for (PreemptionCandidatesSelector selector : candidatesSelectionPolicies) { long startTime = 0; + Map> curCandidates = new HashMap<>(); if (LOG.isDebugEnabled()) { LOG.debug(MessageFormat .format("Trying to use {0} to select preemption candidates", selector.getClass().getName())); startTime = clock.getTime(); } - toPreempt = selector.selectCandidates(toPreempt, + curCandidates = selector.selectCandidates(toPreempt, curCandidates, clusterResources, totalPreemptionAllowed); + toPreemptPerSelector.putIfAbsent(selector, curCandidates); if (LOG.isDebugEnabled()) { LOG.debug(MessageFormat .format("{0} uses {1} millisecond to run", selector.getClass().getName(), clock.getTime() - startTime)); int totalSelected = 0; + int curSelected = 0; for (Set set : toPreempt.values()) { totalSelected += set.size(); } + for (Set set : curCandidates.values()) { + curSelected += set.size(); + } LOG.debug(MessageFormat - .format("So far, total {0} containers selected to be preempted", - totalSelected)); + .format("So far, total {0} containers selected to be preempted, {1}" + + " containers selected this round\n", + totalSelected, curSelected)); } } @@ -483,8 +521,10 @@ private void containerBasedPreemptOrKill(CSQueue root, long currentTime = clock.getTime(); + pcsMap = toPreemptPerSelector; + // preempt (or kill) the selected containers - preemptOrkillSelectedContainerAfterWait(toPreempt, currentTime); + preemptOrkillSelectedContainerAfterWait(toPreemptPerSelector, currentTime); // cleanup staled preemption candidates cleanupStaledPreemptionCandidates(currentTime); @@ -689,6 +729,12 @@ public double getNaturalTerminationFactor() { return queueToPartitions; } + @VisibleForTesting + Map>> getToPreemptCandidatesPerSelector() { + return pcsMap; + } + @Override public int getClusterMaxApplicationPriority() { return scheduler.getMaxClusterLevelAppPriority().getPriority(); @@ -730,4 +776,9 @@ public void addPartitionToUnderServedQueues(String queueName, public IntraQueuePreemptionOrderPolicy getIntraQueuePreemptionOrderPolicy() { return intraQueuePreemptionOrderPolicy; } + + @Override + public long getDefaultMaximumKillWaitTimeout() { + return maxWaitTime; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/QueuePriorityContainerCandidateSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/QueuePriorityContainerCandidateSelector.java index 4a169af1a54..2493e641593 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/QueuePriorityContainerCandidateSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/QueuePriorityContainerCandidateSelector.java @@ -378,6 +378,7 @@ private void incToPreempt(String queue, String partition, @Override public Map> selectCandidates( Map> selectedCandidates, + Map> curCandidates, Resource clusterResource, Resource totalPreemptedResourceAllowed) { // Initialize digraph from queues @@ -388,7 +389,7 @@ private void incToPreempt(String queue, String partition, // When all queues are set to same priority, or priority is not respected, // direct return. if (priorityDigraph.isEmpty()) { - return selectedCandidates; + return curCandidates; } // Save parameters to be shared by other methods @@ -478,6 +479,8 @@ private void incToPreempt(String queue, String partition, .getReservedResource()); } + this.updateCurCandidates(c, curCandidates); + Set containers = selectedCandidates.get( c.getApplicationAttemptId()); if (null == containers) { @@ -504,7 +507,6 @@ private void incToPreempt(String queue, String partition, } } } - - return selectedCandidates; + return curCandidates; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ReservedContainerCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ReservedContainerCandidatesSelector.java index ff100d9a6ec..8bf55062bcd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ReservedContainerCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ReservedContainerCandidatesSelector.java @@ -63,12 +63,13 @@ public NodeForPreemption(float preemptionCost, CapacitySchedulerPreemptionContext preemptionContext) { super(preemptionContext); preemptableAmountCalculator = new PreemptableResourceCalculator( - preemptionContext, true); + preemptionContext, true, false); } @Override public Map> selectCandidates( Map> selectedCandidates, + Map> curCandidates, Resource clusterResource, Resource totalPreemptedResourceAllowed) { // Calculate how much resources we need to preempt @@ -109,6 +110,7 @@ public NodeForPreemption(float preemptionCost, } containers.add(c); + this.updateCurCandidates(c, curCandidates); if (LOG.isDebugEnabled()) { LOG.debug(this.getClass().getName() + " Marked container=" + c .getContainerId() + " from queue=" + c.getQueueName() @@ -118,7 +120,7 @@ public NodeForPreemption(float preemptionCost, } } - return selectedCandidates; + return curCandidates; } private Resource getPreemptableResource(String queueName, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java index 4214acc5524..4fb1862b887 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java @@ -138,7 +138,8 @@ public void addChild(TempQueuePerPartition q) { // This function "accepts" all the resources it can (pending) and return // the unused ones Resource offer(Resource avail, ResourceCalculator rc, - Resource clusterResource, boolean considersReservedResource) { + Resource clusterResource, boolean considersReservedResource, + boolean allowQueueBalanceAfterAllSafisfied) { Resource absMaxCapIdealAssignedDelta = Resources.componentwiseMax( Resources.subtract(getMax(), idealAssigned), Resource.newInstance(0, 0)); @@ -179,7 +180,10 @@ Resource offer(Resource avail, ResourceCalculator rc, // leaf queues. Such under-utilized leaf queue could preemption resources // from over-utilized leaf queue located at other hierarchies. - accepted = filterByMaxDeductAssigned(rc, clusterResource, accepted); + // Allow queues can continue grow and balance even if all queues are satisfied. + if (!allowQueueBalanceAfterAllSafisfied) { + accepted = filterByMaxDeductAssigned(rc, clusterResource, accepted); + } // accepted so far contains the "quota acceptable" amount, we now filter by // locality acceptable diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 76eaac05718..6128686e924 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -1460,6 +1460,23 @@ public boolean getLazyPreemptionEnabled() { public static final String DEFAULT_INTRAQUEUE_PREEMPTION_ORDER_POLICY = "userlimit_first"; /** + * Should we allow queues continue grow after all queue reaches their + * guaranteed capacity. + */ + public static final String PREEMPTION_TO_BALANCE_QUEUES_AFTER_SATISFIED = + PREEMPTION_CONFIG_PREFIX + "preemption-to-balance-queue-after-satisfied.enabled"; + public static final boolean DEFAULT_PREEMPTION_TO_BALANCE_QUEUES_AFTER_SATISFIED = false; + + /** + * How long we will wait to balance queues, by default it is 5 mins. + */ + public static final String PREEMPTION_TO_BALANCE_QUEUES_AFTER_SATISFIED_MAX_WAIT_BEFORE_KILL = + PREEMPTION_CONFIG_PREFIX + "preemption-to-balance-queue-after-satisfied.max-wait-before-kill"; + public static final long + DEFAULT_PREEMPTION_TO_BALANCE_QUEUES_AFTER_SATISFIED_MAX_WAIT_BEFORE_KILL = + 300 * 1000; + + /** * Maximum application for a queue to be used when application per queue is * not defined.To be consistent with previous version the default value is set * as UNDEFINED. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java index 6a953cfe068..8891fe94760 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java @@ -20,6 +20,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes; import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceUtils; @@ -538,4 +539,61 @@ public void test3ResourceTypesInterQueuePreemption() throws IOException { new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( getAppAttemptId(1)))); } + + @Test + public void testPriorityPreemptionForBalanceBetweenSatisfiedQueues() + throws IOException { + /** + * All queues are beyond guarantee, c has higher priority than b. + * c ask for more resource, and there is no idle left, c should preempt + * some resource from b but won’t let b under its guarantee. + * + * Queue structure is: + * + *
+     *        root
+     *       / |  \
+     *      a  b   c
+     * 
+ * + * For priorities + * - a=1 + * - b=1 + * - c=2 + * + */ + String labelsConfig = "=100,true"; // default partition + String nodesConfig = "n1="; // only one node + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100 100 100 100]);" + //root + "-a(=[30 100 0 0]){priority=1};" + // a + "-b(=[30 100 40 50]){priority=1};" + // b + "-c(=[40 100 60 25]){priority=2}"; // c + String appsConfig = + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "b\t(1,1,n1,,40,false);" + // app1 in b + "c\t(1,1,n1,,60,false)"; // app2 in c + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + CapacitySchedulerConfiguration newConf = + new CapacitySchedulerConfiguration(conf); + boolean isPreemptionToBalanceRequired = true; + newConf.setBoolean( + CapacitySchedulerConfiguration.PREEMPTION_TO_BALANCE_QUEUES_AFTER_SATISFIED, + isPreemptionToBalanceRequired); + when(cs.getConfiguration()).thenReturn(newConf); + policy.editSchedule(); + + // IdealAssigned b: 30 c: 70. initIdealAssigned: b: 30 c: 40, even though + // b and c has same relativeAssigned=1.0f(idealAssigned / guaranteed), + // since c has higher priority, c will be put in mostUnderServedQueue and + // get all remain 30 capacity. + verify(mDisp, times(10)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, never()).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyPreemptToBalance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyPreemptToBalance.java new file mode 100644 index 00000000000..26d4198039e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyPreemptToBalance.java @@ -0,0 +1,236 @@ +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.junit.Test; +import java.io.IOException; +import java.util.Map; +import java.util.Set; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class TestProportionalCapacityPreemptionPolicyPreemptToBalance + extends ProportionalCapacityPreemptionPolicyMockFramework { + + @Test + public void testPreemptionToBalanceDisabled() throws IOException { + String labelsConfig = "=100,true"; // default partition + String nodesConfig = "n1="; // only one node + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100 100 100 100]);" + //root + "-a(=[30 100 10 30]);" + // a + "-b(=[30 100 40 30]);" + // b + "-c(=[30 100 50 30]);" + // c + "-d(=[30 100 0 0])"; // d + String appsConfig = + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t(1,1,n1,,10,false);" + // app1 in a + "b\t(1,1,n1,,40,false);" + // app2 in b + "c\t(1,1,n1,,50,false)"; // app3 in c + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // I_A: A:30 B:35 C:35, preempt 5 from B and 15 from C to A + verify(mDisp, times(5)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + verify(mDisp, times(15)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + + assertEquals(30, policy.getQueuePartitions().get("a") + .get("").getIdealAssigned().getMemorySize()); + assertEquals(35, policy.getQueuePartitions().get("b") + .get("").getIdealAssigned().getMemorySize()); + assertEquals(35, policy.getQueuePartitions().get("c") + .get("").getIdealAssigned().getMemorySize()); + } + + @Test + public void testPreemptionToBalanceEnabled() throws IOException { + String labelsConfig = "=100,true"; // default partition + String nodesConfig = "n1="; // only one node + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100 100 100 100]);" + //root + "-a(=[30 100 10 30]);" + // a + "-b(=[30 100 40 30]);" + // b + "-c(=[30 100 50 30]);" + // c + "-d(=[30 100 0 0])"; // d + String appsConfig = + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t(1,1,n1,,10,false);" + // app1 in a + "b\t(1,1,n1,,40,false);" + // app2 in b + "c\t(1,1,n1,,50,false)"; // app3 in c + + // enable preempt to balance and ideal assignment will change. + boolean isPreemptionToBalanceEnabled = true; + conf.setBoolean( + CapacitySchedulerConfiguration.PREEMPTION_TO_BALANCE_QUEUES_AFTER_SATISFIED, + isPreemptionToBalanceEnabled); + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // I_A: A:33 B:33 C:33, preempt 7 from B and 17 from C to A + verify(mDisp, times(7)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + verify(mDisp, times(17)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + + assertEquals(33, policy.getQueuePartitions().get("a") + .get("").getIdealAssigned().getMemorySize()); + assertEquals(33, policy.getQueuePartitions().get("b") + .get("").getIdealAssigned().getMemorySize()); + assertEquals(33, policy.getQueuePartitions().get("c") + .get("").getIdealAssigned().getMemorySize()); + } + + + @Test + public void testPreemptionToBalanceUsedPlusPendingLessThanGuaranteed() + throws IOException{ + String labelsConfig = "=100,true"; // default partition + String nodesConfig = "n1="; // only one node + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100 100 100 100]);" + //root + "-a(=[25 100 10 5]);" + // a + "-b(=[25 100 40 30]);" + // b + "-c(=[25 100 50 30]);" + // c + "-d(=[25 100 0 0])"; // d + String appsConfig = + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t(1,1,n1,,10,false);" + // app1 in a + "b\t(1,1,n1,,40,false);" + // app2 in b + "c\t(1,1,n1,,50,false)"; // app3 in c + + boolean isPreemptionToBalanceEnabled = true; + conf.setBoolean( + CapacitySchedulerConfiguration.PREEMPTION_TO_BALANCE_QUEUES_AFTER_SATISFIED, + isPreemptionToBalanceEnabled); + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // I_A: A:15 B:42 C:43, preempt 7 from B and 17 from C to A + verify(mDisp, times(7)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + + assertEquals(15, policy.getQueuePartitions().get("a") + .get("").getIdealAssigned().getMemorySize()); + assertEquals(42, policy.getQueuePartitions().get("b") + .get("").getIdealAssigned().getMemorySize()); + assertEquals(43, policy.getQueuePartitions().get("c") + .get("").getIdealAssigned().getMemorySize()); + } + + @Test + public void testPreemptionToBalanceWithVcoreResource() throws IOException { + Logger.getRootLogger().setLevel(Level.DEBUG); + String labelsConfig = "=100:100,true"; // default partition + String nodesConfig = "n1="; // only one node + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100:100 100:100 100:100 120:140]);" + //root + "-a(=[60:60 100:100 40:40 70:40]);" + // a + "-b(=[40:40 100:100 60:60 50:100])"; // b + + String appsConfig = + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t(1,1:1,n1,,40,false);" + // app1 in a + "b\t(1,1:1,n1,,60,false)"; // app2 in b + + boolean isPreemptionToBalanceEnabled = true; + conf.setBoolean( + CapacitySchedulerConfiguration.PREEMPTION_TO_BALANCE_QUEUES_AFTER_SATISFIED, + isPreemptionToBalanceEnabled); + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig, true); + policy.editSchedule(); + + // 21 containers will be preempted here + verify(mDisp, times(21)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy. + IsPreemptionRequestFor(getAppAttemptId(2)))); + + assertEquals(60, policy.getQueuePartitions().get("a") + .get("").getIdealAssigned().getMemorySize()); + assertEquals(60, policy.getQueuePartitions().get("a") + .get("").getIdealAssigned().getVirtualCores()); + assertEquals(40, policy.getQueuePartitions().get("b") + .get("").getIdealAssigned().getMemorySize()); + assertEquals(40, policy.getQueuePartitions().get("b") + .get("").getIdealAssigned().getVirtualCores()); + } + + @Test + public void testPreemptionToBalanceWithConfiguredTimeout() throws IOException { + Logger.getRootLogger().setLevel(Level.DEBUG); + String labelsConfig = "=100:100,true"; // default partition + String nodesConfig = "n1="; // only one node + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100:100 100:100 100:100 120:140]);" + //root + "-a(=[60:60 100:100 40:40 70:40]);" + // a + "-b(=[40:40 100:100 60:60 50:100])"; // b + + String appsConfig = + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t(1,1:1,n1,,40,false);" + // app1 in a + "b\t(1,1:1,n1,,60,false)"; // app2 in b + + boolean isPreemptionToBalanceEnabled = true; + conf.setBoolean( + CapacitySchedulerConfiguration.PREEMPTION_TO_BALANCE_QUEUES_AFTER_SATISFIED, + isPreemptionToBalanceEnabled); + final long FB_MAX_BEFORE_KILL = 60 *1000; + conf.setLong( + CapacitySchedulerConfiguration.PREEMPTION_TO_BALANCE_QUEUES_AFTER_SATISFIED_MAX_WAIT_BEFORE_KILL, + FB_MAX_BEFORE_KILL); + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig, true); + policy.editSchedule(); + + Map>> pcps= policy.getToPreemptCandidatesPerSelector(); + + String FIFO_CANDIDATE_SELECTOR = "FifoCandidatesSelector"; + boolean hasFifoSelector = false; + for (Map.Entry>> pc : pcps.entrySet()) { + if (pc.getKey().getClass().getSimpleName().equals(FIFO_CANDIDATE_SELECTOR)) { + FifoCandidatesSelector pcs = (FifoCandidatesSelector) pc.getKey(); + if (pcs.getAllowQueuesBalanceAfterAllQueuesSatisfied() == true) { + hasFifoSelector = true; + assertEquals(pcs.getMaximumKillWaitTimeMs(), FB_MAX_BEFORE_KILL); + } + } + } + + assertEquals(hasFifoSelector, true); + + // 21 containers will be preempted here + verify(mDisp, times(21)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy. + IsPreemptionRequestFor(getAppAttemptId(2)))); + + assertEquals(60, policy.getQueuePartitions().get("a") + .get("").getIdealAssigned().getMemorySize()); + assertEquals(60, policy.getQueuePartitions().get("a") + .get("").getIdealAssigned().getVirtualCores()); + assertEquals(40, policy.getQueuePartitions().get("b") + .get("").getIdealAssigned().getMemorySize()); + assertEquals(40, policy.getQueuePartitions().get("b") + .get("").getIdealAssigned().getVirtualCores()); + } +}