diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java index 6fc90f4..704902d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java @@ -73,6 +73,8 @@ private final SchedulingPolicy defaultSchedulingPolicy; + private final Set hierarchicalUserQueues; + // Policy for mapping apps to queues @VisibleForTesting QueuePlacementPolicy placementPolicy; @@ -89,7 +91,8 @@ public AllocationConfiguration(Map minQueueResources, Map minSharePreemptionTimeouts, Map> queueAcls, long fairSharePreemptionTimeout, long defaultMinSharePreemptionTimeout, - QueuePlacementPolicy placementPolicy, Set queueNames) { + QueuePlacementPolicy placementPolicy, Set queueNames, + Set hierarchicalUserQueues) { this.minQueueResources = minQueueResources; this.maxQueueResources = maxQueueResources; this.queueMaxApps = queueMaxApps; @@ -105,6 +108,7 @@ public AllocationConfiguration(Map minQueueResources, this.defaultMinSharePreemptionTimeout = defaultMinSharePreemptionTimeout; this.placementPolicy = placementPolicy; this.queueNames = queueNames; + this.hierarchicalUserQueues = hierarchicalUserQueues; } public AllocationConfiguration(Configuration conf) { @@ -124,6 +128,7 @@ public AllocationConfiguration(Configuration conf) { placementPolicy = QueuePlacementPolicy.fromConfiguration(conf, new HashSet()); queueNames = new HashSet(); + hierarchicalUserQueues = new HashSet(); } /** @@ -228,4 +233,12 @@ public SchedulingPolicy getDefaultSchedulingPolicy() { public QueuePlacementPolicy getPlacementPolicy() { return placementPolicy; } + + public boolean isHierarchicalUserQueue(String queue) { + if (hierarchicalUserQueues.contains(queue)) { + return true; + } else { + return false; + } + } } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java index 69dcf89..6e3c7e6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java @@ -205,6 +205,7 @@ public synchronized void reloadAllocations() throws IOException, Map minSharePreemptionTimeouts = new HashMap(); Map> queueAcls = new HashMap>(); + Set hierarchicalUserQueues = new HashSet(); int userMaxAppsDefault = Integer.MAX_VALUE; int queueMaxAppsDefault = Integer.MAX_VALUE; long fairSharePreemptionTimeout = Long.MAX_VALUE; @@ -291,7 +292,7 @@ public synchronized void reloadAllocations() throws IOException, } loadQueue(parent, element, minQueueResources, maxQueueResources, queueMaxApps, userMaxApps, queueWeights, queuePolicies, minSharePreemptionTimeouts, - queueAcls, queueNamesInAllocFile); + queueAcls, queueNamesInAllocFile, hierarchicalUserQueues); } // Load placement policy and pass it configured queues @@ -308,7 +309,7 @@ public synchronized void reloadAllocations() throws IOException, queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault, queueMaxAppsDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts, queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout, - newPlacementPolicy, queueNamesInAllocFile); + newPlacementPolicy, queueNamesInAllocFile, hierarchicalUserQueues); lastSuccessfulReload = clock.getTime(); lastReloadAttemptFailed = false; @@ -324,7 +325,8 @@ private void loadQueue(String parentName, Element element, Map Map userMaxApps, Map queueWeights, Map queuePolicies, Map minSharePreemptionTimeouts, - Map> queueAcls, Set queueNamesInAllocFile) + Map> queueAcls, Set queueNamesInAllocFile, + Set hierarchicalUserQueues) throws AllocationConfigurationException { String queueName = element.getAttribute("name"); if (parentName != null) { @@ -371,12 +373,17 @@ private void loadQueue(String parentName, Element element, Map } else if ("aclAdministerApps".equals(field.getTagName())) { String text = ((Text)field.getFirstChild()).getData(); acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text)); - } else if ("queue".endsWith(field.getTagName()) || + } else if ("hierarchicalUserQueue".equals(field.getTagName())) { + String text = ((Text) field.getFirstChild()).getData().trim(); + if (Boolean.parseBoolean(text) == true) + hierarchicalUserQueues.add(queueName); + } + else if ("queue".endsWith(field.getTagName()) || "pool".equals(field.getTagName())) { loadQueue(queueName, field, minQueueResources, maxQueueResources, queueMaxApps, userMaxApps, queueWeights, queuePolicies, minSharePreemptionTimeouts, - queueAcls, queueNamesInAllocFile); + queueAcls, queueNamesInAllocFile, hierarchicalUserQueues); isLeaf = false; } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 3cdff7f..55d9733 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -687,8 +687,17 @@ FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user) { String appRejectMsg = null; try { - QueuePlacementPolicy placementPolicy = allocConf.getPlacementPolicy(); - queueName = placementPolicy.assignAppToQueue(queueName, user); + if (allocConf.isHierarchicalUserQueue(queueName) + || (!queueName.startsWith(QueueManager.ROOT_QUEUE + ".") && allocConf + .isHierarchicalUserQueue(QueueManager.ROOT_QUEUE + "." + + queueName))) { + QueuePlacementRule hUQRule = new QueuePlacementRule.HierarchicalUserQueue() + .initialize(true, null); + queueName = hUQRule.getQueueForApp(queueName, user, null, null); + } else { + QueuePlacementPolicy placementPolicy = allocConf.getPlacementPolicy(); + queueName = placementPolicy.assignAppToQueue(queueName, user); + } if (queueName == null) { appRejectMsg = "Application rejected by queue placement policy"; } else { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java index f6f5160..6c9e03c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java @@ -143,7 +143,7 @@ private FSLeafQueue createLeafQueue(String name) { FSLeafQueue leafQueue = null; for (int i = newQueueNames.size()-1; i >= 0; i--) { String queueName = newQueueNames.get(i); - if (i == 0) { + if (i == 0 && !queueConf.isHierarchicalUserQueue(name)) { // First name added was the leaf queue leafQueue = new FSLeafQueue(name, scheduler, parent); try { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java index 6acba27..d8b6106 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java @@ -206,6 +206,28 @@ public boolean isTerminal() { } /** + * Places apps under a new/already created user queue under the specified + * queue + */ + public static class HierarchicalUserQueue extends QueuePlacementRule { + + @Override + protected String getQueueForApp(String requestedQueue, String user, + Groups groups, Collection configuredQueues) throws IOException { + if (!requestedQueue.startsWith("root.")) { + requestedQueue = "root." + requestedQueue; + } + return requestedQueue + "." + user; + } + + @Override + public boolean isTerminal() { + return false; + } + + } + + /** * Rejects all apps */ public static class Reject extends QueuePlacementRule { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 74074f2..2e71bef 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -43,7 +43,6 @@ import javax.xml.parsers.ParserConfigurationException; import org.junit.Assert; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; @@ -70,6 +69,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; @@ -958,6 +958,111 @@ public void testConfigureRootQueue() throws Exception { assertNotNull(queueManager.getLeafQueue("child2", false)); } + @Test + public void testHierarchicalUserQueue() throws Exception { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println(" fair"); + out.println(" "); + out.println(" fair"); + out.println(" true"); + out.println(" "); + out.println(""); + out.println(""); + out.println(" fair"); + out.println(""); + out.println(""); + out.println(" fair"); + out.println(" "); + out.println(" fair"); + out.println(" true"); + out.println(" "); + out.println(" fair"); + out.println(" true"); + out.println(" "); + out.println(" "); + out.println(""); + out.println(""); + out.close(); + + scheduler.reinitialize(conf, resourceManager.getRMContext()); + RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW); + RMApp rmApp2 = new MockRMApp(1, 1, RMAppState.NEW); + + // test hierarchical user queue at depth 2 + FSLeafQueue user1Leaf = scheduler.assignToQueue(rmApp1, + "regularQueue1.parentUserQueue", "user1"); + // test whether user1 queue was created + assertNotNull(user1Leaf); + assertEquals("root.regularQueue1.parentUserQueue.user1", + user1Leaf.getName()); + + // test hierarchical user queue at depth 3 + FSLeafQueue user3Leaf = scheduler.assignToQueue(rmApp1, + "root.regularQueue3.regularQueue31.parentUserQueue311", "user3"); + assertNotNull(user3Leaf); + assertEquals("root.regularQueue3.regularQueue31.parentUserQueue311.user3", + user3Leaf.getName()); + + // test non-userQueue + assertEquals("root.regularQueue2", + scheduler.assignToQueue(rmApp2, "regularQueue2", "user2").getName()); + } + + @Test + public void testFairShareAndWeightsInHierarchicalUserQueue() throws Exception { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + + out.println(""); + out.println(""); + out.println(""); + out.println(" fair"); + out.println(" "); + out.println(" fair"); + out.println(" true"); + out.println(" "); + out.println(""); + out.println(""); + out.close(); + + RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW); + RMApp rmApp2 = new MockRMApp(1, 1, RMAppState.NEW); + + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + int capacity = 16 * 1024; + //create node with 16 G + RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(capacity), + 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + //user1,user2 submit their apps + scheduler.assignToQueue(rmApp1, "regularQueue1.parentUserQueue", "user1"); + scheduler.assignToQueue(rmApp2, "regularQueue1.parentUserQueue", "user2"); + + scheduler.update(); + + Collection leafQueues = scheduler.getQueueManager() + .getLeafQueues(); + + for (FSLeafQueue leaf : leafQueues) { + if (leaf.getName().equals("root.regularQueue1.parentUserQueue.user1") + || leaf.getName().equals("root.regularQueue1.parentUserQueue.user2")) { + //assert that the fair share is 1/4th node1's capacity + assertEquals(capacity / 4, leaf.getFairShare().getMemory()); + //assert weights are equal for both the user queues + assertEquals(1.0, leaf.getWeights().getWeight(ResourceType.MEMORY), 0); + } + } + } + @Test (timeout = 5000) public void testIsStarvedForMinShare() throws Exception { conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);