diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java index 6836758..1f363f5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java @@ -35,6 +35,7 @@ public class ComputeFairShares { private static final int COMPUTE_FAIR_SHARES_ITERATIONS = 25; + private static final double RESOURCE_USAGE_EPSILON = 0.0001; /** * Compute fair share of the given schedulables.Fair share is an allocation of @@ -139,16 +140,48 @@ private static void computeSharesInternal( getResourceValue(totalResources, type)); double rMax = 1.0; - while (resourceUsedWithWeightToResourceRatio(rMax, schedulables, type) - < totalResource) { + boolean firstTime = true; + // calculate the start value based on minShare to make sure + // currentRU will change for every iteration. + for (Schedulable sched : schedulables) { + double weight = sched.getWeights().getWeight(type); + if (weight > 0) { + int minShare = getResourceValue(sched.getMinShare(), type); + if (weight >= minShare) { + rMax = 1.0; + break; + } else { + double t = (double)minShare / weight; + // get the minimum ratio. + if (firstTime) { + rMax = t; + firstTime = false; + } else { + rMax = Math.min(rMax, t); + } + } + } + } + + double previousRU = 0.0; + while (true) { + // Continue as long as we are making progress towards finding a ratio + // that would lead to exhausting all available resources + double currentRU = + resourceUsedWithWeightToResourceRatio(rMax, schedulables, type); + if ((currentRU - previousRU < RESOURCE_USAGE_EPSILON) || + (currentRU >= totalResource)) { + break; + } rMax *= 2.0; + previousRU = currentRU; } // Perform the binary search for up to COMPUTE_FAIR_SHARES_ITERATIONS steps double left = 0; double right = rMax; for (int i = 0; i < COMPUTE_FAIR_SHARES_ITERATIONS; i++) { double mid = (left + right) / 2.0; - int plannedResourceUsed = resourceUsedWithWeightToResourceRatio( + int plannedResourceUsed = (int)resourceUsedWithWeightToResourceRatio( mid, schedulables, type); if (plannedResourceUsed == totalResource) { right = mid; @@ -162,11 +195,11 @@ private static void computeSharesInternal( // Set the fair shares based on the value of R we've converged to for (Schedulable sched : schedulables) { if (isSteadyShare) { - setResourceValue(computeShare(sched, right, type), + setResourceValue((int)computeShare(sched, right, type), ((FSQueue) sched).getSteadyFairShare(), type); } else { - setResourceValue( - computeShare(sched, right, type), sched.getFairShare(), type); + setResourceValue((int)computeShare(sched, right, type), + sched.getFairShare(), type); } } } @@ -175,12 +208,11 @@ private static void computeSharesInternal( * Compute the resources that would be used given a weight-to-resource ratio * w2rRatio, for use in the computeFairShares algorithm as described in # */ - private static int resourceUsedWithWeightToResourceRatio(double w2rRatio, + private static double resourceUsedWithWeightToResourceRatio(double w2rRatio, Collection schedulables, ResourceType type) { - int resourcesTaken = 0; + double resourcesTaken = 0.0; for (Schedulable sched : schedulables) { - int share = computeShare(sched, w2rRatio, type); - resourcesTaken += share; + resourcesTaken += computeShare(sched, w2rRatio, type); } return resourcesTaken; } @@ -189,12 +221,12 @@ private static int resourceUsedWithWeightToResourceRatio(double w2rRatio, * Compute the resources assigned to a Schedulable given a particular * weight-to-resource ratio w2rRatio. */ - private static int computeShare(Schedulable sched, double w2rRatio, + private static double computeShare(Schedulable sched, double w2rRatio, ResourceType type) { double share = sched.getWeights().getWeight(type) * w2rRatio; share = Math.max(share, getResourceValue(sched.getMinShare(), type)); share = Math.min(share, getResourceValue(sched.getMaxShare(), type)); - return (int) share; + return share; } private static int getResourceValue(Resource resource, ResourceType type) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 05b1925..1a3da55 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -301,13 +301,156 @@ public void testSimpleFairShareCalculation() throws IOException { // Divided three ways - between the two queues and the default queue for (FSLeafQueue p : queues) { - assertEquals(3414, p.getFairShare().getMemory()); - assertEquals(3414, p.getMetrics().getFairShareMB()); - assertEquals(3414, p.getSteadyFairShare().getMemory()); - assertEquals(3414, p.getMetrics().getSteadyFairShareMB()); + assertEquals(3413, p.getFairShare().getMemory()); + assertEquals(3413, p.getMetrics().getFairShareMB()); + assertEquals(3413, p.getSteadyFairShare().getMemory()); + assertEquals(3413, p.getMetrics().getSteadyFairShareMB()); } } - + + @Test + public void testFairShareWithZeroWeight() throws IOException { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + // set queueA and queueB weight zero. + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("0.0"); + out.println(""); + out.println(""); + out.println("0.0"); + out.println(""); + out.println(""); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add one big node (only care about aggregate capacity) + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1, + "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + // Queue A wants 2 * 1024. + createSchedulingRequest(2 * 1024, "queueA", "user1"); + // Queue B wants 6 * 1024 + createSchedulingRequest(6 * 1024, "queueB", "user1"); + + scheduler.update(); + + FSLeafQueue queue = scheduler.getQueueManager().getLeafQueue( + "queueA", false); + // queueA's weight is 0.0, so its fair share should be 0. + assertEquals(0, queue.getFairShare().getMemory()); + // queueB's weight is 0.0, so its fair share should be 0. + queue = scheduler.getQueueManager().getLeafQueue( + "queueB", false); + assertEquals(0, queue.getFairShare().getMemory()); + } + + @Test + public void testFairShareWithZeroWeightNoneZeroMinRes() throws IOException { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + // set queueA and queueB weight zero. + // set queueA and queueB minResources 1. + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("1 mb 1 vcores"); + out.println("0.0"); + out.println(""); + out.println(""); + out.println("1 mb 1 vcores"); + out.println("0.0"); + out.println(""); + out.println(""); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add one big node (only care about aggregate capacity) + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1, + "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + // Queue A wants 2 * 1024. + createSchedulingRequest(2 * 1024, "queueA", "user1"); + // Queue B wants 6 * 1024 + createSchedulingRequest(6 * 1024, "queueB", "user1"); + + scheduler.update(); + + FSLeafQueue queue = scheduler.getQueueManager().getLeafQueue( + "queueA", false); + // queueA's weight is 0.0 and minResources is 1, + // so its fair share should be 1 (minShare). + assertEquals(1, queue.getFairShare().getMemory()); + // queueB's weight is 0.0 and minResources is 1, + // so its fair share should be 1 (minShare). + queue = scheduler.getQueueManager().getLeafQueue( + "queueB", false); + assertEquals(1, queue.getFairShare().getMemory()); + } + + @Test + public void testFairShareWithNoneZeroWeightNoneZeroMinRes() + throws IOException { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + // set queueA and queueB weight 0.5. + // set queueA and queueB minResources 1024. + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("1024 mb 1 vcores"); + out.println("0.5"); + out.println(""); + out.println(""); + out.println("1024 mb 1 vcores"); + out.println("0.5"); + out.println(""); + out.println(""); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add one big node (only care about aggregate capacity) + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1, + "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + // Queue A wants 4 * 1024. + createSchedulingRequest(4 * 1024, "queueA", "user1"); + // Queue B wants 4 * 1024 + createSchedulingRequest(4 * 1024, "queueB", "user1"); + + scheduler.update(); + + FSLeafQueue queue = scheduler.getQueueManager().getLeafQueue( + "queueA", false); + // queueA's weight is 0.5 and minResources is 1024, + // so its fair share should be 4096. + assertEquals(4096, queue.getFairShare().getMemory()); + // queueB's weight is 0.5 and minResources is 1024, + // so its fair share should be 4096. + queue = scheduler.getQueueManager().getLeafQueue( + "queueB", false); + assertEquals(4096, queue.getFairShare().getMemory()); + } + @Test public void testSimpleHierarchicalFairShareCalculation() throws IOException { scheduler.init(conf); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerFairShare.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerFairShare.java index ab8fcbc..f4ae335 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerFairShare.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerFairShare.java @@ -319,6 +319,12 @@ public void testFairShareWithDRFMultipleActiveQueuesUnderDifferentParent() Collection leafQueues = scheduler.getQueueManager() .getLeafQueues(); + assertEquals( + 1, + scheduler.getQueueManager() + .getParentQueue("root.parentB", false).getSteadyFairShare() + .getVirtualCores()); + for (FSLeafQueue leaf : leafQueues) { if (leaf.getName().startsWith("root.parentA")) { assertEquals(0.2, @@ -329,7 +335,7 @@ public void testFairShareWithDRFMultipleActiveQueuesUnderDifferentParent() } else if (leaf.getName().startsWith("root.parentB")) { assertEquals(0.05, (double) leaf.getSteadyFairShare().getMemory() / nodeMem, 0.001); - assertEquals(0.1, + assertEquals(0.0, (double) leaf.getSteadyFairShare().getVirtualCores() / nodeVCores, 0.001); }