diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java index d4ba88f..c26ad51 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java @@ -73,6 +73,12 @@ // below half its fair share for this long, it is allowed to preempt tasks. private final long fairSharePreemptionTimeout; + // The fair share preemption threshold for each queue. It a queue waits + // fairSharePreemptionTimeout without receiving + // fairshare * fairSharePreemptionThreshold resources, it is allowed to + // preempt other queues' tasks. + private final Map fairSharePreemptionThresholds; + private final Map schedulingPolicies; private final SchedulingPolicy defaultSchedulingPolicy; @@ -96,6 +102,7 @@ public AllocationConfiguration(Map minQueueResources, Map minSharePreemptionTimeouts, Map> queueAcls, long fairSharePreemptionTimeout, long defaultMinSharePreemptionTimeout, + Map fairSharePreemptionThresholds, QueuePlacementPolicy placementPolicy, Map> configuredQueues) { this.minQueueResources = minQueueResources; @@ -113,6 +120,7 @@ public AllocationConfiguration(Map minQueueResources, this.queueAcls = queueAcls; this.fairSharePreemptionTimeout = fairSharePreemptionTimeout; this.defaultMinSharePreemptionTimeout = defaultMinSharePreemptionTimeout; + this.fairSharePreemptionThresholds = fairSharePreemptionThresholds; this.placementPolicy = placementPolicy; this.configuredQueues = configuredQueues; } @@ -131,6 +139,7 @@ public AllocationConfiguration(Configuration conf) { minSharePreemptionTimeouts = new HashMap(); defaultMinSharePreemptionTimeout = Long.MAX_VALUE; fairSharePreemptionTimeout = Long.MAX_VALUE; + fairSharePreemptionThresholds = new HashMap(); schedulingPolicies = new HashMap(); defaultSchedulingPolicy = SchedulingPolicy.DEFAULT_POLICY; configuredQueues = new HashMap>(); @@ -177,7 +186,18 @@ public long getMinSharePreemptionTimeout(String queueName) { public long getFairSharePreemptionTimeout() { return fairSharePreemptionTimeout; } - + + /** + * Get the fair share preemption for the given queue in the allocation file. + * Return -1f if the queue doesn't have this parameter configured. + */ + public float getFairSharePreemptionThreshold(String queueName) { + Float fairSharePreemptionThreshold = + fairSharePreemptionThresholds.get(queueName); + return (fairSharePreemptionThreshold == null) ? + -1f : fairSharePreemptionThreshold; + } + public ResourceWeights getQueueWeight(String queue) { ResourceWeights weight = queueWeights.get(queue); return (weight == null) ? ResourceWeights.NEUTRAL : weight; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java index 4cc88c1..d117c5f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java @@ -217,6 +217,7 @@ public synchronized void reloadAllocations() throws IOException, Map queueWeights = new HashMap(); Map queuePolicies = new HashMap(); Map minSharePreemptionTimeouts = new HashMap(); + Map fairSharePremptionThresholds = new HashMap(); Map> queueAcls = new HashMap>(); int userMaxAppsDefault = Integer.MAX_VALUE; @@ -224,6 +225,7 @@ public synchronized void reloadAllocations() throws IOException, float queueMaxAMShareDefault = -1.0f; long fairSharePreemptionTimeout = Long.MAX_VALUE; long defaultMinSharePreemptionTimeout = Long.MAX_VALUE; + float rootFairSharePreemptionThreshold = 0.5f; SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.DEFAULT_POLICY; QueuePlacementPolicy newPlacementPolicy = null; @@ -284,6 +286,11 @@ public synchronized void reloadAllocations() throws IOException, String text = ((Text)element.getFirstChild()).getData().trim(); long val = Long.parseLong(text) * 1000L; defaultMinSharePreemptionTimeout = val; + } else if ("rootFairSharePreemptionThreshold".equals(element.getTagName())) { + String text = ((Text)element.getFirstChild()).getData().trim(); + float val = Float.parseFloat(text); + val = Math.max(Math.min(val, 1.0f), 0.0f); + rootFairSharePreemptionThreshold = val; } else if ("queueMaxAppsDefault".equals(element.getTagName())) { String text = ((Text)element.getFirstChild()).getData().trim(); int val = Integer.parseInt(text); @@ -318,8 +325,8 @@ public synchronized void reloadAllocations() throws IOException, } loadQueue(parent, element, minQueueResources, maxQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights, - queuePolicies, minSharePreemptionTimeouts, queueAcls, - configuredQueues); + queuePolicies, minSharePreemptionTimeouts, fairSharePremptionThresholds, + queueAcls, configuredQueues); } // Load placement policy and pass it configured queues @@ -331,12 +338,18 @@ public synchronized void reloadAllocations() throws IOException, newPlacementPolicy = QueuePlacementPolicy.fromConfiguration(conf, configuredQueues); } - + + // Set the fair share preemption threshold for the root queue + if (!fairSharePremptionThresholds.containsKey(QueueManager.ROOT_QUEUE)) { + fairSharePremptionThresholds.put(QueueManager.ROOT_QUEUE, + rootFairSharePreemptionThreshold); + } + AllocationConfiguration info = new AllocationConfiguration(minQueueResources, maxQueueResources, queueMaxApps, userMaxApps, queueWeights, queueMaxAMShares, userMaxAppsDefault, queueMaxAppsDefault, queueMaxAMShareDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts, queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout, - newPlacementPolicy, configuredQueues); + fairSharePremptionThresholds, newPlacementPolicy, configuredQueues); lastSuccessfulReload = clock.getTime(); lastReloadAttemptFailed = false; @@ -353,6 +366,7 @@ private void loadQueue(String parentName, Element element, Map Map queueWeights, Map queuePolicies, Map minSharePreemptionTimeouts, + Map fairSharePreemptionThresholds, Map> queueAcls, Map> configuredQueues) throws AllocationConfigurationException { @@ -395,6 +409,11 @@ private void loadQueue(String parentName, Element element, Map String text = ((Text)field.getFirstChild()).getData().trim(); long val = Long.parseLong(text) * 1000L; minSharePreemptionTimeouts.put(queueName, val); + } else if ("fairSharePreemptionThreshold".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + float val = Float.parseFloat(text); + val = Math.max(Math.min(val, 1.0f), 0.0f); + fairSharePreemptionThresholds.put(queueName, val); } else if ("schedulingPolicy".equals(field.getTagName()) || "schedulingMode".equals(field.getTagName())) { String text = ((Text)field.getFirstChild()).getData().trim(); @@ -410,8 +429,8 @@ private void loadQueue(String parentName, Element element, Map "pool".equals(field.getTagName())) { loadQueue(queueName, field, minQueueResources, maxQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights, - queuePolicies, minSharePreemptionTimeouts, queueAcls, - configuredQueues); + queuePolicies, minSharePreemptionTimeouts, + fairSharePreemptionThresholds, queueAcls, configuredQueues); configuredQueues.get(FSQueueType.PARENT).add(queueName); isLeaf = false; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 49e8ef0..4a5ec39 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -54,7 +54,7 @@ // Variables used for preemption private long lastTimeAtMinShare; - private long lastTimeAtHalfFairShare; + private long lastTimeAtFairShareThreshold; // Track the AM resource usage for this queue private Resource amResourceUsage; @@ -65,7 +65,7 @@ public FSLeafQueue(String name, FairScheduler scheduler, FSParentQueue parent) { super(name, scheduler, parent); this.lastTimeAtMinShare = scheduler.getClock().getTime(); - this.lastTimeAtHalfFairShare = scheduler.getClock().getTime(); + this.lastTimeAtFairShareThreshold = scheduler.getClock().getTime(); activeUsersManager = new ActiveUsersManager(getMetrics()); amResourceUsage = Resource.newInstance(0, 0); } @@ -279,12 +279,12 @@ public void setLastTimeAtMinShare(long lastTimeAtMinShare) { this.lastTimeAtMinShare = lastTimeAtMinShare; } - public long getLastTimeAtHalfFairShare() { - return lastTimeAtHalfFairShare; + public long getLastTimeAtFairShareThreshold() { + return lastTimeAtFairShareThreshold; } - public void setLastTimeAtHalfFairShare(long lastTimeAtHalfFairShare) { - this.lastTimeAtHalfFairShare = lastTimeAtHalfFairShare; + public void setLastTimeAtFairShareThreshold(long lastTimeAtFairShareThreshold) { + this.lastTimeAtFairShareThreshold = lastTimeAtFairShareThreshold; } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index c071c73..20dc284 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -51,6 +51,8 @@ protected SchedulingPolicy policy = SchedulingPolicy.DEFAULT_POLICY; + private float fairSharePreemptionThreshold = 0.5f; + public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) { this.name = name; this.scheduler = scheduler; @@ -155,7 +157,15 @@ public void setFairShare(Resource fairShare) { public boolean hasAccess(QueueACL acl, UserGroupInformation user) { return scheduler.getAllocationConfiguration().hasAccess(name, acl, user); } - + + public float getFairSharePreemptionThreshold() { + return fairSharePreemptionThreshold; + } + + public void setFairSharePreemptionThreshold(float fairSharePreemptionThreshold) { + this.fairSharePreemptionThreshold = fairSharePreemptionThreshold; + } + /** * Recomputes the shares for all child queues and applications based on this * queue's current share diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index aa3f824..84a6589 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -340,7 +340,26 @@ private void updatePreemptionVariables() { sched.setLastTimeAtMinShare(now); } if (!isStarvedForFairShare(sched)) { - sched.setLastTimeAtHalfFairShare(now); + sched.setLastTimeAtFairShareThreshold(now); + } + } + } + + /** + * Update the fair share thresholds for all queue recursively. + */ + private void updateFairSharePreemptionThresholds(FSQueue queue) { + float fairSharePreemptionThreshold = + allocConf.getFairSharePreemptionThreshold(queue.getQueueName()); + if (fairSharePreemptionThreshold > 0) { + queue.setFairSharePreemptionThreshold(fairSharePreemptionThreshold); + } else if (queue.parent != null) { + queue.setFairSharePreemptionThreshold( + queue.parent.getFairSharePreemptionThreshold()); + } + if (queue instanceof FSParentQueue) { + for (FSQueue child : queue.getChildQueues()) { + updateFairSharePreemptionThresholds(child); } } } @@ -357,12 +376,12 @@ boolean isStarvedForMinShare(FSLeafQueue sched) { /** * Is a queue being starved for fair share for the given task type? This is - * defined as being below half its fair share. + * defined as being below its fair share threshold. */ boolean isStarvedForFairShare(FSLeafQueue sched) { Resource desiredFairShare = Resources.min(RESOURCE_CALCULATOR, - clusterResource, - Resources.multiply(sched.getFairShare(), .5), sched.getDemand()); + clusterResource, Resources.multiply(sched.getFairShare(), + sched.getFairSharePreemptionThreshold()), sched.getDemand()); return Resources.lessThan(RESOURCE_CALCULATOR, clusterResource, sched.getResourceUsage(), desiredFairShare); } @@ -499,7 +518,7 @@ protected void warnOrKillContainer(RMContainer container) { * Return the resource amount that this queue is allowed to preempt, if any. * If the queue has been below its min share for at least its preemption * timeout, it should preempt the difference between its current share and - * this min share. If it has been below half its fair share for at least the + * this min share. If it has been below its fair share threshold for at least the * fairSharePreemptionTimeout, it should preempt enough tasks to get up to its * full fair share. If both conditions hold, we preempt the max of the two * amounts (this shouldn't happen unless someone sets the timeouts to be @@ -517,7 +536,7 @@ protected Resource resToPreempt(FSLeafQueue sched, long curTime) { resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterResource, Resources.none(), Resources.subtract(target, sched.getResourceUsage())); } - if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) { + if (curTime - sched.getLastTimeAtFairShareThreshold() > fairShareTimeout) { Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource, sched.getFairShare(), sched.getDemand()); resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterResource, @@ -1299,6 +1318,8 @@ private synchronized void initScheduler(Configuration conf) // will just result in leaving things as they are. try { allocsLoader.reloadAllocations(); + // Update the fair share preemption thresholds for all queue recursively + updateFairSharePreemptionThresholds(queueMgr.getRootQueue()); } catch (Exception e) { throw new IOException("Failed to initialize FairScheduler", e); } @@ -1353,6 +1374,8 @@ public synchronized void reinitialize(Configuration conf, RMContext rmContext) throws IOException { try { allocsLoader.reloadAllocations(); + // Update the fair share preemption thresholds for all queue recursively + updateFairSharePreemptionThresholds(queueMgr.getRootQueue()); } catch (Exception e) { LOG.error("Failed to reload allocations file", e); } @@ -1414,6 +1437,8 @@ public void onReload(AllocationConfiguration queueInfo) { allocConf = queueInfo; allocConf.getDefaultSchedulingPolicy().initialize(clusterResource); queueMgr.updateAllocationConfiguration(allocConf); + // Update the fair share preemption thresholds for all queues recursively + updateFairSharePreemptionThresholds(queueMgr.getRootQueue()); } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java index 4f8735b..839dfec 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java @@ -176,6 +176,7 @@ private FSQueue createQueue(String name, FSQueueType queueType) { parent.addChildQueue(leafQueue); queues.put(leafQueue.getName(), leafQueue); leafQueues.add(leafQueue); + setFairSharePreemptionThreshold(leafQueue, parent, queueConf); return leafQueue; } else { FSParentQueue newParent = new FSParentQueue(queueName, scheduler, parent); @@ -187,6 +188,7 @@ private FSQueue createQueue(String name, FSQueueType queueType) { } parent.addChildQueue(newParent); queues.put(newParent.getName(), newParent); + setFairSharePreemptionThreshold(newParent, parent, queueConf); parent = newParent; } } @@ -195,6 +197,21 @@ private FSQueue createQueue(String name, FSQueueType queueType) { } /** + * Set the fair share preemption threshold for the given queue. + * If the threshold is configured in the allocation file, the queue will use + * that value; otherwise, the queue inherits the value from its parent queue. + */ + private void setFairSharePreemptionThreshold(FSQueue queue, + FSParentQueue parentQueue, AllocationConfiguration queueConf) { + float fairSharePreemptionThreshold = + queueConf.getFairSharePreemptionThreshold(queue.getQueueName()); + if (fairSharePreemptionThreshold < 0) { + fairSharePreemptionThreshold = parentQueue.getFairSharePreemptionThreshold(); + } + queue.setFairSharePreemptionThreshold(fairSharePreemptionThreshold); + } + + /** * Make way for the given queue if possible, by removing incompatible * queues with no apps in them. Incompatibility could be due to * (1) queueToCreate being currently a parent but needs to change to leaf diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java index 2a4992c..69728f25 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java @@ -186,9 +186,12 @@ public void testAllocationFileParsing() throws Exception { //Make queue F a parent queue without configured leaf queues using the 'type' attribute out.println(""); out.println(""); - //Create hierarchical queues G,H + // Create hierarchical queues G,H, with different fair share preemption + // thresholds out.println(""); + out.println("0.6"); out.println(" "); + out.println(" 0.7"); out.println(" "); out.println(""); // Set default limit of apps per queue to 15 @@ -206,6 +209,8 @@ public void testAllocationFileParsing() throws Exception { + ""); // Set fair share preemption timeout to 5 minutes out.println("300"); + // Set root queue's fair share threshold to 0.4 + out.println("0.4"); // Set default scheduling policy to DRF out.println("drf"); out.println(""); @@ -279,7 +284,27 @@ public void testAllocationFileParsing() throws Exception { assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueA")); assertEquals(60000, queueConf.getMinSharePreemptionTimeout("root.queueE")); assertEquals(300000, queueConf.getFairSharePreemptionTimeout()); - + + assertEquals(.4f, queueConf.getFairSharePreemptionThreshold("root"), 0.01); + assertEquals(-1, queueConf.getFairSharePreemptionThreshold("root." + + YarnConfiguration.DEFAULT_QUEUE_NAME), 0.01); + assertEquals(-1, + queueConf.getFairSharePreemptionThreshold("root.queueA"), 0.01); + assertEquals(-1, + queueConf.getFairSharePreemptionThreshold("root.queueB"), 0.01); + assertEquals(-1, + queueConf.getFairSharePreemptionThreshold("root.queueC"), 0.01); + assertEquals(-1, + queueConf.getFairSharePreemptionThreshold("root.queueD"), 0.01); + assertEquals(-1, + queueConf.getFairSharePreemptionThreshold("root.queueE"), 0.01); + assertEquals(-1, + queueConf.getFairSharePreemptionThreshold("root.queueF"), 0.01); + assertEquals(.6f, + queueConf.getFairSharePreemptionThreshold("root.queueG"), 0.01); + assertEquals(.7f, + queueConf.getFairSharePreemptionThreshold("root.queueG.queueH"), 0.01); + assertTrue(queueConf.getConfiguredQueues() .get(FSQueueType.PARENT) .contains("root.queueF")); @@ -327,9 +352,10 @@ public void testBackwardsCompatibleAllocationFileParsing() throws Exception { out.println(""); out.println("3"); out.println(""); - // Give queue E a preemption timeout of one minute + // Give queue E a preemption timeout of one minute and a 0.3 threshold out.println(""); out.println("60"); + out.println("0.3"); out.println(""); // Set default limit of apps per queue to 15 out.println("15"); @@ -344,6 +370,8 @@ public void testBackwardsCompatibleAllocationFileParsing() throws Exception { + ""); // Set fair share preemption timeout to 5 minutes out.println("300"); + // Set root queue's fair share preemption threshold to 0.6f + out.println("0.6"); out.println(""); out.close(); @@ -403,6 +431,20 @@ public void testBackwardsCompatibleAllocationFileParsing() throws Exception { assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueA")); assertEquals(60000, queueConf.getMinSharePreemptionTimeout("root.queueE")); assertEquals(300000, queueConf.getFairSharePreemptionTimeout()); + + assertEquals(.6f, queueConf.getFairSharePreemptionThreshold("root"), 0.01); + assertEquals(-1, queueConf.getFairSharePreemptionThreshold("root." + + YarnConfiguration.DEFAULT_QUEUE_NAME), 0.01); + assertEquals(-1, + queueConf.getFairSharePreemptionThreshold("root.queueA"), 0.01); + assertEquals(-1, + queueConf.getFairSharePreemptionThreshold("root.queueB"), 0.01); + assertEquals(-1, + queueConf.getFairSharePreemptionThreshold("root.queueC"), 0.01); + assertEquals(-1, + queueConf.getFairSharePreemptionThreshold("root.queueD"), 0.01); + assertEquals(.3f, + queueConf.getFairSharePreemptionThreshold("root.queueE"), 0.01); } @Test diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index a7b1738..dab612b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -926,7 +926,9 @@ public void testConfigureRootQueue() throws Exception { out.println(" "); out.println(" 1024mb,4vcores"); out.println(" "); + out.println(" 0.5"); out.println(""); + out.println("0.6"); out.println(""); out.close(); @@ -940,6 +942,8 @@ public void testConfigureRootQueue() throws Exception { assertNotNull(queueManager.getLeafQueue("child1", false)); assertNotNull(queueManager.getLeafQueue("child2", false)); + + assertEquals(.5f, root.getFairSharePreemptionThreshold(), 0.01); } @Test (timeout = 5000) @@ -1009,11 +1013,18 @@ public void testIsStarvedForFairShare() throws Exception { out.println(""); out.println(""); out.println(""); - out.println(".25"); + out.println(".2"); out.println(""); out.println(""); - out.println(".75"); + out.println("0.4"); + out.println(".8"); + out.println(""); + out.println(""); + out.println(""); + out.println("0.6"); + out.println(""); out.println(""); + out.println("0.5"); out.println(""); out.close(); @@ -1023,42 +1034,53 @@ public void testIsStarvedForFairShare() throws Exception { // Add one big node (only care about aggregate capacity) RMNode node1 = - MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1, + MockNodes.newNodeInfo(1, Resources.createResource(10 * 1024, 10), 1, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); scheduler.handle(nodeEvent1); - // Queue A wants 3 * 1024. Node update gives this all to A - createSchedulingRequest(3 * 1024, "queueA", "user1"); + // Queue A wants 4 * 1024. Node update gives this all to A + createSchedulingRequest(1 * 1024, "queueA", "user1", 4); scheduler.update(); NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1); - scheduler.handle(nodeEvent2); + for (int i = 0; i < 4; i++) { + scheduler.handle(nodeEvent2); + } - // Queue B arrives and wants 1 * 1024 - createSchedulingRequest(1 * 1024, "queueB", "user1"); - scheduler.update(); - Collection queues = scheduler.getQueueManager().getLeafQueues(); - assertEquals(3, queues.size()); + QueueManager queueMgr = scheduler.getQueueManager(); + FSLeafQueue queueA = queueMgr.getLeafQueue("queueA", false); + assertEquals(4 * 1024, queueA.getResourceUsage().getMemory()); - // Queue A should be above fair share, B below. - for (FSLeafQueue p : queues) { - if (p.getName().equals("root.queueA")) { - assertEquals(false, scheduler.isStarvedForFairShare(p)); - } - else if (p.getName().equals("root.queueB")) { - assertEquals(true, scheduler.isStarvedForFairShare(p)); - } + // Both queue B1 and queue B2 want 3 * 1024 + createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 3); + createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 3); + scheduler.update(); + for (int i = 0; i < 4; i ++) { + scheduler.handle(nodeEvent2); } - // Node checks in again, should allocate for B + FSLeafQueue queueB1 = queueMgr.getLeafQueue("queueB.queueB1", false); + FSLeafQueue queueB2 = queueMgr.getLeafQueue("queueB.queueB2", false); + assertEquals(2 * 1024, queueB1.getResourceUsage().getMemory()); + assertEquals(2 * 1024, queueB2.getResourceUsage().getMemory()); + + // For queue B1, the fairSharePreemptionThreshold is 0.4, and the fair share + // threshold is 1.6 * 1024 + assertEquals(false, scheduler.isStarvedForFairShare(queueB1)); + + // For queue B2, the fairSharePreemptionThreshold is 0.6, and the fair share + // threshold is 2.4 * 1024 + assertEquals(true, scheduler.isStarvedForFairShare(queueB2)); + + // Node checks in again scheduler.handle(nodeEvent2); - // B should not be starved for fair share, since entire demand is - // satisfied. - for (FSLeafQueue p : queues) { - if (p.getName().equals("root.queueB")) { - assertEquals(false, scheduler.isStarvedForFairShare(p)); - } - } + scheduler.handle(nodeEvent2); + assertEquals(3 * 1024, queueB1.getResourceUsage().getMemory()); + assertEquals(3 * 1024, queueB2.getResourceUsage().getMemory()); + + // Both queue B1 and queue B2 usages go to 3 * 1024 + assertEquals(false, scheduler.isStarvedForFairShare(queueB1)); + assertEquals(false, scheduler.isStarvedForFairShare(queueB2)); } @Test (timeout = 5000) @@ -1245,7 +1267,8 @@ public void testPreemptionIsNotDelayedToNextRound() throws Exception { out.println(""); out.println("2"); out.println(""); - out.print("10"); + out.println("10"); + out.println("0.5"); out.println(""); out.close(); @@ -1328,8 +1351,9 @@ public void testPreemptionDecision() throws Exception { out.println(".25"); out.println("1024mb,0vcores"); out.println(""); - out.print("5"); - out.print("10"); + out.println("5"); + out.println("10"); + out.println("0.5"); out.println(""); out.close();