diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index cea3d7c..5738645 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -111,6 +111,9 @@ public static final String NATURAL_TERMINATION_FACTOR = "yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor"; + public static final String BASE_YARN_RM_PREEMPTION = "yarn.scheduler.capacity."; + public static final String SUFFIX_DISABLE_PREEMPTION = ".disable_preemption"; + // the dispatcher to send preempt and kill events public EventHandler dispatcher; @@ -125,6 +128,7 @@ private float percentageClusterPreemptionAllowed; private double naturalTerminationFactor; private boolean observeOnly; + private Configuration conf; public ProportionalCapacityPreemptionPolicy() { clock = new SystemClock(); @@ -164,6 +168,7 @@ public void init(Configuration config, config.getFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 0.1); observeOnly = config.getBoolean(OBSERVE_ONLY, false); rc = scheduler.getResourceCalculator(); + conf = config; } @VisibleForTesting @@ -370,6 +375,15 @@ private void computeIdealResourceDistribution(ResourceCalculator rc, private void computeFixpointAllocation(ResourceCalculator rc, Resource tot_guarant, Collection qAlloc, Resource unassigned, boolean ignoreGuarantee) { + + for (TempQueue q : qAlloc) { + if (Resources.greaterThan(rc, tot_guarant, + q.untouchableExtra, Resources.none())) { + q.idealAssigned = Resources.add(q.guaranteed, q.untouchableExtra); + Resources.subtractFrom(unassigned, q.idealAssigned); + } + } + //assign all cluster resources until no more demand, or no resources are left while (!qAlloc.isEmpty() && Resources.greaterThan(rc, tot_guarant, unassigned, Resources.none())) { @@ -640,20 +654,43 @@ private TempQueue cloneQueues(CSQueue root, Resource clusterResources) { Resource current = Resources.multiply(clusterResources, absUsed); Resource guaranteed = Resources.multiply(clusterResources, absCap); Resource maxCapacity = Resources.multiply(clusterResources, absMaxCap); + + // Only read the per-queue preemption property from the conf if the queue + // is over capacity. This is so that when the queue is under capacity, it + // will be part of the calculations for ideal resource distribution. + boolean queueDisablePreemption = false; + if (Resources.greaterThan(rc, clusterResources, current, guaranteed)) { + String queuePropName = BASE_YARN_RM_PREEMPTION + root.getQueuePath() + + SUFFIX_DISABLE_PREEMPTION; + queueDisablePreemption = + this.conf.getBoolean(queuePropName, false); + } + + Resource extra = Resource.newInstance(0, 0); + if (Resources.greaterThan(rc, clusterResources, current, guaranteed)) { + extra = Resources.subtract(current, guaranteed); + } if (root instanceof LeafQueue) { LeafQueue l = (LeafQueue) root; Resource pending = l.getTotalResourcePending(); ret = new TempQueue(queueName, current, pending, guaranteed, maxCapacity); - + if (queueDisablePreemption) { + ret.untouchableExtra = extra; + } ret.setLeafQueue(l); } else { Resource pending = Resource.newInstance(0, 0); ret = new TempQueue(root.getQueueName(), current, pending, guaranteed, maxCapacity); + Resource childrensExtra = Resource.newInstance(0, 0); for (CSQueue c : root.getChildQueues()) { - ret.addChild(cloneQueues(c, clusterResources)); + TempQueue subq = cloneQueues(c, clusterResources); + Resources.addTo(childrensExtra, subq.untouchableExtra); + ret.addChild(subq); } + ret.untouchableExtra = + Resources.min(rc, clusterResources, childrensExtra, extra); } } return ret; @@ -691,6 +728,7 @@ public int compare(TempQueue o1, TempQueue o2) { Resource idealAssigned; Resource toBePreempted; Resource actuallyPreempted; + Resource untouchableExtra; double normalizedGuarantee; @@ -709,6 +747,7 @@ public int compare(TempQueue o1, TempQueue o2) { this.toBePreempted = Resource.newInstance(0, 0); this.normalizedGuarantee = Float.NaN; this.children = new ArrayList(); + this.untouchableExtra = Resource.newInstance(0, 0); } public void setLeafQueue(LeafQueue l){ @@ -740,12 +779,23 @@ public void addChildren(ArrayList queues) { // the unused ones Resource offer(Resource avail, ResourceCalculator rc, Resource clusterResource) { - // remain = avail - min(avail, (max - assigned), (current + pending - assigned)) - Resource accepted = - Resources.min(rc, clusterResource, + Resource adjustedCur = current; + if (Resources.greaterThan(rc, clusterResource, + untouchableExtra, Resources.none())) { + Resource extra = Resources.subtract(current, guaranteed); + Resource preemptableExtra = Resources.subtract(extra, untouchableExtra); + adjustedCur = Resources.subtract(current, preemptableExtra); + } + + // remain = avail - + // min( avail, + // (max - assigned), + // ( ((current - preemptableExtra) + pending) - assigned)) + Resource accepted = + Resources.min(rc, clusterResource, Resources.subtract(maxCapacity, idealAssigned), Resources.min(rc, clusterResource, avail, Resources.subtract( - Resources.add(current, pending), idealAssigned))); + Resources.add(adjustedCur, pending), idealAssigned))); Resource remain = Resources.subtract(avail, accepted); Resources.addTo(idealAssigned, accepted); return remain; @@ -762,6 +812,7 @@ public String toString() { .append(" IDEAL_ASSIGNED: ").append(idealAssigned) .append(" IDEAL_PREEMPT: ").append(toBePreempted) .append(" ACTUAL_PREEMPT: ").append(actuallyPreempted) + .append(" UNTOUCHABLE: ").append(untouchableExtra) .append("\n"); return sb.toString(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 8a2840e..8a29a5a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -17,16 +17,19 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.BASE_YARN_RM_PREEMPTION; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MAX_IGNORED_OVER_CAPACITY; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY; +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.SUFFIX_DISABLE_PREEMPTION; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.isA; @@ -98,6 +101,8 @@ ApplicationId.newInstance(TS, 3), 0); final ApplicationAttemptId appE = ApplicationAttemptId.newInstance( ApplicationId.newInstance(TS, 4), 0); + final ApplicationAttemptId appF = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(TS, 4), 0); final ArgumentCaptor evtCaptor = ArgumentCaptor.forClass(ContainerPreemptEvent.class); @@ -259,6 +264,150 @@ public void testDeadzone() { } @Test + public void testPerQueueDisablePreemption() { + int[][] qData = new int[][]{ + // / A B C + { 100, 55, 25, 20 }, // abs + { 100, 100, 100, 100 }, // maxCap + { 100, 0, 54, 46 }, // used + { 10, 10, 0, 0 }, // pending + { 0, 0, 0, 0 }, // reserved + // appA appB appC + { 3, 1, 1, 1 }, // apps + { -1, 1, 1, 1 }, // req granularity + { 3, 0, 0, 0 }, // subqueues + }; + + conf.setBoolean(BASE_YARN_RM_PREEMPTION + + "root.queueB" + SUFFIX_DISABLE_PREEMPTION, true); + + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + // With PREEMPTION_DISABLED set for queueB, get resources from queueC + verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC))); + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB))); + + // With no PREEMPTION_DISABLED set for queueB, resources will be preempted + // from both queueB and queueC. Test must be reset for so that the mDisp + // event handler will count only events from the following test and not the + // previous one. + setup(); + ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData); + + conf.setBoolean(BASE_YARN_RM_PREEMPTION + + "root.queueB" + SUFFIX_DISABLE_PREEMPTION, false); + policy2.editSchedule(); + + verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appB))); + verify(mDisp, times(6)).handle(argThat(new IsPreemptionRequestFor(appC))); + } + + @Test + public void testPerQueueDisablePreemptionHierarchical() { + int[][] qData = new int[][] { + // / A B C D E F + { 200, 100, 50, 50, 100, 10, 90 }, // abs + { 200, 200, 200, 200, 200, 200, 200 }, // maxCap + { 200, 109, 60, 49, 91, 91, 0 }, // used + { 10, 0, 0, 0, 10, 0, 10 }, // pending + { 0, 0, 0, 0, 0, 0, 0 }, // reserved + // appA appB appC appD + { 4, 2, 1, 1, 2, 1, 1 }, // apps + { -1, -1, 1, 1, -1, 1, 1 }, // req granularity + { 2, 2, 0, 0, 2, 0, 0 }, // subqueues + }; + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + // verify capacity taken from queueB (appA), not queueE (appC) despite + // queueE being far over its absolute capacity because queueA (queueB's + // parent) is over capacity and queueD (queueE's parent) is not. + ApplicationAttemptId expectedAttemptOnQueueB = + ApplicationAttemptId.newInstance( + appA.getApplicationId(), appA.getAttemptId()); + assertTrue("appA should be running on queueB", + mCS.getAppsInQueue("queueB").contains(expectedAttemptOnQueueB)); + verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA))); + + // Need to call setup() again to reset mDisp + setup(); + // Disable preemption for queueB and it's children + conf.setBoolean(BASE_YARN_RM_PREEMPTION + + "root.queueA.queueB" + SUFFIX_DISABLE_PREEMPTION, true); + ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData); + policy2.editSchedule(); + ApplicationAttemptId expectedAttemptOnQueueC = + ApplicationAttemptId.newInstance( + appB.getApplicationId(), appB.getAttemptId()); + ApplicationAttemptId expectedAttemptOnQueueE = + ApplicationAttemptId.newInstance( + appC.getApplicationId(), appC.getAttemptId()); + // Now, all of queueB's (appA) over capacity is not preemptable, so neither + // is queueA's. Verify that capacity is taken from queueE (appC). + assertTrue("appB should be running on queueC", + mCS.getAppsInQueue("queueC").contains(expectedAttemptOnQueueC)); + assertTrue("appC should be running on queueE", + mCS.getAppsInQueue("queueE").contains(expectedAttemptOnQueueE)); + // Resources should have come from queueE (appC) and neither of queueA's + // children. + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); + verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC))); + } + + @Test + public void testPerQueueDisablePreemptionBroadHierarchical() { + int[][] qData = new int[][] { + // / A D G + // B C E F H I + {1000, 345, 145, 200, 405, 205, 200, 250, 100, 150 }, // abs + {1000,1000,1000,1000,1000,1000,1000,1000,1000,1000 }, // maxCap + {1000, 395, 195, 200, 405, 255, 150, 200, 150, 50 }, // used + { 50, 0, 0, 0, 50, 0, 50, 0, 0, 0 }, // pending + { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved + // appA appB appC appD appE appF + { 6, 2, 1, 1, 2, 1, 1, 2, 1, 1 }, // apps + { -1, -1, 1, 1, -1, 1, 1, -1, 1, 1 }, // req granulrity + { 3, 2, 0, 0, 2, 0, 0, 2, 0, 0 }, // subqueues + }; + + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + // queueF(appD) wants resources, Verify that resources come from queueE(appC) + // because it's a sibling and queueB(appA) because queueA is over capacity. + verify(mDisp, times(27)).handle(argThat(new IsPreemptionRequestFor(appA))); + verify(mDisp, times(27)).handle(argThat(new IsPreemptionRequestFor(appA))); + + // Need to call setup() again to reset mDisp + setup(); + // Disable preemption for queueB(appA) and it's children + conf.setBoolean(BASE_YARN_RM_PREEMPTION + + "root.queueA.queueB" + SUFFIX_DISABLE_PREEMPTION, true); + ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData); + policy2.editSchedule(); + // Now that queueB(appA) is not preemptable, verify that resources come + // from queueE(appC) + verify(mDisp, times(49)).handle(argThat(new IsPreemptionRequestFor(appC))); + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); + + setup(); + // Disable preemption for two of the 3 queues with over-capacity. + conf.setBoolean(BASE_YARN_RM_PREEMPTION + + "root.queueD.queueE" + SUFFIX_DISABLE_PREEMPTION, true); + conf.setBoolean(BASE_YARN_RM_PREEMPTION + + "root.queueA.queueB" + SUFFIX_DISABLE_PREEMPTION, true); + ProportionalCapacityPreemptionPolicy policy3 = buildPolicy(qData); + policy3.editSchedule(); + + // Verify that the request was starved out even though queueH(appE) is + // over capacity. This is because queueG (queueH's parent) is NOT + // overcapacity. + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB))); + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appC))); + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appE))); + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appF))); + } + + @Test public void testOverCapacityImbalance() { int[][] qData = new int[][]{ // / A B C @@ -642,6 +791,10 @@ ParentQueue mockNested(int[] abs, int[] maxCap, int[] used, when(q.getAbsoluteUsedCapacity()).thenReturn(used[i] / tot); when(q.getAbsoluteCapacity()).thenReturn(abs[i] / tot); when(q.getAbsoluteMaximumCapacity()).thenReturn(maxCap[i] / tot); + String parentPathName = p.getQueuePath(); + parentPathName = (parentPathName == null) ? "root" : parentPathName; + String queuePathName = (parentPathName+"."+queueName).replace("/","root"); + when(q.getQueuePath()).thenReturn(queuePathName); } assert 0 == pqs.size(); return root; @@ -664,6 +817,8 @@ ParentQueue mockParentQueue(ParentQueue p, int subqueues, LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs, int[] used, int[] pending, int[] reserved, int[] apps, int[] gran) { LeafQueue lq = mock(LeafQueue.class); + List appAttemptIdList = + new ArrayList(); when(lq.getTotalResourcePending()).thenReturn( Resource.newInstance(pending[i], 0)); // consider moving where CapacityScheduler::comparator accessible @@ -681,9 +836,14 @@ public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) { int aPending = pending[i] / apps[i]; int aReserve = reserved[i] / apps[i]; for (int a = 0; a < apps[i]; ++a) { - qApps.add(mockApp(i, appAlloc, aUsed, aPending, aReserve, gran[i])); + FiCaSchedulerApp mockFiCaApp = + mockApp(i, appAlloc, aUsed, aPending, aReserve, gran[i]); + qApps.add(mockFiCaApp); ++appAlloc; + appAttemptIdList.add(mockFiCaApp.getApplicationAttemptId()); } + when(mCS.getAppsInQueue("queue" + (char)('A' + i - 1))) + .thenReturn(appAttemptIdList); } when(lq.getApplications()).thenReturn(qApps); if(setAMResourcePercent != 0.0f){