diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index cea3d7c..02f58bb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -111,6 +111,9 @@ public static final String NATURAL_TERMINATION_FACTOR = "yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor"; + public static final String BASE_YARN_RM_PREEMPTION = "yarn.scheduler.capacity."; + public static final String SUFFIX_DISABLE_PREEMPTION = ".disable_preemption"; + // the dispatcher to send preempt and kill events public EventHandler dispatcher; @@ -125,6 +128,7 @@ private float percentageClusterPreemptionAllowed; private double naturalTerminationFactor; private boolean observeOnly; + private Configuration conf; public ProportionalCapacityPreemptionPolicy() { clock = new SystemClock(); @@ -164,6 +168,7 @@ public void init(Configuration config, config.getFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 0.1); observeOnly = config.getBoolean(OBSERVE_ONLY, false); rc = scheduler.getResourceCalculator(); + conf = config; } @VisibleForTesting @@ -297,6 +302,17 @@ private void computeIdealResourceDistribution(ResourceCalculator rc, // qAlloc tracks currently active queues (will decrease progressively as // demand is met) List qAlloc = new ArrayList(queues); + + // If the queue is over capacity and the PREEMPT_DISABLE flag is set, remove + // the queue's used resources from the total. + for (Iterator i = qAlloc.iterator(); i.hasNext();) { + TempQueue sub = i.next(); + if (sub.queueDisablePreemption) { + Resources.subtractFrom(tot_guarant, sub.current); + i.remove(); + } + } + // unassigned tracks how much resources are still to assign, initialized // with the total capacity for this set of queues Resource unassigned = Resources.clone(tot_guarant); @@ -330,7 +346,8 @@ private void computeIdealResourceDistribution(ResourceCalculator rc, // how much preemption is required overall Resource totPreemptionNeeded = Resource.newInstance(0, 0); for (TempQueue t:queues) { - if (Resources.greaterThan(rc, tot_guarant, t.current, t.idealAssigned)) { + if (Resources.greaterThan(rc, tot_guarant, t.current, t.idealAssigned) + && ! t.queueDisablePreemption) { Resources.addTo(totPreemptionNeeded, Resources.subtract(t.current, t.idealAssigned)); } @@ -348,7 +365,9 @@ private void computeIdealResourceDistribution(ResourceCalculator rc, // assign to each queue the amount of actual preemption based on local // information of ideal preemption and scaling factor for (TempQueue t : queues) { - t.assignPreemption(scalingFactor, rc, tot_guarant); + if (! t.queueDisablePreemption) { + t.assignPreemption(scalingFactor, rc, tot_guarant); + } } if (LOG.isDebugEnabled()) { long time = clock.getTime(); @@ -445,7 +464,8 @@ private void resetCapacity(ResourceCalculator rc, Resource clusterResource, // we act only if we are violating balance by more than // maxIgnoredOverCapacity if (Resources.greaterThan(rc, clusterResource, qT.current, - Resources.multiply(qT.guaranteed, 1.0 + maxIgnoredOverCapacity))) { + Resources.multiply(qT.guaranteed, 1.0 + maxIgnoredOverCapacity)) + && ! qT.queueDisablePreemption) { // we introduce a dampening factor naturalTerminationFactor that // accounts for natural termination of containers Resource resToObtain = @@ -637,20 +657,43 @@ private TempQueue cloneQueues(CSQueue root, Resource clusterResources) { float absCap = root.getAbsoluteCapacity(); float absMaxCap = root.getAbsoluteMaximumCapacity(); + // The per-queue disablePreemption defaults to false (preemption enabled). + // Inherit parent's per-queue disablePreemption value. + boolean parentQueueDisablePreemption = false; + boolean queueDisablePreemption = false; + + if (root.getParent() != null) { + String parentQueuePropName = BASE_YARN_RM_PREEMPTION + + root.getParent().getQueuePath() + + SUFFIX_DISABLE_PREEMPTION; + parentQueueDisablePreemption = + this.conf.getBoolean(parentQueuePropName, false); + } + + String queuePropName = BASE_YARN_RM_PREEMPTION + root.getQueuePath() + + SUFFIX_DISABLE_PREEMPTION; + queueDisablePreemption = + this.conf.getBoolean(queuePropName, parentQueueDisablePreemption); + Resource current = Resources.multiply(clusterResources, absUsed); Resource guaranteed = Resources.multiply(clusterResources, absCap); Resource maxCapacity = Resources.multiply(clusterResources, absMaxCap); + // Always make disable preemption flag false if the queue is not over + // capacity so that this queue will remain in consideration for ideal + // resource distribution. + if (Resources.lessThanOrEqual(rc, clusterResources, current, guaranteed)) { + queueDisablePreemption = false; + } if (root instanceof LeafQueue) { LeafQueue l = (LeafQueue) root; Resource pending = l.getTotalResourcePending(); ret = new TempQueue(queueName, current, pending, guaranteed, - maxCapacity); - + maxCapacity, queueDisablePreemption); ret.setLeafQueue(l); } else { Resource pending = Resource.newInstance(0, 0); ret = new TempQueue(root.getQueueName(), current, pending, guaranteed, - maxCapacity); + maxCapacity, queueDisablePreemption); for (CSQueue c : root.getChildQueues()) { ret.addChild(cloneQueues(c, clusterResources)); } @@ -693,12 +736,14 @@ public int compare(TempQueue o1, TempQueue o2) { Resource actuallyPreempted; double normalizedGuarantee; + boolean queueDisablePreemption; final ArrayList children; LeafQueue leafQueue; TempQueue(String queueName, Resource current, Resource pending, - Resource guaranteed, Resource maxCapacity) { + Resource guaranteed, Resource maxCapacity, + boolean queueDisablePreemption) { this.queueName = queueName; this.current = current; this.pending = pending; @@ -709,6 +754,7 @@ public int compare(TempQueue o1, TempQueue o2) { this.toBePreempted = Resource.newInstance(0, 0); this.normalizedGuarantee = Float.NaN; this.children = new ArrayList(); + this.queueDisablePreemption = queueDisablePreemption; } public void setLeafQueue(LeafQueue l){ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 8a2840e..e88004b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -17,10 +17,12 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.BASE_YARN_RM_PREEMPTION; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MAX_IGNORED_OVER_CAPACITY; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY; +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.SUFFIX_DISABLE_PREEMPTION; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER; @@ -258,6 +260,45 @@ public void testDeadzone() { verify(mDisp, never()).handle(isA(ContainerPreemptEvent.class)); } + @SuppressWarnings("unchecked") + @Test + public void testPerQueueDisablePreemption() { + int[][] qData = new int[][]{ + // / A B C + { 100, 55, 25, 20 }, // abs + { 100, 100, 100, 100 }, // maxCap + { 100, 0, 54, 46 }, // used + { 10, 10, 0, 0 }, // pending + { 0, 0, 0, 0 }, // reserved + { 3, 1, 1, 1 }, // apps + { -1, 1, 1, 1 }, // req granularity + { 3, 0, 0, 0 }, // subqueues + }; + + conf.setBoolean(BASE_YARN_RM_PREEMPTION + + "root.queueB" + SUFFIX_DISABLE_PREEMPTION, true); + + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + // With PREEMPTION_DISABLED set for queueB, get resources from queueC + verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC))); + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB))); + + // With no PREEMPTION_DISABLED set for queueB, resources will be preempted + // from both queueB and queueC. Test must be reset for so that the mDisp + // event handler will count only events from the following test and not the + // previous one. + setup(); + ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData); + + conf.setBoolean(BASE_YARN_RM_PREEMPTION + + "root.queueB" + SUFFIX_DISABLE_PREEMPTION, false); + policy2.editSchedule(); + + verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appB))); + verify(mDisp, times(6)).handle(argThat(new IsPreemptionRequestFor(appC))); + } + @Test public void testOverCapacityImbalance() { int[][] qData = new int[][]{ @@ -642,6 +683,10 @@ ParentQueue mockNested(int[] abs, int[] maxCap, int[] used, when(q.getAbsoluteUsedCapacity()).thenReturn(used[i] / tot); when(q.getAbsoluteCapacity()).thenReturn(abs[i] / tot); when(q.getAbsoluteMaximumCapacity()).thenReturn(maxCap[i] / tot); + String parentPathName = p.getQueuePath(); + parentPathName = (parentPathName == null) ? "root" : parentPathName; + String queuePathName = (parentPathName+"."+queueName).replace("/","root"); + when(q.getQueuePath()).thenReturn(queuePathName); } assert 0 == pqs.size(); return root;