diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index f0d1ed1..c7a9375 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -382,6 +382,12 @@ public void addPreemption(RMContainer container, long time) { Resources.addTo(preemptedResources, container.getAllocatedResource()); } + public void removePreemption(RMContainer container) { + assert preemptionMap.get(container) == null; + preemptionMap.remove(container); + Resources.subtractFrom(preemptedResources, container.getAllocatedResource()); + } + public Long getContainerPreemptionTime(RMContainer container) { return preemptionMap.get(container); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 04dbd2f..4d77fbb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -27,7 +27,6 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -347,6 +346,12 @@ public Resource assignContainer(FSSchedulerNode node) { } @Override + public void preemptResource() { + // Try to process preemption request for this level + preemptResourceBetweenChildren(); + } + + @Override public RMContainer preemptContainer() { RMContainer toBePreempted = null; @@ -542,28 +547,4 @@ private boolean preemptContainerPreCheck() { return parent.getPolicy().checkIfUsageOverFairShare(getResourceUsage(), getFairShare()); } - - /** - * Is a queue being starved for its min share. - */ - @VisibleForTesting - boolean isStarvedForMinShare() { - return isStarved(getMinShare()); - } - - /** - * Is a queue being starved for its fair share threshold. - */ - @VisibleForTesting - boolean isStarvedForFairShare() { - return isStarved( - Resources.multiply(getFairShare(), getFairSharePreemptionThreshold())); - } - - private boolean isStarved(Resource share) { - Resource desiredShare = Resources.min(scheduler.getResourceCalculator(), - scheduler.getClusterResource(), share, getDemand()); - return Resources.lessThan(scheduler.getResourceCalculator(), - scheduler.getClusterResource(), getResourceUsage(), desiredShare); - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index f74106a..63b98dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.Iterator; import java.util.List; import org.apache.commons.logging.Log; @@ -34,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; @@ -179,6 +181,28 @@ public Resource assignContainer(FSSchedulerNode node) { } @Override + public void clearPreemptedResources() { + preemptionRequestFromChildren.setMemory(0); + preemptionRequestFromChildren.setVirtualCores(0); + resourceToPreemptBetweenChildren.setMemory(0); + resourceToPreemptBetweenChildren.setVirtualCores(0); + + for (FSQueue queue : childQueues) { + queue.clearPreemptedResources(); + } + } + + @Override + public void preemptResource() { + // Try to process preemption request for this level + preemptResourceBetweenChildren(); + + for (FSQueue queue : childQueues) { + queue.preemptResource(); + } + } + + @Override public RMContainer preemptContainer() { RMContainer toBePreempted = null; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index ade2880..9f68117 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -20,9 +20,11 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.Iterator; import java.util.List; import java.util.Set; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -38,6 +40,8 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.util.resource.Resources; @@ -63,6 +67,11 @@ private long minSharePreemptionTimeout = Long.MAX_VALUE; private float fairSharePreemptionThreshold = 0.5f; + protected Resource preemptionRequestFromChildren = Resources.createResource(0, 0); + protected Resource resourceToPreemptBetweenChildren = Resources.createResource(0, 0); + + protected List warnedContainers = new ArrayList(); + public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) { this.name = name; this.scheduler = scheduler; @@ -98,6 +107,11 @@ protected void throwPolicyDoesnotApplyException(SchedulingPolicy policy) public abstract void setPolicy(SchedulingPolicy policy) throws AllocationConfigurationException; + /** + * Preempt resource downside + */ + public abstract void preemptResource(); + @Override public ResourceWeights getWeights() { return scheduler.getAllocationConfiguration().getQueueWeight(getName()); @@ -330,4 +344,115 @@ public void incPendingResource(String nodeLabel, Resource resourceToInc) { @Override public void decPendingResource(String nodeLabel, Resource resourceToDec) { } + + public void updateResourceToPreempt(Resource addedResource) { + if (getParent() == null || !isStarvedForFairShare()) { + // for root queue, or when queue is not starved: only should preempt from + // children + Resources.addTo(resourceToPreemptBetweenChildren, addedResource); + } else { + // for non root queue, divide preemption request to two part: preemption + // from children, and preemption from sibling + Resource usageResource = getResourceUsage(); + + Resource oldResourceAfterPreemption = Resources.add( + preemptionRequestFromChildren, usageResource); + Resources.addTo(preemptionRequestFromChildren, addedResource); + Resource newResourceAfterPreemption = Resources.add( + preemptionRequestFromChildren, usageResource); + + // request above fair share should be preempted from children + resourceToPreemptBetweenChildren = Resources.subtract( + newResourceAfterPreemption, getFairShare()); + + // request below fair share should be preempted from sibling + Resource newResourceToPreemptFromSibling = + Resources.subtract( + Resources.componentwiseMin(newResourceAfterPreemption, + getFairShare()), + Resources.componentwiseMin(oldResourceAfterPreemption, + getFairShare()) + ); + + // only update preemption request to parent if this current queue is starved + parent.updateResourceToPreempt(newResourceToPreemptFromSibling); + } + LOG.info("update resource to preempt, queue: " + getName() + ", " + + "preemption between children: " + resourceToPreemptBetweenChildren); + } + + public void clearPreemptedResources() { + preemptionRequestFromChildren.setMemory(0); + preemptionRequestFromChildren.setVirtualCores(0); + resourceToPreemptBetweenChildren.setMemory(0); + resourceToPreemptBetweenChildren.setVirtualCores(0); + } + + /** + * Is a queue being starved for its min share. + */ + @VisibleForTesting + boolean isStarvedForMinShare() { + return isStarved(getMinShare()); + } + + /** + * Is a queue being starved for its fair share threshold. + */ + @VisibleForTesting + boolean isStarvedForFairShare() { + return isStarved( + Resources.multiply(getFairShare(), getFairSharePreemptionThreshold())); + } + + private boolean isStarved(Resource share) { + Resource desiredShare = Resources.min(scheduler.getResourceCalculator(), + scheduler.getClusterResource(), share, getDemand()); + return Resources.lessThan(scheduler.getResourceCalculator(), + scheduler.getClusterResource(), getResourceUsage(), desiredShare); + } + + protected void preemptResourceBetweenChildren() { + // warn or kill containers that has already been chosen to preempt + LOG.info("Trying to preempt resource under queue " + getName() + + " for resource " + + resourceToPreemptBetweenChildren); + + Iterator warnedIter = warnedContainers.iterator(); + Resource toPreempt = Resources.clone(resourceToPreemptBetweenChildren); + while (warnedIter.hasNext()) { + RMContainer container = warnedIter.next(); + if ((container.getState() == RMContainerState.RUNNING || + container.getState() == RMContainerState.ALLOCATED) && + Resources.greaterThan(scheduler.getResourceCalculator(), + scheduler.getClusterResource(), + toPreempt, Resources.none())) { + scheduler.warnOrKillContainer(container); + Resources + .subtractFrom(toPreempt, container.getContainer().getResource()); + } else { + // container finished or preemption request is gone + warnedIter.remove(); + // remove container from its original application's preempted resource + scheduler.removePreemption(container); + } + } + + // preempt from children for remaining preemption request + while (Resources.greaterThan(scheduler.getResourceCalculator(), + scheduler.getClusterResource(), + toPreempt, Resources.none())) { + RMContainer container = preemptContainer(); + if (container == null) { + break; + } else { + scheduler.warnOrKillContainer(container); + warnedContainers.add(container); // mark container on this queue + Resources.subtractFrom( + toPreempt, container.getContainer().getResource()); + LOG.info("Succeeded preempt resource under queue " + getName() + + " from container: " + container); + } + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index f481de5..952e374 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -169,7 +169,10 @@ // Containers whose AMs have been warned that they will be preempted soon. private List warnedContainers = new ArrayList(); - + // Containers preempted for which queue + private HashMap> mapQueueToWarnedContainers = + new HashMap>(); + protected boolean sizeBasedWeight; // Give larger weights to larger jobs protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not @@ -358,16 +361,38 @@ protected synchronized void preemptTasksIfNecessary() { } lastPreemptCheckTime = curTime; + // clear parent's resToPreempt + queueMgr.getRootQueue().clearPreemptedResources(); + + // calculation resToPreempt + Resource resToPreempt = Resources.clone(Resources.none()); for (FSLeafQueue sched : queueMgr.getLeafQueues()) { Resources.addTo(resToPreempt, resToPreempt(sched, curTime)); } + + // do preemption if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, resToPreempt, Resources.none())) { - preemptResources(resToPreempt); + preemptResource(); } } + private void preemptResource() { + long start = getClock().getTime(); + for (FSLeafQueue queue : getQueueManager().getLeafQueues()) { + queue.resetPreemptedResources(); + } + + getQueueManager().getRootQueue().preemptResource(); + // Clear preemptedResources for each app + for (FSLeafQueue queue : getQueueManager().getLeafQueues()) { + queue.clearPreemptedResources(); + } + long duration = getClock().getTime() - start; + fsOpDurations.addPreemptCallDuration(duration); + } + /** * Preempt a quantity of resources. Each round, we start from the root queue, * level-by-level, until choosing a candidate application. @@ -432,12 +457,17 @@ protected void preemptResources(Resource toPreempt) { fsOpDurations.addPreemptCallDuration(duration); } - protected void warnOrKillContainer(RMContainer container) { + public void warnOrKillContainer(RMContainer container) { ApplicationAttemptId appAttemptId = container.getApplicationAttemptId(); FSAppAttempt app = getSchedulerApp(appAttemptId); + if (app == null) { + return; + } + FSLeafQueue queue = app.getQueue(); - LOG.info("Preempting container (prio=" + container.getContainer().getPriority() + - "res=" + container.getContainer().getResource() + + LOG.info("Preempting container " + container + " (prio=" + + container.getContainer().getPriority() + + " res=" + container.getContainer().getResource() + ") from queue " + queue.getName()); Long time = app.getContainerPreemptionTime(container); @@ -454,7 +484,7 @@ protected void warnOrKillContainer(RMContainer container) { // TODO: Not sure if this ever actually adds this to the list of cleanup // containers on the RMNode (see SchedulerNode.releaseContainer()). completedContainer(container, status, RMContainerEventType.KILL); - LOG.info("Killing container" + container + + LOG.info("Killing container " + container + " (after waiting for premption for " + (getClock().getTime() - time) + "ms)"); } @@ -464,6 +494,16 @@ protected void warnOrKillContainer(RMContainer container) { } } + public void removePreemption(RMContainer container) { + ApplicationAttemptId appAttemptId = container.getApplicationAttemptId(); + FSAppAttempt app = getSchedulerApp(appAttemptId); + if (app == null) { + return; + } + + app.removePreemption(container); + } + /** * Return the resource amount that this queue is allowed to preempt, if any. * If the queue has been below its min share for at least its preemption @@ -499,6 +539,7 @@ protected Resource resToPreempt(FSLeafQueue sched, long curTime) { + sched.getName() + ": resDueToMinShare = " + resDueToMinShare + ", resDueToFairShare = " + resDueToFairShare; LOG.info(message); + sched.getParent().updateResourceToPreempt(resToPreempt); } return resToPreempt; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java index 458b06d..bbaab9e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java @@ -18,10 +18,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; @@ -35,6 +36,8 @@ import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; +import java.util.HashSet; +import java.util.Set; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -44,17 +47,18 @@ TestFairSchedulerPreemption.class.getName() + ".xml").getAbsolutePath(); private MockClock clock; + private RMNode node; private static class StubbedFairScheduler extends FairScheduler { - public int lastPreemptMemory = -1; - + Set preemptedContainers = new HashSet(); @Override - protected void preemptResources(Resource toPreempt) { - lastPreemptMemory = toPreempt.getMemory(); + public void warnOrKillContainer(RMContainer container) { + preemptedContainers.add(container); + super.warnOrKillContainer(container); } public void resetLastPreemptResources() { - lastPreemptMemory = -1; + preemptedContainers.clear(); } } @@ -85,6 +89,11 @@ public void teardown() { private void startResourceManager(float utilizationThreshold) { conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, utilizationThreshold); + + // Allow really kill container in 2nd calling of preemptTasksIfNecessary() + conf.setInt(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 1); + conf.setInt(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 0); + resourceManager = new MockRM(conf); resourceManager.start(); @@ -96,32 +105,39 @@ private void startResourceManager(float utilizationThreshold) { scheduler.updateInterval = 60 * 1000; } - private void registerNodeAndSubmitApp( - int memory, int vcores, int appContainers, int appMemory) { - RMNode node1 = MockNodes.newNodeInfo( + private void initClusterResource(int memory, int vcores) { + node = MockNodes.newNodeInfo( 1, Resources.createResource(memory, vcores), 1, "node1"); - NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node); scheduler.handle(nodeEvent1); assertEquals("Incorrect amount of resources in the cluster", memory, scheduler.rootMetrics.getAvailableMB()); assertEquals("Incorrect amount of resources in the cluster", vcores, scheduler.rootMetrics.getAvailableVirtualCores()); + } - createSchedulingRequest(appMemory, "queueA", "user1", appContainers); + private ApplicationAttemptId createSchedulingRequestAndSchedule( + int memory, String queueId, String userId, + int numContainers, int priority) { + ApplicationAttemptId applicationAttemptId = + createSchedulingRequest(memory, queueId, userId, numContainers, + priority); scheduler.update(); + nodemanagerHearbeat(); + return applicationAttemptId; + } + + private void nodemanagerHearbeat() { // Sufficient node check-ins to fully schedule containers - for (int i = 0; i < 3; i++) { - NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); + for (int i = 0; i < node.getTotalCapability().getMemory() / 1024; i++) { + NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node); scheduler.handle(nodeUpdate1); } - assertEquals("app1's request is not met", - memory - appContainers * appMemory, - scheduler.rootMetrics.getAvailableMB()); } @Test - public void testPreemptionWithFreeResources() throws Exception { + public void testPreemptionForDepth1() throws Exception { PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println(""); out.println(""); @@ -136,55 +152,393 @@ public void testPreemptionWithFreeResources() throws Exception { out.println("1"); out.println("1024mb,0vcores"); out.println(""); - out.print("5"); - out.print("10"); + out.println("5"); out.println(""); out.close(); - startResourceManager(0f); + startResourceManager(0.8f); // Create node with 4GB memory and 4 vcores - registerNodeAndSubmitApp(4 * 1024, 4, 2, 1024); + initClusterResource(4 * 1024, 4); + + createSchedulingRequestAndSchedule(1024, "queueA", "user1", 3, 1); - // Verify submitting another request triggers preemption + // Verify submitting another request doesn't trigger preemption createSchedulingRequest(1024, "queueB", "user1", 1, 1); scheduler.update(); clock.tick(6); ((StubbedFairScheduler) scheduler).resetLastPreemptResources(); scheduler.preemptTasksIfNecessary(); - assertEquals("preemptResources() should have been called", 1024, - ((StubbedFairScheduler) scheduler).lastPreemptMemory); + assertTrue("preemptResources() should not have been called", + ((StubbedFairScheduler) scheduler).preemptedContainers.isEmpty()); resourceManager.stop(); - startResourceManager(0.8f); + startResourceManager(0.7f); // Create node with 4GB memory and 4 vcores - registerNodeAndSubmitApp(4 * 1024, 4, 3, 1024); + initClusterResource(4 * 1024, 4); + ApplicationAttemptId applicationAttemptId = + createSchedulingRequestAndSchedule(1024, "queueA", "user1", 4, 1); - // Verify submitting another request doesn't trigger preemption createSchedulingRequest(1024, "queueB", "user1", 1, 1); scheduler.update(); clock.tick(6); ((StubbedFairScheduler) scheduler).resetLastPreemptResources(); scheduler.preemptTasksIfNecessary(); - assertEquals("preemptResources() should not have been called", -1, - ((StubbedFairScheduler) scheduler).lastPreemptMemory); + Set preemptedContainers = + ((StubbedFairScheduler) scheduler).preemptedContainers; + assertEquals("Container(s) should have been preempted", 1, + preemptedContainers.size()); + for (RMContainer preemptedContainer : preemptedContainers) { + assertEquals(applicationAttemptId, + preemptedContainer.getApplicationAttemptId()); + } + clock.tick(2); + scheduler.preemptTasksIfNecessary(); + scheduler.update(); + nodemanagerHearbeat(); - resourceManager.stop(); + FSQueue queueB = scheduler.getQueueManager().getQueue("queueB"); + assertEquals(1024, queueB.getResourceUsage().getMemory()); + } + + @Test + public void testPreemptionForDepth2Case1() throws Exception { + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("0mb,0vcores"); + out.println(""); + out.println(""); + out.println("1"); + out.println(" "); + out.println(" 1"); + out.println(" "); + out.println(" "); + out.println(" 1"); + out.println(" "); + out.println(""); + out.println("5"); + out.println("0.8"); + out.println(""); + out.close(); startResourceManager(0.7f); // Create node with 4GB memory and 4 vcores - registerNodeAndSubmitApp(4 * 1024, 4, 3, 1024); + initClusterResource(4 * 1024, 4); + ApplicationAttemptId applicationAttemptId = + createSchedulingRequestAndSchedule(1024, "queueA.1", "user1", 4, 1); - // Verify submitting another request triggers preemption - createSchedulingRequest(1024, "queueB", "user1", 1, 1); + createSchedulingRequest(1024, "queueA.2", "user1", 3, 1); + scheduler.update(); + nodemanagerHearbeat(); + clock.tick(6); + + ((StubbedFairScheduler) scheduler).resetLastPreemptResources(); + scheduler.preemptTasksIfNecessary(); + Set preemptedContainers = + ((StubbedFairScheduler) scheduler).preemptedContainers; + assertEquals("Container(s) should have been preempted", 2, + preemptedContainers.size()); + for (RMContainer preemptedContainer : preemptedContainers) { + assertEquals(applicationAttemptId, + preemptedContainer.getApplicationAttemptId()); + } + clock.tick(2); + scheduler.preemptTasksIfNecessary(); + + scheduler.update(); + nodemanagerHearbeat(); + + // verify final usage + assertEquals(2048, scheduler.getQueueManager().getQueue("queueA.1") + .getResourceUsage().getMemory()); + assertEquals(2048, scheduler.getQueueManager().getQueue("queueA.2") + .getResourceUsage().getMemory()); + } + + @Test + public void testPreemptionForDepth2Case2() throws Exception { + // case: A2 should only preempt from sibling A1, when other queue B is + // under fair share + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("0mb,0vcores"); + out.println(""); + out.println(""); + out.println("1"); + out.println(" "); + out.println(" 1"); + out.println(" "); + out.println(" "); + out.println(" 1"); + out.println(" "); + out.println(""); + out.println(""); + out.println("1"); + out.println(""); + out.println("5"); + out.println("0.8"); + out.println(""); + out.close(); + + startResourceManager(0.7f); + // Create node with 4GB memory and 4 vcores + initClusterResource(4 * 1024, 4); + + ApplicationAttemptId applicationAttemptId = + createSchedulingRequestAndSchedule(1024, "queueA.1", "user1", 2, 1); + createSchedulingRequestAndSchedule(1024, "queueB", "user1", 2, 1); + + createSchedulingRequest(1024, "queueA.2", "user1", 4, 1); + scheduler.update(); + nodemanagerHearbeat(); + clock.tick(6); + + ((StubbedFairScheduler) scheduler).resetLastPreemptResources(); + scheduler.preemptTasksIfNecessary(); + Set preemptedContainers = + ((StubbedFairScheduler) scheduler).preemptedContainers; + assertEquals("Container(s) should have been preempted", 1, + preemptedContainers.size()); + for (RMContainer preemptedContainer : preemptedContainers) { + assertEquals("Should only preempt from sibling", applicationAttemptId, + preemptedContainer.getApplicationAttemptId()); + } + clock.tick(2); + scheduler.preemptTasksIfNecessary(); + + scheduler.update(); + nodemanagerHearbeat(); + + // verify final usage + assertEquals(1024, scheduler.getQueueManager().getQueue("queueA.1") + .getResourceUsage().getMemory()); + assertEquals(1024, scheduler.getQueueManager().getQueue("queueA.2") + .getResourceUsage().getMemory()); + assertEquals(2048, scheduler.getQueueManager().getQueue("queueB") + .getResourceUsage().getMemory()); + } + + @Test + public void testPreemptionForDepth2Case3() throws Exception { + // case: A2 should preempt from A1 and B + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("0mb,0vcores"); + out.println(""); + out.println(""); + out.println("1"); + out.println(" "); + out.println(" 1"); + out.println(" "); + out.println(" "); + out.println(" 1"); + out.println(" "); + out.println(""); + out.println(""); + out.println("1"); + out.println(""); + out.println("5"); + out.println("0.8"); + out.println(""); + out.close(); + + startResourceManager(0.7f); + initClusterResource(100 * 1024, 100); + + ApplicationAttemptId applicationAttemptIdA1 = + createSchedulingRequestAndSchedule(1024, "queueA.1", "user1", 30, 1); + ApplicationAttemptId applicationAttemptIdB = + createSchedulingRequestAndSchedule(1024, "queueB", "user1", 70, 1); + + createSchedulingRequest(1024, "queueA.2", "user1", 50, 1); scheduler.update(); + nodemanagerHearbeat(); clock.tick(6); ((StubbedFairScheduler) scheduler).resetLastPreemptResources(); scheduler.preemptTasksIfNecessary(); - assertEquals("preemptResources() should have been called", 1024, - ((StubbedFairScheduler) scheduler).lastPreemptMemory); + Set preemptedContainers = + ((StubbedFairScheduler) scheduler).preemptedContainers; + assertEquals("Container(s) should have been preempted", 25, + preemptedContainers.size()); + int numPreemptFromQueueA1 = 0; + int numPreemptFromQueueB = 0; + for (RMContainer preemptedContainer : preemptedContainers) { + if (applicationAttemptIdA1.equals( + preemptedContainer.getApplicationAttemptId())) { + ++numPreemptFromQueueA1; + } + if (applicationAttemptIdB.equals( + preemptedContainer.getApplicationAttemptId())) { + ++numPreemptFromQueueB; + } + } + + assertEquals("Should preempt from queueA.1", 5, numPreemptFromQueueA1); + assertEquals("Should preempt from queueB", 20, numPreemptFromQueueB); + clock.tick(2); + scheduler.preemptTasksIfNecessary(); + + scheduler.update(); + nodemanagerHearbeat(); + + // verify final usage + assertEquals(1024 * 25, scheduler.getQueueManager().getQueue("queueA.1") + .getResourceUsage().getMemory()); + assertEquals(1024 * 25, scheduler.getQueueManager().getQueue("queueA.2") + .getResourceUsage().getMemory()); + assertEquals(1024 * 50, scheduler.getQueueManager().getQueue("queueB") + .getResourceUsage().getMemory()); + } + + @Test + public void testPreemptionForDepth2Case4() throws Exception { + // case: A2 should only preempt from A1 when parent queue A reached its max + // limit, even when other siblings of queue A is over fair share + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("0mb,0vcores"); + out.println(""); + out.println(""); + out.println("1"); + out.println("20480mb,20vcores"); + out.println(" "); + out.println(" 1"); + out.println(" "); + out.println(" "); + out.println(" 1"); + out.println(" "); + out.println(""); + out.println(""); + out.println("1"); + out.println(""); + out.println(""); + out.println("1"); + out.println(""); + out.println("5"); + out.println("0.8"); + out.println(""); + out.close(); + + startResourceManager(0.7f); + initClusterResource(100 * 1024, 100); + + ApplicationAttemptId applicationAttemptIdA1 = + createSchedulingRequestAndSchedule(1024, "queueA.1", "user1", 20, 1); + ApplicationAttemptId applicationAttemptIdB = + createSchedulingRequestAndSchedule(1024, "queueB", "user1", 20, 1); + ApplicationAttemptId applicationAttemptIdC = + createSchedulingRequestAndSchedule(1024, "queueC", "user1", 60, 1); + + createSchedulingRequest(1024, "queueA.2", "user1", 50, 1); + scheduler.update(); + nodemanagerHearbeat(); + clock.tick(6); + + ((StubbedFairScheduler) scheduler).resetLastPreemptResources(); + scheduler.preemptTasksIfNecessary(); + Set preemptedContainers = + ((StubbedFairScheduler) scheduler).preemptedContainers; + assertEquals("Container(s) should have been preempted", 10, + preemptedContainers.size()); + for (RMContainer preemptedContainer : preemptedContainers) { + assertEquals("Should only preempt from A1", applicationAttemptIdA1, + preemptedContainer.getApplicationAttemptId()); + } + + clock.tick(2); + scheduler.preemptTasksIfNecessary(); + + scheduler.update(); + nodemanagerHearbeat(); + + // verify final usage + assertEquals(1024 * 10, scheduler.getQueueManager().getQueue("queueA.1") + .getResourceUsage().getMemory()); + assertEquals(1024 * 10, scheduler.getQueueManager().getQueue("queueA.2") + .getResourceUsage().getMemory()); + assertEquals(1024 * 20, scheduler.getQueueManager().getQueue("queueB") + .getResourceUsage().getMemory()); + + // queueC is over fair share, but will not be preempted + assertEquals(1024 * 40, scheduler.getQueueManager().getQueue("queueC") + .getFairShare().getMemory()); + assertEquals(1024 * 60, scheduler.getQueueManager().getQueue("queueC") + .getResourceUsage().getMemory()); + } + + @Test + public void testPreemptionForDepth2Case5() throws Exception { + // case: starved A2 should only preempt from sibling A1 when parent A is + // not starved + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("0mb,0vcores"); + out.println(""); + out.println(""); + out.println("1"); + out.println(" "); + out.println(" 1"); + out.println(" "); + out.println(" "); + out.println(" 1"); + out.println(" "); + out.println(""); + out.println(""); + out.println("1"); + out.println(""); + out.println("5"); + out.println("0.8"); + out.println(""); + out.close(); + + startResourceManager(0.8f); + // Create node with 10GB memory and 10 vcores + initClusterResource(10 * 1024, 10); + + ApplicationAttemptId applicationAttemptId = + createSchedulingRequestAndSchedule(1024, "queueA.1", "user1", 4, 1); + createSchedulingRequestAndSchedule(1024, "queueB", "user1", 6, 1); + + createSchedulingRequest(1024, "queueA.2", "user1", 1, 1); + scheduler.update(); + nodemanagerHearbeat(); + clock.tick(6); + + ((StubbedFairScheduler) scheduler).resetLastPreemptResources(); + scheduler.preemptTasksIfNecessary(); + Set preemptedContainers = + ((StubbedFairScheduler) scheduler).preemptedContainers; + assertEquals("Container(s) should have been preempted", 1, + preemptedContainers.size()); + for (RMContainer preemptedContainer : preemptedContainers) { + assertEquals("Should only preempt from sibling", applicationAttemptId, + preemptedContainer.getApplicationAttemptId()); + } + clock.tick(2); + scheduler.preemptTasksIfNecessary(); + + scheduler.update(); + nodemanagerHearbeat(); + + // verify final usage + assertEquals(1024 * 3, scheduler.getQueueManager().getQueue("queueA.1") + .getResourceUsage().getMemory()); + assertEquals(1024 * 1, scheduler.getQueueManager().getQueue("queueA.2") + .getResourceUsage().getMemory()); + assertEquals(1024 * 6, scheduler.getQueueManager().getQueue("queueB") + .getResourceUsage().getMemory()); } }