diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java index 9ed5179..67d4c00 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java @@ -18,8 +18,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; +import java.io.Serializable; import java.util.Arrays; import java.util.Collection; +import java.util.Comparator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -58,6 +60,8 @@ private Priority priority; private ResourceWeights resourceWeights; + private RMContainerComparator comparator = new RMContainerComparator(); + public AppSchedulable(FairScheduler scheduler, FSSchedulerApp app, FSLeafQueue queue) { this.scheduler = scheduler; this.app = app; @@ -111,7 +115,10 @@ public long getStartTime() { @Override public Resource getResourceUsage() { - return app.getCurrentConsumption(); + // Here the getPreemptedResources() always return zero, except in + // a preemption round + return Resources.subtract(app.getCurrentConsumption(), + app.getPreemptedResources()); } @@ -384,6 +391,27 @@ public Resource assignContainer(FSSchedulerNode node) { } /** + * Preempt a running container according to the priority + */ + @Override + public RMContainer preemptContainer() { + if (LOG.isDebugEnabled()) { + LOG.debug("App " + getName() + " is going to preempt a running " + + "container"); + } + + RMContainer toBePreempted = null; + for (RMContainer container : app.getLiveContainers()) { + if (! app.getPreemptionContainers().contains(container) && + (toBePreempted == null || + comparator.compare(toBePreempted, container) > 0)) { + toBePreempted = container; + } + } + return toBePreempted; + } + + /** * Whether this app has containers requests that could be satisfied on the * given node, if the node had full space. */ @@ -407,4 +435,17 @@ public boolean hasContainerForNode(Priority prio, FSSchedulerNode node) { Resources.lessThanOrEqual(RESOURCE_CALCULATOR, null, anyRequest.getCapability(), node.getRMNode().getTotalCapability()); } + + static class RMContainerComparator implements Comparator, + Serializable { + @Override + public int compare(RMContainer c1, RMContainer c2) { + int ret = c1.getContainer().getPriority().compareTo( + c2.getContainer().getPriority()); + if (ret == 0) { + return c2.getContainerId().compareTo(c1.getContainerId()); + } + return ret; + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index e842a6a..1974f86 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.util.resource.Resources; @@ -209,6 +210,36 @@ public Resource assignContainer(FSSchedulerNode node) { } @Override + public RMContainer preemptContainer() { + RMContainer toBePreempted = null; + if (LOG.isDebugEnabled()) { + LOG.debug("Queue " + getName() + " is going to preempt a container " + + "from its applications."); + } + + // If this queue is not over its fair share, reject + if (!preemptContainerPreCheck()) { + return toBePreempted; + } + + // Choose the app that is most over fair share + Comparator comparator = policy.getComparator(); + AppSchedulable candidateSched = null; + for (AppSchedulable sched : runnableAppScheds) { + if (candidateSched == null || + comparator.compare(sched, candidateSched) > 0) { + candidateSched = sched; + } + } + + // Preempt from the selected app + if (candidateSched != null) { + toBePreempted = candidateSched.preemptContainer(); + } + return toBePreempted; + } + + @Override public List getChildQueues() { return new ArrayList(1); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index 427cb86..48db414 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.List; import org.apache.commons.logging.Log; @@ -32,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; @@ -157,6 +159,32 @@ public Resource assignContainer(FSSchedulerNode node) { } @Override + public RMContainer preemptContainer() { + RMContainer toBePreempted = null; + + // If this queue is not over its fair share, reject + if (!preemptContainerPreCheck()) { + return toBePreempted; + } + + // Find the childQueue which is most over fair share + FSQueue candidateQueue = null; + Comparator comparator = policy.getComparator(); + for (FSQueue queue : childQueues) { + if (candidateQueue == null || + comparator.compare(queue, candidateQueue) > 0) { + candidateQueue = queue; + } + } + + // Let the selected queue choose which of its container to preempt + if (candidateQueue != null) { + toBePreempted = candidateQueue.preemptContainer(); + } + return toBePreempted; + } + + @Override public List getChildQueues() { return childQueues; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index c265fcf..ce35bb4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -35,6 +35,10 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @Private @@ -49,6 +53,8 @@ RecordFactoryProvider.getRecordFactory(null); protected SchedulingPolicy policy = SchedulingPolicy.DEFAULT_POLICY; + private static final ResourceCalculator RESOURCE_CALCULATOR = + new DefaultResourceCalculator(); public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) { this.name = name; @@ -187,4 +193,30 @@ protected boolean assignContainerPreCheck(FSSchedulerNode node) { } return true; } + + /** + * Helper method to check if the queue should preempt containers + * + * @return true if check passes (can preempt) or false otherwise + */ + protected boolean preemptContainerPreCheck() { + if (this == scheduler.getQueueManager().getRootQueue()) { + return true; + } + + if (policy instanceof DominantResourceFairnessPolicy) { + if (Resources.fitsIn(getResourceUsage(), getFairShare())) { + return false; + } else { + return true; + } + } else { + if (Resources.lessThanOrEqual(RESOURCE_CALCULATOR, + scheduler.getClusterCapacity(), getResourceUsage(), getFairShare())) { + return false; + } else { + return true; + } + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java index adabfef..e515348 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java @@ -59,6 +59,8 @@ private AppSchedulable appSchedulable; final Map preemptionMap = new HashMap(); + + private Resource preemptedResources = Resources.createResource(0); public FSSchedulerApp(ApplicationAttemptId applicationAttemptId, String user, FSLeafQueue queue, ActiveUsersManager activeUsersManager, @@ -316,6 +318,7 @@ public synchronized void resetAllowedLocalityLevel(Priority priority, public void addPreemption(RMContainer container, long time) { assert preemptionMap.get(container) == null; preemptionMap.put(container, time); + Resources.addTo(preemptedResources, container.getAllocatedResource()); } public Long getContainerPreemptionTime(RMContainer container) { @@ -330,4 +333,20 @@ public Long getContainerPreemptionTime(RMContainer container) { public FSLeafQueue getQueue() { return (FSLeafQueue)super.getQueue(); } + + public Resource getPreemptedResources() { + return preemptedResources; + } + + public void resetPremptedResources() { + preemptedResources = Resources.createResource(0); + for (RMContainer container : getPreemptionContainers()) { + Resources.addTo(preemptedResources, container.getAllocatedResource()); + } + } + + public void clearPreemptedResources() { + preemptedResources.setMemory(0); + preemptedResources.setVirtualCores(0); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index fab9ebe..c3abd87 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -364,7 +364,7 @@ protected synchronized void preemptTasksIfNecessary() { } if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, resToPreempt, Resources.none())) { - preemptResources(queueMgr.getLeafQueues(), resToPreempt); + preemptResources(resToPreempt); } } @@ -375,83 +375,63 @@ protected synchronized void preemptTasksIfNecessary() { * We further prioritize preemption by choosing containers with lowest * priority to preempt. */ - protected void preemptResources(Collection scheds, - Resource toPreempt) { - if (scheds.isEmpty() || Resources.equals(toPreempt, Resources.none())) { + protected void preemptResources(Resource toPreempt) { + if (Resources.equals(toPreempt, Resources.none())) { return; } - Map apps = - new HashMap(); - Map queues = - new HashMap(); - - // Collect running containers from over-scheduled queues - List runningContainers = new ArrayList(); - for (FSLeafQueue sched : scheds) { - if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, - sched.getResourceUsage(), sched.getFairShare())) { - for (AppSchedulable as : sched.getRunnableAppSchedulables()) { - for (RMContainer c : as.getApp().getLiveContainers()) { - runningContainers.add(c); - apps.put(c, as.getApp()); - queues.put(c, sched); - } - } - } - } - - // Sort containers into reverse order of priority - Collections.sort(runningContainers, new Comparator() { - public int compare(RMContainer c1, RMContainer c2) { - int ret = c1.getContainer().getPriority().compareTo( - c2.getContainer().getPriority()); - if (ret == 0) { - return c2.getContainerId().compareTo(c1.getContainerId()); - } - return ret; - } - }); - // Scan down the list of containers we've already warned and kill them // if we need to. Remove any containers from the list that we don't need // or that are no longer running. Iterator warnedIter = warnedContainers.iterator(); - Set preemptedThisRound = new HashSet(); while (warnedIter.hasNext()) { RMContainer container = warnedIter.next(); - if (container.getState() == RMContainerState.RUNNING && + if ((container.getState() == RMContainerState.RUNNING || + container.getState() == RMContainerState.ALLOCATED) && Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, toPreempt, Resources.none())) { - warnOrKillContainer(container, apps.get(container), queues.get(container)); - preemptedThisRound.add(container); + warnOrKillContainer(container); Resources.subtractFrom(toPreempt, container.getContainer().getResource()); } else { warnedIter.remove(); } } - // Scan down the rest of the containers until we've preempted enough, making - // sure we don't preempt too many from any queue - Iterator runningIter = runningContainers.iterator(); - while (runningIter.hasNext() && - Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, - toPreempt, Resources.none())) { - RMContainer container = runningIter.next(); - FSLeafQueue sched = queues.get(container); - if (!preemptedThisRound.contains(container) && - Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, - sched.getResourceUsage(), sched.getFairShare())) { - warnOrKillContainer(container, apps.get(container), sched); - - warnedContainers.add(container); - Resources.subtractFrom(toPreempt, container.getContainer().getResource()); + try { + // Set preemptedResources for each app + for (FSLeafQueue queue : getQueueManager().getLeafQueues()) { + for (AppSchedulable app : queue.getRunnableAppSchedulables()) { + app.getApp().resetPremptedResources(); + } + } + + while (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, + toPreempt, Resources.none())) { + RMContainer container = getQueueManager().getRootQueue() + .preemptContainer(); + if (container == null) { + break; + } else { + warnOrKillContainer(container); + warnedContainers.add(container); + Resources.subtractFrom(toPreempt, container.getContainer() + .getResource()); + } + } + } finally { + // Clear preemptedResources for each app (to 0) + for (FSLeafQueue queue : getQueueManager().getLeafQueues()) { + for (AppSchedulable app : queue.getRunnableAppSchedulables()) { + app.getApp().clearPreemptedResources(); + } } } } - private void warnOrKillContainer(RMContainer container, FSSchedulerApp app, - FSLeafQueue queue) { + private void warnOrKillContainer(RMContainer container) { + ApplicationAttemptId appAttemptId = container.getApplicationAttemptId(); + FSSchedulerApp app = getSchedulerApp(appAttemptId); + FSLeafQueue queue = app.getQueue(); LOG.info("Preempting container (prio=" + container.getContainer().getPriority() + "res=" + container.getContainer().getResource() + ") from queue " + queue.getName()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java index 92b6d3e..4f8ac1e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java @@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.util.resource.Resources; /** @@ -100,6 +101,11 @@ */ public abstract Resource assignContainer(FSSchedulerNode node); + /** + * Preempt a container from this Schedulable if possible. + */ + public abstract RMContainer preemptContainer(); + /** Assign a fair share to this Schedulable. */ public void setFairShare(Resource fairShare) { this.fairShare = fairShare; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java index d0ba0d8..dcfc2d3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java @@ -21,6 +21,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.resource.Resources; @@ -84,6 +85,11 @@ public Resource assignContainer(FSSchedulerNode node) { } @Override + public RMContainer preemptContainer() { + return null; + } + + @Override public Resource getDemand() { return null; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 2524763..5812c20 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -1161,14 +1161,15 @@ else if (p.getName().equals("root.queueB")) { @Test (timeout = 5000) /** - * Make sure containers are chosen to be preempted in the correct order. Right - * now this means decreasing order of priority. + * Make sure containers are chosen to be preempted in the correct order. */ public void testChoiceOfPreemptedContainers() throws Exception { conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000); - conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000); - conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file", ALLOC_FILE); - + conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file", + ALLOC_FILE); + conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false"); + MockClock clock = new MockClock(); scheduler.setClock(clock); @@ -1184,7 +1185,7 @@ public void testChoiceOfPreemptedContainers() throws Exception { out.println(""); out.println(".25"); out.println(""); - out.println(""); + out.println(""); out.println(".25"); out.println(""); out.println(""); @@ -1192,133 +1193,133 @@ public void testChoiceOfPreemptedContainers() throws Exception { scheduler.reinitialize(conf, resourceManager.getRMContext()); - // Create four nodes + // Create two nodes RMNode node1 = - MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 1, + MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); scheduler.handle(nodeEvent1); RMNode node2 = - MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 2, + MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 2, "127.0.0.2"); NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); scheduler.handle(nodeEvent2); - RMNode node3 = - MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 3, - "127.0.0.3"); - NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3); - scheduler.handle(nodeEvent3); - - - // Queue A and B each request three containers + // Queue A and B each request two applications. + // Each application requests two containers with different priorities. ApplicationAttemptId app1 = - createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1); + createSchedulingRequest(1 * 1024, 1, "queueA", "user1", 1, 1); + createSchedulingRequestExistingApplication(1 * 1024, 2, app1); ApplicationAttemptId app2 = - createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 2); - ApplicationAttemptId app3 = - createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 3); + createSchedulingRequest(1 * 1024, 1, "queueA", "user1", 3, 1); + createSchedulingRequestExistingApplication(1 * 1024, 4, app2); + ApplicationAttemptId app3 = + createSchedulingRequest(1 * 1024, 1, "queueB", "user1", 1, 1); + createSchedulingRequestExistingApplication(1 * 1024, 2, app3); ApplicationAttemptId app4 = - createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 1); - ApplicationAttemptId app5 = - createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 2); - ApplicationAttemptId app6 = - createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 3); + createSchedulingRequest(1 * 1024, 1, "queueB", "user1", 3, 1); + createSchedulingRequestExistingApplication(1 * 1024, 4, app4); scheduler.update(); + scheduler.getQueueManager().getLeafQueue("queueA", true) + .setPolicy(SchedulingPolicy.parse("fifo")); + scheduler.getQueueManager().getLeafQueue("queueB", true) + .setPolicy(SchedulingPolicy.parse("fair")); + // Sufficient node check-ins to fully schedule containers - for (int i = 0; i < 2; i++) { - NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2); + for (int i = 0; i < 4; i++) { scheduler.handle(nodeUpdate1); - - NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2); scheduler.handle(nodeUpdate2); - - NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3); - scheduler.handle(nodeUpdate3); } - assertEquals(1, scheduler.getSchedulerApp(app1).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app3).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app5).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app6).getLiveContainers().size()); - - // Now new requests arrive from queues C and D - ApplicationAttemptId app7 = - createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1); - ApplicationAttemptId app8 = - createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2); - ApplicationAttemptId app9 = - createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3); - - ApplicationAttemptId app10 = - createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 1); - ApplicationAttemptId app11 = - createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 2); - ApplicationAttemptId app12 = - createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 3); + assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size()); + assertEquals(2, scheduler.getSchedulerApp(app2).getLiveContainers().size()); + assertEquals(2, scheduler.getSchedulerApp(app3).getLiveContainers().size()); + assertEquals(2, scheduler.getSchedulerApp(app4).getLiveContainers().size()); + // Now new requests arrive from queueC and default + createSchedulingRequest(1 * 1024, 1, "queueC", "user1", 1, 1); + createSchedulingRequest(1 * 1024, 1, "queueC", "user1", 1, 1); + createSchedulingRequest(1 * 1024, 1, "default", "user1", 1, 1); + createSchedulingRequest(1 * 1024, 1, "default", "user1", 1, 1); scheduler.update(); - // We should be able to claw back one container from A and B each. - // Make sure it is lowest priority container. - scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), - Resources.createResource(2 * 1024)); - assertEquals(1, scheduler.getSchedulerApp(app1).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app5).getLiveContainers().size()); - - // First verify we are adding containers to preemption list for the application - assertTrue(!Collections.disjoint(scheduler.getSchedulerApp(app3).getLiveContainers(), - scheduler.getSchedulerApp(app3).getPreemptionContainers())); - assertTrue(!Collections.disjoint(scheduler.getSchedulerApp(app6).getLiveContainers(), - scheduler.getSchedulerApp(app6).getPreemptionContainers())); + // We should be able to claw back one container from queueA and queueB each. + scheduler.preemptResources(Resources.createResource(2 * 1024, 2)); + assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size()); + assertEquals(2, scheduler.getSchedulerApp(app3).getLiveContainers().size()); + + // First verify we are adding containers to preemption list for the app. + // For queueA (fifo), app2 is selected. + // For queueB (fair), app4 is selected. + assertTrue("App2 should have container to be preempted", + !Collections.disjoint( + scheduler.getSchedulerApp(app2).getLiveContainers(), + scheduler.getSchedulerApp(app2).getPreemptionContainers())); + assertTrue("App4 should have container to be preempted", + !Collections.disjoint( + scheduler.getSchedulerApp(app2).getLiveContainers(), + scheduler.getSchedulerApp(app2).getPreemptionContainers())); // Pretend 15 seconds have passed clock.tick(15); // Trigger a kill by insisting we want containers back - scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), - Resources.createResource(2 * 1024)); + scheduler.preemptResources(Resources.createResource(2 * 1024, 2)); // At this point the containers should have been killed (since we are not simulating AM) - assertEquals(0, scheduler.getSchedulerApp(app6).getLiveContainers().size()); - assertEquals(0, scheduler.getSchedulerApp(app3).getLiveContainers().size()); + assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size()); + assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size()); + // Inside each app, containers are sorted according to their priorities. + // Containers with priority 4 are preempted for app2 and app4. + Set set = new HashSet(); + for (RMContainer container : + scheduler.getSchedulerApp(app2).getLiveContainers()) { + if (container.getAllocatedPriority().getPriority() == 4) { + set.add(container); + } + } + for (RMContainer container : + scheduler.getSchedulerApp(app4).getLiveContainers()) { + if (container.getAllocatedPriority().getPriority() == 4) { + set.add(container); + } + } + assertTrue("Containers with priority=4 in app2 and app4 should be " + + "preempted.", set.isEmpty()); // Trigger a kill by insisting we want containers back - scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), - Resources.createResource(2 * 1024)); + scheduler.preemptResources(Resources.createResource(2 * 1024, 2)); // Pretend 15 seconds have passed clock.tick(15); // We should be able to claw back another container from A and B each. - // Make sure it is lowest priority container. - scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), - Resources.createResource(2 * 1024)); - - assertEquals(1, scheduler.getSchedulerApp(app1).getLiveContainers().size()); + // For queueA (fifo), continue preempting from app2. + // For queueB (fair), even app4 has a lowest priority container with p=4, it + // still preempts from app3 as app3 is most over fair share. + scheduler.preemptResources(Resources.createResource(2 * 1024, 2)); + + assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size()); assertEquals(0, scheduler.getSchedulerApp(app2).getLiveContainers().size()); - assertEquals(0, scheduler.getSchedulerApp(app3).getLiveContainers().size()); + assertEquals(1, scheduler.getSchedulerApp(app3).getLiveContainers().size()); assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size()); - assertEquals(0, scheduler.getSchedulerApp(app5).getLiveContainers().size()); - assertEquals(0, scheduler.getSchedulerApp(app6).getLiveContainers().size()); // Now A and B are below fair share, so preemption shouldn't do anything - scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), - Resources.createResource(2 * 1024)); - assertEquals(1, scheduler.getSchedulerApp(app1).getLiveContainers().size()); - assertEquals(0, scheduler.getSchedulerApp(app2).getLiveContainers().size()); - assertEquals(0, scheduler.getSchedulerApp(app3).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size()); - assertEquals(0, scheduler.getSchedulerApp(app5).getLiveContainers().size()); - assertEquals(0, scheduler.getSchedulerApp(app6).getLiveContainers().size()); + scheduler.preemptResources(Resources.createResource(2 * 1024, 2)); + assertTrue("App1 should have no container to be preempted", + scheduler.getSchedulerApp(app1).getPreemptionContainers().isEmpty()); + assertTrue("App2 should have no container to be preempted", + scheduler.getSchedulerApp(app2).getPreemptionContainers().isEmpty()); + assertTrue("App3 should have no container to be preempted", + scheduler.getSchedulerApp(app3).getPreemptionContainers().isEmpty()); + assertTrue("App4 should have no container to be preempted", + scheduler.getSchedulerApp(app4).getPreemptionContainers().isEmpty()); } @Test (timeout = 5000)