diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java index 661caa7..60f2e98 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java @@ -492,6 +492,10 @@ private void loadQueue(String parentName, Element element, } else if ("weight".equals(field.getTagName())) { String text = ((Text)field.getFirstChild()).getData().trim(); double val = Double.parseDouble(text); + if (val < 0) { + throw new AllocationConfigurationException("Weights " + + "cannot be negative"); + } queueWeights.put(queueName, new ResourceWeights((float)val)); } else if ("minSharePreemptionTimeout".equals(field.getTagName())) { String text = ((Text)field.getFirstChild()).getData().trim(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java index f4fad32..b83a769 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java @@ -119,6 +119,9 @@ private static void computeSharesInternal( if (schedulables.isEmpty()) { return; } + + boolean allWeightsZero = checkIfAllWeightsZero(schedulables, type); + // Find an upper bound on R that we can use in our binary search. We start // at R = 1 and double it until we have either used all the resources or we // have met all Schedulables' max shares. @@ -137,8 +140,8 @@ private static void computeSharesInternal( totalResource = Math.min(totalMaxShare, totalResource); double rMax = 1.0; - while (resourceUsedWithWeightToResourceRatio(rMax, schedulables, type) - < totalResource) { + while (totalResource > resourceUsedWithWeightToResourceRatio( + rMax, schedulables, type, allWeightsZero)) { rMax *= 2.0; } // Perform the binary search for up to COMPUTE_FAIR_SHARES_ITERATIONS steps @@ -147,7 +150,7 @@ private static void computeSharesInternal( for (int i = 0; i < COMPUTE_FAIR_SHARES_ITERATIONS; i++) { double mid = (left + right) / 2.0; int plannedResourceUsed = resourceUsedWithWeightToResourceRatio( - mid, schedulables, type); + mid, schedulables, type, allWeightsZero); if (plannedResourceUsed == totalResource) { right = mid; break; @@ -160,13 +163,23 @@ private static void computeSharesInternal( // Set the fair shares based on the value of R we've converged to for (Schedulable sched : schedulables) { if (isSteadyShare) { - setResourceValue(computeShare(sched, right, type), + setResourceValue(computeShare(sched, right, type, false), ((FSQueue) sched).getSteadyFairShare(), type); } else { - setResourceValue( - computeShare(sched, right, type), sched.getFairShare(), type); + setResourceValue(computeShare(sched, right, type, allWeightsZero), + sched.getFairShare(), type); + } + } + } + + private static boolean checkIfAllWeightsZero( + Collection schedulables, ResourceType type) { + for (Schedulable sched : schedulables) { + if (sched.getWeights().getWeight(type) > 0) { + return false; } } + return true; } /** @@ -174,10 +187,11 @@ private static void computeSharesInternal( * w2rRatio, for use in the computeFairShares algorithm as described in # */ private static int resourceUsedWithWeightToResourceRatio(double w2rRatio, - Collection schedulables, ResourceType type) { + Collection schedulables, ResourceType type, + boolean allWeightsZero) { int resourcesTaken = 0; for (Schedulable sched : schedulables) { - int share = computeShare(sched, w2rRatio, type); + int share = computeShare(sched, w2rRatio, type, allWeightsZero); resourcesTaken += share; } return resourcesTaken; @@ -188,8 +202,17 @@ private static int resourceUsedWithWeightToResourceRatio(double w2rRatio, * weight-to-resource ratio w2rRatio. */ private static int computeShare(Schedulable sched, double w2rRatio, - ResourceType type) { - double share = sched.getWeights().getWeight(type) * w2rRatio; + ResourceType type, boolean forceWeightOne) { + // If all active Schedulable weights are 0.0, + // use 1 as the weight to avoid zero share. + float weight; + if (forceWeightOne && sched.getWeights().getWeight(type) == 0.0) { + weight = 1; + } else { + weight = sched.getWeights().getWeight(type); + } + + double share = weight * w2rRatio; share = Math.max(share, getResourceValue(sched.getMinShare(), type)); share = Math.min(share, getResourceValue(sched.getMaxShare(), type)); return (int) share; @@ -226,8 +249,9 @@ private static int handleFixedFairShares( /** * Get the fairshare for the {@link Schedulable} if it is fixed, -1 otherwise. * - * The fairshare is fixed if either the maxShare is 0, weight is 0, - * or the Schedulable is not active for instantaneous fairshare. + * The fairshare is fixed if either the maxShare is 0, weight is 0 while + * calculating steady share, or the Schedulable is not active for + * instantaneous fairshare. */ private static int getFairShareIfFixed(Schedulable sched, boolean isSteadyShare, ResourceType type) { @@ -243,8 +267,8 @@ private static int getFairShareIfFixed(Schedulable sched, return 0; } - // Check if weight is 0 - if (sched.getWeights().getWeight(type) <= 0) { + // Check if weight is 0 while calculating steady share + if (isSteadyShare && sched.getWeights().getWeight(type) == 0) { int minShare = getResourceValue(sched.getMinShare(), type); return (minShare <= 0) ? 0 : minShare; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java index cc91ef9..42845d7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java @@ -681,6 +681,31 @@ public void testReservableCannotBeCombinedWithDynamicUserQueue() allocLoader.reloadAllocations(); } + /** + * Verify a queue can't have a negative weight. + */ + @Test (expected = AllocationConfigurationException.class) + public void testNegativeWeightQueue() + throws Exception { + Configuration conf = new Configuration(); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("-1"); + out.println(""); + out.println(""); + out.close(); + + AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); + allocLoader.init(conf); + ReloadListener confHolder = new ReloadListener(); + allocLoader.setReloadListener(confHolder); + allocLoader.reloadAllocations(); + } + private class ReloadListener implements AllocationFileLoaderService.Listener { public AllocationConfiguration allocConf; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 5690fa4..ea1fae3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -406,14 +406,14 @@ public void testFairShareWithZeroWeight() throws IOException { scheduler.update(); + // all queue weight is 0.0 and no active Non-Zero weight queue, + // so its fair share should be 4096. FSLeafQueue queue = scheduler.getQueueManager().getLeafQueue( "queueA", false); - // queueA's weight is 0.0, so its fair share should be 0. - assertEquals(0, queue.getFairShare().getMemory()); - // queueB's weight is 0.0, so its fair share should be 0. + assertEquals(4096, queue.getFairShare().getMemory()); queue = scheduler.getQueueManager().getLeafQueue( "queueB", false); - assertEquals(0, queue.getFairShare().getMemory()); + assertEquals(4096, queue.getFairShare().getMemory()); } @Test @@ -425,13 +425,16 @@ public void testFairShareWithZeroWeightNoneZeroMinRes() throws IOException { out.println(""); out.println(""); out.println(""); - out.println("1 mb 1 vcores"); + out.println("5120 mb 1 vcores"); out.println("0.0"); out.println(""); out.println(""); - out.println("1 mb 1 vcores"); + out.println("5120 mb 1 vcores"); out.println("0.0"); out.println(""); + out.println(""); + out.println("1.0"); + out.println(""); out.println(""); out.close(); @@ -455,14 +458,14 @@ public void testFairShareWithZeroWeightNoneZeroMinRes() throws IOException { FSLeafQueue queue = scheduler.getQueueManager().getLeafQueue( "queueA", false); - // queueA's weight is 0.0 and minResources is 1, - // so its fair share should be 1 (minShare). - assertEquals(1, queue.getFairShare().getMemory()); - // queueB's weight is 0.0 and minResources is 1, - // so its fair share should be 1 (minShare). + // queueA's weight is 0.0 and minResources is 5120, + // so its fair share should be 5120 (minShare). + assertEquals(5 * 1024, queue.getFairShare().getMemory()); + // queueB's weight is 0.0 and minResources is 5120, + // so its fair share should be 5120 (minShare). queue = scheduler.getQueueManager().getLeafQueue( "queueB", false); - assertEquals(1, queue.getFairShare().getMemory()); + assertEquals(5 * 1024, queue.getFairShare().getMemory()); } @Test @@ -3378,6 +3381,53 @@ public void testQueueMaxAMShareDefault() throws Exception { } /** + * The test verifies that if there is no non-zero weight active queue, + * zero weight queues can get resources. + */ + @Test + public void testRequestAMResourceInZeroWeightQueue() throws Exception { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("0.0"); + out.println(""); + out.println(""); + out.println("2.0"); + out.println(""); + out.println("fair" + + ""); + out.println(""); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + RMNode node = + MockNodes.newNodeInfo(1, Resources.createResource(8192, 20), + 0, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); + scheduler.handle(nodeEvent); + scheduler.update(); + + // Create a Managed AM + Resource amResource = Resource.newInstance(1024, 1); + int amPriority = RMAppAttemptImpl.AM_CONTAINER_PRIORITY.getPriority(); + ApplicationAttemptId attId1 = createAppAttemptId(1, 1); + createApplicationWithAMResource(attId1, "root.queue1", "user1", amResource); + createSchedulingRequestExistingApplication(1024, 1, amPriority, attId1); + FSAppAttempt app1 = scheduler.getSchedulerApp(attId1); + scheduler.update(); + scheduler.handle(updateEvent); + assertEquals("Application 1 should be running", + 1, app1.getLiveContainers().size()); + } + + /** * The test verifies container gets reserved when not over maxAMShare, * reserved container gets unreserved when over maxAMShare, * container doesn't get reserved when over maxAMShare,