diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 568c5ab..2f97589 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue.User; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; @@ -110,8 +111,14 @@ * #WAIT_TIME_BEFORE_KILL}, even absent natural termination. */ public static final String NATURAL_TERMINATION_FACTOR = "yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor"; - - //the dispatcher to send preempt and kill events + /** + * If true, all application master containers will be given least priority + * while considering for preemption among other types of containers of + * multiple applications. + */ + public static final String SKIP_AM_CONTAINER_FROM_PREEMPTION = + "yarn.resourcemanager.monitor.capacity.preemption.skip_am_container"; + // the dispatcher to send preempt and kill events public EventHandler dispatcher; private final Clock clock; @@ -125,6 +132,7 @@ private float percentageClusterPreemptionAllowed; private double naturalTerminationFactor; private boolean observeOnly; + private boolean skipAMContainer; public ProportionalCapacityPreemptionPolicy() { clock = new SystemClock(); @@ -163,6 +171,8 @@ public void init(Configuration config, percentageClusterPreemptionAllowed = config.getFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 0.1); observeOnly = config.getBoolean(OBSERVE_ONLY, false); + skipAMContainer = config.getBoolean(SKIP_AM_CONTAINER_FROM_PREEMPTION, + true); rc = scheduler.getResourceCalculator(); } @@ -439,6 +449,7 @@ private void resetCapacity(ResourceCalculator rc, Resource clusterResource, Map> list = new HashMap>(); + List skippedAMContainerlist = new ArrayList(); for (TempQueue qT : queues) { // we act only if we are violating balance by more than @@ -449,23 +460,110 @@ private void resetCapacity(ResourceCalculator rc, Resource clusterResource, // accounts for natural termination of containers Resource resToObtain = Resources.multiply(qT.toBePreempted, naturalTerminationFactor); + Resource skippedAMSize = Resource.newInstance(0, 0); + Map> userLimitContainers = null; // lock the leafqueue while we scan applications and unreserve synchronized(qT.leafQueue) { NavigableSet ns = (NavigableSet) qT.leafQueue.getApplications(); + Iterator desc = ns.descendingIterator(); qT.actuallyPreempted = Resources.clone(resToObtain); + // First try to balance the User Limits within queue + userLimitContainers = + balanceUserLimitsinQueueForPreemption(qT, clusterResource, rc, resToObtain); while (desc.hasNext()) { FiCaSchedulerApp fc = desc.next(); if (Resources.lessThanOrEqual(rc, clusterResource, resToObtain, Resources.none())) { break; } - list.put(fc.getApplicationAttemptId(), - preemptFrom(fc, clusterResource, resToObtain)); + list.put( + fc.getApplicationAttemptId(), + preemptFrom(fc, clusterResource, resToObtain, + skippedAMContainerlist, skippedAMSize,userLimitContainers)); } + Resource maxAMCapacity = Resources.multiply( + Resources.multiply(clusterResource, + qT.leafQueue.getAbsoluteMaximumCapacity()), + qT.leafQueue.getMaxAMResourcePerQueuePercent()); + // If skipAMContainer is disabled, skippedAMContainerlist will be + // empty. + for (RMContainer c : skippedAMContainerlist) { + if (Resources.lessThanOrEqual(rc, clusterResource, resToObtain, + Resources.none())) { + break; + } + if (Resources.lessThanOrEqual(rc, clusterResource, skippedAMSize, + maxAMCapacity)) { + break; + } + Set contToPrempt = list.get(c + .getApplicationAttemptId()); + if (null == contToPrempt) { + contToPrempt = new HashSet(); + list.put(c.getApplicationAttemptId(), contToPrempt); + } + contToPrempt.add(c); + Resources.subtractFrom(resToObtain, c.getContainer().getResource()); + Resources.subtractFrom(skippedAMSize, c.getContainer() + .getResource()); + } + skippedAMContainerlist.clear(); } + if(userLimitContainers != null){ + list.putAll(userLimitContainers); + } + } + } + return list; + } + + + private Map> balanceUserLimitsinQueueForPreemption( + TempQueue qT, Resource clusterResource, ResourceCalculator rc, + Resource resToObtain) { + Map> list = + new HashMap>(); + NavigableSet ns = (NavigableSet) qT.leafQueue + .getApplications(); + List skippedAMContainerlist = new ArrayList(); + Resource skippedAMSize = Resource.newInstance(0, 0); + + Iterator desc = ns.descendingIterator(); + while (desc.hasNext()) { + FiCaSchedulerApp fc = desc.next(); + ApplicationAttemptId appId = fc.getApplicationAttemptId(); + if (Resources.lessThanOrEqual(rc, clusterResource, resToObtain, + Resources.none())) { + break; + } + Resource userLimitforQueue = qT.leafQueue.computeUserLimit(fc, + clusterResource, Resources.none()); + LOG.info("MAYANK queue NAME "+ qT.leafQueue.getQueueName()); + if(userLimitforQueue == null){ + LOG.info("MAYANK USER LIMIT IS NULL"); + } + if (Resources.lessThan(rc, clusterResource, userLimitforQueue, + qT.leafQueue.getUser(fc.getUser()).getConsumedResources())) { + + // As we have used more resources the user limit, + // we need to claim back the resources equivalent to + // consumed resources by user - user limit + Resource resourcesToClaimBackFromUser = Resources.subtract(qT.leafQueue + .getUser(fc.getUser()).getConsumedResources(), userLimitforQueue); + Resource initialRes = Resources.clone(resourcesToClaimBackFromUser); + + Set containers = preemptFrom(fc, clusterResource, + resourcesToClaimBackFromUser, skippedAMContainerlist, + skippedAMSize, null); + + Resources.subtract(resToObtain, + Resources.subtract(initialRes, resourcesToClaimBackFromUser)); + list.put(appId, containers); + } else { + continue; } } return list; @@ -480,8 +578,10 @@ private void resetCapacity(ResourceCalculator rc, Resource clusterResource, * @param rsrcPreempt * @return */ - private Set preemptFrom( - FiCaSchedulerApp app, Resource clusterResource, Resource rsrcPreempt) { + private Set preemptFrom(FiCaSchedulerApp app, + Resource clusterResource, Resource rsrcPreempt, + List skippedAMContainerlist, Resource skippedAMSize, + Map> userLimitContainers) { Set ret = new HashSet(); ApplicationAttemptId appId = app.getApplicationAttemptId(); @@ -513,6 +613,19 @@ private void resetCapacity(ResourceCalculator rc, Resource clusterResource, rsrcPreempt, Resources.none())) { return ret; } + if (userLimitContainers != null) { + Set userContainers = userLimitContainers.get(app + .getApplicationAttemptId()); + if (userContainers.contains(c)) { + continue; + } + } + if (skipAMContainer && c.isMasterContainer()) { + // Skip AM Container (which has priority of 0) for now. + skippedAMContainerlist.add(c); + Resources.addTo(skippedAMSize, c.getContainer().getResource()); + continue; + } ret.add(c); Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource()); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 2a1170d..37c02c8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -89,6 +89,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; @@ -840,7 +841,12 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, // Set the masterContainer appAttempt.setMasterContainer(amContainerAllocation.getContainers() - .get(0)); + .get(0)); + RMContainer rmMasterContainer = appAttempt.scheduler + .getRMContainer(appAttempt.getMasterContainer().getId()); + if (rmMasterContainer != null) { + rmMasterContainer.setMasterContainer(true); + } // The node set in NMTokenSecrentManager is used for marking whether the // NMToken has been issued for this node to the AM. // When AM container was allocated to RM itself, the node which allocates diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java index 045e44a..ab94f2e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java @@ -71,5 +71,9 @@ ContainerState getContainerState(); ContainerReport createContainerReport(); + + boolean isMasterContainer(); + + void setMasterContainer(boolean masterContainer); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index 01db215..1adb7ab 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -155,6 +155,7 @@ private long startTime; private long finishTime; private ContainerStatus finishedStatus; + private boolean masterContainer; @@ -172,6 +173,7 @@ public RMContainerImpl(Container container, this.rmContext = rmContext; this.eventHandler = rmContext.getDispatcher().getEventHandler(); this.containerAllocationExpirer = rmContext.getContainerAllocationExpirer(); + this.masterContainer = false; ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.readLock = lock.readLock(); @@ -487,4 +489,13 @@ public ContainerReport createContainerReport() { return containerReport; } + @Override + public boolean isMasterContainer() { + return masterContainer; + } + + @Override + public void setMasterContainer(boolean masterContainer) { + this.masterContainer = masterContainer; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 65938aa..09f7eed 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -467,7 +467,7 @@ synchronized void setMaxCapacity(float maximumCapacity) { * Set user limit - used only for testing. * @param userLimit new user limit */ - synchronized void setUserLimit(int userLimit) { + public synchronized void setUserLimit(int userLimit) { this.userLimit = userLimit; } @@ -475,7 +475,7 @@ synchronized void setUserLimit(int userLimit) { * Set user limit factor - used only for testing. * @param userLimitFactor new user limit factor */ - synchronized void setUserLimitFactor(int userLimitFactor) { + public synchronized void setUserLimitFactor(int userLimitFactor) { this.userLimitFactor = userLimitFactor; } @@ -1007,7 +1007,7 @@ private Resource computeUserLimitAndSetHeadroom( } @Lock(NoLock.class) - private Resource computeUserLimit(FiCaSchedulerApp application, + public Resource computeUserLimit(FiCaSchedulerApp application, Resource clusterResource, Resource required) { // What is our current capacity? // * It is equal to the max(required, queue-capacity) if diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index d0a80eb..8fb9c8b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -23,6 +23,7 @@ import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL; +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.SKIP_AM_CONTAINER_FROM_PREEMPTION; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER; import static org.junit.Assert.assertEquals; @@ -35,6 +36,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.Matchers.any; import java.util.ArrayList; import java.util.Comparator; @@ -63,23 +65,29 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue.User; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatcher; +import org.mockito.Mock; +import org.mortbay.log.Log; public class TestProportionalCapacityPreemptionPolicy { static final long TS = 3141592653L; int appAlloc = 0; + boolean setAMContainer = false; + boolean setUserLimit = false; Random rand = null; Clock mClock = null; Configuration conf = null; @@ -466,7 +474,110 @@ public void testPolicyInitializeAfterSchedulerInitialized() { fail("Failed to find SchedulingMonitor service, please check what happened"); } + + @Test + public void testSkipAMContainer() { + int[][] qData = new int[][] { + // / A B + { 100, 50, 50 }, // abs + { 100, 100, 100 }, // maxcap + { 100, 100, 0 }, // used + { 70, 20, 50 }, // pending + { 0, 0, 0 }, // reserved + { 5, 4, 1 }, // apps + { -1, 1, 1 }, // req granularity + { 2, 0, 0 }, // subqueues + }; + conf.setBoolean(SKIP_AM_CONTAINER_FROM_PREEMPTION, true); + setAMContainer = true; + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + + // By skipping AM Container, all other 24 containers of appD will be + // preempted + verify(mDisp, times(24)).handle(argThat(new IsPreemptionRequestFor(appD))); + + // By skipping AM Container, all other 24 containers of appC will be + // preempted + verify(mDisp, times(24)).handle(argThat(new IsPreemptionRequestFor(appC))); + + // Since AM containers of appC and appD are saved, 2 containers from appB + // has to be preempted. + verify(mDisp, times(2)).handle(argThat(new IsPreemptionRequestFor(appB))); + setAMContainer = false; + } + + @Test + public void testPreemptSkippedAMContainers() { + int[][] qData = new int[][] { + // / A B + { 100, 10, 90 }, // abs + { 100, 100, 100 }, // maxcap + { 100, 100, 0 }, // used + { 70, 20, 90 }, // pending + { 0, 0, 0 }, // reserved + { 5, 4, 1 }, // apps + { -1, 5, 5 }, // req granularity + { 2, 0, 0 }, // subqueues + }; + conf.setBoolean(SKIP_AM_CONTAINER_FROM_PREEMPTION, true); + setAMContainer = true; + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + + // All 5 containers of appD will be preempted including AM container. + verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appD))); + // All 5 containers of appB will be preempted including AM container. + verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appC))); + + // By skipping AM Container, all other 4 containers of appB will be + // preempted + verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appB))); + + // By skipping AM Container, all other 4 containers of appB will be + // preempted + verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appA))); + setAMContainer = false; + } + +// @Test +// public void testPreemptionUserLimit() { +// int[][] qData = new int[][] { +// // / A B +// { 100, 10, 90 }, // abs +// { 100, 100, 100 }, // maxcap +// { 100, 100, 0 }, // used +// { 70, 20, 90 }, // pending +// { 0, 0, 0 }, // reserved +// { 5, 4, 1 }, // apps +// { -1, 5, 5 }, // req granularity +// { 2, 0, 0 }, // subqueues +// }; +// conf.setBoolean(SKIP_AM_CONTAINER_FROM_PREEMPTION, true); +// setAMContainer = true; +// setUserLimit = true; +// ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); +// policy.editSchedule(); +// +// // All 5 containers of appD will be preempted including AM container. +// verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appD))); +// +// // All 5 containers of appB will be preempted including AM container. +// verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appC))); +// +// // By skipping AM Container, all other 4 containers of appB will be +// // preempted +// verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appB))); +// +// // By skipping AM Container, all other 4 containers of appB will be +// // preempted +// verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appA))); +// setAMContainer = false; +// setUserLimit = false; +// } + + static class IsPreemptionRequestFor extends ArgumentMatcher { private final ApplicationAttemptId appAttId; @@ -563,6 +674,17 @@ LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs, LeafQueue lq = mock(LeafQueue.class); when(lq.getTotalResourcePending()).thenReturn( Resource.newInstance(pending[i], 0)); + + //if(setUserLimit){ + when( + lq.computeUserLimit(any(FiCaSchedulerApp.class), any(Resource.class), + any(Resource.class))).thenReturn(Resource.newInstance(1, 0)); + User usr1 = mock(User.class); + when(lq.getUser(any(String.class))).thenReturn(usr1); + when(usr1.getConsumedResources()).thenReturn(Resource.newInstance(2, 0)); +// when(lq.getUserLimit()).thenReturn(50); + // when(lq.getUserLimitFactor()).thenReturn(2F); + // } // consider moving where CapacityScheduler::comparator accessible NavigableSet qApps = new TreeSet( new Comparator() { @@ -584,6 +706,10 @@ public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) { } when(lq.getApplications()).thenReturn(qApps); p.getChildQueues().add(lq); + if(setUserLimit){ + lq.setUserLimit(50); + lq.setUserLimitFactor(2); + } return lq; } @@ -595,7 +721,12 @@ FiCaSchedulerApp mockApp(int qid, int id, int used, int pending, int reserved, ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(appId, 0); when(app.getApplicationId()).thenReturn(appId); when(app.getApplicationAttemptId()).thenReturn(appAttId); - + if((id % 2) == 0){ + when(app.getUser()).thenReturn("user1"); + }else{ + when(app.getUser()).thenReturn("user2"); + } + int cAlloc = 0; Resource unit = Resource.newInstance(gran, 0); List cReserved = new ArrayList(); @@ -607,7 +738,11 @@ FiCaSchedulerApp mockApp(int qid, int id, int used, int pending, int reserved, List cLive = new ArrayList(); for (int i = 0; i < used; i += gran) { - cLive.add(mockContainer(appAttId, cAlloc, unit, 1)); + if(setAMContainer && i == 0){ + cLive.add(mockContainer(appAttId, cAlloc, unit, 0)); + }else{ + cLive.add(mockContainer(appAttId, cAlloc, unit, 1)); + } ++cAlloc; } when(app.getLiveContainers()).thenReturn(cLive); @@ -623,6 +758,10 @@ RMContainer mockContainer(ApplicationAttemptId appAttId, int id, RMContainer mC = mock(RMContainer.class); when(mC.getContainerId()).thenReturn(cId); when(mC.getContainer()).thenReturn(c); + when(mC.getApplicationAttemptId()).thenReturn(appAttId); + if(0 == priority){ + when(mC.isMasterContainer()).thenReturn(true); + } return mC; }