diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index cea3d7c..dcce85a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -111,6 +111,10 @@ public static final String NATURAL_TERMINATION_FACTOR = "yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor"; + public static final String BASE_YARN_RM_PREEMPTION = + "yarn.resourcemanager.monitor.capacity.preemption."; + public static final String SUFFIX_DISABLE_PREEMPTION = ".disable_preemption"; + // the dispatcher to send preempt and kill events public EventHandler dispatcher; @@ -125,6 +129,7 @@ private float percentageClusterPreemptionAllowed; private double naturalTerminationFactor; private boolean observeOnly; + private Configuration conf; public ProportionalCapacityPreemptionPolicy() { clock = new SystemClock(); @@ -164,6 +169,7 @@ public void init(Configuration config, config.getFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 0.1); observeOnly = config.getBoolean(OBSERVE_ONLY, false); rc = scheduler.getResourceCalculator(); + conf = config; } @VisibleForTesting @@ -297,6 +303,17 @@ private void computeIdealResourceDistribution(ResourceCalculator rc, // qAlloc tracks currently active queues (will decrease progressively as // demand is met) List qAlloc = new ArrayList(queues); + + // If the queue is over capacity and the PREEMPT_DISABLE flag is set, remove + // the queue's used resources from the total. + for (Iterator i = qAlloc.iterator(); i.hasNext();) { + TempQueue sub = i.next(); + if (sub.queueDisablePreemption) { + Resources.subtractFrom(tot_guarant, sub.current); + i.remove(); + } + } + // unassigned tracks how much resources are still to assign, initialized // with the total capacity for this set of queues Resource unassigned = Resources.clone(tot_guarant); @@ -330,7 +347,8 @@ private void computeIdealResourceDistribution(ResourceCalculator rc, // how much preemption is required overall Resource totPreemptionNeeded = Resource.newInstance(0, 0); for (TempQueue t:queues) { - if (Resources.greaterThan(rc, tot_guarant, t.current, t.idealAssigned)) { + if (Resources.greaterThan(rc, tot_guarant, t.current, t.idealAssigned) + && ! t.queueDisablePreemption) { Resources.addTo(totPreemptionNeeded, Resources.subtract(t.current, t.idealAssigned)); } @@ -348,7 +366,9 @@ private void computeIdealResourceDistribution(ResourceCalculator rc, // assign to each queue the amount of actual preemption based on local // information of ideal preemption and scaling factor for (TempQueue t : queues) { - t.assignPreemption(scalingFactor, rc, tot_guarant); + if (! t.queueDisablePreemption) { + t.assignPreemption(scalingFactor, rc, tot_guarant); + } } if (LOG.isDebugEnabled()) { long time = clock.getTime(); @@ -445,7 +465,8 @@ private void resetCapacity(ResourceCalculator rc, Resource clusterResource, // we act only if we are violating balance by more than // maxIgnoredOverCapacity if (Resources.greaterThan(rc, clusterResource, qT.current, - Resources.multiply(qT.guaranteed, 1.0 + maxIgnoredOverCapacity))) { + Resources.multiply(qT.guaranteed, 1.0 + maxIgnoredOverCapacity)) + && ! qT.queueDisablePreemption) { // we introduce a dampening factor naturalTerminationFactor that // accounts for natural termination of containers Resource resToObtain = @@ -637,20 +658,43 @@ private TempQueue cloneQueues(CSQueue root, Resource clusterResources) { float absCap = root.getAbsoluteCapacity(); float absMaxCap = root.getAbsoluteMaximumCapacity(); + // The per-queue disablePreemption defaults to false (preemption enabled). + // Inherit parent's per-queue disablePreemption value. + boolean parentQueueDisablePreemption = false; + boolean queueDisablePreemption = false; + + if (root.getParent() != null) { + String parentQueuePropName = BASE_YARN_RM_PREEMPTION + + root.getParent().getQueuePath() + + SUFFIX_DISABLE_PREEMPTION; + parentQueueDisablePreemption = + this.conf.getBoolean(parentQueuePropName, false); + } + + String queuePropName = BASE_YARN_RM_PREEMPTION + root.getQueuePath() + + SUFFIX_DISABLE_PREEMPTION; + queueDisablePreemption = + this.conf.getBoolean(queuePropName, parentQueueDisablePreemption); + Resource current = Resources.multiply(clusterResources, absUsed); Resource guaranteed = Resources.multiply(clusterResources, absCap); Resource maxCapacity = Resources.multiply(clusterResources, absMaxCap); + // Always make disable preemption flag false if the queue is not over + // capacity so that this queue will remain in consideration for ideal + // resource distribution. + if (Resources.lessThanOrEqual(rc, clusterResources, current, guaranteed)) { + queueDisablePreemption = false; + } if (root instanceof LeafQueue) { LeafQueue l = (LeafQueue) root; Resource pending = l.getTotalResourcePending(); ret = new TempQueue(queueName, current, pending, guaranteed, - maxCapacity); - + maxCapacity, queueDisablePreemption); ret.setLeafQueue(l); } else { Resource pending = Resource.newInstance(0, 0); ret = new TempQueue(root.getQueueName(), current, pending, guaranteed, - maxCapacity); + maxCapacity, queueDisablePreemption); for (CSQueue c : root.getChildQueues()) { ret.addChild(cloneQueues(c, clusterResources)); } @@ -693,12 +737,14 @@ public int compare(TempQueue o1, TempQueue o2) { Resource actuallyPreempted; double normalizedGuarantee; + boolean queueDisablePreemption; final ArrayList children; LeafQueue leafQueue; TempQueue(String queueName, Resource current, Resource pending, - Resource guaranteed, Resource maxCapacity) { + Resource guaranteed, Resource maxCapacity, + boolean queueDisablePreemption) { this.queueName = queueName; this.current = current; this.pending = pending; @@ -709,6 +755,7 @@ public int compare(TempQueue o1, TempQueue o2) { this.toBePreempted = Resource.newInstance(0, 0); this.normalizedGuarantee = Float.NaN; this.children = new ArrayList(); + this.queueDisablePreemption = queueDisablePreemption; } public void setLeafQueue(LeafQueue l){ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 8a2840e..3b19334 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -17,10 +17,12 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.BASE_YARN_RM_PREEMPTION; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MAX_IGNORED_OVER_CAPACITY; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY; +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.SUFFIX_DISABLE_PREEMPTION; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER; @@ -259,6 +261,38 @@ public void testDeadzone() { } @Test + public void testPerQueueDisablePreemption() { + int[][] qData = new int[][]{ + // / A B C + { 100, 55, 25, 20 }, // abs + { 100, 100, 100, 100 }, // maxCap + { 100, 0, 54, 46 }, // used + { 10, 10, 0, 0 }, // pending + { 0, 0, 0, 0 }, // reserved + { 3, 1, 1, 1 }, // apps + { -1, 1, 1, 1 }, // req granularity + { 3, 0, 0, 0 }, // subqueues + }; + + conf.setBoolean(BASE_YARN_RM_PREEMPTION + + "root.queueB" + SUFFIX_DISABLE_PREEMPTION, true); + + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + // With PREEMPTION_DISABLED set for queueB, get resources from queueC + verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC))); + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB))); + + conf.setBoolean(BASE_YARN_RM_PREEMPTION + + "root.queueB" + SUFFIX_DISABLE_PREEMPTION, false); + policy.editSchedule(); + // With no PREEMPTION_DISABLED set for queueB, get resources from both + // queueB and queueC (times() assertion is cumulative). + verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appB))); + verify(mDisp, times(16)).handle(argThat(new IsPreemptionRequestFor(appC))); + } + + @Test public void testOverCapacityImbalance() { int[][] qData = new int[][]{ // / A B C @@ -642,6 +676,7 @@ ParentQueue mockNested(int[] abs, int[] maxCap, int[] used, when(q.getAbsoluteUsedCapacity()).thenReturn(used[i] / tot); when(q.getAbsoluteCapacity()).thenReturn(abs[i] / tot); when(q.getAbsoluteMaximumCapacity()).thenReturn(maxCap[i] / tot); + when(q.getQueuePath()).thenReturn("root" + "." + queueName); } assert 0 == pqs.size(); return root;