diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java index e684c2b..2291911 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java @@ -217,7 +217,14 @@ public void setQueues(List queues) { // Since partitionToLookAt is a thread local variable, and every time we // copy and sort queues, so it's safe for multi-threading environment. PriorityUtilizationQueueOrderingPolicy.partitionToLookAt.set(partition); - List sortedQueue = new ArrayList<>(queues); + List sortedQueue = new ArrayList<>(); + for (CSQueue childQueue : queues) { + if (!Resources + .equals(childQueue.getQueueResourceUsage().getPending(partition), + Resources.none())) { + sortedQueue.add(childQueue); + } + } Collections.sort(sortedQueue, new PriorityQueueComparator()); return sortedQueue.iterator(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPerf.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPerf.java index a2ccf6e..e9324cb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPerf.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPerf.java @@ -31,25 +31,35 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.junit.Assert; import org.junit.Assume; import org.junit.Test; @@ -58,6 +68,7 @@ import java.util.HashMap; import java.util.Map; import java.util.PriorityQueue; +import java.util.Random; import static org.apache.hadoop.yarn.server.resourcemanager.resource.TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES; import static org.mockito.Mockito.mock; @@ -262,4 +273,88 @@ public void testUserLimitThroughputForFourResources() throws Exception { public void testUserLimitThroughputForFiveResources() throws Exception { testUserLimitThroughputWithNumberOfResourceTypes(5); } + + @Test(timeout = 300000) + public void testAssignContainersPerformanceForOneHundredQueues() + throws Exception { + testAssignContainersPerformanceForLargeScaleQueues(100, 1000); + } + + @Test(timeout = 300000) + public void testAssignContainersPerformanceForFiveHundredQueues() + throws Exception { + testAssignContainersPerformanceForLargeScaleQueues(500, 1000); + } + + @Test(timeout = 300000) + public void testAssignContainersPerformanceForOneThousandQueues() + throws Exception { + testAssignContainersPerformanceForLargeScaleQueues(1000, 1000); + } + + @Test(timeout = 300000) + public void testAssignContainersPerformanceForFiveThousandQueues() + throws Exception { + testAssignContainersPerformanceForLargeScaleQueues(5000, 1000); + } + + private void testAssignContainersPerformanceForLargeScaleQueues(int queueSize, + int testingTimes) throws Exception { + // init conf + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + float capacity = 100.0f / queueSize; + String[] queues = new String[queueSize]; + for (int i = 0; i < queueSize; i++) { + queues[i] = "q" + i; + conf.setCapacity("root." + queues[i], capacity); + conf.setMaximumCapacity("root." + queues[i], 100); + } + conf.setQueues(CapacitySchedulerConfiguration.ROOT, queues); + // init RM & NMs & Nodes + final MockRM rm = new MockRM(conf); + rm.start(); + final MockNM nm1 = rm.registerNode("h1:1234", 9 * GB); + // init scheduler nodes + int waitTime = 1000; + while (waitTime > 0 && + ((AbstractYarnScheduler) rm.getRMContext().getScheduler()) + .getNodeTracker().nodeCount() < 2) { + waitTime -= 10; + Thread.sleep(10); + } + Assert.assertEquals(1, + ((AbstractYarnScheduler) rm.getRMContext().getScheduler()) + .getNodeTracker().nodeCount()); + // Quiet the loggers while measuring performance + for (Enumeration loggers = LogManager.getCurrentLoggers(); loggers + .hasMoreElements(); ) { + Logger logger = (Logger) loggers.nextElement(); + logger.setLevel(Level.WARN); + } + LogManager.getLogger("org.apache.hadoop.yarn.server.resourcemanager") + .setLevel(Level.WARN); + // assign containers for every queue + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + FiCaSchedulerNode node1 = cs.getAllNodes().get(0); + long startTime; + long totalTime = 0; + Random rand = new Random(); + for (int i = 0; i < testingTimes; i++) { + long queueIndex = rand.nextInt(queueSize); + RMApp app = rm.submitApp(200, "app", "user", null, "q" + queueIndex); + startTime = System.nanoTime(); + cs.getRootQueue().assignContainers(nm1.getCapatibility(), + new SimpleCandidateNodeSet<>(node1), + new ResourceLimits(nm1.getCapatibility()), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + totalTime += System.nanoTime() - startTime; + rm.killApp(app.getApplicationId()); + } + System.out.println( + "#QueueSize = " + queueSize + ", testing times : " + testingTimes + + ", total cost time : " + totalTime + " ns, average cost time : " + + (float) totalTime / testingTimes + " ns."); + rm.stop(); + } + }