diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemption.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemption.java new file mode 100644 index 0000000..d0d260d --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemption.java @@ -0,0 +1,435 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ContainerStatus; + +public class FSPreemption { + private static final Log LOG = LogFactory.getLog(FairScheduler.class); + private static final ResourceCalculator RESOURCE_CALCULATOR = + new DefaultResourceCalculator(); + + private FairScheduler scheduler; + + // ms to wait before force killing stuff (must be longer than a couple + // of heartbeats to give task-kill commands a chance to act). + private long waitTimeBeforeKill; + // Time we last ran preemptTasksIfNecessary + private long lastPreemptCheckTime; + // How often tasks are preempted + protected long preemptionInterval; + // Whether enable preemption + private boolean preemptionEnabled; + + // Running containers comparator + private static final RMContainerComparator comparator = + new RMContainerComparator(); + + // Containers whose AMs have been warned that they will be preempted soon + private List warnedContainers = new ArrayList(); + + public FSPreemption(FairScheduler scheduler) { + this.scheduler = scheduler; + } + + public void initialize(FairSchedulerConfiguration conf) { + preemptionEnabled = conf.getPreemptionEnabled(); + preemptionInterval = conf.getPreemptionInterval(); + waitTimeBeforeKill = conf.getWaitTimeBeforeKill(); + } + + /** + * Check for queues that need tasks preempted, either because they have been + * below their guaranteed share for minSharePreemptionTimeout or they have + * been below half their fair share for the fairSharePreemptionTimeout. If + * such queues exist, compute how many tasks of each type need to be preempted + * and then select the right ones using preemptTasks. + */ + protected synchronized void preemptTasksIfNecessary() { + if (!preemptionEnabled) { + return; + } + + long curTime = scheduler.getClock().getTime(); + if (curTime - lastPreemptCheckTime < preemptionInterval) { + return; + } + lastPreemptCheckTime = curTime; + + Resource resToPreempt = Resources.none(); + + for (FSLeafQueue sched : scheduler.getQueueManager().getLeafQueues()) { + resToPreempt = Resources.add(resToPreempt, resToPreempt(sched, curTime)); + } + if (Resources.greaterThan(RESOURCE_CALCULATOR, + scheduler.getClusterCapacity(), resToPreempt, Resources.none())) { + preemptResources(resToPreempt); + } + } + + /** + * Update the preemption fields for all QueueScheduables, i.e. the times since + * each queue last was at its guaranteed share and at > 1/2 of its fair share + * for each type of task. + */ + protected void updatePreemptionVariables() { + long now = scheduler.getClock().getTime(); + for (FSLeafQueue sched : scheduler.getQueueManager().getLeafQueues()) { + if (!scheduler.isStarvedForMinShare(sched)) { + sched.setLastTimeAtMinShare(now); + } + if (!scheduler.isStarvedForFairShare(sched)) { + sched.setLastTimeAtHalfFairShare(now); + } + } + } + + /** + * Preempt a quantity of resources from over-scheduled queues. The policy + * for this is to pick the leaf queue that are farthest over its fair share. + * The policy for picking an app is either fair share policy or fifo, + * according to the queue's configuration. We further prioritize preemption + * by choosing containers with lowest priority to preempt. + * @param toPreempt + */ + @VisibleForTesting + void preemptResources(Resource toPreempt) { + if (scheduler.getQueueManager().getLeafQueues().isEmpty() || + Resources.equals(toPreempt, Resources.none())) { + return; + } + Resource clusterCapacity = scheduler.getClusterCapacity(); + + // Collect running containers from over-scheduled queues + Map apps = + new HashMap(); + Map queues = + new HashMap(); + for (FSLeafQueue sched : scheduler.getQueueManager().getLeafQueues()) { + if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, + sched.getResourceUsage(), sched.getFairShare())) { + for (AppSchedulable as : sched.getRunnableAppSchedulables()) { + for (RMContainer c : as.getApp().getLiveContainers()) { + apps.put(c, as.getApp()); + queues.put(c, sched); + } + } + } + } + + // Scan down the list of containers we've already warned and kill them + // if we need to. Remove any containers from the list that we don't need + // or that are no longer running. + Iterator warnedIter = warnedContainers.iterator(); + while (warnedIter.hasNext()) { + RMContainer container = warnedIter.next(); + if ((container.getState() == RMContainerState.RUNNING || + container.getState() == RMContainerState.ALLOCATED) + && Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, + toPreempt, Resources.none())) { + warnOrKillContainer(container, apps.get(container), + queues.get(container)); + Resources.subtractFrom(toPreempt, container.getContainer().getResource()); + } else { + warnedIter.remove(); + } + } + + if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, toPreempt, + Resources.none())) { + // Snapshot the queue resource usage for all queues and their apps + TempSchedulable tRoot; + synchronized (scheduler) { + tRoot = cloneQueueApps(scheduler.getQueueManager().getRootQueue()); + tRoot.setRootQueue(); + } + // Iteratively preempt container from over-scheduled queues + while (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, + toPreempt, Resources.none())) { + RMContainer container = tRoot.preemptContainer(); + if (container == null) { + // Break if we cannot find a candidate container + break; + } + warnOrKillContainer(container, apps.get(container), + queues.get(container)); + warnedContainers.add(container); + Resources.subtractFrom(toPreempt, container.getContainer().getResource()); + } + } + } + + /** + * Snapshot the resource usage + * @param root + * @return + */ + private TempSchedulable cloneQueueApps(Schedulable root) { + TempSchedulable sche; + synchronized (root) { + sche = new TempSchedulable(root, scheduler.getClusterCapacity()); + if (root instanceof FSParentQueue) { + for (FSQueue child : ((FSParentQueue) root).getChildQueues()) { + sche.addChild(cloneQueueApps(child)); + } + } else if (root instanceof FSLeafQueue) { + for (AppSchedulable child : + ((FSLeafQueue) root).getRunnableAppSchedulables()) { + sche.addChild(cloneQueueApps(child)); + } + } + } + + return sche; + } + + private void warnOrKillContainer(RMContainer container, FSSchedulerApp app, + FSLeafQueue queue) { + LOG.info("Preempting container (prio=" + container.getContainer().getPriority() + + "res=" + container.getContainer().getResource() + + ") from queue " + queue.getName()); + + Long time = app.getContainerPreemptionTime(container); + + if (time != null) { + // if we asked for preemption more than maxWaitTimeBeforeKill ms ago, + // proceed with kill + if (time + waitTimeBeforeKill < scheduler.getClock().getTime()) { + ContainerStatus status = + SchedulerUtils.createPreemptedContainerStatus( + container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER); + + // TODO: Not sure if this ever actually adds this to the list of cleanup + // containers on the RMNode (see SchedulerNode.releaseContainer()). + scheduler.completedContainer(container, status, RMContainerEventType.KILL); + LOG.info("Killing container" + container + + " (after waiting for premption for " + + (scheduler.getClock().getTime() - time) + "ms)"); + } + } else { + // track the request in the FSSchedulerApp itself + app.addPreemption(container, scheduler.getClock().getTime()); + } + } + + /** + * Return the resource amount that this queue is allowed to preempt, if any. + * If the queue has been below its min share for at least its preemption + * timeout, it should preempt the difference between its current share and + * this min share. If it has been below half its fair share for at least the + * fairSharePreemptionTimeout, it should preempt enough tasks to get up to its + * full fair share. If both conditions hold, we preempt the max of the two + * amounts (this shouldn't happen unless someone sets the timeouts to be + * identical for some reason). + */ + protected Resource resToPreempt(FSLeafQueue sched, long curTime) { + AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); + Resource clusterCapacity = scheduler.getClusterCapacity(); + String queue = sched.getName(); + long minShareTimeout = allocConf.getMinSharePreemptionTimeout(queue); + long fairShareTimeout = allocConf.getFairSharePreemptionTimeout(); + Resource resDueToMinShare = Resources.none(); + Resource resDueToFairShare = Resources.none(); + if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) { + Resource target = Resources.min(RESOURCE_CALCULATOR, clusterCapacity, + sched.getMinShare(), sched.getDemand()); + resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity, + Resources.none(), Resources.subtract(target, sched.getResourceUsage())); + } + if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) { + Resource target = Resources.min(RESOURCE_CALCULATOR, clusterCapacity, + sched.getFairShare(), sched.getDemand()); + resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity, + Resources.none(), Resources.subtract(target, sched.getResourceUsage())); + } + Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterCapacity, + resDueToMinShare, resDueToFairShare); + if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, + resToPreempt, Resources.none())) { + String message = "Should preempt " + resToPreempt + " res for queue " + + sched.getName() + ": resDueToMinShare = " + resDueToMinShare + + ", resDueToFairShare = " + resDueToFairShare; + LOG.info(message); + } + return resToPreempt; + } + + /** + * Temporary data-structure used to clone {@link Schedulable}. + */ + static class TempSchedulable extends Schedulable { + Resource clusterCapacity; + final ArrayList children; + Schedulable sche; + boolean isRootQueue; + + public TempSchedulable(Schedulable s, Resource clusterCapacity) { + this.sche = s; + this.clusterCapacity = clusterCapacity; + this.children = new ArrayList(); + this.isRootQueue = false; + } + + public void addChild(TempSchedulable q) { + children.add(q); + } + + public void setRootQueue() { + this.isRootQueue = true; + } + + /** + * Preemption rules: + * For application: preempt the container with the minimum priority. + * For queue: reverse order of its resource assignment policy. + * @return + */ + public RMContainer preemptContainer() { + // Pre-check: only check non-root queue + if (! isRootQueue && ! (sche instanceof AppSchedulable) && + Resources.lessThanOrEqual(RESOURCE_CALCULATOR, clusterCapacity, + sche.getResourceUsage(), sche.getFairShare())) { + return null; + } + + if (sche instanceof AppSchedulable) { + AppSchedulable app = (AppSchedulable) sche; + List liveContainers = new ArrayList( + app.getApp().getLiveContainersMap().values()); + Collections.sort(liveContainers, comparator); + + RMContainer toBePreempted = liveContainers.isEmpty() ? null : + liveContainers.get(0); + return toBePreempted; + } else { + if (children.isEmpty()) { + return null; + } else { + Comparator comparator = + ((FSQueue) sche).getPolicy().getComparator(); + Collections.sort(children, comparator); + // Preempt from the last one + return children.get(children.size() - 1).preemptContainer(); + } + } + } + + @Override + public Resource getResourceUsage() { + Resource usage = Resources.createResource(0); + if (sche instanceof AppSchedulable) { + // All running resources - toBePreempted + AppSchedulable app = (AppSchedulable) sche; + for (RMContainer container : app.getApp().getLiveContainers()) { + if (! app.getApp().getPreemptionContainers().contains(container)) { + Resources.addTo(usage, container.getAllocatedResource()); + } + } + } else { + for (TempSchedulable tSche : children) { + Resources.addTo(usage, tSche.getResourceUsage()); + } + } + return usage; + } + + @Override + public Resource getMinShare() { + assert sche != null; + return sche.getMinShare(); + } + + @Override + public Resource getMaxShare() { + assert sche != null; + return sche.getMaxShare(); + } + + @Override + public ResourceWeights getWeights() { + assert sche != null; + return sche.getWeights(); + } + + @Override + public long getStartTime() { + assert sche != null; + return sche.getStartTime(); + } + + @Override + public Priority getPriority() { + assert sche != null; + return sche.getPriority(); + } + + @Override + public void updateDemand() { + } + + @Override + public Resource assignContainer(FSSchedulerNode node) { + return null; + } + + @Override + public String getName() { + assert sche != null; + return sche.getName(); + } + + @Override + public Resource getDemand() { + assert sche != null; + return sche.getDemand(); + } + + } + + static class RMContainerComparator implements Comparator { + public int compare(RMContainer c1, RMContainer c2) { + int ret = c1.getContainer().getPriority().compareTo( + c2.getContainer().getPriority()); + if (ret == 0) { + return c2.getContainerId().compareTo(c1.getContainerId()); + } + return ret; + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 3cdff7f..54b0038 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -147,10 +147,9 @@ // Aggregate metrics FSQueueMetrics rootMetrics; - // Time when we last updated preemption vars - protected long lastPreemptionUpdateTime; - // Time we last ran preemptTasksIfNecessary - private long lastPreemptCheckTime; + // Preemption policy + @VisibleForTesting + FSPreemption preemptor; // Nodes in the cluster, indexed by NodeId private Map nodes = @@ -160,17 +159,6 @@ private Resource clusterCapacity = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class); - // How often tasks are preempted - protected long preemptionInterval; - - // ms to wait before force killing stuff (must be longer than a couple - // of heartbeats to give task-kill commands a chance to act). - protected long waitTimeBeforeKill; - - // Containers whose AMs have been warned that they will be preempted soon. - private List warnedContainers = new ArrayList(); - - protected boolean preemptionEnabled; protected boolean sizeBasedWeight; // Give larger weights to larger jobs protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not @@ -198,6 +186,7 @@ public FairScheduler() { allocsLoader = new AllocationFileLoaderService(); queueMgr = new QueueManager(this); maxRunningEnforcer = new MaxRunningAppsEnforcer(this); + preemptor = new FSPreemption(this); } private void validateConf(Configuration conf) { @@ -273,7 +262,7 @@ public void run() { try { Thread.sleep(UPDATE_INTERVAL); update(); - preemptTasksIfNecessary(); + preemptor.preemptTasksIfNecessary(); } catch (Exception e) { LOG.error("Exception in fair scheduler UpdateThread", e); } @@ -287,7 +276,8 @@ public void run() { * required resources per job. */ protected synchronized void update() { - updatePreemptionVariables(); // Determine if any queues merit preemption + // Determine if any queues merit preemption + preemptor.updatePreemptionVariables(); FSQueue rootQueue = queueMgr.getRootQueue(); @@ -301,24 +291,6 @@ protected synchronized void update() { } /** - * Update the preemption fields for all QueueScheduables, i.e. the times since - * each queue last was at its guaranteed share and at > 1/2 of its fair share - * for each type of task. - */ - private void updatePreemptionVariables() { - long now = clock.getTime(); - lastPreemptionUpdateTime = now; - for (FSLeafQueue sched : queueMgr.getLeafQueues()) { - if (!isStarvedForMinShare(sched)) { - sched.setLastTimeAtMinShare(now); - } - if (!isStarvedForFairShare(sched)) { - sched.setLastTimeAtHalfFairShare(now); - } - } - } - - /** * Is a queue below its min share for the given task type? */ boolean isStarvedForMinShare(FSLeafQueue sched) { @@ -339,186 +311,6 @@ boolean isStarvedForFairShare(FSLeafQueue sched) { sched.getResourceUsage(), desiredFairShare); } - /** - * Check for queues that need tasks preempted, either because they have been - * below their guaranteed share for minSharePreemptionTimeout or they have - * been below half their fair share for the fairSharePreemptionTimeout. If - * such queues exist, compute how many tasks of each type need to be preempted - * and then select the right ones using preemptTasks. - */ - protected synchronized void preemptTasksIfNecessary() { - if (!preemptionEnabled) { - return; - } - - long curTime = clock.getTime(); - if (curTime - lastPreemptCheckTime < preemptionInterval) { - return; - } - lastPreemptCheckTime = curTime; - - Resource resToPreempt = Resources.none(); - - for (FSLeafQueue sched : queueMgr.getLeafQueues()) { - resToPreempt = Resources.add(resToPreempt, resToPreempt(sched, curTime)); - } - if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, resToPreempt, - Resources.none())) { - preemptResources(queueMgr.getLeafQueues(), resToPreempt); - } - } - - /** - * Preempt a quantity of resources from a list of QueueSchedulables. The - * policy for this is to pick apps from queues that are over their fair share, - * but make sure that no queue is placed below its fair share in the process. - * We further prioritize preemption by choosing containers with lowest - * priority to preempt. - */ - protected void preemptResources(Collection scheds, - Resource toPreempt) { - if (scheds.isEmpty() || Resources.equals(toPreempt, Resources.none())) { - return; - } - - Map apps = - new HashMap(); - Map queues = - new HashMap(); - - // Collect running containers from over-scheduled queues - List runningContainers = new ArrayList(); - for (FSLeafQueue sched : scheds) { - if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, - sched.getResourceUsage(), sched.getFairShare())) { - for (AppSchedulable as : sched.getRunnableAppSchedulables()) { - for (RMContainer c : as.getApp().getLiveContainers()) { - runningContainers.add(c); - apps.put(c, as.getApp()); - queues.put(c, sched); - } - } - } - } - - // Sort containers into reverse order of priority - Collections.sort(runningContainers, new Comparator() { - public int compare(RMContainer c1, RMContainer c2) { - int ret = c1.getContainer().getPriority().compareTo( - c2.getContainer().getPriority()); - if (ret == 0) { - return c2.getContainerId().compareTo(c1.getContainerId()); - } - return ret; - } - }); - - // Scan down the list of containers we've already warned and kill them - // if we need to. Remove any containers from the list that we don't need - // or that are no longer running. - Iterator warnedIter = warnedContainers.iterator(); - Set preemptedThisRound = new HashSet(); - while (warnedIter.hasNext()) { - RMContainer container = warnedIter.next(); - if (container.getState() == RMContainerState.RUNNING && - Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, - toPreempt, Resources.none())) { - warnOrKillContainer(container, apps.get(container), queues.get(container)); - preemptedThisRound.add(container); - Resources.subtractFrom(toPreempt, container.getContainer().getResource()); - } else { - warnedIter.remove(); - } - } - - // Scan down the rest of the containers until we've preempted enough, making - // sure we don't preempt too many from any queue - Iterator runningIter = runningContainers.iterator(); - while (runningIter.hasNext() && - Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, - toPreempt, Resources.none())) { - RMContainer container = runningIter.next(); - FSLeafQueue sched = queues.get(container); - if (!preemptedThisRound.contains(container) && - Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, - sched.getResourceUsage(), sched.getFairShare())) { - warnOrKillContainer(container, apps.get(container), sched); - - warnedContainers.add(container); - Resources.subtractFrom(toPreempt, container.getContainer().getResource()); - } - } - } - - private void warnOrKillContainer(RMContainer container, FSSchedulerApp app, - FSLeafQueue queue) { - LOG.info("Preempting container (prio=" + container.getContainer().getPriority() + - "res=" + container.getContainer().getResource() + - ") from queue " + queue.getName()); - - Long time = app.getContainerPreemptionTime(container); - - if (time != null) { - // if we asked for preemption more than maxWaitTimeBeforeKill ms ago, - // proceed with kill - if (time + waitTimeBeforeKill < clock.getTime()) { - ContainerStatus status = - SchedulerUtils.createPreemptedContainerStatus( - container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER); - - // TODO: Not sure if this ever actually adds this to the list of cleanup - // containers on the RMNode (see SchedulerNode.releaseContainer()). - completedContainer(container, status, RMContainerEventType.KILL); - LOG.info("Killing container" + container + - " (after waiting for premption for " + - (clock.getTime() - time) + "ms)"); - } - } else { - // track the request in the FSSchedulerApp itself - app.addPreemption(container, clock.getTime()); - } - } - - /** - * Return the resource amount that this queue is allowed to preempt, if any. - * If the queue has been below its min share for at least its preemption - * timeout, it should preempt the difference between its current share and - * this min share. If it has been below half its fair share for at least the - * fairSharePreemptionTimeout, it should preempt enough tasks to get up to its - * full fair share. If both conditions hold, we preempt the max of the two - * amounts (this shouldn't happen unless someone sets the timeouts to be - * identical for some reason). - */ - protected Resource resToPreempt(FSLeafQueue sched, long curTime) { - String queue = sched.getName(); - long minShareTimeout = allocConf.getMinSharePreemptionTimeout(queue); - long fairShareTimeout = allocConf.getFairSharePreemptionTimeout(); - Resource resDueToMinShare = Resources.none(); - Resource resDueToFairShare = Resources.none(); - if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) { - Resource target = Resources.min(RESOURCE_CALCULATOR, clusterCapacity, - sched.getMinShare(), sched.getDemand()); - resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity, - Resources.none(), Resources.subtract(target, sched.getResourceUsage())); - } - if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) { - Resource target = Resources.min(RESOURCE_CALCULATOR, clusterCapacity, - sched.getFairShare(), sched.getDemand()); - resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity, - Resources.none(), Resources.subtract(target, sched.getResourceUsage())); - } - Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterCapacity, - resDueToMinShare, resDueToFairShare); - if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, - resToPreempt, Resources.none())) { - String message = "Should preempt " + resToPreempt + " res for queue " - + sched.getName() + ": resDueToMinShare = " + resDueToMinShare - + ", resDueToFairShare = " + resDueToFairShare; - LOG.info(message); - } - return resToPreempt; - } - public RMContainerTokenSecretManager getContainerTokenSecretManager() { return rmContext.getContainerTokenSecretManager(); } @@ -785,7 +577,7 @@ private synchronized void removeApplicationAttempt( /** * Clean up a completed container. */ - private synchronized void completedContainer(RMContainer rmContainer, + protected synchronized void completedContainer(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { if (rmContainer == null) { LOG.info("Null container completed..."); @@ -1244,12 +1036,9 @@ public synchronized void reinitialize(Configuration conf, RMContext rmContext) rackLocalityThreshold = this.conf.getLocalityThresholdRack(); nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs(); rackLocalityDelayMs = this.conf.getLocalityDelayRackMs(); - preemptionEnabled = this.conf.getPreemptionEnabled(); assignMultiple = this.conf.getAssignMultiple(); maxAssign = this.conf.getMaxAssign(); sizeBasedWeight = this.conf.getSizeBasedWeight(); - preemptionInterval = this.conf.getPreemptionInterval(); - waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill(); usePortForNodeName = this.conf.getUsePortForNodeName(); rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf); @@ -1269,6 +1058,8 @@ public synchronized void reinitialize(Configuration conf, RMContext rmContext) throw new IOException("Failed to start FairScheduler", e); } + preemptor.initialize(this.conf); + Thread updateThread = new Thread(new UpdateThread()); updateThread.setName("FairSchedulerUpdateThread"); updateThread.setDaemon(true); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 50121c3..3412ee4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -1075,14 +1075,14 @@ else if (p.getName().equals("root.queueB")) { @Test (timeout = 5000) /** - * Make sure containers are chosen to be preempted in the correct order. Right - * now this means decreasing order of priority. + * Make sure containers are chosen to be preempted in the correct order. */ public void testChoiceOfPreemptedContainers() throws Exception { conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000); conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file", ALLOC_FILE); - + conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false"); + MockClock clock = new MockClock(); scheduler.setClock(clock); @@ -1098,7 +1098,7 @@ public void testChoiceOfPreemptedContainers() throws Exception { out.println(""); out.println(".25"); out.println(""); - out.println(""); + out.println(""); out.println(".25"); out.println(""); out.println(""); @@ -1106,133 +1106,152 @@ public void testChoiceOfPreemptedContainers() throws Exception { scheduler.reinitialize(conf, resourceManager.getRMContext()); - // Create four nodes + // Create two nodes RMNode node1 = - MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 1, + MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); scheduler.handle(nodeEvent1); RMNode node2 = - MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 2, + MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 2, "127.0.0.2"); NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); scheduler.handle(nodeEvent2); - RMNode node3 = - MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 3, - "127.0.0.3"); - NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3); - scheduler.handle(nodeEvent3); - - - // Queue A and B each request three containers + // Queue A and B each request two applications. + // Each application requests two containers with priority 1 and 2. ApplicationAttemptId app1 = createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1); + createSchedulingRequestExistingApplication(1 * 1024, 2, app1); ApplicationAttemptId app2 = - createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 2); - ApplicationAttemptId app3 = - createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 3); + createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1); + createSchedulingRequestExistingApplication(1 * 1024, 2, app2); + ApplicationAttemptId app3 = + createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 3); + createSchedulingRequestExistingApplication(1 * 1024, 2, app3); ApplicationAttemptId app4 = createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 1); - ApplicationAttemptId app5 = - createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 2); - ApplicationAttemptId app6 = - createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 3); + createSchedulingRequestExistingApplication(1 * 1024, 2, app4); scheduler.update(); + scheduler.getQueueManager().getLeafQueue("queueA", true) + .setPolicy(SchedulingPolicy.parse("fifo")); + scheduler.getQueueManager().getLeafQueue("queueB", true) + .setPolicy(SchedulingPolicy.parse("fair")); + // Sufficient node check-ins to fully schedule containers - for (int i = 0; i < 2; i++) { + for (int i = 0; i < 4; i++) { NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); scheduler.handle(nodeUpdate1); NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2); scheduler.handle(nodeUpdate2); - - NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3); - scheduler.handle(nodeUpdate3); } - assertEquals(1, scheduler.getSchedulerApp(app1).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app3).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app5).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app6).getLiveContainers().size()); + assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size()); + assertEquals(2, scheduler.getSchedulerApp(app2).getLiveContainers().size()); + assertEquals(2, scheduler.getSchedulerApp(app3).getLiveContainers().size()); + assertEquals(2, scheduler.getSchedulerApp(app4).getLiveContainers().size()); // Now new requests arrive from queues C and D - ApplicationAttemptId app7 = + ApplicationAttemptId app5 = createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1); + ApplicationAttemptId app6 = + createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1); + + ApplicationAttemptId app7 = + createSchedulingRequest(1 * 1024, "default", "user1", 1, 1); ApplicationAttemptId app8 = - createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2); - ApplicationAttemptId app9 = - createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3); - - ApplicationAttemptId app10 = - createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 1); - ApplicationAttemptId app11 = - createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 2); - ApplicationAttemptId app12 = - createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 3); + createSchedulingRequest(1 * 1024, "default", "user1", 1, 1); scheduler.update(); // We should be able to claw back one container from A and B each. // Make sure it is lowest priority container. - scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), - Resources.createResource(2 * 1024)); - assertEquals(1, scheduler.getSchedulerApp(app1).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app5).getLiveContainers().size()); - - // First verify we are adding containers to preemption list for the application - assertTrue(!Collections.disjoint(scheduler.getSchedulerApp(app3).getLiveContainers(), - scheduler.getSchedulerApp(app3).getPreemptionContainers())); - assertTrue(!Collections.disjoint(scheduler.getSchedulerApp(app6).getLiveContainers(), - scheduler.getSchedulerApp(app6).getPreemptionContainers())); + scheduler.preemptor.preemptResources(Resources.createResource(2 * 1024)); + assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size()); + assertEquals(2, scheduler.getSchedulerApp(app3).getLiveContainers().size()); + + // First verify we are adding containers to preemption list for the app. + // For queueA (fifo), app2 is selected. + // For queueB (fair), app4 is selected. + assertTrue("App2 should have container to be preempted", + !Collections.disjoint( + scheduler.getSchedulerApp(app2).getLiveContainers(), + scheduler.getSchedulerApp(app2).getPreemptionContainers())); + assertTrue("App4 should have container to be preempted", + !Collections.disjoint( + scheduler.getSchedulerApp(app4).getLiveContainers(), + scheduler.getSchedulerApp(app4).getPreemptionContainers())); // Pretend 15 seconds have passed clock.tick(15); // Trigger a kill by insisting we want containers back - scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), - Resources.createResource(2 * 1024)); + scheduler.preemptor.preemptResources(Resources.createResource(2 * 1024)); // At this point the containers should have been killed (since we are not simulating AM) - assertEquals(0, scheduler.getSchedulerApp(app6).getLiveContainers().size()); - assertEquals(0, scheduler.getSchedulerApp(app3).getLiveContainers().size()); + assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size()); + assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size()); + // Inside each app, containers are sorted according to their priorities. + // Containers with priority=2 are preempted for app2 and app4 + Set set = new HashSet(); + for (RMContainer container : + scheduler.getSchedulerApp(app2).getLiveContainers()) { + if (container.getAllocatedPriority().getPriority() == 2) { + set.add(container); + } + } + for (RMContainer container : + scheduler.getSchedulerApp(app4).getLiveContainers()) { + if (container.getAllocatedPriority().getPriority() == 2) { + set.add(container); + } + } + assertTrue("Containers with priority=2 in app2 and app4 should be " + + "preempted.", set.isEmpty()); // Trigger a kill by insisting we want containers back - scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), - Resources.createResource(2 * 1024)); + scheduler.preemptor.preemptResources(Resources.createResource(2 * 1024)); // Pretend 15 seconds have passed clock.tick(15); // We should be able to claw back another container from A and B each. - // Make sure it is lowest priority container. - scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), - Resources.createResource(2 * 1024)); + // For queueA (fifo), continue to preempt from app2. + // For queueB (fair), preempt from app3. + scheduler.preemptor.preemptResources(Resources.createResource(2 * 1024)); - assertEquals(1, scheduler.getSchedulerApp(app1).getLiveContainers().size()); + assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size()); assertEquals(0, scheduler.getSchedulerApp(app2).getLiveContainers().size()); - assertEquals(0, scheduler.getSchedulerApp(app3).getLiveContainers().size()); + assertEquals(1, scheduler.getSchedulerApp(app3).getLiveContainers().size()); assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size()); - assertEquals(0, scheduler.getSchedulerApp(app5).getLiveContainers().size()); - assertEquals(0, scheduler.getSchedulerApp(app6).getLiveContainers().size()); // Now A and B are below fair share, so preemption shouldn't do anything - scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), - Resources.createResource(2 * 1024)); - assertEquals(1, scheduler.getSchedulerApp(app1).getLiveContainers().size()); - assertEquals(0, scheduler.getSchedulerApp(app2).getLiveContainers().size()); - assertEquals(0, scheduler.getSchedulerApp(app3).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size()); - assertEquals(0, scheduler.getSchedulerApp(app5).getLiveContainers().size()); - assertEquals(0, scheduler.getSchedulerApp(app6).getLiveContainers().size()); + scheduler.preemptor.preemptResources(Resources.createResource(2 * 1024)); + assertTrue(scheduler.getSchedulerApp(app1).getPreemptionContainers().isEmpty()); + assertTrue(scheduler.getSchedulerApp(app2).getPreemptionContainers().isEmpty()); + assertTrue(scheduler.getSchedulerApp(app3).getPreemptionContainers().isEmpty()); + assertTrue(scheduler.getSchedulerApp(app4).getPreemptionContainers().isEmpty()); + assertTrue("App1 should have no container to be preempted", + Collections.disjoint( + scheduler.getSchedulerApp(app1).getLiveContainers(), + scheduler.getSchedulerApp(app1).getPreemptionContainers())); + assertTrue("App2 should have no container to be preempted", + Collections.disjoint( + scheduler.getSchedulerApp(app2).getLiveContainers(), + scheduler.getSchedulerApp(app2).getPreemptionContainers())); + assertTrue("App3 should have no container to be preempted", + Collections.disjoint( + scheduler.getSchedulerApp(app3).getLiveContainers(), + scheduler.getSchedulerApp(app3).getPreemptionContainers())); + assertTrue("App4 should have no container to be preempted", + Collections.disjoint( + scheduler.getSchedulerApp(app4).getLiveContainers(), + scheduler.getSchedulerApp(app4).getPreemptionContainers())); } @Test (timeout = 5000) @@ -1344,26 +1363,26 @@ public void testPreemptionDecision() throws Exception { FSLeafQueue schedD = scheduler.getQueueManager().getLeafQueue("queueD", true); - assertTrue(Resources.equals( - Resources.none(), scheduler.resToPreempt(schedC, clock.getTime()))); - assertTrue(Resources.equals( - Resources.none(), scheduler.resToPreempt(schedD, clock.getTime()))); + assertTrue(Resources.equals(Resources.none(), + scheduler.preemptor.resToPreempt(schedC, clock.getTime()))); + assertTrue(Resources.equals(Resources.none(), + scheduler.preemptor.resToPreempt(schedD, clock.getTime()))); // After minSharePreemptionTime has passed, they should want to preempt min // share. clock.tick(6); - assertEquals( - 1024, scheduler.resToPreempt(schedC, clock.getTime()).getMemory()); - assertEquals( - 1024, scheduler.resToPreempt(schedD, clock.getTime()).getMemory()); + assertEquals(1024, + scheduler.preemptor.resToPreempt(schedC, clock.getTime()).getMemory()); + assertEquals(1024, + scheduler.preemptor.resToPreempt(schedD, clock.getTime()).getMemory()); // After fairSharePreemptionTime has passed, they should want to preempt // fair share. scheduler.update(); clock.tick(6); - assertEquals( - 1536 , scheduler.resToPreempt(schedC, clock.getTime()).getMemory()); - assertEquals( - 1536, scheduler.resToPreempt(schedD, clock.getTime()).getMemory()); + assertEquals(1536, + scheduler.preemptor.resToPreempt(schedC, clock.getTime()).getMemory()); + assertEquals(1536, + scheduler.preemptor.resToPreempt(schedD, clock.getTime()).getMemory()); } @Test (timeout = 5000)