From 01eefa7201d295c2e9c0bd8116fa7ec26cd40784 Mon Sep 17 00:00:00 2001 From: keyki Date: Thu, 3 Jul 2014 15:10:12 +0200 Subject: [PATCH] YARN-2248. Implement application movement between queues in CapacityScheduler --- .../scheduler/capacity/CapacityScheduler.java | 94 +++++- .../capacity/CapacitySchedulerConfiguration.java | 6 +- .../scheduler/capacity/LeafQueue.java | 17 ++ .../yarn/server/resourcemanager/Application.java | 4 + .../scheduler/capacity/TestCapacityScheduler.java | 319 +++++++++++++++++++++ 5 files changed, 437 insertions(+), 3 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 92727e3..08a9396 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -18,8 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; -import com.google.common.base.Preconditions; - import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; @@ -50,9 +48,12 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; @@ -94,6 +95,7 @@ import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; @LimitedPrivate("yarn") @Evolving @@ -1135,4 +1137,92 @@ private CapacitySchedulerConfiguration loadCapacitySchedulerConfiguration( throw new IOException(e); } } + + @Override + public synchronized String moveApplication(ApplicationId appId, String queueName) + throws YarnException { + SchedulerApplication app = applications.get(appId); + if (app == null) { + throw new YarnException("App to be moved " + appId + " not found."); + } + FiCaSchedulerApp attempt = (FiCaSchedulerApp) app.getCurrentAppAttempt(); + // To serialize with #allocate, synchronize on app attempt + synchronized (attempt) { + LeafQueue oldQueue = (LeafQueue) app.getQueue(); + LeafQueue targetQueue = (LeafQueue) queues.get(queueName); + if (targetQueue == null) { + throw new YarnException("Target queue " + queueName + " not found"); + } + if (targetQueue == oldQueue) { + return oldQueue.getQueueName(); + } + verifyMoveDoesNotViolateConstraints(attempt, oldQueue, targetQueue); + executeMove(app, attempt, oldQueue, targetQueue); + return targetQueue.getQueueName(); + } + } + + private void verifyMoveDoesNotViolateConstraints(FiCaSchedulerApp app, + LeafQueue oldQueue, LeafQueue targetQueue) throws YarnException { + String queueName = targetQueue.getQueueName(); + ApplicationAttemptId appAttId = app.getApplicationAttemptId(); + // When checking maxResources and maxRunningApps, only need to consider + // queues before the lowest common ancestor of the two queues because the + // total running apps in queues above will not be changed. + CSQueue lowestCommonAncestor = findLowestCommonAncestorQueue(oldQueue, targetQueue); + Resource consumption = app.getCurrentConsumption(); + + CSQueue currentQueue = targetQueue; + while (currentQueue != lowestCommonAncestor) { + // maxApps + if (currentQueue.getNumApplications() == this.conf. + getMaximumApplicationsPerQueue(currentQueue.getQueueName())) { + throw new YarnException("Moving app attempt " + appAttId + " to queue " + + queueName + " would violate queue maxApps constraints on" + + " queue " + currentQueue.getQueueName()); + } + + // maxCapacity + float potentialNewCapacity = Resources.divide(calculator, clusterResource, + Resources.add(currentQueue.getUsedResources(), consumption), clusterResource); + if (potentialNewCapacity >= currentQueue.getAbsoluteMaximumCapacity()) { + throw new YarnException("Moving app attempt " + appAttId + " to queue " + + queueName + " would violate queue maxCapacity constraints on" + + " queue " + currentQueue.getQueueName()); + } + + // queue state + if (currentQueue.getState() != QueueState.RUNNING) { + throw new YarnException("Moving app attempt " + appAttId + " to queue " + + queueName + " would violate queue state constraints on" + + " queue " + currentQueue.getQueueName()); + } + currentQueue = currentQueue.getParent(); + } + } + + private void executeMove(SchedulerApplication app, FiCaSchedulerApp attempt, LeafQueue oldQueue, + LeafQueue newQueue) { + oldQueue.removeApplication(attempt); + attempt.move(newQueue); // This updates all the metrics + app.setQueue(newQueue); + newQueue.trackApplicationsOnParent(attempt.getApplicationId(), attempt.getUser()); + newQueue.submitApplicationAttempt(attempt, attempt.getUser()); + } + + private CSQueue findLowestCommonAncestorQueue(CSQueue queue1, CSQueue queue2) { + String name1 = queue1.getQueuePath(); + String name2 = queue2.getQueuePath(); + // We keep track of the last period we encounter to avoid returning root.apple + // when the queues are root.applepie and root.appletart + int lastPeriodIndex = -1; + for (int i = 0; i < Math.max(name1.length(), name2.length()); i++) { + if (name1.length() <= i || name2.length() <= i || name1.charAt(i) != name2.charAt(i)) { + return queues.get(name1.substring(0, lastPeriodIndex)); + } else if (name1.charAt(i) == '.') { + lastPeriodIndex = i; + } + } + return queue1; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 6fe695e..27c7ccd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -264,7 +264,11 @@ public float getUserLimitFactor(String queue) { public void setUserLimitFactor(String queue, float userLimitFactor) { setFloat(getQueuePrefix(queue) + USER_LIMIT_FACTOR, userLimitFactor); } - + + public void setState(String queue, QueueState state){ + set(getQueuePrefix(queue) + STATE, state.name()); + } + public QueueState getState(String queue) { String state = get(getQueuePrefix(queue) + STATE); return (state != null) ? diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 65938aa..d838067 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -763,6 +763,23 @@ public void finishApplicationAttempt(FiCaSchedulerApp application, String queue) getParent().finishApplicationAttempt(application, queue); } + public synchronized boolean trackApplicationsOnParent(ApplicationId applicationId, String user) { + boolean result = true; + try { + getParent().submitApplication(applicationId, user, this.queueName); + } catch (AccessControlException e) { + LOG.info("Failed to add application to queue " + getParent().getQueueName(), e); + result = false; + } + return result; + } + + public synchronized void removeApplication(FiCaSchedulerApp application) { + removeApplicationAttempt(application, users.get(application.getUser())); + // parent keep track #applications + getParent().finishApplication(application.getApplicationId(), application.getUser()); + } + public synchronized void removeApplicationAttempt(FiCaSchedulerApp application, User user) { boolean wasActive = activeApplications.remove(application); if (!wasActive) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java index ce5dd96..2c69d80 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java @@ -174,6 +174,10 @@ public synchronized void submit() throws IOException, YarnException { new AppAttemptAddedSchedulerEvent(this.applicationAttemptId, false); scheduler.handle(addAttemptEvent); } + + public void moveToQueue(String queue) throws YarnException { + resourceManager.getResourceScheduler().moveApplication(applicationId, queue); + } public synchronized void addResourceRequestSpec( Priority priority, Resource capability) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index c3b1d57..6b5a33e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -65,6 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.NodeManager; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; @@ -828,4 +830,321 @@ public void testNumClusterNodes() throws Exception { cs.stop(); } + + @Test(expected = YarnException.class) + public void testMoveAppViolateMaxCapacity() throws Exception { + + // Register node1 + String host_0 = "host_0"; + NodeManager nm_0 = registerNode(host_0, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(3 * GB, 1)); + + // Register node2 + String host_1 = "host_1"; + NodeManager nm_1 = registerNode(host_1, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(3 * GB, 1)); + + // ResourceRequest priorities + Priority priority_0 = org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.create(0); + Priority priority_1 = org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.create(1); + + // Submit application_0 + Application application_0 = new Application("user_0", "a1", resourceManager); + application_0.submit(); // app + app attempt event sent to scheduler + + application_0.addNodeManager(host_0, 1234, nm_0); + application_0.addNodeManager(host_1, 1234, nm_1); + + Resource capability_0_0 = Resources.createResource(3 * GB, 1); + application_0.addResourceRequestSpec(priority_1, capability_0_0); + + Resource capability_0_1 = Resources.createResource(2 * GB, 1); + application_0.addResourceRequestSpec(priority_0, capability_0_1); + + Task task_0_0 = new Task(application_0, priority_1, new String[]{host_0, host_1}); + application_0.addTask(task_0_0); + + // Submit application_1 + Application application_1 = new Application("user_1", "b2", resourceManager); + application_1.submit(); // app + app attempt event sent to scheduler + + application_1.addNodeManager(host_0, 1234, nm_0); + application_1.addNodeManager(host_1, 1234, nm_1); + + Resource capability_1_0 = Resources.createResource(3 * GB, 1); + application_1.addResourceRequestSpec(priority_1, capability_1_0); + + Resource capability_1_1 = Resources.createResource(2 * GB, 1); + application_1.addResourceRequestSpec(priority_0, capability_1_1); + + Task task_1_0 = new Task(application_1, priority_1, new String[]{host_0, host_1}); + application_1.addTask(task_1_0); + + // Send resource requests to the scheduler + application_0.schedule(); // allocate + application_1.schedule(); // allocate + + // task_0_0 allocated + nodeUpdate(nm_0); + + nodeUpdate(nm_1); + + // Get allocations from the scheduler + application_0.schedule(); // task_0_0 + checkApplicationResourceUsage(3 * GB, application_0); + + application_1.schedule(); // task_1_0 + checkApplicationResourceUsage(3 * GB, application_1); + + checkNodeResourceUsage(3 * GB, nm_0); + checkNodeResourceUsage(3 * GB, nm_1); + + // b2 queue contains 3GB consumption app, + // add another 3GB will hit max capacity limit on queue b + application_0.moveToQueue("b1"); + } + + @Test + public void testMoveAppForMoveToQueueCannotRunApp() throws Exception { + + // Register node1 + String host_0 = "host_0"; + NodeManager nm_0 = registerNode(host_0, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(4 * GB, 1)); + + // Register node2 + String host_1 = "host_1"; + NodeManager nm_1 = registerNode(host_1, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(2 * GB, 1)); + + // ResourceRequest priorities + Priority priority_0 = org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.create(0); + Priority priority_1 = org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.create(1); + + // Submit application_0 + Application application_0 = new Application("user_0", "a1", resourceManager); + application_0.submit(); // app + app attempt event sent to scheduler + + application_0.addNodeManager(host_0, 1234, nm_0); + application_0.addNodeManager(host_1, 1234, nm_1); + + Resource capability_0_0 = Resources.createResource(1 * GB, 1); + application_0.addResourceRequestSpec(priority_1, capability_0_0); + + Resource capability_0_1 = Resources.createResource(2 * GB, 1); + application_0.addResourceRequestSpec(priority_0, capability_0_1); + + Task task_0_0 = new Task(application_0, priority_1, new String[]{host_0, host_1}); + application_0.addTask(task_0_0); + + // Submit application_1 + Application application_1 = new Application("user_1", "b2", resourceManager); + application_1.submit(); // app + app attempt event sent to scheduler + + application_1.addNodeManager(host_0, 1234, nm_0); + application_1.addNodeManager(host_1, 1234, nm_1); + + Resource capability_1_0 = Resources.createResource(1 * GB, 1); + application_1.addResourceRequestSpec(priority_1, capability_1_0); + + Resource capability_1_1 = Resources.createResource(2 * GB, 1); + application_1.addResourceRequestSpec(priority_0, capability_1_1); + + Task task_1_0 = new Task(application_1, priority_1, new String[]{host_0, host_1}); + application_1.addTask(task_1_0); + + // Send resource requests to the scheduler + application_0.schedule(); // allocate + application_1.schedule(); // allocate + + // task_0_0 task_1_0 allocated, used=4G + nodeUpdate(nm_0); + + // nothing allocated + nodeUpdate(nm_1); + + // Get allocations from the scheduler + application_0.schedule(); // task_0_0 + checkApplicationResourceUsage(1 * GB, application_0); + + application_1.schedule(); // task_1_0 + checkApplicationResourceUsage(1 * GB, application_1); + + checkNodeResourceUsage(2 * GB, nm_0); // task_0_0 (1G) and task_1_0 (1G) 2G available + checkNodeResourceUsage(0 * GB, nm_1); // no tasks, 2G available + + // move app from a1(30% cap of total 10.5% cap) to b1(79,2% cap of 89,5% total cap) + application_0.moveToQueue("b1"); + + // 2GB 1C + Task task_1_1 = new Task(application_1, priority_0, new String[]{ResourceRequest.ANY}); + application_1.addTask(task_1_1); + + application_1.schedule(); + + // 2GB 1C + Task task_0_1 = new Task(application_0, priority_0, new String[]{host_0, host_1}); + application_0.addTask(task_0_1); + + application_0.schedule(); + + // prev 2G used free 2G + nodeUpdate(nm_0); + + //prev 0G used free 2G + nodeUpdate(nm_1); + + // Get allocations from the scheduler + application_1.schedule(); + checkApplicationResourceUsage(3 * GB, application_1); + + // Get allocations from the scheduler + application_0.schedule(); + checkApplicationResourceUsage(3 * GB, application_0); + + checkNodeResourceUsage(4 * GB, nm_0); + checkNodeResourceUsage(2 * GB, nm_1); + } + + @Test + public void testMoveApp() throws Exception { + + // Register node1 + String host_0 = "host_0"; + NodeManager nm_0 = registerNode(host_0, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(5 * GB, 1)); + + // Register node2 + String host_1 = "host_1"; + NodeManager nm_1 = registerNode(host_1, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(5 * GB, 1)); + + // ResourceRequest priorities + Priority priority_0 = org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.create(0); + Priority priority_1 = org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.create(1); + + // Submit application_0 + Application application_0 = new Application("user_0", "a1", resourceManager); + application_0.submit(); // app + app attempt event sent to scheduler + + application_0.addNodeManager(host_0, 1234, nm_0); + application_0.addNodeManager(host_1, 1234, nm_1); + + Resource capability_0_0 = Resources.createResource(3 * GB, 1); + application_0.addResourceRequestSpec(priority_1, capability_0_0); + + Resource capability_0_1 = Resources.createResource(2 * GB, 1); + application_0.addResourceRequestSpec(priority_0, capability_0_1); + + Task task_0_0 = new Task(application_0, priority_1, new String[]{host_0, host_1}); + application_0.addTask(task_0_0); + + // Submit application_1 + Application application_1 = new Application("user_1", "b2", resourceManager); + application_1.submit(); // app + app attempt event sent to scheduler + + application_1.addNodeManager(host_0, 1234, nm_0); + application_1.addNodeManager(host_1, 1234, nm_1); + + Resource capability_1_0 = Resources.createResource(1 * GB, 1); + application_1.addResourceRequestSpec(priority_1, capability_1_0); + + Resource capability_1_1 = Resources.createResource(2 * GB, 1); + application_1.addResourceRequestSpec(priority_0, capability_1_1); + + Task task_1_0 = new Task(application_1, priority_1, new String[]{host_0, host_1}); + application_1.addTask(task_1_0); + + // Send resource requests to the scheduler + application_0.schedule(); // allocate + application_1.schedule(); // allocate + + // b2 can only run 1 app at a time + application_0.moveToQueue("b2"); + + nodeUpdate(nm_0); + + nodeUpdate(nm_1); + + // Get allocations from the scheduler + application_0.schedule(); // task_0_0 + checkApplicationResourceUsage(0 * GB, application_0); + + application_1.schedule(); // task_1_0 + checkApplicationResourceUsage(1 * GB, application_1); + + // task_1_0 (1G) application_0 moved to b2 with max running app 1 so it is not scheduled + checkNodeResourceUsage(1 * GB, nm_0); + checkNodeResourceUsage(0 * GB, nm_1); + + // lets move application_0 to a queue where it can run + application_0.moveToQueue("a2"); + application_0.schedule(); + + nodeUpdate(nm_1); + + // Get allocations from the scheduler + application_0.schedule(); // task_0_0 + checkApplicationResourceUsage(3 * GB, application_0); + + checkNodeResourceUsage(1 * GB, nm_0); + checkNodeResourceUsage(3 * GB, nm_1); + } + + @Test(expected = YarnException.class) + public void testMoveAppViolateQueueState() throws Exception { + + resourceManager = new ResourceManager(); + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + setupQueueConfiguration(csConf); + csConf.setState(B, QueueState.STOPPED); + YarnConfiguration conf = new YarnConfiguration(csConf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class); + resourceManager.init(conf); + resourceManager.getRMContext().getContainerTokenSecretManager().rollMasterKey(); + resourceManager.getRMContext().getNMTokenSecretManager().rollMasterKey(); + ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start(); + mockContext = mock(RMContext.class); + when(mockContext.getConfigurationProvider()).thenReturn( + new LocalConfigurationProvider()); + + // Register node1 + String host_0 = "host_0"; + NodeManager nm_0 = registerNode(host_0, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(6 * GB, 1)); + + // ResourceRequest priorities + Priority priority_0 = org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.create(0); + Priority priority_1 = org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.create(1); + + // Submit application_0 + Application application_0 = new Application("user_0", "a1", resourceManager); + application_0.submit(); // app + app attempt event sent to scheduler + + application_0.addNodeManager(host_0, 1234, nm_0); + + Resource capability_0_0 = Resources.createResource(3 * GB, 1); + application_0.addResourceRequestSpec(priority_1, capability_0_0); + + Resource capability_0_1 = Resources.createResource(2 * GB, 1); + application_0.addResourceRequestSpec(priority_0, capability_0_1); + + Task task_0_0 = new Task(application_0, priority_1, new String[]{host_0}); + application_0.addTask(task_0_0); + + // Send resource requests to the scheduler + application_0.schedule(); // allocate + + // task_0_0 allocated + nodeUpdate(nm_0); + + // Get allocations from the scheduler + application_0.schedule(); // task_0_0 + checkApplicationResourceUsage(3 * GB, application_0); + + checkNodeResourceUsage(3 * GB, nm_0); + // b2 queue contains 3GB consumption app, + // add another 3GB will hit max capacity limit on queue b + application_0.moveToQueue("b1"); + } } -- 1.8.5.2 (Apple Git-48)