diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index cea3d7c..21cc396 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -111,6 +111,9 @@ public static final String NATURAL_TERMINATION_FACTOR = "yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor"; + public static final String BASE_YARN_RM_PREEMPTION = "yarn.scheduler.capacity."; + public static final String SUFFIX_DISABLE_PREEMPTION = ".disable_preemption"; + // the dispatcher to send preempt and kill events public EventHandler dispatcher; @@ -125,6 +128,7 @@ private float percentageClusterPreemptionAllowed; private double naturalTerminationFactor; private boolean observeOnly; + private Configuration conf; public ProportionalCapacityPreemptionPolicy() { clock = new SystemClock(); @@ -164,6 +168,7 @@ public void init(Configuration config, config.getFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 0.1); observeOnly = config.getBoolean(OBSERVE_ONLY, false); rc = scheduler.getResourceCalculator(); + conf = config; } @VisibleForTesting @@ -370,11 +375,38 @@ private void computeIdealResourceDistribution(ResourceCalculator rc, private void computeFixpointAllocation(ResourceCalculator rc, Resource tot_guarant, Collection qAlloc, Resource unassigned, boolean ignoreGuarantee) { + UnpreemptablePendingQueues unpreemptablePending = + new UnpreemptablePendingQueues(rc, tot_guarant); + // If any over-capacity queue(s) at this level is using non-premptable + // resources, set the idealAssigned for that queue(s) to be its current, + // subtract those resources from the unassigned pool, and remove this + // queue(s) from consideration for distribution of the rest of the + // unassigned resources. + // If a non-preemptable, over-capacity queue is also asking for more + // resources, place that queue into an UnpreemptablePendingQueue because + // once the other queues have first pick at the remaining resources, we + // may need to put that queue(s) back into consideration. + for (Iterator i = qAlloc.iterator(); i.hasNext();) { + TempQueue q = i.next(); + if (Resources.greaterThan(rc, tot_guarant, + q.untouchableExtra, Resources.none())) { + q.idealAssigned = Resources.add(q.untouchableExtra, q.guaranteed); + Resources.subtractFrom(unassigned, q.idealAssigned); + i.remove(); + // If q is also asking for resources, put it in the unpreemptablePending + // list in case we need to bring it back. + if (Resources.greaterThan(rc, tot_guarant, + q.pending, Resources.none())) { + unpreemptablePending.add(q); + } + } + } + //assign all cluster resources until no more demand, or no resources are left while (!qAlloc.isEmpty() && Resources.greaterThan(rc, tot_guarant, unassigned, Resources.none())) { Resource wQassigned = Resource.newInstance(0, 0); - + boolean addBackUnpreemptablePending = false; // we compute normalizedGuarantees capacity based on currently active // queues resetCapacity(rc, unassigned, qAlloc, ignoreGuarantee); @@ -385,8 +417,10 @@ private void computeFixpointAllocation(ResourceCalculator rc, TempQueue sub = i.next(); Resource wQavail = Resources.multiply(unassigned, sub.normalizedGuarantee); - Resource wQidle = sub.offer(wQavail, rc, tot_guarant); + Resource wQidle = sub.offer(wQavail, rc, tot_guarant, + unpreemptablePending); Resource wQdone = Resources.subtract(wQavail, wQidle); + // if the queue returned a value > 0 it means it is fully satisfied // and it is removed from the list of active queues qAlloc if (!Resources.greaterThan(rc, tot_guarant, @@ -394,8 +428,17 @@ private void computeFixpointAllocation(ResourceCalculator rc, i.remove(); } Resources.addTo(wQassigned, wQdone); + // If the current queue has now reached the same (or greater) level of + // over-capacity as the non-preemptable queues that were taken out of + // rotation above, set the flag to add them back in. + if (unpreemptablePending.greaterThanOrEqual(sub)) { + addBackUnpreemptablePending = true; + } } Resources.subtractFrom(unassigned, wQassigned); + if (addBackUnpreemptablePending) { + qAlloc.add(unpreemptablePending.remove()); + } } } @@ -640,20 +683,43 @@ private TempQueue cloneQueues(CSQueue root, Resource clusterResources) { Resource current = Resources.multiply(clusterResources, absUsed); Resource guaranteed = Resources.multiply(clusterResources, absCap); Resource maxCapacity = Resources.multiply(clusterResources, absMaxCap); + + // Only read the per-queue preemption property from the conf if the queue + // is over capacity. This is so that when the queue is under capacity, it + // will be part of the calculations for ideal resource distribution. + boolean queueDisablePreemption = false; + if (Resources.greaterThan(rc, clusterResources, current, guaranteed)) { + String queuePropName = BASE_YARN_RM_PREEMPTION + root.getQueuePath() + + SUFFIX_DISABLE_PREEMPTION; + queueDisablePreemption = + this.conf.getBoolean(queuePropName, false); + } + + Resource extra = Resource.newInstance(0, 0); + if (Resources.greaterThan(rc, clusterResources, current, guaranteed)) { + extra = Resources.subtract(current, guaranteed); + } if (root instanceof LeafQueue) { LeafQueue l = (LeafQueue) root; Resource pending = l.getTotalResourcePending(); ret = new TempQueue(queueName, current, pending, guaranteed, maxCapacity); - + if (queueDisablePreemption) { + ret.untouchableExtra = extra; + } ret.setLeafQueue(l); } else { Resource pending = Resource.newInstance(0, 0); ret = new TempQueue(root.getQueueName(), current, pending, guaranteed, maxCapacity); + Resource childrensExtra = Resource.newInstance(0, 0); for (CSQueue c : root.getChildQueues()) { - ret.addChild(cloneQueues(c, clusterResources)); + TempQueue subq = cloneQueues(c, clusterResources); + Resources.addTo(childrensExtra, subq.untouchableExtra); + ret.addChild(subq); } + ret.untouchableExtra = + Resources.min(rc, clusterResources, childrensExtra, extra); } } return ret; @@ -691,6 +757,7 @@ public int compare(TempQueue o1, TempQueue o2) { Resource idealAssigned; Resource toBePreempted; Resource actuallyPreempted; + Resource untouchableExtra; double normalizedGuarantee; @@ -709,6 +776,7 @@ public int compare(TempQueue o1, TempQueue o2) { this.toBePreempted = Resource.newInstance(0, 0); this.normalizedGuarantee = Float.NaN; this.children = new ArrayList(); + this.untouchableExtra = Resource.newInstance(0, 0); } public void setLeafQueue(LeafQueue l){ @@ -739,13 +807,21 @@ public void addChildren(ArrayList queues) { // This function "accepts" all the resources it can (pending) and return // the unused ones Resource offer(Resource avail, ResourceCalculator rc, - Resource clusterResource) { - // remain = avail - min(avail, (max - assigned), (current + pending - assigned)) - Resource accepted = - Resources.min(rc, clusterResource, - Resources.subtract(maxCapacity, idealAssigned), - Resources.min(rc, clusterResource, avail, Resources.subtract( - Resources.add(current, pending), idealAssigned))); + Resource clusterResource, + UnpreemptablePendingQueues unpreemptablePending) { + // Calculate the amount of resources that would bring this queue up to the + // same percentage of over-capacity as the first item in the + // UnpreemptablePendingQueue. + Resource levelize = unpreemptablePending.amountToLevelize(this); + // remain = avail - min(avail, (max - assigned), (current + pending - assigned), levelize) + Resource accepted = + Resources.min(rc, clusterResource, + Resources.min(rc, clusterResource, + Resources.subtract(maxCapacity, idealAssigned), + Resources.min(rc, clusterResource, avail, Resources.subtract( + Resources.add(current, pending), idealAssigned))), + levelize); + Resource remain = Resources.subtract(avail, accepted); Resources.addTo(idealAssigned, accepted); return remain; @@ -762,10 +838,19 @@ public String toString() { .append(" IDEAL_ASSIGNED: ").append(idealAssigned) .append(" IDEAL_PREEMPT: ").append(toBePreempted) .append(" ACTUAL_PREEMPT: ").append(actuallyPreempted) + .append(" UNTOUCHABLE: ").append(untouchableExtra) .append("\n"); return sb.toString(); } + + public void printAll() { + LOG.info(this.toString()); + for (TempQueue sub : this.getChildren()) { + sub.printAll(); + } + } + public void assignPreemption(float scalingFactor, ResourceCalculator rc, Resource clusterResource) { if (Resources.greaterThan(rc, clusterResource, current, idealAssigned)) { @@ -794,4 +879,90 @@ void appendLogString(StringBuilder sb) { } + /** + * Holds a list of queues at a particular level that are + * 1) unpreemptable + * 2) over capacity + * 3) asking for more resources + * 4) already have been assigned resources up to their current level + * + * During the assignment of available resources at a given level, these + * queues are given their resources first and then taken out of consideration. + * The other queues at this level are given first priority for the remaining + * resources.. If any remaining queues ever levelize to over-capacity back up + * to the level of these queues, these queues are added back into + * consideration for assignment of the remaining resources. + */ + static class UnpreemptablePendingQueues { + private List queues; + private ResourceCalculator rc; + private Resource clusterResources; + + UnpreemptablePendingQueues(ResourceCalculator rc, + Resource clusterResources) { + this.rc = rc; + this.clusterResources = clusterResources; + this.queues = new ArrayList(); + } + + /** + * Add queues in ascending order of the percentage over their guaranteed. + * ASSUMPTION: Only queues that are over their guaranteed capacity are put + * into this object. + */ + public void add(TempQueue qToAdd) { + int index = 0; + boolean done = false; + double qToAddPctOver = calculatePctOver(qToAdd); + for (; index < queues.size() && !done ; index++) { + TempQueue q = queues.get(index); + double qEntryPctOver = calculatePctOver(q); + if (qToAddPctOver < qEntryPctOver) { + done = true; + } + } + queues.add(index, qToAdd); + } + + public TempQueue remove() { + return queues.remove(0); + } + + // Is tq's pct overage is greater than or equal to the first item on the + // UnpreemptablePendingQueues? + public boolean greaterThanOrEqual(TempQueue tq) { + boolean isGreaterThanOrEqual = false; + if (!queues.isEmpty()) { + isGreaterThanOrEqual = + (calculatePctOver(tq) >= calculatePctOver(queues.get(0))); + } + return isGreaterThanOrEqual; + } + + /** + * Calculates the amount it would take to bring the specified queue to the + * same percentage of over capacity as the first element on the + * UnpreemptablePendingQueue. + * The return value is used in a "min" calculation, so it needs to be + * initialized to IntMax. + */ + public Resource amountToLevelize(TempQueue tq) { + Resource levelize = + Resource.newInstance(Integer.MAX_VALUE, Integer.MAX_VALUE); + if (!queues.isEmpty()) { + levelize = + Resources.subtract( + Resources.multiply(tq.guaranteed,1 + calculatePctOver(queues.get(0))), + tq.idealAssigned); + } + return levelize; + } + + // Could be negative if queue's idealAssigned is under its capacity. + private double calculatePctOver(TempQueue q) { + double pctOver = + Resources.divide(rc, clusterResources, q.idealAssigned, q.guaranteed); + return (pctOver - 1); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 8a2840e..412d408 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -17,16 +17,19 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.BASE_YARN_RM_PREEMPTION; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MAX_IGNORED_OVER_CAPACITY; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY; +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.SUFFIX_DISABLE_PREEMPTION; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.isA; @@ -98,6 +101,8 @@ ApplicationId.newInstance(TS, 3), 0); final ApplicationAttemptId appE = ApplicationAttemptId.newInstance( ApplicationId.newInstance(TS, 4), 0); + final ApplicationAttemptId appF = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(TS, 4), 0); final ArgumentCaptor evtCaptor = ArgumentCaptor.forClass(ContainerPreemptEvent.class); @@ -259,6 +264,179 @@ public void testDeadzone() { } @Test + public void testPerQueueDisablePreemption() { + int[][] qData = new int[][]{ + // / A B C + { 100, 55, 25, 20 }, // abs + { 100, 100, 100, 100 }, // maxCap + { 100, 0, 54, 46 }, // used + { 10, 10, 0, 0 }, // pending + { 0, 0, 0, 0 }, // reserved + // appA appB appC + { 3, 1, 1, 1 }, // apps + { -1, 1, 1, 1 }, // req granularity + { 3, 0, 0, 0 }, // subqueues + }; + + conf.setBoolean(BASE_YARN_RM_PREEMPTION + + "root.queueB" + SUFFIX_DISABLE_PREEMPTION, true); + + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + // With PREEMPTION_DISABLED set for queueB, get resources from queueC + verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC))); + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB))); + + // With no PREEMPTION_DISABLED set for queueB, resources will be preempted + // from both queueB and queueC. Test must be reset for so that the mDisp + // event handler will count only events from the following test and not the + // previous one. + setup(); + ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData); + + conf.setBoolean(BASE_YARN_RM_PREEMPTION + + "root.queueB" + SUFFIX_DISABLE_PREEMPTION, false); + policy2.editSchedule(); + + verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appB))); + verify(mDisp, times(6)).handle(argThat(new IsPreemptionRequestFor(appC))); + } + + @Test + public void testDisablePreemptionOverCapPlusPending() { + int[][] qData = new int[][]{ + // / A B C + { 90, 30, 30, 30 }, // abs + { 90, 90, 90, 90 }, // maxCap + { 90, 40, 50, 0 }, // used + { 10, 50, 0, 0 }, // pending + { 0, 0, 0, 0 }, // reserved + // appA appB + { 3, 1, 1, 1 }, // apps + { -1, 1, 1, 1 }, // req granularity + { 3, 0, 0, 0 }, // subqueues + }; + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appB))); + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); + + setup(); + conf.setBoolean(BASE_YARN_RM_PREEMPTION + + "root.queueA" + SUFFIX_DISABLE_PREEMPTION, true); + + ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData); + policy2.editSchedule(); + verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appB))); + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); + } + + @Test + public void testPerQueueDisablePreemptionHierarchical() { + int[][] qData = new int[][] { + // / A B C D E F + { 200, 100, 50, 50, 100, 10, 90 }, // abs + { 200, 200, 200, 200, 200, 200, 200 }, // maxCap + { 200, 109, 60, 49, 91, 91, 0 }, // used + { 10, 0, 0, 0, 10, 0, 10 }, // pending + { 0, 0, 0, 0, 0, 0, 0 }, // reserved + // appA appB appC appD + { 4, 2, 1, 1, 2, 1, 1 }, // apps + { -1, -1, 1, 1, -1, 1, 1 }, // req granularity + { 2, 2, 0, 0, 2, 0, 0 }, // subqueues + }; + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + // verify capacity taken from queueB (appA), not queueE (appC) despite + // queueE being far over its absolute capacity because queueA (queueB's + // parent) is over capacity and queueD (queueE's parent) is not. + ApplicationAttemptId expectedAttemptOnQueueB = + ApplicationAttemptId.newInstance( + appA.getApplicationId(), appA.getAttemptId()); + assertTrue("appA should be running on queueB", + mCS.getAppsInQueue("queueB").contains(expectedAttemptOnQueueB)); + verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA))); + + // Need to call setup() again to reset mDisp + setup(); + // Disable preemption for queueB and it's children + conf.setBoolean(BASE_YARN_RM_PREEMPTION + + "root.queueA.queueB" + SUFFIX_DISABLE_PREEMPTION, true); + ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData); + policy2.editSchedule(); + ApplicationAttemptId expectedAttemptOnQueueC = + ApplicationAttemptId.newInstance( + appB.getApplicationId(), appB.getAttemptId()); + ApplicationAttemptId expectedAttemptOnQueueE = + ApplicationAttemptId.newInstance( + appC.getApplicationId(), appC.getAttemptId()); + // Now, all of queueB's (appA) over capacity is not preemptable, so neither + // is queueA's. Verify that capacity is taken from queueE (appC). + assertTrue("appB should be running on queueC", + mCS.getAppsInQueue("queueC").contains(expectedAttemptOnQueueC)); + assertTrue("appC should be running on queueE", + mCS.getAppsInQueue("queueE").contains(expectedAttemptOnQueueE)); + // Resources should have come from queueE (appC) and neither of queueA's + // children. + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); + verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC))); + } + + @Test + public void testPerQueueDisablePreemptionBroadHierarchical() { + int[][] qData = new int[][] { + // / A D G + // B C E F H I + {1000, 345, 145, 200, 405, 205, 200, 250, 100, 150 }, // abs + {1000,1000,1000,1000,1000,1000,1000,1000,1000,1000 }, // maxCap + {1000, 395, 195, 200, 405, 255, 150, 200, 150, 50 }, // used + { 50, 0, 0, 0, 50, 0, 50, 0, 0, 0 }, // pending + { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved + // appA appB appC appD appE appF + { 6, 2, 1, 1, 2, 1, 1, 2, 1, 1 }, // apps + { -1, -1, 1, 1, -1, 1, 1, -1, 1, 1 }, // req granulrity + { 3, 2, 0, 0, 2, 0, 0, 2, 0, 0 }, // subqueues + }; + + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + // queueF(appD) wants resources, Verify that resources come from queueE(appC) + // because it's a sibling and queueB(appA) because queueA is over capacity. + verify(mDisp, times(27)).handle(argThat(new IsPreemptionRequestFor(appA))); + verify(mDisp, times(22)).handle(argThat(new IsPreemptionRequestFor(appC))); + + // Need to call setup() again to reset mDisp + setup(); + // Disable preemption for queueB(appA) and it's children + conf.setBoolean(BASE_YARN_RM_PREEMPTION + + "root.queueA.queueB" + SUFFIX_DISABLE_PREEMPTION, true); + ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData); + policy2.editSchedule(); + // Now that queueB(appA) is not preemptable, verify that resources come + // from queueE(appC) + verify(mDisp, times(49)).handle(argThat(new IsPreemptionRequestFor(appC))); + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); + + setup(); + // Disable preemption for two of the 3 queues with over-capacity. + conf.setBoolean(BASE_YARN_RM_PREEMPTION + + "root.queueD.queueE" + SUFFIX_DISABLE_PREEMPTION, true); + conf.setBoolean(BASE_YARN_RM_PREEMPTION + + "root.queueA.queueB" + SUFFIX_DISABLE_PREEMPTION, true); + ProportionalCapacityPreemptionPolicy policy3 = buildPolicy(qData); + policy3.editSchedule(); + + // Verify that the request was starved out even though queueH(appE) is + // over capacity. This is because queueG (queueH's parent) is NOT + // overcapacity. + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB))); + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appC))); + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appE))); + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appF))); + } + + @Test public void testOverCapacityImbalance() { int[][] qData = new int[][]{ // / A B C @@ -642,6 +820,10 @@ ParentQueue mockNested(int[] abs, int[] maxCap, int[] used, when(q.getAbsoluteUsedCapacity()).thenReturn(used[i] / tot); when(q.getAbsoluteCapacity()).thenReturn(abs[i] / tot); when(q.getAbsoluteMaximumCapacity()).thenReturn(maxCap[i] / tot); + String parentPathName = p.getQueuePath(); + parentPathName = (parentPathName == null) ? "root" : parentPathName; + String queuePathName = (parentPathName+"."+queueName).replace("/","root"); + when(q.getQueuePath()).thenReturn(queuePathName); } assert 0 == pqs.size(); return root; @@ -664,6 +846,8 @@ ParentQueue mockParentQueue(ParentQueue p, int subqueues, LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs, int[] used, int[] pending, int[] reserved, int[] apps, int[] gran) { LeafQueue lq = mock(LeafQueue.class); + List appAttemptIdList = + new ArrayList(); when(lq.getTotalResourcePending()).thenReturn( Resource.newInstance(pending[i], 0)); // consider moving where CapacityScheduler::comparator accessible @@ -681,9 +865,14 @@ public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) { int aPending = pending[i] / apps[i]; int aReserve = reserved[i] / apps[i]; for (int a = 0; a < apps[i]; ++a) { - qApps.add(mockApp(i, appAlloc, aUsed, aPending, aReserve, gran[i])); + FiCaSchedulerApp mockFiCaApp = + mockApp(i, appAlloc, aUsed, aPending, aReserve, gran[i]); + qApps.add(mockFiCaApp); ++appAlloc; + appAttemptIdList.add(mockFiCaApp.getApplicationAttemptId()); } + when(mCS.getAppsInQueue("queue" + (char)('A' + i - 1))) + .thenReturn(appAttemptIdList); } when(lq.getApplications()).thenReturn(qApps); if(setAMResourcePercent != 0.0f){