diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java index d4ba88f..e91c0b2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java @@ -73,6 +73,15 @@ // below half its fair share for this long, it is allowed to preempt tasks. private final long fairSharePreemptionTimeout; + // The fair share preemption threshold for each queue. If a queue waits + // fairSharePreemptionTimeout without receiving + // fairshare * fairSharePreemptionThreshold resources, it is allowed to + // preempt other queues' tasks. + private final Map fairSharePreemptionThresholds; + + // Default fair share preemption threshold. + private final float defaultFairSahrePremptionThreshold; + private final Map schedulingPolicies; private final SchedulingPolicy defaultSchedulingPolicy; @@ -96,6 +105,8 @@ public AllocationConfiguration(Map minQueueResources, Map minSharePreemptionTimeouts, Map> queueAcls, long fairSharePreemptionTimeout, long defaultMinSharePreemptionTimeout, + Map fairSharePreemptionThresholds, + float defaultFairSahrePremptionThreshold, QueuePlacementPolicy placementPolicy, Map> configuredQueues) { this.minQueueResources = minQueueResources; @@ -113,6 +124,8 @@ public AllocationConfiguration(Map minQueueResources, this.queueAcls = queueAcls; this.fairSharePreemptionTimeout = fairSharePreemptionTimeout; this.defaultMinSharePreemptionTimeout = defaultMinSharePreemptionTimeout; + this.fairSharePreemptionThresholds = fairSharePreemptionThresholds; + this.defaultFairSahrePremptionThreshold = defaultFairSahrePremptionThreshold; this.placementPolicy = placementPolicy; this.configuredQueues = configuredQueues; } @@ -131,6 +144,8 @@ public AllocationConfiguration(Configuration conf) { minSharePreemptionTimeouts = new HashMap(); defaultMinSharePreemptionTimeout = Long.MAX_VALUE; fairSharePreemptionTimeout = Long.MAX_VALUE; + fairSharePreemptionThresholds = new HashMap(); + defaultFairSahrePremptionThreshold = 0.5f; schedulingPolicies = new HashMap(); defaultSchedulingPolicy = SchedulingPolicy.DEFAULT_POLICY; configuredQueues = new HashMap>(); @@ -177,7 +192,14 @@ public long getMinSharePreemptionTimeout(String queueName) { public long getFairSharePreemptionTimeout() { return fairSharePreemptionTimeout; } - + + public float getFairSharePreemptionThreshold(String queueName) { + Float fairSharePreemptionThreshold = + fairSharePreemptionThresholds.get(queueName); + return (fairSharePreemptionThreshold == null) ? + defaultFairSahrePremptionThreshold : fairSharePreemptionThreshold; + } + public ResourceWeights getQueueWeight(String queue) { ResourceWeights weight = queueWeights.get(queue); return (weight == null) ? ResourceWeights.NEUTRAL : weight; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java index 4cc88c1..0347429 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java @@ -217,6 +217,7 @@ public synchronized void reloadAllocations() throws IOException, Map queueWeights = new HashMap(); Map queuePolicies = new HashMap(); Map minSharePreemptionTimeouts = new HashMap(); + Map fairSharePreemptionThresholds = new HashMap(); Map> queueAcls = new HashMap>(); int userMaxAppsDefault = Integer.MAX_VALUE; @@ -224,6 +225,7 @@ public synchronized void reloadAllocations() throws IOException, float queueMaxAMShareDefault = -1.0f; long fairSharePreemptionTimeout = Long.MAX_VALUE; long defaultMinSharePreemptionTimeout = Long.MAX_VALUE; + float defaultFairSharePreemptionThreshold = 0.5f; SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.DEFAULT_POLICY; QueuePlacementPolicy newPlacementPolicy = null; @@ -284,6 +286,12 @@ public synchronized void reloadAllocations() throws IOException, String text = ((Text)element.getFirstChild()).getData().trim(); long val = Long.parseLong(text) * 1000L; defaultMinSharePreemptionTimeout = val; + } else if ("defaultFairSharePreemptionThreshold".equals(element.getTagName())) { + String text = ((Text) element.getFirstChild()).getData().trim(); + float val = Float.parseFloat(text); + val = Math.min(val, 1.0f); + val = Math.max(val, 0.0f); + defaultFairSharePreemptionThreshold = val; } else if ("queueMaxAppsDefault".equals(element.getTagName())) { String text = ((Text)element.getFirstChild()).getData().trim(); int val = Integer.parseInt(text); @@ -318,8 +326,8 @@ public synchronized void reloadAllocations() throws IOException, } loadQueue(parent, element, minQueueResources, maxQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights, - queuePolicies, minSharePreemptionTimeouts, queueAcls, - configuredQueues); + queuePolicies, minSharePreemptionTimeouts, + fairSharePreemptionThresholds, queueAcls, configuredQueues); } // Load placement policy and pass it configured queues @@ -336,6 +344,7 @@ public synchronized void reloadAllocations() throws IOException, queueMaxApps, userMaxApps, queueWeights, queueMaxAMShares, userMaxAppsDefault, queueMaxAppsDefault, queueMaxAMShareDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts, queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout, + fairSharePreemptionThresholds, defaultFairSharePreemptionThreshold, newPlacementPolicy, configuredQueues); lastSuccessfulReload = clock.getTime(); @@ -353,6 +362,7 @@ private void loadQueue(String parentName, Element element, Map Map queueWeights, Map queuePolicies, Map minSharePreemptionTimeouts, + Map fairSharePreemptionThresholds, Map> queueAcls, Map> configuredQueues) throws AllocationConfigurationException { @@ -395,6 +405,12 @@ private void loadQueue(String parentName, Element element, Map String text = ((Text)field.getFirstChild()).getData().trim(); long val = Long.parseLong(text) * 1000L; minSharePreemptionTimeouts.put(queueName, val); + } else if ("fairSharePreemptionThreshold".equals(field.getTagName())) { + String text = ((Text) field.getFirstChild()).getData().trim(); + float val = Float.parseFloat(text); + val = Math.min(val, 1.0f); + val = Math.max(val, 0.0f); + fairSharePreemptionThresholds.put(queueName, val); } else if ("schedulingPolicy".equals(field.getTagName()) || "schedulingMode".equals(field.getTagName())) { String text = ((Text)field.getFirstChild()).getData().trim(); @@ -410,8 +426,8 @@ private void loadQueue(String parentName, Element element, Map "pool".equals(field.getTagName())) { loadQueue(queueName, field, minQueueResources, maxQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights, - queuePolicies, minSharePreemptionTimeouts, queueAcls, - configuredQueues); + queuePolicies, minSharePreemptionTimeouts, + fairSharePreemptionThresholds, queueAcls, configuredQueues); configuredQueues.get(FSQueueType.PARENT).add(queueName); isLeaf = false; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 49e8ef0..4a5ec39 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -54,7 +54,7 @@ // Variables used for preemption private long lastTimeAtMinShare; - private long lastTimeAtHalfFairShare; + private long lastTimeAtFairShareThreshold; // Track the AM resource usage for this queue private Resource amResourceUsage; @@ -65,7 +65,7 @@ public FSLeafQueue(String name, FairScheduler scheduler, FSParentQueue parent) { super(name, scheduler, parent); this.lastTimeAtMinShare = scheduler.getClock().getTime(); - this.lastTimeAtHalfFairShare = scheduler.getClock().getTime(); + this.lastTimeAtFairShareThreshold = scheduler.getClock().getTime(); activeUsersManager = new ActiveUsersManager(getMetrics()); amResourceUsage = Resource.newInstance(0, 0); } @@ -279,12 +279,12 @@ public void setLastTimeAtMinShare(long lastTimeAtMinShare) { this.lastTimeAtMinShare = lastTimeAtMinShare; } - public long getLastTimeAtHalfFairShare() { - return lastTimeAtHalfFairShare; + public long getLastTimeAtFairShareThreshold() { + return lastTimeAtFairShareThreshold; } - public void setLastTimeAtHalfFairShare(long lastTimeAtHalfFairShare) { - this.lastTimeAtHalfFairShare = lastTimeAtHalfFairShare; + public void setLastTimeAtFairShareThreshold(long lastTimeAtFairShareThreshold) { + this.lastTimeAtFairShareThreshold = lastTimeAtFairShareThreshold; } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index aa3f824..d19683b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -340,7 +340,7 @@ private void updatePreemptionVariables() { sched.setLastTimeAtMinShare(now); } if (!isStarvedForFairShare(sched)) { - sched.setLastTimeAtHalfFairShare(now); + sched.setLastTimeAtFairShareThreshold(now); } } } @@ -357,12 +357,15 @@ boolean isStarvedForMinShare(FSLeafQueue sched) { /** * Is a queue being starved for fair share for the given task type? This is - * defined as being below half its fair share. + * defined as being below its fair share threshold. */ boolean isStarvedForFairShare(FSLeafQueue sched) { + float fairSharePreemptionThreshold = + allocConf.getFairSharePreemptionThreshold(sched.getQueueName()); Resource desiredFairShare = Resources.min(RESOURCE_CALCULATOR, clusterResource, - Resources.multiply(sched.getFairShare(), .5), sched.getDemand()); + Resources.multiply(sched.getFairShare(), fairSharePreemptionThreshold), + sched.getDemand()); return Resources.lessThan(RESOURCE_CALCULATOR, clusterResource, sched.getResourceUsage(), desiredFairShare); } @@ -517,7 +520,7 @@ protected Resource resToPreempt(FSLeafQueue sched, long curTime) { resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterResource, Resources.none(), Resources.subtract(target, sched.getResourceUsage())); } - if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) { + if (curTime - sched.getLastTimeAtFairShareThreshold() > fairShareTimeout) { Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource, sched.getFairShare(), sched.getDemand()); resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterResource, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java index 2a4992c..80db517 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java @@ -180,8 +180,10 @@ public void testAllocationFileParsing() throws Exception { out.println("0.4"); out.println(""); // Give queue E a preemption timeout of one minute + // and 0.6f fairSharePreemptionThreshold out.println(""); out.println("60"); + out.println("0.6"); out.println(""); //Make queue F a parent queue without configured leaf queues using the 'type' attribute out.println(""); @@ -206,6 +208,9 @@ public void testAllocationFileParsing() throws Exception { + ""); // Set fair share preemption timeout to 5 minutes out.println("300"); + // Set default fair share preemption threshold to 0.4 + out.println("0.4" + + ""); // Set default scheduling policy to DRF out.println("drf"); out.println(""); @@ -250,6 +255,13 @@ public void testAllocationFileParsing() throws Exception { assertEquals(.4f, queueConf.getQueueMaxAMShare("root.queueD"), 0.01); assertEquals(.5f, queueConf.getQueueMaxAMShare("root.queueE"), 0.01); + assertEquals(.4f, queueConf.getFairSharePreemptionThreshold("root." + YarnConfiguration.DEFAULT_QUEUE_NAME), 0.01); + assertEquals(.4f, queueConf.getFairSharePreemptionThreshold("root.queueA"), 0.01); + assertEquals(.4f, queueConf.getFairSharePreemptionThreshold("root.queueB"), 0.01); + assertEquals(.4f, queueConf.getFairSharePreemptionThreshold("root.queueC"), 0.01); + assertEquals(.4f, queueConf.getFairSharePreemptionThreshold("root.queueD"), 0.01); + assertEquals(.6f, queueConf.getFairSharePreemptionThreshold("root.queueE"), 0.01); + // Root should get * ACL assertEquals("*", queueConf.getQueueAcl("root", QueueACL.ADMINISTER_QUEUE).getAclString()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index a7b1738..f1f81c3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -1009,10 +1009,14 @@ public void testIsStarvedForFairShare() throws Exception { out.println(""); out.println(""); out.println(""); - out.println(".25"); + out.println(".2"); out.println(""); out.println(""); - out.println(".75"); + out.println(".4"); + out.println(""); + out.println(""); + out.println(".4"); + out.println("0.6"); out.println(""); out.println(""); out.close(); @@ -1023,42 +1027,53 @@ public void testIsStarvedForFairShare() throws Exception { // Add one big node (only care about aggregate capacity) RMNode node1 = - MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1, + MockNodes.newNodeInfo(1, Resources.createResource(10 * 1024, 10), 1, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); scheduler.handle(nodeEvent1); - // Queue A wants 3 * 1024. Node update gives this all to A - createSchedulingRequest(3 * 1024, "queueA", "user1"); + // Queue A wants 4 * 1024. Node update gives this all to A + createSchedulingRequest(1 * 1024, "queueA", "user1", 4); scheduler.update(); NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1); - scheduler.handle(nodeEvent2); + for (int i = 0; i < 4; i ++) { + scheduler.handle(nodeEvent2); + } - // Queue B arrives and wants 1 * 1024 - createSchedulingRequest(1 * 1024, "queueB", "user1"); - scheduler.update(); - Collection queues = scheduler.getQueueManager().getLeafQueues(); - assertEquals(3, queues.size()); + QueueManager queueManager = scheduler.getQueueManager(); + FSLeafQueue queueA = queueManager.getLeafQueue("queueA", false); + assertEquals(4 * 1024, queueA.getResourceUsage().getMemory()); - // Queue A should be above fair share, B below. - for (FSLeafQueue p : queues) { - if (p.getName().equals("root.queueA")) { - assertEquals(false, scheduler.isStarvedForFairShare(p)); - } - else if (p.getName().equals("root.queueB")) { - assertEquals(true, scheduler.isStarvedForFairShare(p)); - } + // Both queue B and queue C want 3 * 1024. + createSchedulingRequest(1 * 1024, "queueB", "user1", 3); + createSchedulingRequest(1 * 1024, "queueC", "user1", 3); + scheduler.update(); + for (int i = 0; i < 4; i ++) { + scheduler.handle(nodeEvent2); } - // Node checks in again, should allocate for B + FSLeafQueue queueB = queueManager.getLeafQueue("queueB", false); + FSLeafQueue queueC = queueManager.getLeafQueue("queueC", false); + assertEquals(2 * 1024, queueB.getResourceUsage().getMemory()); + assertEquals(2 * 1024, queueC.getResourceUsage().getMemory()); + + // For queueB, the fairSharePreemptionThreshold is 0.5 in default, and the + // fair share threshold is 2*1024. + assertEquals(false, scheduler.isStarvedForFairShare(queueB)); + + // For queueC, the fairSharePreemptionThreshold is 0.6, and the fair share + // threshold is 2.4*1024. + assertEquals(true, scheduler.isStarvedForFairShare(queueC)); + + // Node checks in again scheduler.handle(nodeEvent2); - // B should not be starved for fair share, since entire demand is - // satisfied. - for (FSLeafQueue p : queues) { - if (p.getName().equals("root.queueB")) { - assertEquals(false, scheduler.isStarvedForFairShare(p)); - } - } + scheduler.handle(nodeEvent2); + assertEquals(3 * 1024, queueB.getResourceUsage().getMemory()); + assertEquals(3 * 1024, queueC.getResourceUsage().getMemory()); + + // Both queueB and queueC usages go to 3*1024. + assertEquals(false, scheduler.isStarvedForFairShare(queueB)); + assertEquals(false, scheduler.isStarvedForFairShare(queueC)); } @Test (timeout = 5000) diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm index 9bb8563..1dae213 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm @@ -272,6 +272,11 @@ Allocation file format * minSharePreemptionTimeout: number of seconds the queue is under its minimum share before it will try to preempt containers to take resources from other queues. + * fairSharePreemptionThreshold: the fair share preemption threshold for the + queue. The queue will trigger the preemption if it has pending requests, + and its resource usage is under fairShare * fairSharePreemptionThreshold + for fairSharePreemptionTimeout seconds. Valid values are between [0.0,1.0]. + * <>, which represent settings governing the behavior of individual users. They can contain a single property: maxRunningApps, a limit on the number of running apps for a particular user. @@ -283,6 +288,10 @@ Allocation file format its fair share before it will try to preempt containers to take resources from other queues. + * <>, which sets the default + fair share preemption threshold; overridden by fairSharePreemptionThreshold + element in each queue if specified. The default value if 0.5f. + * <>, which sets the default number of seconds the queue is under its minimum share before it will try to preempt containers to take resources from other queues; overriden by