diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 0f48b0c..335ff93 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -425,20 +425,22 @@ private void resetCapacity(ResourceCalculator rc, Resource clusterResource, } /** - * Based a resource preemption target drop reservations of containers and - * if necessary select containers for preemption from applications in each - * over-capacity queue. It uses {@link #NATURAL_TERMINATION_FACTOR} to - * account for containers that will naturally complete. - * - * @param queues set of leaf queues to preempt from - * @param clusterResource total amount of cluster resources + * Based a resource preemption target drop reservations of containers and if + * necessary select containers for preemption from applications in each + * over-capacity queue. It uses {@link #NATURAL_TERMINATION_FACTOR} to account + * for containers that will naturally complete. + * + * @param queues + * set of leaf queues to preempt from + * @param clusterResource + * total amount of cluster resources * @return a map of applciationID to set of containers to preempt */ - private Map> getContainersToPreempt( + private Map> getContainersToPreempt( List queues, Resource clusterResource) { - Map> preemptMap = - new HashMap>(); + Map> preemptMap = + new HashMap>(); List skippedAMContainerlist = new ArrayList(); for (TempQueue qT : queues) { @@ -448,32 +450,32 @@ private void resetCapacity(ResourceCalculator rc, Resource clusterResource, Resources.multiply(qT.guaranteed, 1.0 + maxIgnoredOverCapacity))) { // we introduce a dampening factor naturalTerminationFactor that // accounts for natural termination of containers - Resource resToObtain = - Resources.multiply(qT.toBePreempted, naturalTerminationFactor); + Resource resToObtain = Resources.multiply(qT.toBePreempted, + naturalTerminationFactor); Resource skippedAMSize = Resource.newInstance(0, 0); - + Map> userLimitContainers = null; + Map> distributedContainers = null; // lock the leafqueue while we scan applications and unreserve synchronized (qT.leafQueue) { - NavigableSet ns = - (NavigableSet) qT.leafQueue.getApplications(); - Iterator desc = ns.descendingIterator(); + qT.actuallyPreempted = Resources.clone(resToObtain); - while (desc.hasNext()) { - FiCaSchedulerApp fc = desc.next(); - if (Resources.lessThanOrEqual(rc, clusterResource, resToObtain, - Resources.none())) { - break; - } - preemptMap.put( - fc.getApplicationAttemptId(), - preemptFrom(fc, clusterResource, resToObtain, - skippedAMContainerlist, skippedAMSize)); + userLimitContainers = balanceUserLimitsinQueueForPreemption(qT, + clusterResource, rc, resToObtain, skippedAMContainerlist, + skippedAMSize); + if (Resources.greaterThan(rc, clusterResource, resToObtain, + Resources.none())) { + distributedContainers = distributePreemptionforUsers(qT, + clusterResource, rc, resToObtain, skippedAMContainerlist, + skippedAMSize, userLimitContainers); } Resource maxAMCapacityForThisQueue = Resources.multiply( Resources.multiply(clusterResource, qT.leafQueue.getAbsoluteCapacity()), qT.leafQueue.getMaxAMResourcePerQueuePercent()); + fillContainers(preemptMap, userLimitContainers); + fillContainers(preemptMap, distributedContainers); + // Can try preempting AMContainers (still saving atmost // maxAMCapacityForThisQueue AMResource's) if more resources are // required to be preempted from this Queue. @@ -485,6 +487,170 @@ private void resetCapacity(ResourceCalculator rc, Resource clusterResource, } return preemptMap; } + + private void fillContainers( + Map> preemptMap, + Map> containersMap) { + if (containersMap != null) { + for (Map.Entry> entry : containersMap + .entrySet()) { + ApplicationAttemptId id = entry.getKey(); + Set containers = containersMap.get(id); + if (containers != null) { + if (preemptMap.containsKey(id)) { + preemptMap.get(id).addAll(containers); + } else { + preemptMap.put(id, containers); + } + } + } + } + } + + private Map> distributePreemptionforUsers( + TempQueue qT, Resource clusterResource, ResourceCalculator rc, + Resource resToObtain, List skippedAMContainerlist, + Resource skippedAMSize, + Map> prevList) { + + Map> list = + new HashMap>(); + NavigableSet ns = + (NavigableSet) qT.leafQueue.getApplications(); + + int numUsers = qT.getLeafQueue().getActiveUsersManager() + .getNumActiveUsers(); + Resource resourceToClaimFromEachUser = Resources.divideAndCeil(rc, + resToObtain, numUsers); + + Map userMarkedPreemptedResource = + new HashMap(); + + Iterator desc = ns.descendingIterator(); + + while (desc.hasNext()) { + + FiCaSchedulerApp fc = desc.next(); + ApplicationAttemptId appId = fc.getApplicationAttemptId(); + + if (Resources.lessThanOrEqual(rc, clusterResource, resToObtain, + Resources.none())) { + break; + } + + Resource userPreemptedResource = Resources.none(); + + if (userMarkedPreemptedResource.containsKey(fc.getUser())) { + userPreemptedResource = userMarkedPreemptedResource.get(fc.getUser()); + } + + if (Resources + .lessThanOrEqual(rc, clusterResource, Resources.subtract( + resourceToClaimFromEachUser, userPreemptedResource), Resources + .none())) { + break; + } + + Resource resourcesToClaimBackFromUser = Resources + .clone(resourceToClaimFromEachUser); + + Resource initialRes = Resources.clone(resourcesToClaimBackFromUser); + + Set containers = preemptFrom(fc, clusterResource, + resourcesToClaimBackFromUser, skippedAMContainerlist, skippedAMSize, + prevList); + + Resources.subtractFrom(resToObtain, + Resources.subtract(initialRes, resourcesToClaimBackFromUser)); + list.put(appId, containers); + + if (userMarkedPreemptedResource.containsKey(fc.getUser())) { + userMarkedPreemptedResource.put(fc.getUser(), Resources.add( + Resources.subtract(initialRes, resourcesToClaimBackFromUser), + userMarkedPreemptedResource.get(fc.getUser()))); + } else { + userMarkedPreemptedResource.put(fc.getUser(), + Resources.subtract(initialRes, resourcesToClaimBackFromUser)); + } + } + return list; + } + + /** + * this will balance the queue with user limits when preemption is needed. + * + * @param qT + * @param clusterResource + * @param rc + * @param resToObtain + * @return + */ + private Map> balanceUserLimitsinQueueForPreemption( + TempQueue qT, Resource clusterResource, ResourceCalculator rc, + Resource resToObtain, List skippedAMContainerlist, + Resource skippedAMSize) { + Map> list = + new HashMap>(); + NavigableSet ns = (NavigableSet) qT.leafQueue + .getApplications(); + // List skippedAMContainerlist = new ArrayList(); + // Resource skippedAMSize = Resource.newInstance(0, 0); + Resource userLimitforQueue = qT.getLeafQueue().computeTargetedUserLimit(rc, + clusterResource, resToObtain); + + Map userMarkedPreemptedResource = new HashMap(); + Iterator desc = ns.descendingIterator(); + while (desc.hasNext()) { + FiCaSchedulerApp fc = desc.next(); + ApplicationAttemptId appId = fc.getApplicationAttemptId(); + if (Resources.lessThanOrEqual(rc, clusterResource, resToObtain, + Resources.none())) { + break; + } + Resource userConsumedResource = Resources.none(); + if (userMarkedPreemptedResource.containsKey(fc.getUser())) { + userConsumedResource = Resources.subtract( + qT.leafQueue.getUser(fc.getUser()).getTotalConsumedResources(), + userMarkedPreemptedResource.get(fc.getUser())); + + } else { + userConsumedResource = qT.leafQueue.getUser(fc.getUser()) + .getTotalConsumedResources(); + } + + if (Resources.lessThan(rc, clusterResource, userLimitforQueue, + userConsumedResource)) { + // As we have used more resources the user limit, + // we need to claim back the resources equivalent to + // consumed resources by user - user limit + Resource resourcesToClaimBackFromUser = Resources.subtract( + userConsumedResource, userLimitforQueue); + + resourcesToClaimBackFromUser = Resources.lessThan(rc, clusterResource, + resourcesToClaimBackFromUser, resToObtain) ? resourcesToClaimBackFromUser + : Resources.clone(resToObtain); + + Resource initialRes = Resources.clone(resourcesToClaimBackFromUser); + Set containers = preemptFrom(fc, clusterResource, + resourcesToClaimBackFromUser, skippedAMContainerlist, + skippedAMSize, null); + Resources.subtractFrom(resToObtain, + Resources.subtract(initialRes, resourcesToClaimBackFromUser)); + list.put(appId, containers); + if (userMarkedPreemptedResource.containsKey(fc.getUser())) { + userMarkedPreemptedResource.put(fc.getUser(), Resources.add( + Resources.subtract(initialRes, resourcesToClaimBackFromUser), + userMarkedPreemptedResource.get(fc.getUser()))); + } else { + userMarkedPreemptedResource.put(fc.getUser(), + Resources.subtract(initialRes, resourcesToClaimBackFromUser)); + } + } else { + continue; + } + } + return list; + } /** * As more resources are needed for preemption, saved AMContainers has to be @@ -530,9 +696,9 @@ private void preemptAMContainers(Resource clusterResource, } /** - * Given a target preemption for a specific application, select containers - * to preempt (after unreserving all reservation for that app). - * + * Given a target preemption for a specific application, select containers to + * preempt (after unreserving all reservation for that app). + * * @param app * @param clusterResource * @param rsrcPreempt @@ -540,16 +706,15 @@ private void preemptAMContainers(Resource clusterResource, */ private Set preemptFrom(FiCaSchedulerApp app, Resource clusterResource, Resource rsrcPreempt, - List skippedAMContainerlist, Resource skippedAMSize) { + List skippedAMContainerlist, Resource skippedAMSize,Map> prevList) { Set ret = new HashSet(); ApplicationAttemptId appId = app.getApplicationAttemptId(); - // first drop reserved containers towards rsrcPreempt - List reservations = - new ArrayList(app.getReservedContainers()); + List reservations = new ArrayList( + app.getReservedContainers()); for (RMContainer c : reservations) { - if (Resources.lessThanOrEqual(rc, clusterResource, - rsrcPreempt, Resources.none())) { + if (Resources.lessThanOrEqual(rc, clusterResource, rsrcPreempt, + Resources.none())) { return ret; } if (!observeOnly) { @@ -558,33 +723,75 @@ private void preemptAMContainers(Resource clusterResource, } Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource()); } - // if more resources are to be freed go through all live containers in // reverse priority and reverse allocation order and mark them for // preemption - List containers = - new ArrayList(app.getLiveContainers()); + List containers = new ArrayList( + app.getLiveContainers()); sortContainers(containers); for (RMContainer c : containers) { - if (Resources.lessThanOrEqual(rc, clusterResource, - rsrcPreempt, Resources.none())) { + if (Resources.lessThanOrEqual(rc, clusterResource, rsrcPreempt, + Resources.none())) { return ret; } + + if(isContainerAlreadyPreempted(prevList,c,appId)){ + continue; + } + // Skip AM Container from preemption for now. if (c.isAMContainer()) { - skippedAMContainerlist.add(c); - Resources.addTo(skippedAMSize, c.getContainer().getResource()); + // Check if container is already in skip list + if (!containerInList(skippedAMContainerlist, c)) { + skippedAMContainerlist.add(c); + Resources.addTo(skippedAMSize, c.getContainer().getResource()); + } continue; } ret.add(c); Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource()); + } return ret; } + boolean isContainerAlreadyPreempted( + Map> prevList, + RMContainer container, ApplicationAttemptId appId) { + if (prevList != null) { + for (Map.Entry> e : prevList + .entrySet()) { + ApplicationAttemptId id = e.getKey(); + Set containers = prevList.get(id); + if (containers != null) { + Iterator iter = containers.iterator(); + while (iter.hasNext()) { + RMContainer c = iter.next(); + if (c.getContainerId().toString() + .equalsIgnoreCase(container.getContainerId().toString())) { + return true; + } + } + } + } + } + return false; + } + + boolean containerInList(List skippedAMContainerlist, + RMContainer c) { + for (RMContainer amc : skippedAMContainerlist) { + if (amc.getContainerId().toString() + .equalsIgnoreCase(c.getContainerId().toString())) { + return true; + } + } + return false; + } + /** * Compare by reversed priority order first, and then reversed containerId * order @@ -791,6 +998,9 @@ void appendLogString(StringBuilder sb) { .append(actuallyPreempted.getVirtualCores()); } - } + public LeafQueue getLeafQueue() { + return leafQueue; + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index eddf30f..9a4e7c9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.server.utils.Lock.NoLock; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; @@ -1151,6 +1152,70 @@ private Resource computeUserLimit(FiCaSchedulerApp application, return limit; } + @Lock(NoLock.class) + public Resource computeTargetedUserLimit( + ResourceCalculator resourceCalculator, Resource clusterResource, + Resource restoObtain) { + + // What is our current capacity? + // * It is equal to the max(required, queue-capacity) if + // we're running below capacity. The 'max' ensures that jobs in queues + // with miniscule capacity (< 1 slot) make progress + // * If we're running over capacity, then its + // (usedResources + required) (which extra resources we are allocating) + + // Allow progress for queues with miniscule capacity + final Resource queueCapacity = Resources.multiplyAndNormalizeUp( + resourceCalculator, clusterResource, getAbsoluteCapacity(), + getMinimumAllocation()); + + Resource currentCapacity = Resources.lessThan(resourceCalculator, + clusterResource, getUsedResources(), queueCapacity) ? queueCapacity + : getUsedResources(); + + Resource targetedCapacity = Resources + .subtract(currentCapacity, restoObtain); + + // Never allow a single user to take more than the + // queue's configured capacity * user-limit-factor. + // Also, the queue's configured capacity should be higher than + // queue-hard-limit * ulMin + + final int activeUsers = getActiveUsersManager().getNumActiveUsers(); + + Resource limit = + Resources.roundUp( + resourceCalculator, + Resources.min( + resourceCalculator, clusterResource, + Resources.max( + resourceCalculator, clusterResource, + Resources.divideAndCeil( + resourceCalculator, + targetedCapacity, activeUsers), + Resources.divideAndCeil( + resourceCalculator, + Resources.multiplyAndRoundDown( + targetedCapacity, getUserLimit()), 100) + ), + Resources.multiplyAndRoundDown(queueCapacity, getUserLimitFactor()) + ), + getMinimumAllocation()); + + if (LOG.isDebugEnabled()) { + LOG.debug("User limit computation in queue " + getQueueName() + + " userLimit=" + getUserLimit() + " userLimitFactor=" + + getUserLimitFactor() + " required: " + restoObtain + " limit: " + + limit + " queueCapacity: " + queueCapacity + " qconsumed: " + + getUsedResources() + " currentCapacity: " + currentCapacity + + " targetedCapacity: " + targetedCapacity + " activeUsers: " + + activeUsers + " clusterCapacity: " + clusterResource); + } + + return limit; + } + + @Private protected synchronized boolean assignToUser(Resource clusterResource, String userName, Resource limit, FiCaSchedulerApp application, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 24e70bb..5093e97 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -35,6 +35,8 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.spy; import java.util.ArrayList; import java.util.Comparator; @@ -45,6 +47,8 @@ import java.util.Random; import java.util.TreeSet; +import net.sf.ehcache.store.chm.ConcurrentHashMap; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -58,11 +62,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue.User; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.util.Clock; @@ -81,6 +87,11 @@ int appAlloc = 0; boolean setAMContainer = false; + boolean setUserLimit = false; + int userLimit = 1; + boolean setdifferentUser = false; + boolean setConsumedResources = false; + ConcurrentHashMap userConsumedResources = null; float setAMResourcePercent = 0.0f; Random rand = null; Clock mClock = null; @@ -129,6 +140,7 @@ public void setup() { System.out.println(name.getMethodName() + " SEED: " + seed); rand.setSeed(seed); appAlloc = 0; + userConsumedResources = new ConcurrentHashMap(); } @Test @@ -163,9 +175,13 @@ public void testProportionalPreemption() { { -1, 1, 1, 1, 1 }, // req granularity { 4, 0, 0, 0, 0 }, // subqueues }; + setConsumedResources = true; + userConsumedResources.put("user1", Resource.newInstance(16, 0)); + userConsumedResources.put("user2", Resource.newInstance(3, 0)); ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); policy.editSchedule(); verify(mDisp, times(16)).handle(argThat(new IsPreemptionRequestFor(appA))); + setConsumedResources = false; } @Test @@ -318,10 +334,15 @@ public void testObserveOnly() { { 3, 0, 0, 0 }, // subqueues }; conf.setBoolean(OBSERVE_ONLY, true); + setConsumedResources = true; + userConsumedResources.put("user1", Resource.newInstance(45, 0)); + userConsumedResources.put("user2", Resource.newInstance(45, 0)); + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); policy.editSchedule(); // verify even severe imbalance not affected verify(mDisp, never()).handle(isA(ContainerPreemptEvent.class)); + setConsumedResources = false; } @Test @@ -377,12 +398,16 @@ public void testZeroGuarOverCap() { { -1, -1, 1, 1, 1, -1, 1 }, // req granularity { 2, 3, 0, 0, 0, 1, 0 }, // subqueues }; + setConsumedResources = true; + userConsumedResources.put("user1", Resource.newInstance(14, 0)); ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); policy.editSchedule(); // we verify both that C has priority on B and D (has it has >0 guarantees) // and that B and D are force to share their over capacity fairly (as they // are both zero-guarantees) hence D sees some of its containers preempted verify(mDisp, times(14)).handle(argThat(new IsPreemptionRequestFor(appC))); + setConsumedResources = false; + userConsumedResources = null; } @@ -486,6 +511,7 @@ public void testSkipAMContainer() { }; setAMContainer = true; ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); // By skipping AM Container, all other 24 containers of appD will be @@ -516,6 +542,10 @@ public void testPreemptSkippedAMContainers() { { 2, 0, 0 }, // subqueues }; setAMContainer = true; + setConsumedResources = true; + userConsumedResources.put("user1", Resource.newInstance(45, 0)); + userConsumedResources.put("user2", Resource.newInstance(45, 0)); + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); policy.editSchedule(); @@ -533,6 +563,138 @@ public void testPreemptSkippedAMContainers() { // preempted verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appA))); setAMContainer = false; + setConsumedResources = true; + } + + @Test + public void testPreemptionUserLimitReached() { + int[][] qData = new int[][] { + // / A B + { 100, 50, 50 }, // abs + { 100, 100, 100 }, // maxcap + { 100, 100, 0 }, // used + { 70, 20, 50 }, // pending + { 0, 0, 0 }, // reserved + { 5, 4, 1 }, // apps + { -1, 1, 1 }, // req granularity + { 2, 0, 0 }, // subqueues + }; + setAMContainer = true; + setConsumedResources = true; + userConsumedResources.put("user1", Resource.newInstance(25, 0)); + userConsumedResources.put("user2", Resource.newInstance(26, 0)); + + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + + // By skipping AM Container, all other 24 containers of appD will be + // preempted + verify(mDisp, times(24)).handle(argThat(new IsPreemptionRequestFor(appD))); + + // By skipping AM Container, all other 24 containers of appC will be + // preempted + verify(mDisp, times(24)).handle(argThat(new IsPreemptionRequestFor(appC))); + + // Since AM containers of appC and appD are saved, 2 containers from appB + // has to be preempted as User limit is recahed and there is no more + // balancing needed in terms of resources. + verify(mDisp, times(2)).handle(argThat(new IsPreemptionRequestFor(appB))); + + // Checking APPA is also preempted due to user limit. + verify(mDisp, times(0)).handle(argThat(new IsPreemptionRequestFor(appA))); + setAMContainer = false; + setConsumedResources = false; + } + + @Test + public void testPreemptionUserLimit() { + int[][] qData = new int[][] { + // / A B + { 100, 50, 50 }, // abs + { 100, 100, 100 }, // maxcap + { 100, 100, 0 }, // used + { 70, 20, 50 }, // pending + { 0, 0, 0 }, // reserved + { 5, 4, 1 }, // apps + { -1, 1, 1 }, // req granularity + { 2, 0, 0 }, // subqueues + }; + setAMContainer = true; + setUserLimit = true; + setdifferentUser = true; + setConsumedResources = true; + userConsumedResources.put("user1", Resource.newInstance(2, 0)); + userConsumedResources.put("user2", Resource.newInstance(2, 0)); + userConsumedResources.put("user3", Resource.newInstance(25, 0)); + userConsumedResources.put("user4", Resource.newInstance(25, 0)); + userConsumedResources.put("user5", Resource.newInstance(25, 0)); + + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + + // By skipping AM Container, all other 24 containers of appD will be + // preempted + verify(mDisp, times(24)).handle(argThat(new IsPreemptionRequestFor(appD))); + + // By skipping AM Container, all other 24 containers of appC will be + // preempted + verify(mDisp, times(24)).handle(argThat(new IsPreemptionRequestFor(appC))); + + // Since AM containers of appC and appD are saved, 1 containers from appB + // has to be preempted as 1 Container from APPA is also preempted due + // to exceeding the user limit. + verify(mDisp, times(1)).handle(argThat(new IsPreemptionRequestFor(appB))); + + // Checking APPA is also preempted due to user limit. + verify(mDisp, times(1)).handle(argThat(new IsPreemptionRequestFor(appA))); + setAMContainer = false; + setUserLimit = false; + setdifferentUser = false; + setConsumedResources = false; + } + + @Test + public void testPreemptionWithNoUserLimit() { + int[][] qData = new int[][] { + // / A B + { 100, 60, 40 }, // abs + { 100, 100, 100 }, // maxcap + { 100, 100, 0 }, // used + { 70, 20, 40 }, // pending + { 0, 0, 0 }, // reserved + { 5, 4, 1 }, // apps + { -1, 1, 1 }, // req granularity + { 2, 0, 0 }, // subqueues + }; + setAMContainer = true; + setUserLimit = true; + userLimit = 50; + setdifferentUser = true; + setConsumedResources = true; + userConsumedResources.put("user1", Resource.newInstance(25, 0)); + userConsumedResources.put("user2", Resource.newInstance(25, 0)); + userConsumedResources.put("user3", Resource.newInstance(25, 0)); + userConsumedResources.put("user4", Resource.newInstance(25, 0)); + + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + + // Load is distributed among active users in queue. + // each user will prrempt 10 continers + + verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appD))); + + verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC))); + + verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appB))); + + verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA))); + + setAMContainer = false; + userLimit = 1; + setUserLimit = false; + setdifferentUser = false; + setConsumedResources = false; } @Test @@ -550,6 +712,10 @@ public void testAMResourcePercentForSkippedAMContainers() { }; setAMContainer = true; setAMResourcePercent = 0.5f; + setConsumedResources = true; + userConsumedResources.put("user1", Resource.newInstance(45, 0)); + userConsumedResources.put("user2", Resource.newInstance(45, 0)); + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); policy.editSchedule(); @@ -570,6 +736,7 @@ public void testAMResourcePercentForSkippedAMContainers() { // preempted verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appA))); setAMContainer = false; + setConsumedResources = false; } static class IsPreemptionRequestFor @@ -597,13 +764,14 @@ public String toString() { ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData) { ProportionalCapacityPreemptionPolicy policy = - new ProportionalCapacityPreemptionPolicy(conf, mDisp, mCS, mClock); + spy(new ProportionalCapacityPreemptionPolicy(conf, mDisp, mCS, mClock)); ParentQueue mRoot = buildMockRootQueue(rand, qData); when(mCS.getRootQueue()).thenReturn(mRoot); Resource clusterResources = Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0); when(mCS.getClusterResource()).thenReturn(clusterResources); + return policy; } @@ -663,23 +831,75 @@ ParentQueue mockParentQueue(ParentQueue p, int subqueues, return pq; } - LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs, + LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs, int[] used, int[] pending, int[] reserved, int[] apps, int[] gran) { LeafQueue lq = mock(LeafQueue.class); when(lq.getTotalResourcePending()).thenReturn( Resource.newInstance(pending[i], 0)); + + if (setUserLimit) { + when( + lq.computeTargetedUserLimit(any(ResourceCalculator.class), + any(Resource.class), any(Resource.class))).thenReturn( + Resource.newInstance(userLimit, 0)); + + } else { + when( + lq.computeTargetedUserLimit(any(ResourceCalculator.class), + any(Resource.class), any(Resource.class))).thenReturn( + Resource.newInstance(0, 0)); + } + + if (!setConsumedResources) { + User usr1 = mock(User.class); + when(lq.getUser(any(String.class))).thenReturn(usr1); + when(usr1.getTotalConsumedResources()).thenReturn( + Resource.newInstance(abs[i], 0)); + } else { + User usr1 = mock(User.class); + User usr2 = mock(User.class); + User usr3 = mock(User.class); + User usr4 = mock(User.class); + User usr5 = mock(User.class); + when(lq.getUser("user1")).thenReturn(usr1); + when(lq.getUser("user2")).thenReturn(usr2); + when(lq.getUser("user3")).thenReturn(usr3); + when(lq.getUser("user4")).thenReturn(usr4); + when(lq.getUser("user5")).thenReturn(usr5); + for (String usr : userConsumedResources.keySet()) { + if (usr.equalsIgnoreCase("user1")) { + when(usr1.getTotalConsumedResources()).thenReturn( + userConsumedResources.get(usr)); + } else if (usr.equalsIgnoreCase("user2")) { + when(usr2.getTotalConsumedResources()).thenReturn( + userConsumedResources.get(usr)); + } else if (usr.equalsIgnoreCase("user3")) { + when(usr3.getTotalConsumedResources()).thenReturn( + userConsumedResources.get(usr)); + } else if (usr.equalsIgnoreCase("user4")) { + when(usr4.getTotalConsumedResources()).thenReturn( + userConsumedResources.get(usr)); + } else if (usr.equalsIgnoreCase("user5")) { + when(usr5.getTotalConsumedResources()).thenReturn( + userConsumedResources.get(usr)); + } + } + ActiveUsersManager am = mock(ActiveUsersManager.class); + when(lq.getActiveUsersManager()).thenReturn(am); + when(lq.getActiveUsersManager().getNumActiveUsers()).thenReturn(userConsumedResources.size()); + } // consider moving where CapacityScheduler::comparator accessible NavigableSet qApps = new TreeSet( - new Comparator() { - @Override - public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) { - return a1.getApplicationAttemptId() - .compareTo(a2.getApplicationAttemptId()); - } - }); + new Comparator() { + @Override + public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) { + return a1.getApplicationAttemptId().compareTo( + a2.getApplicationAttemptId()); + } + }); // applications are added in global L->R order in queues if (apps[i] != 0) { - int aUsed = used[i] / apps[i]; + int aUsed = used[i] / apps[i]; int aPending = pending[i] / apps[i]; int aReserve = reserved[i] / apps[i]; for (int a = 0; a < apps[i]; ++a) { @@ -688,8 +908,9 @@ public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) { } } when(lq.getApplications()).thenReturn(qApps); - if(setAMResourcePercent != 0.0f){ - when(lq.getMaxAMResourcePerQueuePercent()).thenReturn(setAMResourcePercent); + if (setAMResourcePercent != 0.0f) { + when(lq.getMaxAMResourcePerQueuePercent()).thenReturn( + setAMResourcePercent); } p.getChildQueues().add(lq); return lq; @@ -702,8 +923,18 @@ FiCaSchedulerApp mockApp(int qid, int id, int used, int pending, int reserved, ApplicationId appId = ApplicationId.newInstance(TS, id); ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(appId, 0); when(app.getApplicationId()).thenReturn(appId); + when(app.getApplicationAttemptId()).thenReturn(appAttId); - + if (!setdifferentUser) { + if ((id % 2) == 0) { + when(app.getUser()).thenReturn("user1"); + } else { + when(app.getUser()).thenReturn("user2"); + } + } else { + int newId = id+1; + when(app.getUser()).thenReturn("user" + newId); + } int cAlloc = 0; Resource unit = Resource.newInstance(gran, 0); List cReserved = new ArrayList();