diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java index 228a761..d5dba1e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java @@ -70,6 +70,12 @@ // allowed to preempt other jobs' tasks. private final Map fairSharePreemptionTimeouts; + // The fair share preemption threshold for each queue. It a queue waits + // fairSharePreemptionTimeout without receiving + // fairshare * fairSharePreemptionThreshold resources, it is allowed to + // preempt other queues' tasks. + private final Map fairSharePreemptionThresholds; + private final Map schedulingPolicies; private final SchedulingPolicy defaultSchedulingPolicy; @@ -92,6 +98,7 @@ public AllocationConfiguration(Map minQueueResources, SchedulingPolicy defaultSchedulingPolicy, Map minSharePreemptionTimeouts, Map fairSharePreemptionTimeouts, + Map fairSharePreemptionThresholds, Map> queueAcls, QueuePlacementPolicy placementPolicy, Map> configuredQueues) { @@ -108,6 +115,7 @@ public AllocationConfiguration(Map minQueueResources, this.schedulingPolicies = schedulingPolicies; this.minSharePreemptionTimeouts = minSharePreemptionTimeouts; this.fairSharePreemptionTimeouts = fairSharePreemptionTimeouts; + this.fairSharePreemptionThresholds = fairSharePreemptionThresholds; this.queueAcls = queueAcls; this.placementPolicy = placementPolicy; this.configuredQueues = configuredQueues; @@ -126,6 +134,7 @@ public AllocationConfiguration(Configuration conf) { queueAcls = new HashMap>(); minSharePreemptionTimeouts = new HashMap(); fairSharePreemptionTimeouts = new HashMap(); + fairSharePreemptionThresholds = new HashMap(); schedulingPolicies = new HashMap(); defaultSchedulingPolicy = SchedulingPolicy.DEFAULT_POLICY; configuredQueues = new HashMap>(); @@ -171,7 +180,18 @@ public long getFairSharePreemptionTimeout(String queueName) { return (fairSharePreemptionTimeout == null) ? -1 : fairSharePreemptionTimeout; } - + + /** + * Get a queue's fair share preemption threshold in the allocation file. + * Return -1f if not set. + */ + public float getFairSharePreemptionThreshold(String queueName) { + Float fairSharePreemptionThreshold = + fairSharePreemptionThresholds.get(queueName); + return (fairSharePreemptionThreshold == null) ? + -1f : fairSharePreemptionThreshold; + } + public ResourceWeights getQueueWeight(String queue) { ResourceWeights weight = queueWeights.get(queue); return (weight == null) ? ResourceWeights.NEUTRAL : weight; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java index 970ee99..c2dfc84 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java @@ -218,6 +218,8 @@ public synchronized void reloadAllocations() throws IOException, Map queuePolicies = new HashMap(); Map minSharePreemptionTimeouts = new HashMap(); Map fairSharePreemptionTimeouts = new HashMap(); + Map fairSharePreemptionThresholds = + new HashMap(); Map> queueAcls = new HashMap>(); int userMaxAppsDefault = Integer.MAX_VALUE; @@ -225,6 +227,7 @@ public synchronized void reloadAllocations() throws IOException, float queueMaxAMShareDefault = -1.0f; long defaultFairSharePreemptionTimeout = Long.MAX_VALUE; long defaultMinSharePreemptionTimeout = Long.MAX_VALUE; + float defaultFairSharePreemptionThreshold = 0.5f; SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.DEFAULT_POLICY; QueuePlacementPolicy newPlacementPolicy = null; @@ -277,7 +280,8 @@ public synchronized void reloadAllocations() throws IOException, String text = ((Text)element.getFirstChild()).getData().trim(); int val = Integer.parseInt(text); userMaxAppsDefault = val; - } else if ("defaultFairSharePreemptionTimeout".equals(element.getTagName())) { + } else if ("defaultFairSharePreemptionTimeout" + .equals(element.getTagName())) { String text = ((Text)element.getFirstChild()).getData().trim(); long val = Long.parseLong(text) * 1000L; defaultFairSharePreemptionTimeout = val; @@ -287,10 +291,17 @@ public synchronized void reloadAllocations() throws IOException, long val = Long.parseLong(text) * 1000L; defaultFairSharePreemptionTimeout = val; } - } else if ("defaultMinSharePreemptionTimeout".equals(element.getTagName())) { + } else if ("defaultMinSharePreemptionTimeout" + .equals(element.getTagName())) { String text = ((Text)element.getFirstChild()).getData().trim(); long val = Long.parseLong(text) * 1000L; defaultMinSharePreemptionTimeout = val; + } else if ("defaultFairSharePreemptionThreshold" + .equals(element.getTagName())) { + String text = ((Text)element.getFirstChild()).getData().trim(); + float val = Float.parseFloat(text); + val = Math.max(Math.min(val, 1.0f), 0.0f); + defaultFairSharePreemptionThreshold = val; } else if ("queueMaxAppsDefault".equals(element.getTagName())) { String text = ((Text)element.getFirstChild()).getData().trim(); int val = Integer.parseInt(text); @@ -326,7 +337,7 @@ public synchronized void reloadAllocations() throws IOException, loadQueue(parent, element, minQueueResources, maxQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights, queuePolicies, minSharePreemptionTimeouts, fairSharePreemptionTimeouts, - queueAcls, configuredQueues); + fairSharePreemptionThresholds, queueAcls, configuredQueues); } // Load placement policy and pass it configured queues @@ -349,11 +360,18 @@ public synchronized void reloadAllocations() throws IOException, defaultFairSharePreemptionTimeout); } + // Set the fair share preemption threshold for the root queue + if (!fairSharePreemptionThresholds.containsKey(QueueManager.ROOT_QUEUE)) { + fairSharePreemptionThresholds.put(QueueManager.ROOT_QUEUE, + defaultFairSharePreemptionThreshold); + } + AllocationConfiguration info = new AllocationConfiguration(minQueueResources, maxQueueResources, queueMaxApps, userMaxApps, queueWeights, queueMaxAMShares, userMaxAppsDefault, queueMaxAppsDefault, queueMaxAMShareDefault, queuePolicies, defaultSchedPolicy, - minSharePreemptionTimeouts, fairSharePreemptionTimeouts, queueAcls, + minSharePreemptionTimeouts, fairSharePreemptionTimeouts, + fairSharePreemptionThresholds, queueAcls, newPlacementPolicy, configuredQueues); lastSuccessfulReload = clock.getTime(); @@ -365,13 +383,15 @@ public synchronized void reloadAllocations() throws IOException, /** * Loads a queue from a queue element in the configuration file */ - private void loadQueue(String parentName, Element element, Map minQueueResources, + private void loadQueue(String parentName, Element element, + Map minQueueResources, Map maxQueueResources, Map queueMaxApps, Map userMaxApps, Map queueMaxAMShares, Map queueWeights, Map queuePolicies, Map minSharePreemptionTimeouts, Map fairSharePreemptionTimeouts, + Map fairSharePreemptionThresholds, Map> queueAcls, Map> configuredQueues) throws AllocationConfigurationException { @@ -418,6 +438,11 @@ private void loadQueue(String parentName, Element element, Map String text = ((Text)field.getFirstChild()).getData().trim(); long val = Long.parseLong(text) * 1000L; fairSharePreemptionTimeouts.put(queueName, val); + } else if ("fairSharePreemptionThreshold".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + float val = Float.parseFloat(text); + val = Math.max(Math.min(val, 1.0f), 0.0f); + fairSharePreemptionThresholds.put(queueName, val); } else if ("schedulingPolicy".equals(field.getTagName()) || "schedulingMode".equals(field.getTagName())) { String text = ((Text)field.getFirstChild()).getData().trim(); @@ -434,7 +459,8 @@ private void loadQueue(String parentName, Element element, Map loadQueue(queueName, field, minQueueResources, maxQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights, queuePolicies, minSharePreemptionTimeouts, - fairSharePreemptionTimeouts, queueAcls, configuredQueues); + fairSharePreemptionTimeouts, fairSharePreemptionThresholds, + queueAcls, configuredQueues); configuredQueues.get(FSQueueType.PARENT).add(queueName); isLeaf = false; } @@ -449,11 +475,15 @@ private void loadQueue(String parentName, Element element, Map } } queueAcls.put(queueName, acls); - if (maxQueueResources.containsKey(queueName) && minQueueResources.containsKey(queueName) + if (maxQueueResources.containsKey(queueName) && + minQueueResources.containsKey(queueName) && !Resources.fitsIn(minQueueResources.get(queueName), maxQueueResources.get(queueName))) { - LOG.warn(String.format("Queue %s has max resources %s less than min resources %s", - queueName, maxQueueResources.get(queueName), minQueueResources.get(queueName))); + LOG.warn( + String.format( + "Queue %s has max resources %s less than min resources %s", + queueName, maxQueueResources.get(queueName), + minQueueResources.get(queueName))); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 49e8ef0..95406c7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -24,6 +24,7 @@ import java.util.Comparator; import java.util.List; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -54,7 +55,7 @@ // Variables used for preemption private long lastTimeAtMinShare; - private long lastTimeAtHalfFairShare; + private long lastTimeAtFairShareThreshold; // Track the AM resource usage for this queue private Resource amResourceUsage; @@ -65,7 +66,7 @@ public FSLeafQueue(String name, FairScheduler scheduler, FSParentQueue parent) { super(name, scheduler, parent); this.lastTimeAtMinShare = scheduler.getClock().getTime(); - this.lastTimeAtHalfFairShare = scheduler.getClock().getTime(); + this.lastTimeAtFairShareThreshold = scheduler.getClock().getTime(); activeUsersManager = new ActiveUsersManager(getMetrics()); amResourceUsage = Resource.newInstance(0, 0); } @@ -275,16 +276,17 @@ public long getLastTimeAtMinShare() { return lastTimeAtMinShare; } - public void setLastTimeAtMinShare(long lastTimeAtMinShare) { + private void setLastTimeAtMinShare(long lastTimeAtMinShare) { this.lastTimeAtMinShare = lastTimeAtMinShare; } - public long getLastTimeAtHalfFairShare() { - return lastTimeAtHalfFairShare; + public long getLastTimeAtFairShareThreshold() { + return lastTimeAtFairShareThreshold; } - public void setLastTimeAtHalfFairShare(long lastTimeAtHalfFairShare) { - this.lastTimeAtHalfFairShare = lastTimeAtHalfFairShare; + private void setLastTimeAtFairShareThreshold( + long lastTimeAtFairShareThreshold) { + this.lastTimeAtFairShareThreshold = lastTimeAtFairShareThreshold; } @Override @@ -329,6 +331,20 @@ public void recoverContainer(Resource clusterResource, } /** + * Update the preemption fields for the queue, i.e. the times since last was + * at its guaranteed share and over its fair share threshold. + */ + public void updateStarvationStats() { + long now = scheduler.getClock().getTime(); + if (!isStarved(true)) { + setLastTimeAtMinShare(now); + } + if (!isStarved(false)) { + setLastTimeAtFairShareThreshold(now); + } + } + + /** * Helper method to check if the queue should preempt containers * * @return true if check passes (can preempt) or false otherwise @@ -337,4 +353,19 @@ private boolean preemptContainerPreCheck() { return parent.getPolicy().checkIfUsageOverFairShare(getResourceUsage(), getFairShare()); } + + @VisibleForTesting + /** + * Is a queue being starved for min share or fair share. The + * checkMinShare=true means checking min share, and false for fair share. + */ + boolean isStarved(boolean checkMinShare) { + Resource share = checkMinShare ? getMinShare() : + Resources.multiply(getFairShare(), getFairSharePreemptionThreshold()); + + Resource desiredShare = Resources.min(scheduler.getResourceCalculator(), + scheduler.getClusterResource(), share, getDemand()); + return Resources.lessThan(scheduler.getResourceCalculator(), + scheduler.getClusterResource(), getResourceUsage(), desiredShare); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index 1209970..f74106a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -78,11 +78,11 @@ public void recomputeSteadyShares() { } @Override - public void updatePreemptionTimeouts() { - super.updatePreemptionTimeouts(); + public void updatePreemptionVariables() { + super.updatePreemptionVariables(); // For child queues for (FSQueue childQueue : childQueues) { - childQueue.updatePreemptionTimeouts(); + childQueue.updatePreemptionVariables(); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index b9fcc4b..d4e043d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -54,6 +54,7 @@ private long fairSharePreemptionTimeout = Long.MAX_VALUE; private long minSharePreemptionTimeout = Long.MAX_VALUE; + private float fairSharePreemptionThreshold = 0.5f; public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) { this.name = name; @@ -186,6 +187,14 @@ public void setMinSharePreemptionTimeout(long minSharePreemptionTimeout) { this.minSharePreemptionTimeout = minSharePreemptionTimeout; } + public float getFairSharePreemptionThreshold() { + return fairSharePreemptionThreshold; + } + + public void setFairSharePreemptionThreshold(float fairSharePreemptionThreshold) { + this.fairSharePreemptionThreshold = fairSharePreemptionThreshold; + } + /** * Recomputes the shares for all child queues and applications based on this * queue's current share @@ -193,21 +202,27 @@ public void setMinSharePreemptionTimeout(long minSharePreemptionTimeout) { public abstract void recomputeShares(); /** - * Update the min/fair share preemption timeouts for this queue. + * Update the min/fair share preemption timeouts and threshold for this queue. */ - public void updatePreemptionTimeouts() { - // For min share + public void updatePreemptionVariables() { + // For min share timeout minSharePreemptionTimeout = scheduler.getAllocationConfiguration() .getMinSharePreemptionTimeout(getName()); if (minSharePreemptionTimeout == -1 && parent != null) { minSharePreemptionTimeout = parent.getMinSharePreemptionTimeout(); } - // For fair share + // For fair share timeout fairSharePreemptionTimeout = scheduler.getAllocationConfiguration() .getFairSharePreemptionTimeout(getName()); if (fairSharePreemptionTimeout == -1 && parent != null) { fairSharePreemptionTimeout = parent.getFairSharePreemptionTimeout(); } + // For fair share preemption threshold + fairSharePreemptionThreshold = scheduler.getAllocationConfiguration() + .getFairSharePreemptionThreshold(getName()); + if (fairSharePreemptionThreshold < 0 && parent != null) { + fairSharePreemptionThreshold = parent.getFairSharePreemptionThreshold(); + } } /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index eceeb0d..8c6eb33 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -297,7 +297,7 @@ public void run() { */ protected synchronized void update() { long start = getClock().getTime(); - updatePreemptionVariables(); // Determine if any queues merit preemption + updateStarvationStats(); // Determine if any queues merit preemption FSQueue rootQueue = queueMgr.getRootQueue(); @@ -327,48 +327,20 @@ protected synchronized void update() { /** * Update the preemption fields for all QueueScheduables, i.e. the times since - * each queue last was at its guaranteed share and at > 1/2 of its fair share - * for each type of task. + * each queue last was at its guaranteed share and over its fair share + * threshold for each type of task. */ - private void updatePreemptionVariables() { - long now = getClock().getTime(); - lastPreemptionUpdateTime = now; + private void updateStarvationStats() { + lastPreemptionUpdateTime = clock.getTime(); for (FSLeafQueue sched : queueMgr.getLeafQueues()) { - if (!isStarvedForMinShare(sched)) { - sched.setLastTimeAtMinShare(now); - } - if (!isStarvedForFairShare(sched)) { - sched.setLastTimeAtHalfFairShare(now); - } + sched.updateStarvationStats(); } } /** - * Is a queue below its min share for the given task type? - */ - boolean isStarvedForMinShare(FSLeafQueue sched) { - Resource desiredShare = Resources.min(RESOURCE_CALCULATOR, clusterResource, - sched.getMinShare(), sched.getDemand()); - return Resources.lessThan(RESOURCE_CALCULATOR, clusterResource, - sched.getResourceUsage(), desiredShare); - } - - /** - * Is a queue being starved for fair share for the given task type? This is - * defined as being below half its fair share. - */ - boolean isStarvedForFairShare(FSLeafQueue sched) { - Resource desiredFairShare = Resources.min(RESOURCE_CALCULATOR, - clusterResource, - Resources.multiply(sched.getFairShare(), .5), sched.getDemand()); - return Resources.lessThan(RESOURCE_CALCULATOR, clusterResource, - sched.getResourceUsage(), desiredFairShare); - } - - /** * Check for queues that need tasks preempted, either because they have been * below their guaranteed share for minSharePreemptionTimeout or they have - * been below half their fair share for the fairSharePreemptionTimeout. If + * been below their fair share threshold for the fairSharePreemptionTimeout. If * such queues exist, compute how many tasks of each type need to be preempted * and then select the right ones using preemptTasks. */ @@ -497,11 +469,11 @@ protected void warnOrKillContainer(RMContainer container) { * Return the resource amount that this queue is allowed to preempt, if any. * If the queue has been below its min share for at least its preemption * timeout, it should preempt the difference between its current share and - * this min share. If it has been below half its fair share for at least the - * fairSharePreemptionTimeout, it should preempt enough tasks to get up to its - * full fair share. If both conditions hold, we preempt the max of the two - * amounts (this shouldn't happen unless someone sets the timeouts to be - * identical for some reason). + * this min share. If it has been below its fair share preemption threshold + * for at least the fairSharePreemptionTimeout, it should preempt enough tasks + * to get up to its full fair share. If both conditions hold, we preempt the + * max of the two amounts (this shouldn't happen unless someone sets the + * timeouts to be identical for some reason). */ protected Resource resToPreempt(FSLeafQueue sched, long curTime) { long minShareTimeout = sched.getMinSharePreemptionTimeout(); @@ -514,7 +486,7 @@ protected Resource resToPreempt(FSLeafQueue sched, long curTime) { resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterResource, Resources.none(), Resources.subtract(target, sched.getResourceUsage())); } - if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) { + if (curTime - sched.getLastTimeAtFairShareThreshold() > fairShareTimeout) { Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource, sched.getFairShare(), sched.getDemand()); resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterResource, @@ -1095,7 +1067,11 @@ private synchronized void attemptScheduling(FSSchedulerNode node) { public FSAppAttempt getSchedulerApp(ApplicationAttemptId appAttemptId) { return super.getApplicationAttempt(appAttemptId); } - + + public static ResourceCalculator getResourceCalculator() { + return RESOURCE_CALCULATOR; + } + /** * Subqueue metrics might be a little out of date because fair shares are * recalculated at the update interval, but the root queue metrics needs to diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java index 2444ba4..61b3b6c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java @@ -181,7 +181,7 @@ private FSQueue createQueue(String name, FSQueueType queueType) { parent.addChildQueue(leafQueue); queues.put(leafQueue.getName(), leafQueue); leafQueues.add(leafQueue); - setPreemptionTimeout(leafQueue, parent, queueConf); + leafQueue.updatePreemptionVariables(); return leafQueue; } else { FSParentQueue newParent = new FSParentQueue(queueName, scheduler, parent); @@ -193,7 +193,7 @@ private FSQueue createQueue(String name, FSQueueType queueType) { } parent.addChildQueue(newParent); queues.put(newParent.getName(), newParent); - setPreemptionTimeout(newParent, parent, queueConf); + newParent.updatePreemptionVariables(); parent = newParent; } } @@ -202,29 +202,6 @@ private FSQueue createQueue(String name, FSQueueType queueType) { } /** - * Set the min/fair share preemption timeouts for the given queue. - * If the timeout is configured in the allocation file, the queue will use - * that value; otherwise, the queue inherits the value from its parent queue. - */ - private void setPreemptionTimeout(FSQueue queue, - FSParentQueue parentQueue, AllocationConfiguration queueConf) { - // For min share - long minSharePreemptionTimeout = - queueConf.getMinSharePreemptionTimeout(queue.getQueueName()); - if (minSharePreemptionTimeout == -1) { - minSharePreemptionTimeout = parentQueue.getMinSharePreemptionTimeout(); - } - queue.setMinSharePreemptionTimeout(minSharePreemptionTimeout); - // For fair share - long fairSharePreemptionTimeout = - queueConf.getFairSharePreemptionTimeout(queue.getQueueName()); - if (fairSharePreemptionTimeout == -1) { - fairSharePreemptionTimeout = parentQueue.getFairSharePreemptionTimeout(); - } - queue.setFairSharePreemptionTimeout(fairSharePreemptionTimeout); - } - - /** * Make way for the given queue if possible, by removing incompatible * queues with no apps in them. Incompatibility could be due to * (1) queueToCreate being currently a parent but needs to change to leaf @@ -409,7 +386,8 @@ public void updateAllocationConfiguration(AllocationConfiguration queueConf) { // Update steady fair shares for all queues rootQueue.recomputeSteadyShares(); - // Update the fair share preemption timeouts for all queues recursively - rootQueue.updatePreemptionTimeouts(); + // Update the fair share preemption timeouts and preemption for all queues + // recursively + rootQueue.updatePreemptionVariables(); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java index 14b3111..656e20d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java @@ -187,13 +187,15 @@ public void testAllocationFileParsing() throws Exception { out.println(""); out.println(""); // Create hierarchical queues G,H, with different min/fair share preemption - // timeouts + // timeouts and preemption thresholds out.println(""); out.println("120"); out.println("50"); + out.println("0.6"); out.println(" "); out.println(" 180"); out.println(" 40"); + out.println(" 0.7"); out.println(" "); out.println(""); // Set default limit of apps per queue to 15 @@ -211,6 +213,8 @@ public void testAllocationFileParsing() throws Exception { + ""); // Set default fair share preemption timeout to 5 minutes out.println("300"); + // Set default fair share preemption threshold to 0.4 + out.println("0.4"); // Set default scheduling policy to DRF out.println("drf"); out.println(""); @@ -299,6 +303,26 @@ public void testAllocationFileParsing() throws Exception { assertEquals(120000, queueConf.getFairSharePreemptionTimeout("root.queueG")); assertEquals(180000, queueConf.getFairSharePreemptionTimeout("root.queueG.queueH")); + assertEquals(.4f, queueConf.getFairSharePreemptionThreshold("root"), 0.01); + assertEquals(-1, queueConf.getFairSharePreemptionThreshold("root." + + YarnConfiguration.DEFAULT_QUEUE_NAME), 0.01); + assertEquals(-1, + queueConf.getFairSharePreemptionThreshold("root.queueA"), 0.01); + assertEquals(-1, + queueConf.getFairSharePreemptionThreshold("root.queueB"), 0.01); + assertEquals(-1, + queueConf.getFairSharePreemptionThreshold("root.queueC"), 0.01); + assertEquals(-1, + queueConf.getFairSharePreemptionThreshold("root.queueD"), 0.01); + assertEquals(-1, + queueConf.getFairSharePreemptionThreshold("root.queueE"), 0.01); + assertEquals(-1, + queueConf.getFairSharePreemptionThreshold("root.queueF"), 0.01); + assertEquals(.6f, + queueConf.getFairSharePreemptionThreshold("root.queueG"), 0.01); + assertEquals(.7f, + queueConf.getFairSharePreemptionThreshold("root.queueG.queueH"), 0.01); + assertTrue(queueConf.getConfiguredQueues() .get(FSQueueType.PARENT) .contains("root.queueF")); @@ -346,9 +370,10 @@ public void testBackwardsCompatibleAllocationFileParsing() throws Exception { out.println(""); out.println("3"); out.println(""); - // Give queue E a preemption timeout of one minute + // Give queue E a preemption timeout of one minute and 0.3f threshold out.println(""); out.println("60"); + out.println("0.3"); out.println(""); // Set default limit of apps per queue to 15 out.println("15"); @@ -363,6 +388,8 @@ public void testBackwardsCompatibleAllocationFileParsing() throws Exception { + ""); // Set fair share preemption timeout to 5 minutes out.println("300"); + // Set default fair share preemption threshold to 0.6f + out.println("0.6"); out.println(""); out.close(); @@ -429,6 +456,20 @@ public void testBackwardsCompatibleAllocationFileParsing() throws Exception { assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueC")); assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueD")); assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueE")); + + assertEquals(.6f, queueConf.getFairSharePreemptionThreshold("root"), 0.01); + assertEquals(-1, queueConf.getFairSharePreemptionThreshold("root." + + YarnConfiguration.DEFAULT_QUEUE_NAME), 0.01); + assertEquals(-1, + queueConf.getFairSharePreemptionThreshold("root.queueA"), 0.01); + assertEquals(-1, + queueConf.getFairSharePreemptionThreshold("root.queueB"), 0.01); + assertEquals(-1, + queueConf.getFairSharePreemptionThreshold("root.queueC"), 0.01); + assertEquals(-1, + queueConf.getFairSharePreemptionThreshold("root.queueD"), 0.01); + assertEquals(.3f, + queueConf.getFairSharePreemptionThreshold("root.queueE"), 0.01); } @Test diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java index 7323b6a..c176156 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java @@ -18,50 +18,69 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.io.File; +import java.io.FileWriter; import java.io.IOException; +import java.io.PrintWriter; +import java.util.Collection; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.AsyncDispatcher; -import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; -public class TestFSLeafQueue { - private FSLeafQueue schedulable = null; - private Resource maxResource = Resources.createResource(10); +public class TestFSLeafQueue extends FairSchedulerTestBase { + private final static String ALLOC_FILE = new File(TEST_DIR, + TestFairSchedulerFairShare.class.getName() + ".xml").getAbsolutePath(); + private Resource maxResource = Resources.createResource(1024 * 8); @Before public void setup() throws IOException { - FairScheduler scheduler = new FairScheduler(); - Configuration conf = createConfiguration(); - // All tests assume only one assignment per node update - conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false"); - ResourceManager resourceManager = new ResourceManager(); - resourceManager.init(conf); - ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start(); - scheduler.init(conf); - scheduler.start(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); - - String queueName = "root.queue1"; - scheduler.allocConf = mock(AllocationConfiguration.class); - when(scheduler.allocConf.getMaxResources(queueName)).thenReturn(maxResource); - when(scheduler.allocConf.getMinResources(queueName)).thenReturn(Resources.none()); + conf = createConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class, + ResourceScheduler.class); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + + resourceManager = new MockRM(conf); + resourceManager.start(); + scheduler = (FairScheduler) resourceManager.getResourceScheduler(); + } - schedulable = new FSLeafQueue(queueName, scheduler, null); + @After + public void teardown() { + if (resourceManager != null) { + resourceManager.stop(); + resourceManager = null; + } + conf = null; } @Test public void testUpdateDemand() { + conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false"); + scheduler.allocConf = mock(AllocationConfiguration.class); + + String queueName = "root.queue1"; + when(scheduler.allocConf.getMaxResources(queueName)).thenReturn(maxResource); + when(scheduler.allocConf.getMinResources(queueName)).thenReturn(Resources.none()); + FSLeafQueue schedulable = new FSLeafQueue(queueName, scheduler, null); + FSAppAttempt app = mock(FSAppAttempt.class); Mockito.when(app.getDemand()).thenReturn(maxResource); @@ -73,11 +92,131 @@ public void testUpdateDemand() { assertTrue("Demand is greater than max allowed ", Resources.equals(schedulable.getDemand(), maxResource)); } - - private Configuration createConfiguration() { - Configuration conf = new YarnConfiguration(); - conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class, - ResourceScheduler.class); - return conf; + + @Test (timeout = 5000) + public void testIsStarvedForMinShare() throws Exception { + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("2048mb,0vcores"); + out.println(""); + out.println(""); + out.println("2048mb,0vcores"); + out.println(""); + out.println(""); + out.close(); + + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add one big node (only care about aggregate capacity) + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1, + "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + scheduler.update(); + + // Queue A wants 3 * 1024. Node update gives this all to A + createSchedulingRequest(3 * 1024, "queueA", "user1"); + scheduler.update(); + NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1); + scheduler.handle(nodeEvent2); + + // Queue B arrives and wants 1 * 1024 + createSchedulingRequest(1 * 1024, "queueB", "user1"); + scheduler.update(); + Collection queues = scheduler.getQueueManager().getLeafQueues(); + assertEquals(3, queues.size()); + + // Queue A should be above min share, B below. + FSLeafQueue queueA = + scheduler.getQueueManager().getLeafQueue("queueA", false); + FSLeafQueue queueB = + scheduler.getQueueManager().getLeafQueue("queueB", false); + assertFalse(queueA.isStarved(true)); + assertTrue(queueB.isStarved(true)); + + // Node checks in again, should allocate for B + scheduler.handle(nodeEvent2); + // Now B should have min share ( = demand here) + assertFalse(queueB.isStarved(true)); + } + + @Test (timeout = 5000) + public void testIsStarvedForFairShare() throws Exception { + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println(".2"); + out.println(""); + out.println(""); + out.println(".8"); + out.println(".4"); + out.println(""); + out.println(""); + out.println(""); + out.println(".6"); + out.println(""); + out.println(""); + out.println(".5"); + out.println(""); + out.close(); + + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add one big node (only care about aggregate capacity) + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(10 * 1024, 10), 1, + "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + scheduler.update(); + + // Queue A wants 4 * 1024. Node update gives this all to A + createSchedulingRequest(1 * 1024, "queueA", "user1", 4); + scheduler.update(); + NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1); + for (int i = 0; i < 4; i ++) { + scheduler.handle(nodeEvent2); + } + + QueueManager queueMgr = scheduler.getQueueManager(); + FSLeafQueue queueA = queueMgr.getLeafQueue("queueA", false); + assertEquals(4 * 1024, queueA.getResourceUsage().getMemory()); + + // Both queue B1 and queue B2 want 3 * 1024 + createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 3); + createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 3); + scheduler.update(); + for (int i = 0; i < 4; i ++) { + scheduler.handle(nodeEvent2); + } + + FSLeafQueue queueB1 = queueMgr.getLeafQueue("queueB.queueB1", false); + FSLeafQueue queueB2 = queueMgr.getLeafQueue("queueB.queueB2", false); + assertEquals(2 * 1024, queueB1.getResourceUsage().getMemory()); + assertEquals(2 * 1024, queueB2.getResourceUsage().getMemory()); + + // For queue B1, the fairSharePreemptionThreshold is 0.4, and the fair share + // threshold is 1.6 * 1024 + assertFalse(queueB1.isStarved(false)); + + // For queue B2, the fairSharePreemptionThreshold is 0.6, and the fair share + // threshold is 2.4 * 1024 + assertTrue(queueB2.isStarved(false)); + + // Node checks in again + scheduler.handle(nodeEvent2); + scheduler.handle(nodeEvent2); + assertEquals(3 * 1024, queueB1.getResourceUsage().getMemory()); + assertEquals(3 * 1024, queueB2.getResourceUsage().getMemory()); + + // Both queue B1 and queue B2 usages go to 3 * 1024 + assertFalse(queueB1.isStarved(false)); + assertFalse(queueB2.isStarved(false)); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 6e0127d..05b1925 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -1061,9 +1061,11 @@ public void testConfigureRootQueue() throws Exception { out.println(" "); out.println(" 100"); out.println(" 120"); + out.println(" .5"); out.println(""); out.println("300"); out.println("200"); + out.println(".6"); out.println(""); out.close(); @@ -1080,125 +1082,7 @@ public void testConfigureRootQueue() throws Exception { assertEquals(100000, root.getFairSharePreemptionTimeout()); assertEquals(120000, root.getMinSharePreemptionTimeout()); - } - - @Test (timeout = 5000) - public void testIsStarvedForMinShare() throws Exception { - conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); - - PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); - out.println(""); - out.println(""); - out.println(""); - out.println("2048mb,0vcores"); - out.println(""); - out.println(""); - out.println("2048mb,0vcores"); - out.println(""); - out.println(""); - out.close(); - - scheduler.init(conf); - scheduler.start(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); - - // Add one big node (only care about aggregate capacity) - RMNode node1 = - MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1, - "127.0.0.1"); - NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); - scheduler.handle(nodeEvent1); - - // Queue A wants 3 * 1024. Node update gives this all to A - createSchedulingRequest(3 * 1024, "queueA", "user1"); - scheduler.update(); - NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1); - scheduler.handle(nodeEvent2); - - // Queue B arrives and wants 1 * 1024 - createSchedulingRequest(1 * 1024, "queueB", "user1"); - scheduler.update(); - Collection queues = scheduler.getQueueManager().getLeafQueues(); - assertEquals(3, queues.size()); - - // Queue A should be above min share, B below. - for (FSLeafQueue p : queues) { - if (p.getName().equals("root.queueA")) { - assertEquals(false, scheduler.isStarvedForMinShare(p)); - } - else if (p.getName().equals("root.queueB")) { - assertEquals(true, scheduler.isStarvedForMinShare(p)); - } - } - - // Node checks in again, should allocate for B - scheduler.handle(nodeEvent2); - // Now B should have min share ( = demand here) - for (FSLeafQueue p : queues) { - if (p.getName().equals("root.queueB")) { - assertEquals(false, scheduler.isStarvedForMinShare(p)); - } - } - } - - @Test (timeout = 5000) - public void testIsStarvedForFairShare() throws Exception { - conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); - - PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); - out.println(""); - out.println(""); - out.println(""); - out.println(".25"); - out.println(""); - out.println(""); - out.println(".75"); - out.println(""); - out.println(""); - out.close(); - - scheduler.init(conf); - scheduler.start(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); - - // Add one big node (only care about aggregate capacity) - RMNode node1 = - MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1, - "127.0.0.1"); - NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); - scheduler.handle(nodeEvent1); - - // Queue A wants 3 * 1024. Node update gives this all to A - createSchedulingRequest(3 * 1024, "queueA", "user1"); - scheduler.update(); - NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1); - scheduler.handle(nodeEvent2); - - // Queue B arrives and wants 1 * 1024 - createSchedulingRequest(1 * 1024, "queueB", "user1"); - scheduler.update(); - Collection queues = scheduler.getQueueManager().getLeafQueues(); - assertEquals(3, queues.size()); - - // Queue A should be above fair share, B below. - for (FSLeafQueue p : queues) { - if (p.getName().equals("root.queueA")) { - assertEquals(false, scheduler.isStarvedForFairShare(p)); - } - else if (p.getName().equals("root.queueB")) { - assertEquals(true, scheduler.isStarvedForFairShare(p)); - } - } - - // Node checks in again, should allocate for B - scheduler.handle(nodeEvent2); - // B should not be starved for fair share, since entire demand is - // satisfied. - for (FSLeafQueue p : queues) { - if (p.getName().equals("root.queueB")) { - assertEquals(false, scheduler.isStarvedForFairShare(p)); - } - } + assertEquals(0.5f, root.getFairSharePreemptionThreshold(), 0.01); } @Test (timeout = 5000) @@ -1385,7 +1269,8 @@ public void testPreemptionIsNotDelayedToNextRound() throws Exception { out.println(""); out.println("2"); out.println(""); - out.print("10"); + out.println("10"); + out.println(".5"); out.println(""); out.close(); @@ -1468,8 +1353,9 @@ public void testPreemptionDecision() throws Exception { out.println(".25"); out.println("1024mb,0vcores"); out.println(""); - out.print("5"); - out.print("10"); + out.println("5"); + out.println("10"); + out.println(".5"); out.println(""); out.close(); @@ -1753,8 +1639,6 @@ public void testPreemptionDecisionWithVariousTimeout() throws Exception { @Test public void testBackwardsCompatiblePreemptionConfiguration() throws Exception { conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); - MockClock clock = new MockClock(); - scheduler.setClock(clock); PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println(""); @@ -1842,6 +1726,32 @@ public void testBackwardsCompatiblePreemptionConfiguration() throws Exception { .getFairSharePreemptionTimeout()); } + @Test + public void testPreemptionVariablesForQueueCreatedRuntime() throws Exception { + conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true"); + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Set preemption variables for the root queue + FSParentQueue root = scheduler.getQueueManager().getRootQueue(); + root.setMinSharePreemptionTimeout(10000); + root.setFairSharePreemptionTimeout(15000); + root.setFairSharePreemptionThreshold(.6f); + + // User1 submits one application + ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1); + createApplicationWithAMResource(appAttemptId, "default", "user1", null); + + // The user1 queue should inherit the configurations from the root queue + FSLeafQueue userQueue = + scheduler.getQueueManager().getLeafQueue("user1", true); + assertEquals(1, userQueue.getRunnableAppSchedulables().size()); + assertEquals(10000, userQueue.getMinSharePreemptionTimeout()); + assertEquals(15000, userQueue.getFairSharePreemptionTimeout()); + assertEquals(.6f, userQueue.getFairSharePreemptionThreshold(), 0.001); + } + @Test (timeout = 5000) public void testMultipleContainersWaitingForReservation() throws IOException { scheduler.init(conf); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm index 0b2ebe7..d1ce36a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm @@ -277,6 +277,12 @@ Allocation file format threshold before it will try to preempt containers to take resources from other queues. If not set, the queue will inherit the value from its parent queue. + * fairSharePreemptionThreshold: the fair share preemption threshold for the + queue. If the queue waits fairSharePreemptionTimeout without receiving + fairSharePreemptionThreshold*fairShare resources, it is allowed to preempt + containers to take resources from other queues. If not set, the queue will + inherit the value from its parent queue. + * <>, which represent settings governing the behavior of individual users. They can contain a single property: maxRunningApps, a limit on the number of running apps for a particular user. @@ -292,6 +298,10 @@ Allocation file format preemption timeout for the root queue; overridden by minSharePreemptionTimeout element in root queue. + * <>, which sets the fair share + preemption threshold for the root queue; overridden by fairSharePreemptionThreshold + element in root queue. + * <>, which sets the default running app limit for queues; overriden by maxRunningApps element in each queue.