diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index cea3d7c..5a3fba2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -111,6 +111,9 @@ public static final String NATURAL_TERMINATION_FACTOR = "yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor"; + public static final String BASE_YARN_RM_PREEMPTION = "yarn.scheduler.capacity."; + public static final String SUFFIX_DISABLE_PREEMPTION = ".disable_preemption"; + // the dispatcher to send preempt and kill events public EventHandler dispatcher; @@ -125,6 +128,7 @@ private float percentageClusterPreemptionAllowed; private double naturalTerminationFactor; private boolean observeOnly; + private Configuration conf; public ProportionalCapacityPreemptionPolicy() { clock = new SystemClock(); @@ -164,6 +168,7 @@ public void init(Configuration config, config.getFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 0.1); observeOnly = config.getBoolean(OBSERVE_ONLY, false); rc = scheduler.getResourceCalculator(); + conf = config; } @VisibleForTesting @@ -407,19 +412,65 @@ private void computeFixpointAllocation(ResourceCalculator rc, */ private void resetCapacity(ResourceCalculator rc, Resource clusterResource, Collection queues, boolean ignoreGuar) { + // Total guaranteed capacity. Resource activeCap = Resource.newInstance(0, 0); - + // Sum of the guaranteed capacity of over capacity queues that are + // preemptable. + Resource preemptableCap = Resource.newInstance(0, 0); + if (ignoreGuar) { for (TempQueue q : queues) { q.normalizedGuarantee = (float) 1.0f / ((float) queues.size()); } } else { + List preemptableQueues = new ArrayList(); + List untouchableCapQueues = new ArrayList(); + double ung = 0; // untouchable queues' normalizedGuaranteed for (TempQueue q : queues) { Resources.addTo(activeCap, q.guaranteed); + // Get the amount of this queue's over capacity. + Resource extra = Resource.newInstance(0, 0); + if (Resources.greaterThan(rc, clusterResource, + q.current, q.guaranteed)) { + extra = Resources.subtract(q.current, q.guaranteed); + } + + // NOTE: Due to the way cloneQueues sets q.preemptableExtra in cases + // where a queue is over capacity, q.preemptableExtra will always + // be less than or equal to (q.current - q.guaranteed) ('extra'). + // A queue will be put with the preemptableQueues if it doesn't have + // untouchable, over-capacity resources. So, either it isn't over + // capacity or all of its over-capacity resources are preemptable. + if (Resources.equals(extra, Resources.none()) || + (Resources.greaterThan(rc, clusterResource, extra, Resources.none()) + && Resources.equals(q.preemptableExtra, extra))) { + preemptableQueues.add(q); + Resources.addTo(preemptableCap, q.guaranteed); + } else { + // Else, over-capacity resources exists, but some or all of them are + // not preemptable. + untouchableCapQueues.add(q); + } } - for (TempQueue q : queues) { - q.normalizedGuarantee = Resources.divide(rc, clusterResource, - q.guaranteed, activeCap); + + for (TempQueue q : untouchableCapQueues) { + // Calculate how much of the extra capacity is not preemptable. due to + // the code above, we know that q.current > q.guaranteed, and + // q.preemptable is not larger than the difference between the two. + Resource adjustedGuaranteed = Resources.subtract(q.current, q.preemptableExtra); + q.normalizedGuarantee = + Resources.divide(rc, clusterResource, adjustedGuaranteed, activeCap); + ung += q.normalizedGuarantee; + } + + // Once we've adjusted for the untouchable over capacity, + // the other queues have to fight over what's left. + for (TempQueue q : preemptableQueues) { + // (1.0 - ung) represents the percentage of the total resources at this + // level that are left after the non-preemptable queues get done with + // it. Calculate what portion that this queue gets. + q.normalizedGuarantee = (1.0 - ung) * + Resources.divide(rc, clusterResource, q.guaranteed, preemptableCap); } } } @@ -640,20 +691,43 @@ private TempQueue cloneQueues(CSQueue root, Resource clusterResources) { Resource current = Resources.multiply(clusterResources, absUsed); Resource guaranteed = Resources.multiply(clusterResources, absCap); Resource maxCapacity = Resources.multiply(clusterResources, absMaxCap); + + // Only read the per-queue preemption property from the conf if the queue + // is over capacity. This is so that when the queue is under capacity, it + // will be part of the calculations for ideal resource distribution. + boolean queueDisablePreemption = false; + if (Resources.greaterThan(rc, clusterResources, current, guaranteed)) { + String queuePropName = BASE_YARN_RM_PREEMPTION + root.getQueuePath() + + SUFFIX_DISABLE_PREEMPTION; + queueDisablePreemption = + this.conf.getBoolean(queuePropName, false); + } + + Resource extra = Resource.newInstance(0, 0); + if (Resources.greaterThan(rc, clusterResources, current, guaranteed)) { + extra = Resources.subtract(current, guaranteed); + } if (root instanceof LeafQueue) { LeafQueue l = (LeafQueue) root; Resource pending = l.getTotalResourcePending(); ret = new TempQueue(queueName, current, pending, guaranteed, maxCapacity); - + if (!queueDisablePreemption) { + ret.preemptableExtra = extra; + } ret.setLeafQueue(l); } else { Resource pending = Resource.newInstance(0, 0); ret = new TempQueue(root.getQueueName(), current, pending, guaranteed, maxCapacity); + Resource preemptableExtra = Resource.newInstance(0, 0); for (CSQueue c : root.getChildQueues()) { - ret.addChild(cloneQueues(c, clusterResources)); + TempQueue subq = cloneQueues(c, clusterResources); + Resources.addTo(preemptableExtra, subq.preemptableExtra); + ret.addChild(subq); } + ret.preemptableExtra = + Resources.min(rc, clusterResources, preemptableExtra, extra); } } return ret; @@ -691,6 +765,7 @@ public int compare(TempQueue o1, TempQueue o2) { Resource idealAssigned; Resource toBePreempted; Resource actuallyPreempted; + Resource preemptableExtra; double normalizedGuarantee; @@ -709,6 +784,7 @@ public int compare(TempQueue o1, TempQueue o2) { this.toBePreempted = Resource.newInstance(0, 0); this.normalizedGuarantee = Float.NaN; this.children = new ArrayList(); + this.preemptableExtra = Resource.newInstance(0, 0); } public void setLeafQueue(LeafQueue l){ @@ -762,6 +838,7 @@ public String toString() { .append(" IDEAL_ASSIGNED: ").append(idealAssigned) .append(" IDEAL_PREEMPT: ").append(toBePreempted) .append(" ACTUAL_PREEMPT: ").append(actuallyPreempted) + .append(" PREEMPTABLE: ").append(preemptableExtra) .append("\n"); return sb.toString(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 8a2840e..f90e87d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -17,16 +17,19 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.BASE_YARN_RM_PREEMPTION; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MAX_IGNORED_OVER_CAPACITY; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY; +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.SUFFIX_DISABLE_PREEMPTION; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.isA; @@ -98,6 +101,8 @@ ApplicationId.newInstance(TS, 3), 0); final ApplicationAttemptId appE = ApplicationAttemptId.newInstance( ApplicationId.newInstance(TS, 4), 0); + final ApplicationAttemptId appF = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(TS, 4), 0); final ArgumentCaptor evtCaptor = ArgumentCaptor.forClass(ContainerPreemptEvent.class); @@ -259,6 +264,148 @@ public void testDeadzone() { } @Test + public void testPerQueueDisablePreemption() { + int[][] qData = new int[][]{ + // / A B C + { 100, 55, 25, 20 }, // abs + { 100, 100, 100, 100 }, // maxCap + { 100, 0, 54, 46 }, // used + { 10, 10, 0, 0 }, // pending + { 0, 0, 0, 0 }, // reserved + { 3, 1, 1, 1 }, // apps + { -1, 1, 1, 1 }, // req granularity + { 3, 0, 0, 0 }, // subqueues + }; + + conf.setBoolean(BASE_YARN_RM_PREEMPTION + + "root.queueB" + SUFFIX_DISABLE_PREEMPTION, true); + + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + // With PREEMPTION_DISABLED set for queueB, get resources from queueC + verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC))); + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB))); + + // With no PREEMPTION_DISABLED set for queueB, resources will be preempted + // from both queueB and queueC. Test must be reset for so that the mDisp + // event handler will count only events from the following test and not the + // previous one. + setup(); + ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData); + + conf.setBoolean(BASE_YARN_RM_PREEMPTION + + "root.queueB" + SUFFIX_DISABLE_PREEMPTION, false); + policy2.editSchedule(); + + verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appB))); + verify(mDisp, times(6)).handle(argThat(new IsPreemptionRequestFor(appC))); + } + + @Test + public void testPerQueueDisablePreemptionHierarchical() { + int[][] qData = new int[][] { + // / A B C D E F + { 200, 100, 50, 50, 100, 10, 90 }, // abs + { 200, 200, 200, 200, 200, 200, 200 }, // maxCap + { 200, 110, 60, 50, 90, 90, 0 }, // used + { 10, 0, 0, 0, 10, 0, 10 }, // pending + { 0, 0, 0, 0, 0, 0, 0 }, // reserved + // appA appB appC appD + { 4, 2, 1, 1, 2, 1, 1 }, // apps + { -1, -1, 1, 1, -1, 1, 1 }, // req granularity + { 2, 2, 0, 0, 2, 0, 0 }, // subqueues + }; + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + // verify capacity taken from queueB (appA), not queueE (appC) despite + // queueE being far over its absolute capacity because queueA (queueB's + // parent) is over capacity and queueD (queueE's parent) is not. + ApplicationAttemptId expectedAttemptOnQueueB = + ApplicationAttemptId.newInstance( + appA.getApplicationId(), appA.getAttemptId()); + assertTrue("appA should be running on queueB", + mCS.getAppsInQueue("queueB").contains(expectedAttemptOnQueueB)); + verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA))); + + // Need to call setup() again to reset mDisp + setup(); + // Disable preemption for queueB and it's children + conf.setBoolean(BASE_YARN_RM_PREEMPTION + + "root.queueA.queueB" + SUFFIX_DISABLE_PREEMPTION, true); + ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData); + policy2.editSchedule(); + ApplicationAttemptId expectedAttemptOnQueueC = + ApplicationAttemptId.newInstance( + appB.getApplicationId(), appB.getAttemptId()); + ApplicationAttemptId expectedAttemptOnQueueE = + ApplicationAttemptId.newInstance( + appC.getApplicationId(), appC.getAttemptId()); + // Now, all of queueB's (appA) over capacity is not preemptable, so neither + // is queueA's. Verify that capacity is taken from queueE (appC). + assertTrue("appB should be running on queueC", + mCS.getAppsInQueue("queueC").contains(expectedAttemptOnQueueC)); + assertTrue("appC should be running on queueE", + mCS.getAppsInQueue("queueE").contains(expectedAttemptOnQueueE)); + // Resources should have come from queueE (appC) and neither of queueA's + // children. + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); + verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC))); + } + + @Test + public void testPerQueueDisablePreemptionBroadHierarchical() { + int[][] qData = new int[][] { + // / A D G + // B C E F H I + { 200, 70, 30, 40, 80, 40, 40, 50, 20, 30 }, // abs + { 200, 200, 200, 200, 200, 200, 200, 200, 200, 200 }, // maxCap + { 200, 80, 40, 40, 80, 50, 30, 40, 30, 10 }, // used + { 10, 0, 0, 0, 10, 0, 10, 0, 0, 0 }, // pending + { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved + // appA appB appC appD appE appF + { 6, 2, 1, 1, 2, 1, 1, 2, 1, 1 }, // apps + { -1, -1, 1, 1, -1, 1, 1, -1, 1, 1 }, // req granulrity + { 3, 2, 0, 0, 2, 0, 0, 2, 0, 0 }, // subqueues + }; + + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + // queueF(appD) wants resources, Verify that resources come from queueE(appC) + // because it's a sibling and queueB(appA) because queueA is over capacity. + verify(mDisp, times(6)).handle(argThat(new IsPreemptionRequestFor(appA))); + verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appC))); + + // Need to call setup() again to reset mDisp + setup(); + // Disable preemption for queueB(appA) and it's children + conf.setBoolean(BASE_YARN_RM_PREEMPTION + + "root.queueA.queueB" + SUFFIX_DISABLE_PREEMPTION, true); + ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData); + policy2.editSchedule(); + // Now that queueB(appA) is not preemptable, verify that resources come + // from queueE(appC) + verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC))); + + setup(); + // Disable preemption for two of the 3 queues with over-capacity. + conf.setBoolean(BASE_YARN_RM_PREEMPTION + + "root.queueD.queueE" + SUFFIX_DISABLE_PREEMPTION, true); + conf.setBoolean(BASE_YARN_RM_PREEMPTION + + "root.queueA.queueB" + SUFFIX_DISABLE_PREEMPTION, true); + ProportionalCapacityPreemptionPolicy policy3 = buildPolicy(qData); + policy3.editSchedule(); + + // Verify that the request was starved out even though queueH(appE) is + // over capacity. This is because queueG (queueH's parent) is NOT + // overcapacity. + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB))); + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appC))); + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appE))); + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appF))); + } + + @Test public void testOverCapacityImbalance() { int[][] qData = new int[][]{ // / A B C @@ -642,6 +789,10 @@ ParentQueue mockNested(int[] abs, int[] maxCap, int[] used, when(q.getAbsoluteUsedCapacity()).thenReturn(used[i] / tot); when(q.getAbsoluteCapacity()).thenReturn(abs[i] / tot); when(q.getAbsoluteMaximumCapacity()).thenReturn(maxCap[i] / tot); + String parentPathName = p.getQueuePath(); + parentPathName = (parentPathName == null) ? "root" : parentPathName; + String queuePathName = (parentPathName+"."+queueName).replace("/","root"); + when(q.getQueuePath()).thenReturn(queuePathName); } assert 0 == pqs.size(); return root; @@ -664,6 +815,8 @@ ParentQueue mockParentQueue(ParentQueue p, int subqueues, LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs, int[] used, int[] pending, int[] reserved, int[] apps, int[] gran) { LeafQueue lq = mock(LeafQueue.class); + List appAttemptIdList = + new ArrayList(); when(lq.getTotalResourcePending()).thenReturn( Resource.newInstance(pending[i], 0)); // consider moving where CapacityScheduler::comparator accessible @@ -681,9 +834,14 @@ public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) { int aPending = pending[i] / apps[i]; int aReserve = reserved[i] / apps[i]; for (int a = 0; a < apps[i]; ++a) { - qApps.add(mockApp(i, appAlloc, aUsed, aPending, aReserve, gran[i])); + FiCaSchedulerApp mockFiCaApp = + mockApp(i, appAlloc, aUsed, aPending, aReserve, gran[i]); + qApps.add(mockFiCaApp); ++appAlloc; + appAttemptIdList.add(mockFiCaApp.getApplicationAttemptId()); } + when(mCS.getAppsInQueue("queue" + (char)('A' + i - 1))) + .thenReturn(appAttemptIdList); } when(lq.getApplications()).thenReturn(qApps); if(setAMResourcePercent != 0.0f){