diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index cea3d7c..efd7797 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -111,6 +111,11 @@ public static final String NATURAL_TERMINATION_FACTOR = "yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor"; + public static final String BASE_YARN_RM_PREEMPTION = + "yarn.resourcemanager.monitor.capacity.preemption."; + public static final String SUFFIX_IGNORED_OVER_CAPACITY = + ".max_ignored_over_capacity"; + // the dispatcher to send preempt and kill events public EventHandler dispatcher; @@ -125,6 +130,7 @@ private float percentageClusterPreemptionAllowed; private double naturalTerminationFactor; private boolean observeOnly; + private Configuration conf; public ProportionalCapacityPreemptionPolicy() { clock = new SystemClock(); @@ -164,6 +170,7 @@ public void init(Configuration config, config.getFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 0.1); observeOnly = config.getBoolean(OBSERVE_ONLY, false); rc = scheduler.getResourceCalculator(); + conf = config; } @VisibleForTesting @@ -381,10 +388,13 @@ private void computeFixpointAllocation(ResourceCalculator rc, // offer for each queue their capacity first and in following invocations // their share of over-capacity + // Take into account the per-queue 'max ignored over capacity' setting + // when calculating each queue's available resources. for (Iterator i = qAlloc.iterator(); i.hasNext();) { TempQueue sub = i.next(); Resource wQavail = - Resources.multiply(unassigned, sub.normalizedGuarantee); + Resources.multiply(unassigned, + sub.normalizedGuarantee * (1.0 + sub.queueMaxIgnoredOverCapacity)); Resource wQidle = sub.offer(wQavail, rc, tot_guarant); Resource wQdone = Resources.subtract(wQavail, wQidle); // if the queue returned a value > 0 it means it is fully satisfied @@ -636,7 +646,24 @@ private TempQueue cloneQueues(CSQueue root, Resource clusterResources) { float absUsed = root.getAbsoluteUsedCapacity(); float absCap = root.getAbsoluteCapacity(); float absMaxCap = root.getAbsoluteMaximumCapacity(); + // Let the per-queue maxIgnoredOverCapacity default to 0 so that + // IDEAL_ASSIGNED calculations will not be affected unless the queue + // is specifically configured to do so. + double parentQueueMaxIgnoredOverCapacity = 0; + double queueMaxIgnoredOverCapacity = 0; + + if (root.getParent() != null) { + String parentQueuePropName = BASE_YARN_RM_PREEMPTION + + root.getParent().getQueuePath() + + SUFFIX_IGNORED_OVER_CAPACITY; + parentQueueMaxIgnoredOverCapacity = + this.conf.getDouble(parentQueuePropName, 0); + } + String queuePropName = BASE_YARN_RM_PREEMPTION + root.getQueuePath() + + SUFFIX_IGNORED_OVER_CAPACITY; + queueMaxIgnoredOverCapacity = + this.conf.getDouble(queuePropName, parentQueueMaxIgnoredOverCapacity); Resource current = Resources.multiply(clusterResources, absUsed); Resource guaranteed = Resources.multiply(clusterResources, absCap); Resource maxCapacity = Resources.multiply(clusterResources, absMaxCap); @@ -644,13 +671,13 @@ private TempQueue cloneQueues(CSQueue root, Resource clusterResources) { LeafQueue l = (LeafQueue) root; Resource pending = l.getTotalResourcePending(); ret = new TempQueue(queueName, current, pending, guaranteed, - maxCapacity); + maxCapacity, queueMaxIgnoredOverCapacity); ret.setLeafQueue(l); } else { Resource pending = Resource.newInstance(0, 0); ret = new TempQueue(root.getQueueName(), current, pending, guaranteed, - maxCapacity); + maxCapacity, queueMaxIgnoredOverCapacity); for (CSQueue c : root.getChildQueues()) { ret.addChild(cloneQueues(c, clusterResources)); } @@ -693,12 +720,14 @@ public int compare(TempQueue o1, TempQueue o2) { Resource actuallyPreempted; double normalizedGuarantee; + double queueMaxIgnoredOverCapacity; final ArrayList children; LeafQueue leafQueue; TempQueue(String queueName, Resource current, Resource pending, - Resource guaranteed, Resource maxCapacity) { + Resource guaranteed, Resource maxCapacity, + double queueMaxIgnoredOverCapacity) { this.queueName = queueName; this.current = current; this.pending = pending; @@ -709,6 +738,7 @@ public int compare(TempQueue o1, TempQueue o2) { this.toBePreempted = Resource.newInstance(0, 0); this.normalizedGuarantee = Float.NaN; this.children = new ArrayList(); + this.queueMaxIgnoredOverCapacity = queueMaxIgnoredOverCapacity; } public void setLeafQueue(LeafQueue l){ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 8a2840e..afb3921 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -17,10 +17,12 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.BASE_YARN_RM_PREEMPTION; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MAX_IGNORED_OVER_CAPACITY; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY; +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.SUFFIX_IGNORED_OVER_CAPACITY; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER; @@ -259,6 +261,35 @@ public void testDeadzone() { } @Test + public void testPerQueueIgnoredOverCapacity() { + int[][] qData = new int[][]{ + // / A B C + { 100, 55, 25, 20 }, // abs + { 100, 100, 100, 100 }, // maxCap + { 100, 0, 50, 50 }, // used + { 10, 10, 0, 0 }, // pending + { 0, 0, 0, 0 }, // reserved + { 3, 1, 1, 1 }, // apps + { -1, 1, 1, 1 }, // req granularity + { 3, 0, 0, 0 }, // subqueues + }; + + conf.setFloat(BASE_YARN_RM_PREEMPTION + + "root.queueC" + SUFFIX_IGNORED_OVER_CAPACITY, (float) 1.0); + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + // With IGNORED_OVER_CAPACITY set for queueC, get resources from queueB + verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appB))); + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appC))); + + conf.setFloat(BASE_YARN_RM_PREEMPTION + + "root.queueC" + SUFFIX_IGNORED_OVER_CAPACITY, (float) 0); + policy.editSchedule(); + // With no IGNORED_OVER_CAPACITY set for queueC, get resources from queueC + verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC))); + } + + @Test public void testOverCapacityImbalance() { int[][] qData = new int[][]{ // / A B C @@ -642,6 +673,7 @@ ParentQueue mockNested(int[] abs, int[] maxCap, int[] used, when(q.getAbsoluteUsedCapacity()).thenReturn(used[i] / tot); when(q.getAbsoluteCapacity()).thenReturn(abs[i] / tot); when(q.getAbsoluteMaximumCapacity()).thenReturn(maxCap[i] / tot); + when(q.getQueuePath()).thenReturn("root" + "." + queueName); } assert 0 == pqs.size(); return root;