diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java index 64b36151d85..ec31c6a634b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java @@ -42,6 +42,7 @@ protected final ResourceCalculator rc; protected boolean isReservedPreemptionCandidatesSelector; private Resource stepFactor; + private boolean allowQueuesBalanceAfterAllQueuesSatisfied; static class TQComparator implements Comparator { private ResourceCalculator rc; @@ -83,15 +84,27 @@ private double getIdealPctOfGuaranteed(TempQueuePerPartition q) { * this will be set by different implementation of candidate * selectors, please refer to TempQueuePerPartition#offer for * details. + * @param allowQueuesBalanceAfterAllQueuesSatisfied + * Should we allow queues continue grow after queues satisfied? + * An example is, there're 10 queues under root, guaranteed resource + * of them are all 10%. + * Assume there're two queues are using resources, queueA uses 10% + * queueB uses 90%. For all queues are guaranteed, but it's not fair + * for queueA. + * We wanna make this behavior can be configured. By default it is + * not allowed. + * */ public AbstractPreemptableResourceCalculator( CapacitySchedulerPreemptionContext preemptionContext, - boolean isReservedPreemptionCandidatesSelector) { + boolean isReservedPreemptionCandidatesSelector, + boolean allowQueuesBalanceAfterAllQueuesSatisfied) { context = preemptionContext; rc = preemptionContext.getResourceCalculator(); this.isReservedPreemptionCandidatesSelector = isReservedPreemptionCandidatesSelector; - + this.allowQueuesBalanceAfterAllQueuesSatisfied = + allowQueuesBalanceAfterAllQueuesSatisfied; stepFactor = Resource.newInstance(0, 0); for (ResourceInformation ri : stepFactor.getResources()) { ri.setValue(1); @@ -193,7 +206,8 @@ protected void computeFixpointAllocation(Resource totGuarant, wQavail = Resources.componentwiseMin(wQavail, unassigned); Resource wQidle = sub.offer(wQavail, rc, totGuarant, - isReservedPreemptionCandidatesSelector); + isReservedPreemptionCandidatesSelector, + allowQueuesBalanceAfterAllQueuesSatisfied); Resource wQdone = Resources.subtract(wQavail, wQidle); if (Resources.greaterThan(rc, totGuarant, wQdone, Resources.none())) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java index 098acdd8510..7985296fcaf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java @@ -70,6 +70,8 @@ TempQueuePerPartition getQueueByPartition(String queueName, float getMaxAllowableLimitForIntraQueuePreemption(); + long getDefaultMaximumKillWaitTimeout(); + @Unstable IntraQueuePreemptionOrderPolicy getIntraQueuePreemptionOrderPolicy(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java index 3b2fcbb90d7..789a94a5392 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java @@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; @@ -42,19 +43,43 @@ private static final Log LOG = LogFactory.getLog(FifoCandidatesSelector.class); private PreemptableResourceCalculator preemptableAmountCalculator; + private boolean allowQueuesBalanceAfterAllQueuesSatisfied; FifoCandidatesSelector(CapacitySchedulerPreemptionContext preemptionContext, - boolean includeReservedResource) { + boolean includeReservedResource, + boolean allowQueuesBalanceAfterAllQueuesSatisfied) { super(preemptionContext); + this.allowQueuesBalanceAfterAllQueuesSatisfied = + allowQueuesBalanceAfterAllQueuesSatisfied; preemptableAmountCalculator = new PreemptableResourceCalculator( - preemptionContext, includeReservedResource); + preemptionContext, includeReservedResource, + allowQueuesBalanceAfterAllQueuesSatisfied); } @Override public Map> selectCandidates( Map> selectedCandidates, Resource clusterResource, Resource totalPreemptionAllowed) { + + // When we want to do additional queue balance, just to make sure that + // there's no containers being chosen by previous selectors. + // Otherwise we will skip + if (allowQueuesBalanceAfterAllQueuesSatisfied) { + if (selectedCandidates != null && !selectedCandidates.isEmpty()) { + for (Set containers : selectedCandidates.values()) { + if (!containers.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Fairness inter-queue balance is on, there're some " + + "containers are chosen to preempt by previous selectors, skip" + + " selecting containers for this selector"); + } + } + return selectedCandidates; + } + } + } + // Calculate how much resources we need to preempt preemptableAmountCalculator.computeIdealAllocation(clusterResource, totalPreemptionAllowed); @@ -267,4 +292,8 @@ private void preemptFrom(FiCaSchedulerApp app, selectedContainers, totalPreemptionAllowed, false); } } + + public boolean getAllowQueuesBalanceAfterAllQueuesSatisfied() { + return allowQueuesBalanceAfterAllQueuesSatisfied; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java index 08d834ea004..89a015e4122 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java @@ -48,11 +48,14 @@ * @param isReservedPreemptionCandidatesSelector this will be set by * different implementation of candidate selectors, please refer to * TempQueuePerPartition#offer for details. + * @param allowQueuesBalanceAfterAllQueuesSatisfied */ public PreemptableResourceCalculator( CapacitySchedulerPreemptionContext preemptionContext, - boolean isReservedPreemptionCandidatesSelector) { - super(preemptionContext, isReservedPreemptionCandidatesSelector); + boolean isReservedPreemptionCandidatesSelector, + boolean allowQueuesBalanceAfterAllQueuesSatisfied) { + super(preemptionContext, isReservedPreemptionCandidatesSelector, + allowQueuesBalanceAfterAllQueuesSatisfied); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java index 4d8afaf0c93..3c97364ec02 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java @@ -34,6 +34,7 @@ public abstract class PreemptionCandidatesSelector { protected CapacitySchedulerPreemptionContext preemptionContext; protected ResourceCalculator rc; + private long maximumKillWaitTime = -1; PreemptionCandidatesSelector( CapacitySchedulerPreemptionContext preemptionContext) { @@ -77,4 +78,14 @@ public int compare(RMContainer a, RMContainer b) { }); } + public long getMaximumKillWaitTimeMs() { + if (maximumKillWaitTime > 0) { + return maximumKillWaitTime; + } + return preemptionContext.getDefaultMaximumKillWaitTimeout(); + } + + public void setMaximumKillWaitTime(long maximumKillWaitTime) { + this.maximumKillWaitTime = maximumKillWaitTime; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 3f9fd171689..512c7d07d55 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -131,6 +131,8 @@ private List candidatesSelectionPolicies; private Set allPartitions; private Set leafQueueNames; + private final Map>> toPreemptPerSelector = new HashMap<>(); // Preemptable Entities, synced from scheduler at every run private Map preemptableQueues; @@ -249,7 +251,7 @@ private void updateConfigIfNeeded() { // initialize candidates preemption selection policies candidatesSelectionPolicies.add(new FifoCandidatesSelector(this, - additionalPreemptionBasedOnReservedResource)); + additionalPreemptionBasedOnReservedResource, false)); // Do we need to specially consider intra queue boolean isIntraQueuePreemptionEnabled = config.getBoolean( @@ -259,6 +261,20 @@ private void updateConfigIfNeeded() { candidatesSelectionPolicies.add(new IntraQueueCandidatesSelector(this)); } + // Do we need to do fairness balance of queues even after queues get satisified? + boolean isFairnessQueueBalanceRequired = config.getBoolean( + CapacitySchedulerConfiguration.FAIRNESS_BALANCE_QUEUES_AFTER_SATISFIED, + CapacitySchedulerConfiguration.DEFAULT_FAIRNESS_BALANCE_QUEUES_AFTER_SATISFIED); + long maximumKillWaitTimeForFairnessQueueBalance = config.getLong( + CapacitySchedulerConfiguration.FAIRNESS_BALANCE_QUEUES_AFTER_SATISFIED_MAX_WAIT_BEFORE_KILL, + CapacitySchedulerConfiguration.DEFAULT_FAIRNESS_BALANCE_QUEUES_AFTER_SATISFIED_MAX_WAIT_BEFORE_KILL); + if (isFairnessQueueBalanceRequired) { + PreemptionCandidatesSelector selector = new FifoCandidatesSelector(this, + false, true); + candidatesSelectionPolicies.add(selector); + selector.setMaximumKillWaitTime(maximumKillWaitTimeForFairnessQueueBalance); + } + LOG.info("Capacity Scheduler configuration changed, updated preemption " + "properties to:\n" + "max_ignored_over_capacity = " + maxIgnoredOverCapacity + "\n" + @@ -282,7 +298,8 @@ private void updateConfigIfNeeded() { "select_based_on_reserved_containers = " + selectCandidatesForResevedContainers + "\n" + "additional_res_balance_based_on_reserved_containers = " + - additionalPreemptionBasedOnReservedResource); + additionalPreemptionBasedOnReservedResource + "\n" + + "fairness-queue-balance-enabled = " + isFairnessQueueBalanceRequired); csConfig = config; } @@ -317,35 +334,40 @@ private void preemptOrkillSelectedContainerAfterWait( } // preempt (or kill) the selected containers - for (Map.Entry> e : selectedCandidates + for (Map.Entry>> pc : toPreemptPerSelector .entrySet()) { - ApplicationAttemptId appAttemptId = e.getKey(); - if (LOG.isDebugEnabled()) { - LOG.debug("Send to scheduler: in app=" + appAttemptId - + " #containers-to-be-preemptionCandidates=" + e.getValue().size()); - } - for (RMContainer container : e.getValue()) { - // if we tried to preempt this for more than maxWaitTime - if (preemptionCandidates.get(container) != null - && preemptionCandidates.get(container) - + maxWaitTime <= currentTime) { - // kill it - rmContext.getDispatcher().getEventHandler().handle( - new ContainerPreemptEvent(appAttemptId, container, - SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE)); - preemptionCandidates.remove(container); - } else { - if (preemptionCandidates.get(container) != null) { - // We already updated the information to scheduler earlier, we need - // not have to raise another event. - continue; + Map> cMap = pc.getValue(); + for (Map.Entry> e : cMap.entrySet()) { + ApplicationAttemptId appAttemptId = e.getKey(); + if (LOG.isDebugEnabled()) { + LOG.debug("Send to scheduler: in app=" + appAttemptId + + " #containers-to-be-preemptionCandidates=" + e.getValue().size()); + } + for (RMContainer container : e.getValue()) { + // if we tried to preempt this for more than maxWaitTime + if (preemptionCandidates.get(container) != null + && preemptionCandidates.get(container) + + pc.getKey().getMaximumKillWaitTimeMs() <= currentTime) { + // kill it + rmContext.getDispatcher().getEventHandler().handle( + new ContainerPreemptEvent(appAttemptId, container, + SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE)); + preemptionCandidates.remove(container); + } else { + if (preemptionCandidates.get(container) != null) { + // We already updated the information to scheduler earlier, we need + // not have to raise another event. + continue; + } + + //otherwise just send preemption events + rmContext.getDispatcher().getEventHandler().handle( + new ContainerPreemptEvent(appAttemptId, container, + SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION)); + preemptionCandidates.put(container, currentTime); } - - //otherwise just send preemption events - rmContext.getDispatcher().getEventHandler().handle( - new ContainerPreemptEvent(appAttemptId, container, - SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION)); - preemptionCandidates.put(container, currentTime); } } } @@ -438,29 +460,37 @@ private void containerBasedPreemptOrKill(CSQueue root, // queue and each application Map> toPreempt = new HashMap<>(); + for (PreemptionCandidatesSelector selector : candidatesSelectionPolicies) { long startTime = 0; + Map> curCandidates; if (LOG.isDebugEnabled()) { LOG.debug(MessageFormat .format("Trying to use {0} to select preemption candidates", selector.getClass().getName())); startTime = clock.getTime(); } - toPreempt = selector.selectCandidates(toPreempt, + curCandidates = selector.selectCandidates(toPreempt, clusterResources, totalPreemptionAllowed); + toPreemptPerSelector.putIfAbsent(selector, curCandidates); if (LOG.isDebugEnabled()) { LOG.debug(MessageFormat .format("{0} uses {1} millisecond to run", selector.getClass().getName(), clock.getTime() - startTime)); int totalSelected = 0; + int curSelected = 0; for (Set set : toPreempt.values()) { totalSelected += set.size(); } + for (Set set : curCandidates.values()) { + curSelected += set.size(); + } LOG.debug(MessageFormat - .format("So far, total {0} containers selected to be preempted", - totalSelected)); + .format("So far, total {0} containers selected to be preempted, {1}" + + "containers selected this round", + totalSelected, curSelected)); } } @@ -689,6 +719,12 @@ public double getNaturalTerminationFactor() { return queueToPartitions; } + @VisibleForTesting + Map>> getToPreemptCandidatesPerSelector() { + return toPreemptPerSelector; + } + @Override public int getClusterMaxApplicationPriority() { return scheduler.getMaxClusterLevelAppPriority().getPriority(); @@ -730,4 +766,9 @@ public void addPartitionToUnderServedQueues(String queueName, public IntraQueuePreemptionOrderPolicy getIntraQueuePreemptionOrderPolicy() { return intraQueuePreemptionOrderPolicy; } + + @Override + public long getDefaultMaximumKillWaitTimeout() { + return maxWaitTime; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ReservedContainerCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ReservedContainerCandidatesSelector.java index ff100d9a6ec..da6698609be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ReservedContainerCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ReservedContainerCandidatesSelector.java @@ -63,7 +63,7 @@ public NodeForPreemption(float preemptionCost, CapacitySchedulerPreemptionContext preemptionContext) { super(preemptionContext); preemptableAmountCalculator = new PreemptableResourceCalculator( - preemptionContext, true); + preemptionContext, true, false); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java index 4214acc5524..4fb1862b887 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java @@ -138,7 +138,8 @@ public void addChild(TempQueuePerPartition q) { // This function "accepts" all the resources it can (pending) and return // the unused ones Resource offer(Resource avail, ResourceCalculator rc, - Resource clusterResource, boolean considersReservedResource) { + Resource clusterResource, boolean considersReservedResource, + boolean allowQueueBalanceAfterAllSafisfied) { Resource absMaxCapIdealAssignedDelta = Resources.componentwiseMax( Resources.subtract(getMax(), idealAssigned), Resource.newInstance(0, 0)); @@ -179,7 +180,10 @@ Resource offer(Resource avail, ResourceCalculator rc, // leaf queues. Such under-utilized leaf queue could preemption resources // from over-utilized leaf queue located at other hierarchies. - accepted = filterByMaxDeductAssigned(rc, clusterResource, accepted); + // Allow queues can continue grow and balance even if all queues are satisfied. + if (!allowQueueBalanceAfterAllSafisfied) { + accepted = filterByMaxDeductAssigned(rc, clusterResource, accepted); + } // accepted so far contains the "quota acceptable" amount, we now filter by // locality acceptable diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 76eaac05718..dd97d4c1360 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -1460,6 +1460,23 @@ public boolean getLazyPreemptionEnabled() { public static final String DEFAULT_INTRAQUEUE_PREEMPTION_ORDER_POLICY = "userlimit_first"; /** + * Should we allow queues continue grow after all queue reaches their + * guaranteed capacity. + */ + public static final String FAIRNESS_BALANCE_QUEUES_AFTER_SATISFIED = + PREEMPTION_CONFIG_PREFIX + "fairness-balance-queue-after-satisfied.enabled"; + public static final boolean DEFAULT_FAIRNESS_BALANCE_QUEUES_AFTER_SATISFIED = false; + + /** + * How long we will wait to balance queues, by default it is 5 mins. + */ + public static final String FAIRNESS_BALANCE_QUEUES_AFTER_SATISFIED_MAX_WAIT_BEFORE_KILL = + PREEMPTION_CONFIG_PREFIX + "fairness-balance-queue-after-satisfied.max-wait-before-kill"; + public static final long + DEFAULT_FAIRNESS_BALANCE_QUEUES_AFTER_SATISFIED_MAX_WAIT_BEFORE_KILL = + 300 * 1000; + + /** * Maximum application for a queue to be used when application per queue is * not defined.To be consistent with previous version the default value is set * as UNDEFINED. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java index 6a953cfe068..fc33eb00f71 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java @@ -20,6 +20,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes; import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceUtils; @@ -538,4 +539,61 @@ public void test3ResourceTypesInterQueuePreemption() throws IOException { new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( getAppAttemptId(1)))); } + + @Test + public void testPriorityPreemptionForFairnessBetweenSatisfiedQueues() + throws IOException { + /** + * All queues are beyond guarantee, c has higher priority than b. + * c ask for more resource, and there is no idle left, c should preempt + * some resource from b but won’t let b under its guarantee. + * + * Queue structure is: + * + *
+     *        root
+     *       / |  \
+     *      a  b   c
+     * 
+ * + * For priorities + * - a=1 + * - b=1 + * - c=2 + * + */ + String labelsConfig = "=100,true"; // default partition + String nodesConfig = "n1="; // only one node + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100 100 100 100]);" + //root + "-a(=[30 100 0 0]){priority=1};" + // a + "-b(=[30 100 40 50]){priority=1};" + // b + "-c(=[40 100 60 25]){priority=2}"; // c + String appsConfig = + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "b\t(1,1,n1,,40,false);" + // app1 in b + "c\t(1,1,n1,,60,false)"; // app2 in c + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + CapacitySchedulerConfiguration newConf = + new CapacitySchedulerConfiguration(conf); + boolean isFairnessBalanceEnabled = true; + newConf.setBoolean( + CapacitySchedulerConfiguration.FAIRNESS_BALANCE_QUEUES_AFTER_SATISFIED, + isFairnessBalanceEnabled); + when(cs.getConfiguration()).thenReturn(newConf); + policy.editSchedule(); + + // IdealAssigned b: 30 c: 70. initIdealAssigned: b: 30 c: 40, even though + // b and c has same relativeAssigned=1.0f(idealAssigned / guaranteed), + // since c has higher priority, c will be put in mostUnderServedQueue and + // get all remain 30 capacity. + verify(mDisp, times(10)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, never()).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 22a241f6fec..85a4ec05453 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -57,6 +57,8 @@ import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -72,8 +74,10 @@ import java.util.Deque; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.NavigableSet; import java.util.Random; +import java.util.Set; import java.util.StringTokenizer; import java.util.TreeSet; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -1038,6 +1042,219 @@ public void testPreemptionNotHappenForSingleReservedQueue() { verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); } + @Test + public void testFairnessBalanceBasic() { + /* + * Test case to make sure, when all active queues are beyond their + * guaranteed, preemption will happen to balance for fairness + */ + + int[][] qData = new int[][]{ + // / A B C + { 100, 30, 30, 40 }, // abs + { 100, 100, 100, 100 }, // maxCap + { 100, 30, 60, 10 }, // used + { 10, 10, 100, 0 }, // pending + { 0, 0, 0, 0 }, // reserved + { 3, 1, 1, 1 }, // apps + { -1, 1, 1, 1 }, // req granularity + { 3, 0, 0, 0 }, // subqueues + }; + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + CapacitySchedulerConfiguration newConf = + new CapacitySchedulerConfiguration(conf); + boolean isFairnessBalanceEnabled = true; + newConf.setBoolean( + CapacitySchedulerConfiguration.FAIRNESS_BALANCE_QUEUES_AFTER_SATISFIED, + isFairnessBalanceEnabled); + when(mCS.getConfiguration()).thenReturn(newConf); + policy.editSchedule(); + + // I_A: A:40 B:50 C:10, preempt 10 from B to A + verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appB))); + + assertEquals(40, policy.getQueuePartitions().get("queueA") + .get("").getIdealAssigned().getMemorySize()); + assertEquals(50, policy.getQueuePartitions().get("queueB") + .get("").getIdealAssigned().getMemorySize()); + assertEquals(10, policy.getQueuePartitions().get("queueC") + .get("").getIdealAssigned().getMemorySize()); + } + + @Test + public void testFairnessBalanceHierarchical() throws Exception { + /* + * Test case to make sure, when all active queues are beyond their + * guaranteed, preemption will happen to balance for fairness + */ + Logger.getRootLogger().setLevel(Level.DEBUG); + int[][] qData = new int[][] { + // / A F I + // B C G H J K + // D E + { 400, 200, 60, 140, 100, 40, 100, 70, 30, 100, 10, 90 }, // abs + { 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400 }, // maxCap + { 400, 201, 60, 141, 100, 41, 100, 50, 50, 99, 10, 89 }, // used + { 10, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10 }, // pending + { 50, 30, 20, 10, 5, 5, 0, 0, 0, 10, 10, 0 }, // reserved + // appA appB appC appD appE appF appG + { 7, 3, 1, 2, 1, 1, 2, 1, 1, 2, 1, 1 }, // apps + { -1, -1, 1, -1, 1, 1, -1, 1, 1, -1, 1, 1 }, // req granularity + { 3, 2, 0, 2, 0, 0, 2, 0, 0, 2, 0, 0 }, // subqueues + }; + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + CapacitySchedulerConfiguration newConf = + new CapacitySchedulerConfiguration(conf); + boolean isFairnessBalanceEnabled = true; + newConf.setBoolean( + CapacitySchedulerConfiguration.FAIRNESS_BALANCE_QUEUES_AFTER_SATISFIED, + isFairnessBalanceEnabled); + when(mCS.getConfiguration()).thenReturn(newConf); + policy.editSchedule(); + + // I_A: A B C D E F G H I J K + // 201 60 141 100 41 100 50 50 99 10 89 + // no preemption happened + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB))); + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appC))); + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appD))); + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appE))); + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appF))); + + assertEquals(201, policy.getQueuePartitions().get("queueA") + .get("").getIdealAssigned().getMemorySize()); + assertEquals(141, policy.getQueuePartitions().get("queueC") + .get("").getIdealAssigned().getMemorySize()); + assertEquals(100, policy.getQueuePartitions().get("queueD") + .get("").getIdealAssigned().getMemorySize()); + assertEquals(41, policy.getQueuePartitions().get("queueE") + .get("").getIdealAssigned().getMemorySize()); + assertEquals(99, policy.getQueuePartitions().get("queueI") + .get("").getIdealAssigned().getMemorySize()); + assertEquals(10, policy.getQueuePartitions().get("queueJ") + .get("").getIdealAssigned().getMemorySize()); + assertEquals(89, policy.getQueuePartitions().get("queueK") + .get("").getIdealAssigned().getMemorySize()); + } + + @Test + public void testFairnessBalanceWithVcoreResource() throws Exception { + /* + * Test case to make sure, when all active queues are beyond their + * guaranteed, preemption will happen to balance for fairness + */ + Logger.getRootLogger().setLevel(Level.DEBUG); + int[][] qData = new int[][]{ + // / A B + {100, 100, 100}, // maxcap + {2, 1, 1}, // apps + {2, 0, 0}, // subqueues + }; + + // Resources can be set like memory:vcores + String[][] resData = new String[][]{ + // / A B + {"100:100", "30:60", "40:40"}, // abs + {"100:100", "40:40", "60:60"}, // used + {"70:20", "70:40", "50:100"}, // pending + {"0", "0", "0"}, // reserved + {"-1", "1:1", "1:1"}, // req granularity + }; + + // Passing last param as TRUE to use DominantResourceCalculator + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData, resData, + true); + CapacitySchedulerConfiguration newConf = + new CapacitySchedulerConfiguration(conf); + boolean isFairnessBalanceEnabled = true; + newConf.setBoolean( + CapacitySchedulerConfiguration.FAIRNESS_BALANCE_QUEUES_AFTER_SATISFIED, + isFairnessBalanceEnabled); + when(mCS.getConfiguration()).thenReturn(newConf); + policy.editSchedule(); + + // 20 containers will be preempted here + verify(mDisp, times(20)).handle(argThat(new IsPreemptionRequestFor(appB))); + + assertEquals(60, policy.getQueuePartitions().get("queueA") + .get("").getIdealAssigned().getMemorySize()); + assertEquals(40, policy.getQueuePartitions().get("queueA") + .get("").getIdealAssigned().getVirtualCores()); + assertEquals(40, policy.getQueuePartitions().get("queueB") + .get("").getIdealAssigned().getMemorySize()); + assertEquals(40, policy.getQueuePartitions().get("queueB") + .get("").getIdealAssigned().getVirtualCores()); + } + + @Test + public void testFairnessBalanceWithConfiguredTimeout() throws Exception { + /* + * Test case to make sure, user can use configured timeout for + * selected to preempt containers for fairness balance condition + */ + + Logger.getRootLogger().setLevel(Level.DEBUG); + int[][] qData = new int[][]{ + // / A B + {100, 100, 100}, // maxcap + {2, 1, 1}, // apps + {2, 0, 0}, // subqueues + }; + + // Resources can be set like memory:vcores + String[][] resData = new String[][]{ + // / A B + {"100:100", "30:60", "40:40"}, // abs + {"100:100", "40:40", "60:60"}, // used + {"70:20", "70:40", "50:100"}, // pending + {"0", "0", "0"}, // reserved + {"-1", "1:1", "1:1"}, // req granularity + }; + + // Passing last param as TRUE to use DominantResourceCalculator + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData, resData, + true); + CapacitySchedulerConfiguration newConf = + new CapacitySchedulerConfiguration(conf); + boolean isFairnessBalanceEnabled = true; + newConf.setBoolean( + CapacitySchedulerConfiguration.FAIRNESS_BALANCE_QUEUES_AFTER_SATISFIED, + isFairnessBalanceEnabled); + final long FB_MAX_BEFORE_KILL = 60 *1000; + newConf.setLong( + CapacitySchedulerConfiguration.FAIRNESS_BALANCE_QUEUES_AFTER_SATISFIED_MAX_WAIT_BEFORE_KILL, + FB_MAX_BEFORE_KILL); + when(mCS.getConfiguration()).thenReturn(newConf); + policy.editSchedule(); + + Map>> pcps= policy.getToPreemptCandidatesPerSelector(); + + String FIFO_CANDIDATE_SELECTOR = ""; + for (Map.Entry>> pc : pcps + .entrySet()) { + if (pc.getKey().getClass().getName().equals(FIFO_CANDIDATE_SELECTOR)) { + FifoCandidatesSelector pcs = (FifoCandidatesSelector) pc.getKey(); + if (pcs.getAllowQueuesBalanceAfterAllQueuesSatisfied() == true) { + assertEquals(pcs.getMaximumKillWaitTimeMs(), FB_MAX_BEFORE_KILL); + } + } + } + + // 20 containers will be preempted here + verify(mDisp, times(20)).handle(argThat(new IsPreemptionRequestFor(appB))); + + assertEquals(60, policy.getQueuePartitions().get("queueA") + .get("").getIdealAssigned().getMemorySize()); + assertEquals(40, policy.getQueuePartitions().get("queueA") + .get("").getIdealAssigned().getVirtualCores()); + assertEquals(40, policy.getQueuePartitions().get("queueB") + .get("").getIdealAssigned().getMemorySize()); + assertEquals(40, policy.getQueuePartitions().get("queueB") + .get("").getIdealAssigned().getVirtualCores()); + } @Test public void testRefreshPreemptionProperties() throws Exception {