diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java index 0734a4a..cdd332d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java @@ -21,8 +21,10 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -83,7 +85,9 @@ final Map> reservedContainers = new HashMap>(); - + + final Map preemptionMap = new HashMap(); + /** * Count how many times the application has been given an opportunity * to schedule a task at each priority. Each time the scheduler @@ -106,6 +110,7 @@ public FSSchedulerApp(ApplicationAttemptId applicationAttemptId, this.appSchedulingInfo = new AppSchedulingInfo(applicationAttemptId, user, queue, activeUsersManager); + this.queue = queue; } @@ -230,6 +235,9 @@ synchronized public void containerCompleted(RMContainer rmContainer, Resource containerResource = rmContainer.getContainer().getResource(); queue.getMetrics().releaseResources(getUser(), 1, containerResource); Resources.subtractFrom(currentConsumption, containerResource); + + // remove from preemption map if it is completed + preemptionMap.remove(rmContainer); } synchronized public List pullNewlyAllocatedContainers() { @@ -572,4 +580,27 @@ public synchronized void resetAllowedLocalityLevel(Priority priority, " priority " + priority); allowedLocalityLevel.put(priority, level); } + + // related methods + public void addPreemption(RMContainer container, long time) { + assert preemptionMap.get(container) == null; + preemptionMap.put(container, time); + } + + public Long getPreemptionTimeForContainer(RMContainer container) { + return preemptionMap.get(container); + } + + public Set getPreemptionContainers() { + return preemptionMap.keySet(); + } + + public Set getPreemptionContainersById() { + Set ret = new HashSet(); + for (RMContainer r : preemptionMap.keySet()) { + ret.add(r.getContainerId()); + } + return ret; + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 15f3eba..3466f0f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -155,10 +155,13 @@ private Resource clusterCapacity = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class); - // How often tasks are preempted (must be longer than a couple + // How often tasks are preempted + protected long preemptionInterval = 5000; + + // preeemption give it 30sec before force killing stuff (must be longer than a couple // of heartbeats to give task-kill commands a chance to act). - protected long preemptionInterval = 15000; - + protected long maxWaitTimeBeforeKill = 15000; + protected boolean preemptionEnabled; protected boolean sizeBasedWeight; // Give larger weights to larger jobs protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster @@ -349,13 +352,27 @@ public int compare(RMContainer c1, RMContainer c2) { LOG.info("Preempting container (prio=" + container.getContainer().getPriority() + "res=" + container.getContainer().getResource() + ") from queue " + sched.getName()); - ContainerStatus status = SchedulerUtils.createAbnormalContainerStatus( - container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER); - + // TODO: Not sure if this ever actually adds this to the list of cleanup // containers on the RMNode (see SchedulerNode.releaseContainer()). - completedContainer(container, status, RMContainerEventType.KILL); - + Long time = apps.get(container).getPreemptionTimeForContainer(container); + + if (time != null) { + // if we asked for preemption more than maxWaitTimeBeforeKill ms ago, + // proceed with kill + ContainerStatus status = SchedulerUtils.createAbnormalContainerStatus( + container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER); + if (time + maxWaitTimeBeforeKill < clock.getTime()) { + completedContainer(container, status, RMContainerEventType.KILL); + LOG.info("Killing container" + container + + " (after waiting for premption for " + + (clock.getTime() - time) + "ms)"); + } + } else { + // track in the FSSchedApp itself about the preemption requeust (and + // the time of request) + apps.get(container).addPreemption(container, clock.getTime()); + } toPreempt = Resources.subtract(toPreempt, container.getContainer().getResource()); if (Resources.lessThanOrEqual(RESOURCE_CALCULATOR, clusterCapacity, @@ -748,8 +765,13 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, " #ask=" + ask.size()); } + if (LOG.isDebugEnabled()) { + LOG.debug("Preempting " + + application.getPreemptionContainersById().size() + " container(s)"); + } + return new Allocation(application.pullNewlyAllocatedContainers(), - application.getHeadroom()); + application.getHeadroom(),application.getPreemptionContainersById()); } } @@ -950,7 +972,12 @@ public synchronized void reinitialize(Configuration conf, RMContext rmContext) assignMultiple = this.conf.getAssignMultiple(); maxAssign = this.conf.getMaxAssign(); sizeBasedWeight = this.conf.getSizeBasedWeight(); - + + preemptionInterval = + this.conf.getLong("yarn.scheduler.fair.preemptionInterval", 5000); + maxWaitTimeBeforeKill = + this.conf.getLong("yarn.scheduler.fair.maxWaitTimeBeforeKill", 15000); + if (!initialized) { rootMetrics = QueueMetrics.forQueue("root", null, true, conf); this.rmContext = rmContext; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index e6761b9..fa204da 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -30,8 +30,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import javax.xml.parsers.ParserConfigurationException; @@ -891,9 +894,18 @@ else if (p.getName().equals("root.queueB")) { */ public void testChoiceOfPreemptedContainers() throws Exception { Configuration conf = createConfiguration(); + + //setting known time outs of 5 and 10 seconds + conf.setLong("yarn.scheduler.fair.preemptionInterval",5000); + conf.setLong("yarn.scheduler.fair.maxWaitTimeBeforeKill",10000); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file", ALLOC_FILE); scheduler.reinitialize(conf, resourceManager.getRMContext()); + + MockClock clock = new MockClock(); + scheduler.setClock(clock); + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println(""); out.println(""); @@ -988,15 +1000,40 @@ public void testChoiceOfPreemptedContainers() throws Exception { Resources.createResource(2 * 1024)); assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size()); assertEquals(1, scheduler.applications.get(app2).getLiveContainers().size()); - assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size()); assertEquals(1, scheduler.applications.get(app4).getLiveContainers().size()); assertEquals(1, scheduler.applications.get(app5).getLiveContainers().size()); + + //first verify we are adding containers to preemption list for the application + assertTrue(!Collections.disjoint(scheduler.applications.get(app3).getLiveContainers(), + scheduler.applications.get(app3).getPreemptionContainers())); + assertTrue(!Collections.disjoint(scheduler.applications.get(app6).getLiveContainers(), + scheduler.applications.get(app6).getPreemptionContainers())); + + //pretend 15 seconds are passed + clock.tick(15); + + //trigger a kill by insisting we want containers back + scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), + Resources.createResource(2 * 1024)); + + //at this point the containers should have been killed (since we are not simulating AM) assertEquals(0, scheduler.applications.get(app6).getLiveContainers().size()); + assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size()); + + + //trigger a kill by insisting we want containers back + scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), + Resources.createResource(2 * 1024)); + + //pretend 15 seconds are passed + clock.tick(15); // We should be able to claw back another container from A and B each. // Make sure it is lowest priority container. scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), Resources.createResource(2 * 1024)); + + assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size()); assertEquals(0, scheduler.applications.get(app2).getLiveContainers().size()); assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size());