diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index cea3d7c..6b9b163 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -451,6 +451,7 @@ private void resetCapacity(ResourceCalculator rc, Resource clusterResource, Resource resToObtain = Resources.multiply(qT.toBePreempted, naturalTerminationFactor); Resource skippedAMSize = Resource.newInstance(0, 0); + Map> userLimitContainers = null; // lock the leafqueue while we scan applications and unreserve synchronized (qT.leafQueue) { @@ -458,6 +459,8 @@ private void resetCapacity(ResourceCalculator rc, Resource clusterResource, (NavigableSet) qT.leafQueue.getApplications(); Iterator desc = ns.descendingIterator(); qT.actuallyPreempted = Resources.clone(resToObtain); + userLimitContainers = balanceUserLimitsinQueueForPreemption(qT, + clusterResource, rc, resToObtain); while (desc.hasNext()) { FiCaSchedulerApp fc = desc.next(); if (Resources.lessThanOrEqual(rc, clusterResource, resToObtain, @@ -467,7 +470,7 @@ private void resetCapacity(ResourceCalculator rc, Resource clusterResource, preemptMap.put( fc.getApplicationAttemptId(), preemptFrom(fc, clusterResource, resToObtain, - skippedAMContainerlist, skippedAMSize)); + skippedAMContainerlist, skippedAMSize, userLimitContainers)); } Resource maxAMCapacityForThisQueue = Resources.multiply( Resources.multiply(clusterResource, @@ -480,12 +483,97 @@ private void resetCapacity(ResourceCalculator rc, Resource clusterResource, preemptAMContainers(clusterResource, preemptMap, skippedAMContainerlist, resToObtain, skippedAMSize, maxAMCapacityForThisQueue); + if (userLimitContainers != null) { + for (Map.Entry> entry : userLimitContainers + .entrySet()) { + ApplicationAttemptId id = entry.getKey(); + Set containers = userLimitContainers.get(id); + if (containers != null) { + if (preemptMap.containsKey(id)) { + preemptMap.get(id).addAll(containers); + } else { + preemptMap.put(id, containers); + } + } + } + } } } } return preemptMap; } + + /** + * this will balance the queue with user limits when preemption is needed. + * + * @param qT + * @param clusterResource + * @param rc + * @param resToObtain + * @return + */ + private Map> balanceUserLimitsinQueueForPreemption( + TempQueue qT, Resource clusterResource, ResourceCalculator rc, + Resource resToObtain) { + Map> list = new HashMap>(); + NavigableSet ns = (NavigableSet) qT.leafQueue + .getApplications(); + List skippedAMContainerlist = new ArrayList(); + Resource skippedAMSize = Resource.newInstance(0, 0); + Map userMarkedPreemptedResource = new HashMap(); + Iterator desc = ns.descendingIterator(); + while (desc.hasNext()) { + FiCaSchedulerApp fc = desc.next(); + ApplicationAttemptId appId = fc.getApplicationAttemptId(); + if (Resources.lessThanOrEqual(rc, clusterResource, resToObtain, + Resources.none())) { + break; + } + Resource userLimitforQueue = qT.leafQueue.computeUserLimit(fc, + clusterResource, Resources.none()); + Resource userConsumedResource = Resources.none(); + if (userMarkedPreemptedResource.containsKey(fc.getUser())) { + userConsumedResource = Resources.subtract( + qT.leafQueue.getUser(fc.getUser()).getConsumedResources(), + userMarkedPreemptedResource.get(fc.getUser())); + + } else { + userConsumedResource = qT.leafQueue.getUser(fc.getUser()) + .getConsumedResources(); + } + if (Resources.lessThan(rc, clusterResource, userLimitforQueue, + userConsumedResource)) { + // As we have used more resources the user limit, + // we need to claim back the resources equivalent to + // consumed resources by user - user limit + Resource resourcesToClaimBackFromUser = Resources.subtract( + userConsumedResource, userLimitforQueue); + resourcesToClaimBackFromUser = Resources.min(rc, clusterResource, + resourcesToClaimBackFromUser, resToObtain); + Resource initialRes = Resources.clone(resourcesToClaimBackFromUser); + Set containers = preemptFrom(fc, clusterResource, + resourcesToClaimBackFromUser, skippedAMContainerlist, + skippedAMSize, null); + Resources.subtractFrom(resToObtain, + Resources.subtract(initialRes, resourcesToClaimBackFromUser)); + list.put(appId, containers); + if (userMarkedPreemptedResource.containsKey(fc.getUser())) { + userMarkedPreemptedResource.put(fc.getUser(), Resources.add( + Resources.subtract(initialRes, resourcesToClaimBackFromUser), + userMarkedPreemptedResource.get(fc.getUser()))); + } else { + userMarkedPreemptedResource.put(fc.getUser(), + Resources.subtract(initialRes, resourcesToClaimBackFromUser)); + } + + } else { + continue; + } + } + return list; + } + /** * As more resources are needed for preemption, saved AMContainers has to be * rescanned. Such AMContainers can be preempted based on resToObtain, but @@ -540,7 +628,8 @@ private void preemptAMContainers(Resource clusterResource, */ private Set preemptFrom(FiCaSchedulerApp app, Resource clusterResource, Resource rsrcPreempt, - List skippedAMContainerlist, Resource skippedAMSize) { + List skippedAMContainerlist, Resource skippedAMSize, + Map> userLimitContainers) { Set ret = new HashSet(); ApplicationAttemptId appId = app.getApplicationAttemptId(); @@ -572,6 +661,14 @@ private void preemptAMContainers(Resource clusterResource, rsrcPreempt, Resources.none())) { return ret; } + if (userLimitContainers != null) { + Set userContainers = userLimitContainers.get(app + .getApplicationAttemptId()); + if (userContainers!= null && userContainers.contains(c)) { + continue; + } + } + // Skip AM Container from preemption for now. if (c.isAMContainer()) { skippedAMContainerlist.add(c); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 65938aa..b6baba6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -1007,7 +1007,7 @@ private Resource computeUserLimitAndSetHeadroom( } @Lock(NoLock.class) - private Resource computeUserLimit(FiCaSchedulerApp application, + public Resource computeUserLimit(FiCaSchedulerApp application, Resource clusterResource, Resource required) { // What is our current capacity? // * It is equal to the max(required, queue-capacity) if diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 8a2840e..227fb91 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -35,6 +35,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.Matchers.any; import java.util.ArrayList; import java.util.Comparator; @@ -63,6 +64,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue.User; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.util.Clock; @@ -81,6 +83,8 @@ int appAlloc = 0; boolean setAMContainer = false; + boolean setUserLimit = false; + boolean setdifferentUser = false; float setAMResourcePercent = 0.0f; Random rand = null; Clock mClock = null; @@ -534,6 +538,82 @@ public void testPreemptSkippedAMContainers() { } @Test + public void testPreemptionUserLimitReached() { + int[][] qData = new int[][] { + // / A B + { 100, 50, 50 }, // abs + { 100, 100, 100 }, // maxcap + { 100, 100, 0 }, // used + { 70, 20, 50 }, // pending + { 0, 0, 0 }, // reserved + { 5, 4, 1 }, // apps + { -1, 1, 1 }, // req granularity + { 2, 0, 0 }, // subqueues + }; + setAMContainer = true; + setUserLimit = true; + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + + // By skipping AM Container, all other 24 containers of appD will be + // preempted + verify(mDisp, times(24)).handle(argThat(new IsPreemptionRequestFor(appD))); + + // By skipping AM Container, all other 24 containers of appC will be + // preempted + verify(mDisp, times(24)).handle(argThat(new IsPreemptionRequestFor(appC))); + + // Since AM containers of appC and appD are saved, 2 containers from appB + // has to be preempted as User limit is recahed and there is no more + // balancing needed in terms of resources. + verify(mDisp, times(2)).handle(argThat(new IsPreemptionRequestFor(appB))); + + // Checking APPA is also preempted due to user limit. + verify(mDisp, times(0)).handle(argThat(new IsPreemptionRequestFor(appA))); + setAMContainer = false; + setUserLimit = false; + } + + @Test + public void testPreemptionUserLimit() { + int[][] qData = new int[][] { + // / A B + { 100, 50, 50 }, // abs + { 100, 100, 100 }, // maxcap + { 100, 100, 0 }, // used + { 70, 20, 50 }, // pending + { 0, 0, 0 }, // reserved + { 5, 4, 1 }, // apps + { -1, 1, 1 }, // req granularity + { 2, 0, 0 }, // subqueues + }; + setAMContainer = true; + setUserLimit = true; + setdifferentUser = true; + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + + // By skipping AM Container, all other 24 containers of appD will be + // preempted + verify(mDisp, times(24)).handle(argThat(new IsPreemptionRequestFor(appD))); + + // By skipping AM Container, all other 24 containers of appC will be + // preempted + verify(mDisp, times(24)).handle(argThat(new IsPreemptionRequestFor(appC))); + + // Since AM containers of appC and appD are saved, 1 containers from appB + // has to be preempted as 1 Container from APPA is also preempted due + // to exceeding the user limit. + verify(mDisp, times(1)).handle(argThat(new IsPreemptionRequestFor(appB))); + + // Checking APPA is also preempted due to user limit. + verify(mDisp, times(1)).handle(argThat(new IsPreemptionRequestFor(appA))); + setAMContainer = false; + setUserLimit = false; + setdifferentUser = false; + } + + @Test public void testAMResourcePercentForSkippedAMContainers() { int[][] qData = new int[][] { // / A B @@ -666,6 +746,23 @@ LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs, LeafQueue lq = mock(LeafQueue.class); when(lq.getTotalResourcePending()).thenReturn( Resource.newInstance(pending[i], 0)); + + if (setUserLimit) { + when( + lq.computeUserLimit(any(FiCaSchedulerApp.class), any(Resource.class), + any(Resource.class))).thenReturn(Resource.newInstance(1, 0)); + User usr1 = mock(User.class); + when(lq.getUser(any(String.class))).thenReturn(usr1); + when(usr1.getConsumedResources()).thenReturn(Resource.newInstance(2, 0)); + } else { + when( + lq.computeUserLimit(any(FiCaSchedulerApp.class), any(Resource.class), + any(Resource.class))).thenReturn(Resource.newInstance(0, 0)); + User usr1 = mock(User.class); + when(lq.getUser(any(String.class))).thenReturn(usr1); + when(usr1.getConsumedResources()).thenReturn(Resource.newInstance(0, 0)); + } + // consider moving where CapacityScheduler::comparator accessible NavigableSet qApps = new TreeSet( new Comparator() { @@ -701,7 +798,15 @@ FiCaSchedulerApp mockApp(int qid, int id, int used, int pending, int reserved, ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(appId, 0); when(app.getApplicationId()).thenReturn(appId); when(app.getApplicationAttemptId()).thenReturn(appAttId); - + if (!setdifferentUser) { + if ((id % 2) == 0) { + when(app.getUser()).thenReturn("user1"); + } else { + when(app.getUser()).thenReturn("user2"); + } + } else { + when(app.getUser()).thenReturn("user" + id); + } int cAlloc = 0; Resource unit = Resource.newInstance(gran, 0); List cReserved = new ArrayList();