diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index cea3d7c..85e2d71 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -434,11 +434,11 @@ private void resetCapacity(ResourceCalculator rc, Resource clusterResource, * @param clusterResource total amount of cluster resources * @return a map of applciationID to set of containers to preempt */ - private Map> getContainersToPreempt( + private Map> getContainersToPreempt( List queues, Resource clusterResource) { - Map> preemptMap = - new HashMap>(); + Map> preemptMap = + new HashMap>(); List skippedAMContainerlist = new ArrayList(); for (TempQueue qT : queues) { @@ -448,27 +448,19 @@ private void resetCapacity(ResourceCalculator rc, Resource clusterResource, Resources.multiply(qT.guaranteed, 1.0 + maxIgnoredOverCapacity))) { // we introduce a dampening factor naturalTerminationFactor that // accounts for natural termination of containers - Resource resToObtain = - Resources.multiply(qT.toBePreempted, naturalTerminationFactor); + Resource resToObtain = Resources.multiply(qT.toBePreempted, + naturalTerminationFactor); Resource skippedAMSize = Resource.newInstance(0, 0); + Map> userLimitContainers = null; // lock the leafqueue while we scan applications and unreserve synchronized (qT.leafQueue) { - NavigableSet ns = - (NavigableSet) qT.leafQueue.getApplications(); - Iterator desc = ns.descendingIterator(); + NavigableSet ns = (NavigableSet) qT.leafQueue + .getApplications(); qT.actuallyPreempted = Resources.clone(resToObtain); - while (desc.hasNext()) { - FiCaSchedulerApp fc = desc.next(); - if (Resources.lessThanOrEqual(rc, clusterResource, resToObtain, - Resources.none())) { - break; - } - preemptMap.put( - fc.getApplicationAttemptId(), - preemptFrom(fc, clusterResource, resToObtain, - skippedAMContainerlist, skippedAMSize)); - } + userLimitContainers = balanceUserLimitsinQueueForPreemption(qT, + clusterResource, rc, resToObtain, skippedAMContainerlist, + skippedAMSize); Resource maxAMCapacityForThisQueue = Resources.multiply( Resources.multiply(clusterResource, qT.leafQueue.getAbsoluteCapacity()), @@ -480,12 +472,104 @@ private void resetCapacity(ResourceCalculator rc, Resource clusterResource, preemptAMContainers(clusterResource, preemptMap, skippedAMContainerlist, resToObtain, skippedAMSize, maxAMCapacityForThisQueue); + if (userLimitContainers != null) { + for (Map.Entry> entry : userLimitContainers + .entrySet()) { + ApplicationAttemptId id = entry.getKey(); + Set containers = userLimitContainers.get(id); + if (containers != null) { + if (preemptMap.containsKey(id)) { + preemptMap.get(id).addAll(containers); + } else { + preemptMap.put(id, containers); + } + } + } + } } } } return preemptMap; } + + /** + * this will balance the queue with user limits when preemption is needed. + * + * @param qT + * @param clusterResource + * @param rc + * @param resToObtain + * @return + */ + private Map> balanceUserLimitsinQueueForPreemption( + TempQueue qT, Resource clusterResource, ResourceCalculator rc, + Resource resToObtain, List skippedAMContainerlist, + Resource skippedAMSize) { + Map> list = + new HashMap>(); + NavigableSet ns = + (NavigableSet) qT.leafQueue + .getApplications(); + // List skippedAMContainerlist = new ArrayList(); + // Resource skippedAMSize = Resource.newInstance(0, 0); + Resource userLimitforQueue = qT.getLeafQueue().computeTargetedUserLimit(rc, + clusterResource, resToObtain); + Map userMarkedPreemptedResource = + new HashMap(); + Iterator desc = ns.descendingIterator(); + while (desc.hasNext()) { + FiCaSchedulerApp fc = desc.next(); + ApplicationAttemptId appId = fc.getApplicationAttemptId(); + if (Resources.lessThanOrEqual(rc, clusterResource, resToObtain, + Resources.none())) { + break; + } + Resource userConsumedResource = Resources.none(); + if (userMarkedPreemptedResource.containsKey(fc.getUser())) { + userConsumedResource = Resources.subtract( + qT.leafQueue.getUser(fc.getUser()).getConsumedResources(), + userMarkedPreemptedResource.get(fc.getUser())); + + } else { + userConsumedResource = qT.leafQueue.getUser(fc.getUser()) + .getConsumedResources(); + } + + if (Resources.lessThan(rc, clusterResource, userLimitforQueue, + userConsumedResource)) { + // As we have used more resources the user limit, + // we need to claim back the resources equivalent to + // consumed resources by user - user limit + Resource resourcesToClaimBackFromUser = Resources.subtract( + userConsumedResource, userLimitforQueue); + resourcesToClaimBackFromUser = Resources.min(rc, clusterResource, + resourcesToClaimBackFromUser, resToObtain); + resourcesToClaimBackFromUser = Resources.lessThan(rc, clusterResource, + resourcesToClaimBackFromUser, resToObtain) ? resourcesToClaimBackFromUser + : Resources.clone(resToObtain); + Resource initialRes = Resources.clone(resourcesToClaimBackFromUser); + Set containers = preemptFrom(fc, clusterResource, + resourcesToClaimBackFromUser, skippedAMContainerlist, skippedAMSize); + Resources.subtractFrom(resToObtain, + Resources.subtract(initialRes, resourcesToClaimBackFromUser)); + list.put(appId, containers); + if (userMarkedPreemptedResource.containsKey(fc.getUser())) { + userMarkedPreemptedResource.put(fc.getUser(), Resources.add( + Resources.subtract(initialRes, resourcesToClaimBackFromUser), + userMarkedPreemptedResource.get(fc.getUser()))); + } else { + userMarkedPreemptedResource.put(fc.getUser(), + Resources.subtract(initialRes, resourcesToClaimBackFromUser)); + } + + } else { + continue; + } + } + return list; + } + /** * As more resources are needed for preemption, saved AMContainers has to be * rescanned. Such AMContainers can be preempted based on resToObtain, but @@ -530,9 +614,9 @@ private void preemptAMContainers(Resource clusterResource, } /** - * Given a target preemption for a specific application, select containers - * to preempt (after unreserving all reservation for that app). - * + * Given a target preemption for a specific application, select containers to + * preempt (after unreserving all reservation for that app). + * * @param app * @param clusterResource * @param rsrcPreempt @@ -545,11 +629,11 @@ private void preemptAMContainers(Resource clusterResource, ApplicationAttemptId appId = app.getApplicationAttemptId(); // first drop reserved containers towards rsrcPreempt - List reservations = - new ArrayList(app.getReservedContainers()); + List reservations = new ArrayList( + app.getReservedContainers()); for (RMContainer c : reservations) { - if (Resources.lessThanOrEqual(rc, clusterResource, - rsrcPreempt, Resources.none())) { + if (Resources.lessThanOrEqual(rc, clusterResource, rsrcPreempt, + Resources.none())) { return ret; } if (!observeOnly) { @@ -562,16 +646,17 @@ private void preemptAMContainers(Resource clusterResource, // if more resources are to be freed go through all live containers in // reverse priority and reverse allocation order and mark them for // preemption - List containers = - new ArrayList(app.getLiveContainers()); + List containers = new ArrayList( + app.getLiveContainers()); sortContainers(containers); for (RMContainer c : containers) { - if (Resources.lessThanOrEqual(rc, clusterResource, - rsrcPreempt, Resources.none())) { + if (Resources.lessThanOrEqual(rc, clusterResource, rsrcPreempt, + Resources.none())) { return ret; } + // Skip AM Container from preemption for now. if (c.isAMContainer()) { skippedAMContainerlist.add(c); @@ -792,6 +877,9 @@ void appendLogString(StringBuilder sb) { .append(actuallyPreempted.getVirtualCores()); } - } + public LeafQueue getLeafQueue() { + return leafQueue; + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 65938aa..e3d6f64 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -1078,6 +1078,59 @@ private Resource computeUserLimit(FiCaSchedulerApp application, return limit; } + @Lock(NoLock.class) + public Resource computeTargetedUserLimit( + ResourceCalculator resourceCalculator, Resource clusterResource, + Resource restoObtain) { + + // What is our current capacity? + // * It is equal to the max(required, queue-capacity) if + // we're running below capacity. The 'max' ensures that jobs in queues + // with miniscule capacity (< 1 slot) make progress + // * If we're running over capacity, then its + // (usedResources + required) (which extra resources we are allocating) + + // Allow progress for queues with miniscule capacity + final Resource queueCapacity = Resources.multiplyAndNormalizeUp( + resourceCalculator, clusterResource, getAbsoluteCapacity(), + getMinimumAllocation()); + + Resource currentCapacity = Resources.lessThan(resourceCalculator, + clusterResource, getUsedResources(), queueCapacity) ? queueCapacity + : getUsedResources(); + + Resource targetedCapacity = Resources + .subtract(currentCapacity, restoObtain); + + // Never allow a single user to take more than the + // queue's configured capacity * user-limit-factor. + // Also, the queue's configured capacity should be higher than + // queue-hard-limit * ulMin + + final int activeUsers = getActiveUsersManager().getNumActiveUsers(); + + Resource limit = Resources.roundUp(resourceCalculator, Resources.min( + resourceCalculator, clusterResource, Resources.max(resourceCalculator, + clusterResource, Resources.divideAndCeil(resourceCalculator, + targetedCapacity, activeUsers), Resources.divideAndCeil( + resourceCalculator, Resources.multiplyAndRoundDown( + targetedCapacity, getUserLimit()), 100)), Resources + .multiplyAndRoundDown(queueCapacity, getUserLimitFactor())), + getMinimumAllocation()); + + if (LOG.isDebugEnabled()) { + LOG.debug("User limit computation in queue " + getQueueName() + + " userLimit=" + getUserLimit() + " userLimitFactor=" + + getUserLimitFactor() + " required: " + restoObtain + " limit: " + + limit + " queueCapacity: " + queueCapacity + " qconsumed: " + + getUsedResources() + " currentCapacity: " + currentCapacity + + " targetedCapacity: " + targetedCapacity + " activeUsers: " + + activeUsers + " clusterCapacity: " + clusterResource); + } + + return limit; + } + private synchronized boolean assignToUser(Resource clusterResource, String userName, Resource limit) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 8a2840e..b99a097 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -35,6 +35,8 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.spy; import java.util.ArrayList; import java.util.Comparator; @@ -45,6 +47,8 @@ import java.util.Random; import java.util.TreeSet; +import net.sf.ehcache.store.chm.ConcurrentHashMap; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -63,6 +67,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue.User; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.util.Clock; @@ -81,6 +86,10 @@ int appAlloc = 0; boolean setAMContainer = false; + boolean setUserLimit = false; + boolean setdifferentUser = false; + boolean setConsumedResources = false; + ConcurrentHashMap userConsumedResources = null; float setAMResourcePercent = 0.0f; Random rand = null; Clock mClock = null; @@ -122,6 +131,7 @@ public void setup() { System.out.println(name.getMethodName() + " SEED: " + seed); rand.setSeed(seed); appAlloc = 0; + userConsumedResources = new ConcurrentHashMap(); } @Test @@ -156,9 +166,13 @@ public void testProportionalPreemption() { { -1, 1, 1, 1, 1 }, // req granularity { 4, 0, 0, 0, 0 }, // subqueues }; + setConsumedResources = true; + userConsumedResources.put("user1", Resource.newInstance(16, 0)); + userConsumedResources.put("user2", Resource.newInstance(3, 0)); ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); policy.editSchedule(); verify(mDisp, times(16)).handle(argThat(new IsPreemptionRequestFor(appA))); + setConsumedResources = false; } @Test @@ -370,12 +384,16 @@ public void testZeroGuarOverCap() { { -1, -1, 1, 1, 1, -1, 1 }, // req granularity { 2, 3, 0, 0, 0, 1, 0 }, // subqueues }; + setConsumedResources = true; + userConsumedResources.put("user1", Resource.newInstance(14, 0)); ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); policy.editSchedule(); // we verify both that C has priority on B and D (has it has >0 guarantees) // and that B and D are force to share their over capacity fairly (as they // are both zero-guarantees) hence D sees some of its containers preempted verify(mDisp, times(14)).handle(argThat(new IsPreemptionRequestFor(appC))); + setConsumedResources = false; + userConsumedResources = null; } @@ -484,6 +502,7 @@ public void testSkipAMContainer() { }; setAMContainer = true; ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); // By skipping AM Container, all other 24 containers of appD will be @@ -514,6 +533,10 @@ public void testPreemptSkippedAMContainers() { { 2, 0, 0 }, // subqueues }; setAMContainer = true; + setConsumedResources = true; + userConsumedResources.put("user1", Resource.newInstance(45, 0)); + userConsumedResources.put("user2", Resource.newInstance(45, 0)); + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); policy.editSchedule(); @@ -531,6 +554,94 @@ public void testPreemptSkippedAMContainers() { // preempted verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appA))); setAMContainer = false; + setConsumedResources = true; + } + + @Test + public void testPreemptionUserLimitReached() { + int[][] qData = new int[][] { + // / A B + { 100, 50, 50 }, // abs + { 100, 100, 100 }, // maxcap + { 100, 100, 0 }, // used + { 70, 20, 50 }, // pending + { 0, 0, 0 }, // reserved + { 5, 4, 1 }, // apps + { -1, 1, 1 }, // req granularity + { 2, 0, 0 }, // subqueues + }; + setAMContainer = true; + setConsumedResources = true; + userConsumedResources.put("user1", Resource.newInstance(25, 0)); + userConsumedResources.put("user2", Resource.newInstance(26, 0)); + + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + + // By skipping AM Container, all other 24 containers of appD will be + // preempted + verify(mDisp, times(24)).handle(argThat(new IsPreemptionRequestFor(appD))); + + // By skipping AM Container, all other 24 containers of appC will be + // preempted + verify(mDisp, times(24)).handle(argThat(new IsPreemptionRequestFor(appC))); + + // Since AM containers of appC and appD are saved, 2 containers from appB + // has to be preempted as User limit is recahed and there is no more + // balancing needed in terms of resources. + verify(mDisp, times(2)).handle(argThat(new IsPreemptionRequestFor(appB))); + + // Checking APPA is also preempted due to user limit. + verify(mDisp, times(0)).handle(argThat(new IsPreemptionRequestFor(appA))); + setAMContainer = false; + setConsumedResources = false; + } + + @Test + public void testPreemptionUserLimit() { + int[][] qData = new int[][] { + // / A B + { 100, 50, 50 }, // abs + { 100, 100, 100 }, // maxcap + { 100, 100, 0 }, // used + { 70, 20, 50 }, // pending + { 0, 0, 0 }, // reserved + { 5, 4, 1 }, // apps + { -1, 1, 1 }, // req granularity + { 2, 0, 0 }, // subqueues + }; + setAMContainer = true; + setUserLimit = true; + setdifferentUser = true; + setConsumedResources = true; + userConsumedResources.put("user1", Resource.newInstance(2, 0)); + userConsumedResources.put("user2", Resource.newInstance(2, 0)); + userConsumedResources.put("user3", Resource.newInstance(25, 0)); + userConsumedResources.put("user4", Resource.newInstance(25, 0)); + userConsumedResources.put("user5", Resource.newInstance(25, 0)); + + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + + // By skipping AM Container, all other 24 containers of appD will be + // preempted + verify(mDisp, times(24)).handle(argThat(new IsPreemptionRequestFor(appD))); + + // By skipping AM Container, all other 24 containers of appC will be + // preempted + verify(mDisp, times(24)).handle(argThat(new IsPreemptionRequestFor(appC))); + + // Since AM containers of appC and appD are saved, 1 containers from appB + // has to be preempted as 1 Container from APPA is also preempted due + // to exceeding the user limit. + verify(mDisp, times(1)).handle(argThat(new IsPreemptionRequestFor(appB))); + + // Checking APPA is also preempted due to user limit. + verify(mDisp, times(1)).handle(argThat(new IsPreemptionRequestFor(appA))); + setAMContainer = false; + setUserLimit = false; + setdifferentUser = false; + setConsumedResources = false; } @Test @@ -548,6 +659,10 @@ public void testAMResourcePercentForSkippedAMContainers() { }; setAMContainer = true; setAMResourcePercent = 0.5f; + setConsumedResources = true; + userConsumedResources.put("user1", Resource.newInstance(45, 0)); + userConsumedResources.put("user2", Resource.newInstance(45, 0)); + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); policy.editSchedule(); @@ -568,6 +683,7 @@ public void testAMResourcePercentForSkippedAMContainers() { // preempted verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appA))); setAMContainer = false; + setConsumedResources = false; } static class IsPreemptionRequestFor @@ -595,13 +711,14 @@ public String toString() { ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData) { ProportionalCapacityPreemptionPolicy policy = - new ProportionalCapacityPreemptionPolicy(conf, mDisp, mCS, mClock); + spy(new ProportionalCapacityPreemptionPolicy(conf, mDisp, mCS, mClock)); ParentQueue mRoot = buildMockRootQueue(rand, qData); when(mCS.getRootQueue()).thenReturn(mRoot); Resource clusterResources = Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0); when(mCS.getClusterResource()).thenReturn(clusterResources); + return policy; } @@ -661,23 +778,72 @@ ParentQueue mockParentQueue(ParentQueue p, int subqueues, return pq; } - LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs, + LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs, int[] used, int[] pending, int[] reserved, int[] apps, int[] gran) { LeafQueue lq = mock(LeafQueue.class); when(lq.getTotalResourcePending()).thenReturn( Resource.newInstance(pending[i], 0)); + + if (setUserLimit) { + when( + lq.computeTargetedUserLimit(any(ResourceCalculator.class), + any(Resource.class), any(Resource.class))).thenReturn( + Resource.newInstance(1, 0)); + + } else { + when( + lq.computeTargetedUserLimit(any(ResourceCalculator.class), + any(Resource.class), any(Resource.class))).thenReturn( + Resource.newInstance(0, 0)); + } + + if (!setConsumedResources) { + User usr1 = mock(User.class); + when(lq.getUser(any(String.class))).thenReturn(usr1); + when(usr1.getConsumedResources()).thenReturn( + Resource.newInstance(abs[i], 0)); + } else { + User usr1 = mock(User.class); + User usr2 = mock(User.class); + User usr3 = mock(User.class); + User usr4 = mock(User.class); + User usr5 = mock(User.class); + when(lq.getUser("user1")).thenReturn(usr1); + when(lq.getUser("user2")).thenReturn(usr2); + when(lq.getUser("user3")).thenReturn(usr3); + when(lq.getUser("user4")).thenReturn(usr4); + when(lq.getUser("user5")).thenReturn(usr5); + for (String usr : userConsumedResources.keySet()) { + if (usr.equalsIgnoreCase("user1")) { + when(usr1.getConsumedResources()).thenReturn( + userConsumedResources.get(usr)); + } else if (usr.equalsIgnoreCase("user2")) { + when(usr2.getConsumedResources()).thenReturn( + userConsumedResources.get(usr)); + } else if (usr.equalsIgnoreCase("user3")) { + when(usr3.getConsumedResources()).thenReturn( + userConsumedResources.get(usr)); + } else if (usr.equalsIgnoreCase("user4")) { + when(usr4.getConsumedResources()).thenReturn( + userConsumedResources.get(usr)); + } else if (usr.equalsIgnoreCase("user5")) { + when(usr5.getConsumedResources()).thenReturn( + userConsumedResources.get(usr)); + } + } + } // consider moving where CapacityScheduler::comparator accessible NavigableSet qApps = new TreeSet( - new Comparator() { - @Override - public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) { - return a1.getApplicationAttemptId() - .compareTo(a2.getApplicationAttemptId()); - } - }); + new Comparator() { + @Override + public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) { + return a1.getApplicationAttemptId().compareTo( + a2.getApplicationAttemptId()); + } + }); // applications are added in global L->R order in queues if (apps[i] != 0) { - int aUsed = used[i] / apps[i]; + int aUsed = used[i] / apps[i]; int aPending = pending[i] / apps[i]; int aReserve = reserved[i] / apps[i]; for (int a = 0; a < apps[i]; ++a) { @@ -686,8 +852,9 @@ public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) { } } when(lq.getApplications()).thenReturn(qApps); - if(setAMResourcePercent != 0.0f){ - when(lq.getMaxAMResourcePerQueuePercent()).thenReturn(setAMResourcePercent); + if (setAMResourcePercent != 0.0f) { + when(lq.getMaxAMResourcePerQueuePercent()).thenReturn( + setAMResourcePercent); } p.getChildQueues().add(lq); return lq; @@ -700,8 +867,18 @@ FiCaSchedulerApp mockApp(int qid, int id, int used, int pending, int reserved, ApplicationId appId = ApplicationId.newInstance(TS, id); ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(appId, 0); when(app.getApplicationId()).thenReturn(appId); + when(app.getApplicationAttemptId()).thenReturn(appAttId); - + if (!setdifferentUser) { + if ((id % 2) == 0) { + when(app.getUser()).thenReturn("user1"); + } else { + when(app.getUser()).thenReturn("user2"); + } + } else { + int newId = id+1; + when(app.getUser()).thenReturn("user" + newId); + } int cAlloc = 0; Resource unit = Resource.newInstance(gran, 0); List cReserved = new ArrayList();