diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 5eeda64..edd18ea 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -423,10 +423,11 @@ public int compare(RMContainer c1, RMContainer c2) { private void warnOrKillContainer(RMContainer container, FSSchedulerApp app, FSLeafQueue queue) { - LOG.info("Preempting container (prio=" + container.getContainer().getPriority() + - "res=" + container.getContainer().getResource() + - ") from queue " + queue.getName()); - + LOG.info("Preempting container (containerId=" + container + " prio=" + + container.getContainer().getPriority() + "res=" + + container.getContainer().getResource() + ") from queue " + + queue.getName()); + Long time = app.getContainerPreemptionTime(container); if (time != null) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java index 77dad49..7094412 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java @@ -21,6 +21,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; /** @@ -96,6 +97,27 @@ public static void computeShares( } int totalResource = Math.min(totalMaxShare, getResourceValue(totalResources, type)); + + if (schedulables.iterator().next() instanceof FSQueue) { + // Check if there exists at least one active queue in the + // collection of queues given as input + boolean atLeastOneActiveQueue = false; + for (Schedulable sched : schedulables) { + if (containsRunnableApps(sched)) { + atLeastOneActiveQueue = true; + break; + } + } + + // If there are no active queues, set fair share to zero for + // all of them and return. + if (!atLeastOneActiveQueue) { + for (Schedulable sched : schedulables) { + setResourceValue(0, sched.getFairShare(), type); + } + return; + } + } double rMax = 1.0; while (resourceUsedWithWeightToResourceRatio(rMax, schedulables, type) @@ -118,9 +140,15 @@ public static void computeShares( right = mid; } } + // Set the fair shares based on the value of R we've converged to for (Schedulable sched : schedulables) { - setResourceValue(computeShare(sched, right, type), sched.getFairShare(), type); + if ((sched instanceof FSQueue) && !containsRunnableApps(sched)) { + setResourceValue(0, sched.getFairShare(), type); + continue; + } + int share = computeShare(sched, right, type); + setResourceValue(share, sched.getFairShare(), type); } } @@ -132,12 +160,20 @@ private static int resourceUsedWithWeightToResourceRatio(double w2rRatio, Collection schedulables, ResourceType type) { int resourcesTaken = 0; for (Schedulable sched : schedulables) { + if ((sched instanceof FSQueue) && !containsRunnableApps(sched)) { + continue; + } int share = computeShare(sched, w2rRatio, type); resourcesTaken += share; } return resourcesTaken; } + private static boolean containsRunnableApps(Schedulable sched) { + FSQueue queue = (FSQueue) sched; + return queue.getNumRunnableApps() > 0; + } + /** * Compute the resources assigned to a Schedulable given a particular * weight-to-resource ratio w2rRatio. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index fe2cb23..a04f938 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -414,12 +414,16 @@ public void testSimpleFairShareCalculation() throws IOException { scheduler.update(); Collection queues = scheduler.getQueueManager().getLeafQueues(); - assertEquals(3, queues.size()); - + // Divided three ways - betwen the two queues and the default queue for (FSLeafQueue p : queues) { - assertEquals(3414, p.getFairShare().getMemory()); - assertEquals(3414, p.getMetrics().getFairShareMB()); + if (p.getName().equals("root.default")) { + assertEquals(0, p.getFairShare().getMemory()); + assertEquals(0, p.getMetrics().getFairShareMB()); + } else { + assertEquals(5120, p.getFairShare().getMemory()); + assertEquals(5120, p.getMetrics().getFairShareMB()); + } } } @@ -448,12 +452,12 @@ public void testSimpleHierarchicalFairShareCalculation() throws IOException { FSLeafQueue queue1 = queueManager.getLeafQueue("default", true); FSLeafQueue queue2 = queueManager.getLeafQueue("parent.queue2", true); FSLeafQueue queue3 = queueManager.getLeafQueue("parent.queue3", true); - assertEquals(capacity / 2, queue1.getFairShare().getMemory()); - assertEquals(capacity / 2, queue1.getMetrics().getFairShareMB()); - assertEquals(capacity / 4, queue2.getFairShare().getMemory()); - assertEquals(capacity / 4, queue2.getMetrics().getFairShareMB()); - assertEquals(capacity / 4, queue3.getFairShare().getMemory()); - assertEquals(capacity / 4, queue3.getMetrics().getFairShareMB()); + assertEquals(0, queue1.getFairShare().getMemory()); + assertEquals(0, queue1.getMetrics().getFairShareMB()); + assertEquals(capacity / 2, queue2.getFairShare().getMemory()); + assertEquals(capacity / 2, queue2.getMetrics().getFairShareMB()); + assertEquals(capacity / 2, queue3.getFairShare().getMemory()); + assertEquals(capacity / 2, queue3.getMetrics().getFairShareMB()); } @Test @@ -868,9 +872,6 @@ public void testFairShareAndWeightsInNestedUserQueueRule() throws Exception { out.println(""); out.close(); - RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW); - RMApp rmApp2 = new MockRMApp(1, 1, RMAppState.NEW); - scheduler.reinitialize(conf, resourceManager.getRMContext()); int capacity = 16 * 1024; @@ -881,8 +882,8 @@ public void testFairShareAndWeightsInNestedUserQueueRule() throws Exception { scheduler.handle(nodeEvent1); // user1,user2 submit their apps to parentq and create user queues - scheduler.assignToQueue(rmApp1, "root.parentq", "user1"); - scheduler.assignToQueue(rmApp2, "root.parentq", "user2"); + createSchedulingRequest(2 * 1024, "root.parentq", "user1"); + createSchedulingRequest(2 * 1024, "root.parentq", "user2"); scheduler.update(); @@ -892,14 +893,204 @@ public void testFairShareAndWeightsInNestedUserQueueRule() throws Exception { for (FSLeafQueue leaf : leafQueues) { if (leaf.getName().equals("root.parentq.user1") || leaf.getName().equals("root.parentq.user2")) { - // assert that the fair share is 1/4th node1's capacity - assertEquals(capacity / 4, leaf.getFairShare().getMemory()); + // assert that the fair share is 1/2 node1's capacity + assertEquals(capacity / 2, leaf.getFairShare().getMemory()); // assert weights are equal for both the user queues assertEquals(1.0, leaf.getWeights().getWeight(ResourceType.MEMORY), 0); } } } + private void setupCluster(int nodeCapacity) throws IOException { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + + out.println(""); + out.println(""); + out.println(""); + out.println("2"); + out.println(" "); + out.println(" "); + out.println(""); + out.println(""); + out.println("8"); + out.println(" "); + out.println(" "); + out.println(" "); + out.println(" "); + out.println(" "); + out.println(" "); + out.println(" "); + out.println(" "); + out.println(""); + out.println(""); + out.close(); + + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // create node with 16 G + RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(nodeCapacity), + 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + } + + @Test + public void testFairShareNoAppsRunning() throws IOException { + int nodeCapacity = 16 * 1024; + setupCluster(nodeCapacity); + // No apps are running in the cluster,verify if fair share is zero for all + // queues. + Collection leafQueues = scheduler.getQueueManager() + .getLeafQueues(); + + for (FSLeafQueue leaf : leafQueues) { + assertEquals(0, leaf.getFairShare().getMemory() / nodeCapacity * 100); + } + } + + @Test + public void testFairShareOneAppRunning() throws IOException { + int nodeCapacity = 16 * 1024; + setupCluster(nodeCapacity); + + // Run a app in a childB1 and see if that queue's fair share is 100% + createSchedulingRequest(2 * 1024, "root.lowPriorityParentB.childB1", + "user1"); + + scheduler.update(); + + assertEquals( + 100, + scheduler.getQueueManager() + .getLeafQueue("root.lowPriorityParentB.childB1", false) + .getFairShare().getMemory() + / nodeCapacity * 100); + } + + @Test + public void testMultipleActiveQueuesUnderSameParent() throws IOException { + int nodeCapacity = 16 * 1024; + setupCluster(nodeCapacity); + + // Run apps in childA1,childA2,childA3 + createSchedulingRequest(2 * 1024, "root.highPriorityParentA.childA1", + "user1"); + createSchedulingRequest(2 * 1024, "root.highPriorityParentA.childA2", + "user2"); + createSchedulingRequest(2 * 1024, "root.highPriorityParentA.childA3", + "user3"); + + scheduler.update(); + + // Since no app is running in root.lowPriorityParentB,fair share of + // root.highPriorityParentA would be 100% and each of the three + // active child queues would get 33%. + for (int i = 1; i <= 3; i++) { + assertEquals( + 33, + (double) scheduler.getQueueManager() + .getLeafQueue("root.highPriorityParentA.childA" + i, false) + .getFairShare().getMemory() + / nodeCapacity * 100, .9); + } + } + + @Test + public void testMultipleActiveQueuesUnderDifferentParent() throws IOException { + int nodeCapacity = 16 * 1024; + setupCluster(nodeCapacity); + + // Run apps in childA1,childA2 which are under parent highPriorityParentA + createSchedulingRequest(2 * 1024, "root.highPriorityParentA.childA1", + "user1"); + createSchedulingRequest(3 * 1024, "root.highPriorityParentA.childA2", + "user2"); + + // Run apps in childB1,childB2 which are under parent lowPriorityParentB + createSchedulingRequest(1 * 1024, "root.lowPriorityParentB.childB1", + "user3"); + createSchedulingRequest(2 * 1024, "root.lowPriorityParentB.childB2", + "user4"); + + scheduler.update(); + + // Now both parents highPriorityParentA and lowPriorityParentB have active + // queues under them,so each of the parents would get their fair share of + // 80% and 20%. The two active child queues under highPriorityParentA would + // get 80/2=40% and the ones in lowPriorityParentB would get 20/2=10% + for (int i = 1; i <= 2; i++) { + assertEquals( + 40, + (double) scheduler.getQueueManager() + .getLeafQueue("root.highPriorityParentA.childA" + i, false) + .getFairShare().getMemory() + / nodeCapacity * 100, .9); + } + + for (int i = 1; i <= 2; i++) { + assertEquals( + 10, + (double) scheduler.getQueueManager() + .getLeafQueue("root.lowPriorityParentB.childB" + i, false) + .getFairShare().getMemory() + / nodeCapacity * 100, .9); + } + } + + @Test + public void testFairShareResetsToZeroWhenAppsComplete() throws IOException { + int nodeCapacity = 16 * 1024; + setupCluster(nodeCapacity); + + // Run apps in childA1,childA2 which are under parent highPriorityParentA + ApplicationAttemptId app1 = createSchedulingRequest(2 * 1024, + "root.highPriorityParentA.childA1", "user1"); + ApplicationAttemptId app2 = createSchedulingRequest(3 * 1024, + "root.highPriorityParentA.childA2", "user2"); + + scheduler.update(); + + // Verify if both the active queues under highPriorityParentA get 50% fair + // share + assertEquals( + 50, + (double) scheduler.getQueueManager() + .getLeafQueue("root.highPriorityParentA.childA1", false) + .getFairShare().getMemory() + / nodeCapacity * 100, 0); + assertEquals( + 50, + (double) scheduler.getQueueManager() + .getLeafQueue("root.highPriorityParentA.childA2", false) + .getFairShare().getMemory() + / nodeCapacity * 100, 0); + + // Let app under childA1 complete. This should cause the fair share of queue + // childA1 to be reset to zero,since the queue has no apps running. + // Queue childA2's fair share would increase to 100% since its the only + // active queue. + AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent( + app1, RMAppAttemptState.FINISHED, false); + + scheduler.handle(appRemovedEvent1); + scheduler.update(); + + assertEquals( + 0, + (double) scheduler.getQueueManager() + .getLeafQueue("root.highPriorityParentA.childA1", false) + .getFairShare().getMemory() + / nodeCapacity * 100, 0); + assertEquals( + 100, + (double) scheduler.getQueueManager() + .getLeafQueue("root.highPriorityParentA.childA2", false) + .getFairShare().getMemory() + / nodeCapacity * 100, 0); + } + /** * Make allocation requests and ensure they are reflected in queue demand. */