diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java index 0f9d906..126e1cd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java @@ -53,6 +53,11 @@ private final int userMaxAppsDefault; private final int queueMaxAppsDefault; + // Maximum resource share for each queue that can be used to run AMs + @VisibleForTesting + final Map queueMaxAMShares; + private final float queueMaxAMShareDefault; + // ACL's for each queue. Only specifies non-default ACL's from configuration. private final Map> queueAcls; @@ -84,8 +89,9 @@ public AllocationConfiguration(Map minQueueResources, Map maxQueueResources, Map queueMaxApps, Map userMaxApps, - Map queueWeights, int userMaxAppsDefault, - int queueMaxAppsDefault, + Map queueWeights, + Map queueMaxAMShares, int userMaxAppsDefault, + int queueMaxAppsDefault, float queueMaxAMShareDefault, Map schedulingPolicies, SchedulingPolicy defaultSchedulingPolicy, Map minSharePreemptionTimeouts, @@ -97,9 +103,11 @@ public AllocationConfiguration(Map minQueueResources, this.maxQueueResources = maxQueueResources; this.queueMaxApps = queueMaxApps; this.userMaxApps = userMaxApps; + this.queueMaxAMShares = queueMaxAMShares; this.queueWeights = queueWeights; this.userMaxAppsDefault = userMaxAppsDefault; this.queueMaxAppsDefault = queueMaxAppsDefault; + this.queueMaxAMShareDefault = queueMaxAMShareDefault; this.defaultSchedulingPolicy = defaultSchedulingPolicy; this.schedulingPolicies = schedulingPolicies; this.minSharePreemptionTimeouts = minSharePreemptionTimeouts; @@ -116,8 +124,10 @@ public AllocationConfiguration(Configuration conf) { queueWeights = new HashMap(); queueMaxApps = new HashMap(); userMaxApps = new HashMap(); + queueMaxAMShares = new HashMap(); userMaxAppsDefault = Integer.MAX_VALUE; queueMaxAppsDefault = Integer.MAX_VALUE; + queueMaxAMShareDefault = 1.0f; queueAcls = new HashMap>(); minSharePreemptionTimeouts = new HashMap(); defaultMinSharePreemptionTimeout = Long.MAX_VALUE; @@ -183,7 +193,11 @@ public int getQueueMaxApps(String queue) { Integer maxApps = queueMaxApps.get(queue); return (maxApps == null) ? queueMaxAppsDefault : maxApps; } - + + public float getQueueMaxAMShare(String queue) { + Float maxAMShare = queueMaxAMShares.get(queue); + return (maxAMShare == null) ? queueMaxAMShareDefault : maxAMShare; + } /** * Get the minimum resource allocation for the given queue. * @return the cap set on this queue, or 0 if not set. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java index bedbb64..c135c09 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java @@ -200,6 +200,7 @@ public synchronized void reloadAllocations() throws IOException, Map maxQueueResources = new HashMap(); Map queueMaxApps = new HashMap(); Map userMaxApps = new HashMap(); + Map queueMaxAMShares = new HashMap(); Map queueWeights = new HashMap(); Map queuePolicies = new HashMap(); Map minSharePreemptionTimeouts = new HashMap(); @@ -207,6 +208,7 @@ public synchronized void reloadAllocations() throws IOException, new HashMap>(); int userMaxAppsDefault = Integer.MAX_VALUE; int queueMaxAppsDefault = Integer.MAX_VALUE; + float queueMaxAMShareDefault = 1.0f; long fairSharePreemptionTimeout = Long.MAX_VALUE; long defaultMinSharePreemptionTimeout = Long.MAX_VALUE; SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.DEFAULT_POLICY; @@ -273,6 +275,10 @@ public synchronized void reloadAllocations() throws IOException, String text = ((Text)element.getFirstChild()).getData().trim(); int val = Integer.parseInt(text); queueMaxAppsDefault = val; + } else if ("queueMaxAMShareDefault".equals(element.getTagName())) { + String text = ((Text)element.getFirstChild()).getData().trim(); + float val = Float.parseFloat(text); + queueMaxAMShareDefault = val; } else if ("defaultQueueSchedulingPolicy".equals(element.getTagName()) || "defaultQueueSchedulingMode".equals(element.getTagName())) { String text = ((Text)element.getFirstChild()).getData().trim(); @@ -297,8 +303,8 @@ public synchronized void reloadAllocations() throws IOException, parent = null; } loadQueue(parent, element, minQueueResources, maxQueueResources, - queueMaxApps, userMaxApps, queueWeights, queuePolicies, - minSharePreemptionTimeouts, queueAcls, + queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights, + queuePolicies, minSharePreemptionTimeouts, queueAcls, configuredQueues); } @@ -313,8 +319,8 @@ public synchronized void reloadAllocations() throws IOException, } AllocationConfiguration info = new AllocationConfiguration(minQueueResources, maxQueueResources, - queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault, - queueMaxAppsDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts, + queueMaxApps, userMaxApps, queueWeights, queueMaxAMShares, userMaxAppsDefault, + queueMaxAppsDefault, queueMaxAMShareDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts, queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout, newPlacementPolicy, configuredQueues); @@ -329,7 +335,8 @@ public synchronized void reloadAllocations() throws IOException, */ private void loadQueue(String parentName, Element element, Map minQueueResources, Map maxQueueResources, Map queueMaxApps, - Map userMaxApps, Map queueWeights, + Map userMaxApps, Map queueMaxAMShares, + Map queueWeights, Map queuePolicies, Map minSharePreemptionTimeouts, Map> queueAcls, @@ -358,9 +365,13 @@ private void loadQueue(String parentName, Element element, Map Resource val = FairSchedulerConfiguration.parseResourceConfigValue(text); maxQueueResources.put(queueName, val); } else if ("maxRunningApps".equals(field.getTagName())) { - String text = ((Text)field.getFirstChild()).getData().trim(); + String text = ((Text) field.getFirstChild()).getData().trim(); int val = Integer.parseInt(text); queueMaxApps.put(queueName, val); + } else if ("maxAMShare".equals(field.getTagName())) { + String text = ((Text) field.getFirstChild()).getData().trim(); + float val = Float.parseFloat(text); + queueMaxAMShares.put(queueName, val); } else if ("weight".equals(field.getTagName())) { String text = ((Text)field.getFirstChild()).getData().trim(); double val = Double.parseDouble(text); @@ -383,8 +394,9 @@ private void loadQueue(String parentName, Element element, Map } else if ("queue".endsWith(field.getTagName()) || "pool".equals(field.getTagName())) { loadQueue(queueName, field, minQueueResources, maxQueueResources, - queueMaxApps, userMaxApps, queueWeights, queuePolicies, - minSharePreemptionTimeouts, queueAcls, configuredQueues); + queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights, + queuePolicies, minSharePreemptionTimeouts, queueAcls, + configuredQueues); configuredQueues.get(FSQueueType.PARENT).add(queueName); isLeaf = false; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index fab9ebe..f1ba7fd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -449,13 +449,13 @@ public int compare(RMContainer c1, RMContainer c2) { } } } - + private void warnOrKillContainer(RMContainer container, FSSchedulerApp app, FSLeafQueue queue) { LOG.info("Preempting container (prio=" + container.getContainer().getPriority() + "res=" + container.getContainer().getResource() + ") from queue " + queue.getName()); - + Long time = app.getContainerPreemptionTime(container); if (time != null) { @@ -676,9 +676,15 @@ protected synchronized void addApplicationAttempt( + " to scheduler from user: " + user); rmContext.getDispatcher().getEventHandler().handle( new RMAppAttemptEvent(applicationAttemptId, - RMAppAttemptEventType.ATTEMPT_ADDED)); + RMAppAttemptEventType.ATTEMPT_ADDED) + ); } + /*protected RMAppAttempt getCurrentAppAttempt(ApplicationId appId) { + RMApp app = rmContext.getRMApps().get(appId); + return app.getCurrentAppAttempt(); + }*/ + /** * Helper method that attempts to assign the app to a queue. The method is * responsible to call the appropriate event-handler if the app is rejected. @@ -1367,6 +1373,7 @@ public void onReload(AllocationConfiguration queueInfo) { allocConf = queueInfo; allocConf.getDefaultSchedulingPolicy().initialize(clusterCapacity); queueMgr.updateAllocationConfiguration(allocConf); + maxRunningEnforcer.updateRunnabilityOnConfigurationReload(); } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java index 359519a..b2e55c4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java @@ -30,10 +30,14 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ListMultimap; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; /** - * Handles tracking and enforcement for user and queue maxRunningApps - * constraints + * Handles tracking and enforcement for user and queue maxRunningApps & + * maxAMResourceShare constraints */ public class MaxRunningAppsEnforcer { private static final Log LOG = LogFactory.getLog(FairScheduler.class); @@ -45,6 +49,9 @@ @VisibleForTesting final ListMultimap usersNonRunnableApps; + private static final ResourceCalculator RESOURCE_CALCULATOR = + new DefaultResourceCalculator(); + public MaxRunningAppsEnforcer(FairScheduler scheduler) { this.scheduler = scheduler; this.usersNumRunnableApps = new HashMap(); @@ -53,9 +60,9 @@ public MaxRunningAppsEnforcer(FairScheduler scheduler) { /** * Checks whether making the application runnable would exceed any - * maxRunningApps limits. + * maxRunningApps and queueMaxAMShare limits. */ - public boolean canAppBeRunnable(FSQueue queue, String user) { + public boolean canAppBeRunnable(FSQueue queue, String user){ AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); Integer userNumRunnable = usersNumRunnableApps.get(user); if (userNumRunnable == null) { @@ -66,7 +73,7 @@ public boolean canAppBeRunnable(FSQueue queue, String user) { } // Check queue and all parent queues while (queue != null) { - int queueMaxApps = allocConf.getQueueMaxApps(queue.getName()); + int queueMaxApps = getMaxAppPerQueue(queue); if (queue.getNumRunnableApps() >= queueMaxApps) { return false; } @@ -113,7 +120,7 @@ public void trackNonRunnableApp(FSSchedulerApp app) { */ public void updateRunnabilityOnAppRemoval(FSSchedulerApp app, FSLeafQueue queue) { AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); - + // childqueueX might have no pending apps itself, but if a queue higher up // in the hierarchy parentqueueY has a maxRunningApps set, an app completion // in childqueueX could allow an app in some other distant child of @@ -126,8 +133,7 @@ public void updateRunnabilityOnAppRemoval(FSSchedulerApp app, FSLeafQueue queue) allocConf.getQueueMaxApps(queue.getName()) - 1) ? queue : null; FSParentQueue parent = queue.getParent(); while (parent != null) { - if (parent.getNumRunnableApps() == allocConf.getQueueMaxApps(parent - .getName()) - 1) { + if (parent.getNumRunnableApps() == getMaxAppPerQueue(parent) - 1) { highestQueueWithAppsNowRunnable = parent; } parent = parent.getParent(); @@ -156,6 +162,30 @@ public void updateRunnabilityOnAppRemoval(FSSchedulerApp app, FSLeafQueue queue) } } + checkMaybeRunnableApps(appsNowMaybeRunnable); + } + + /** + * Checks to see whether any other applications runnable. + * + * This may happen when the configuration is reloaded. + */ + public void updateRunnabilityOnConfigurationReload() { + // Gather all pending apps from all leaf queues + List> appsNowMaybeRunnable = + new ArrayList>(); + for(FSLeafQueue leafQueue : scheduler.getQueueManager().getLeafQueues()) { + if (! leafQueue.getNonRunnableAppSchedulables().isEmpty()) { + appsNowMaybeRunnable.add(leafQueue.getNonRunnableAppSchedulables()); + } + } + + checkMaybeRunnableApps(appsNowMaybeRunnable); + } + + + private void checkMaybeRunnableApps( + List> appsNowMaybeRunnable) { // Scan through and check whether this means that any apps are now runnable Iterator iter = new MultiListStartTimeIterator( appsNowMaybeRunnable); @@ -167,6 +197,8 @@ public void updateRunnabilityOnAppRemoval(FSSchedulerApp app, FSLeafQueue queue) continue; } + // Here we already sort all apps by submitted time. + // we should call the function here, as the checker only works when app is submitted. if (canAppBeRunnable(next.getQueue(), next.getUser())) { trackRunnableApp(next); AppSchedulable appSched = next.getAppSchedulable(); @@ -182,7 +214,7 @@ public void updateRunnabilityOnAppRemoval(FSSchedulerApp app, FSLeafQueue queue) prev = next; } - + // We remove the apps from their pending lists afterwards so that we don't // pull them out from under the iterator. If they are not in these lists // in the first place, there is a bug. @@ -192,14 +224,14 @@ public void updateRunnabilityOnAppRemoval(FSSchedulerApp app, FSLeafQueue queue) LOG.error("Can't make app runnable that does not already exist in queue" + " as non-runnable: " + appSched + ". This should never happen."); } - + if (!usersNonRunnableApps.remove(appSched.getApp().getUser(), appSched)) { LOG.error("Waiting app " + appSched + " expected to be in " - + "usersNonRunnableApps, but was not. This should never happen."); + + "usersNonRunnableApps, but was not. This should never happen."); } } } - + /** * Updates the relevant tracking variables after a runnable app with the given * queue and user has been removed. @@ -215,8 +247,7 @@ public void untrackRunnableApp(FSSchedulerApp app) { } // Update runnable app bookkeeping for queues - FSLeafQueue queue = app.getQueue(); - FSParentQueue parent = queue.getParent(); + FSParentQueue parent = app.getQueue().getParent(); while (parent != null) { parent.decrementRunnableApps(); parent = parent.getParent(); @@ -236,8 +267,7 @@ public void untrackNonRunnableApp(FSSchedulerApp app) { */ private void gatherPossiblyRunnableAppLists(FSQueue queue, List> appLists) { - if (queue.getNumRunnableApps() < scheduler.getAllocationConfiguration() - .getQueueMaxApps(queue.getName())) { + if (queue.getNumRunnableApps() < getMaxAppPerQueue(queue)) { if (queue instanceof FSLeafQueue) { appLists.add(((FSLeafQueue)queue).getNonRunnableAppSchedulables()); } else { @@ -249,6 +279,27 @@ private void gatherPossiblyRunnableAppLists(FSQueue queue, } /** + * Get the maximum allowed number of applications for the given queue + */ + protected int getMaxAppPerQueue(FSQueue queue) { + AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); + int queueMaxApps = allocConf.getQueueMaxApps(queue.getName()); + // We also need to take care about the queueMaxAMShare + float maxQueueAMShare = allocConf.getQueueMaxAMShare(queue.getName()); + if (maxQueueAMShare == 1.0f) { + return queueMaxApps; + } + Resource queueMaxAMResource = + Resources.multiply(queue.getMaxShare(), maxQueueAMShare); + int queueMaxAppsBasedOnAMShare = Math.max(1, + ((int) Math.ceil(Resources.ratio(RESOURCE_CALCULATOR, + queueMaxAMResource, scheduler.getMinimumResourceCapability())))); + queueMaxApps = Math.min(queueMaxApps, queueMaxAppsBasedOnAMShare); + + return queueMaxApps; + } + + /** * Takes a list of lists, each of which is ordered by start time, and returns * their elements in order of start time. * diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java index 2a725d8..b511227 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java @@ -174,9 +174,10 @@ public void testAllocationFileParsing() throws Exception { out.println(""); out.println("alice,bob admins"); out.println(""); - // Give queue D a limit of 3 running apps + // Give queue D a limit of 3 running apps and 0.4f maxAMResourceShare out.println(""); out.println("3"); + out.println("0.4"); out.println(""); // Give queue E a preemption timeout of one minute out.println(""); @@ -194,6 +195,8 @@ public void testAllocationFileParsing() throws Exception { out.println("15"); // Set default limit of apps per user to 5 out.println("5"); + // Set default limit of AMResourceShare to 0.5f + out.println("0.5f"); // Give user1 a limit of 10 jobs out.println(""); out.println("10"); @@ -240,6 +243,13 @@ public void testAllocationFileParsing() throws Exception { assertEquals(10, queueConf.getUserMaxApps("user1")); assertEquals(5, queueConf.getUserMaxApps("user2")); + assertEquals(.5f, queueConf.getQueueMaxAMShare("root." + YarnConfiguration.DEFAULT_QUEUE_NAME), 0.0001); + assertEquals(.5f, queueConf.getQueueMaxAMShare("root.queueA"), 0.01); + assertEquals(.5f, queueConf.getQueueMaxAMShare("root.queueB"), 0.01); + assertEquals(.5f, queueConf.getQueueMaxAMShare("root.queueC"), 0.01); + assertEquals(.4f, queueConf.getQueueMaxAMShare("root.queueD"), 0.01); + assertEquals(.5f, queueConf.getQueueMaxAMShare("root.queueE"), 0.01); + // Root should get * ACL assertEquals("*", queueConf.getQueueAcl("root", QueueACL.ADMINISTER_QUEUE).getAclString()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 2524763..108152f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -1534,6 +1534,187 @@ public void testUserMaxRunningApps() throws Exception { // Request should be fulfilled assertEquals(2, scheduler.getSchedulerApp(attId1).getLiveContainers().size()); } + + + @Test (timeout = 5000) + public void testUserMaxRunningApps2() throws Exception { + // Set max running apps + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("1"); + out.println(""); + out.println(""); + out.close(); + + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node + RMNode node1 = + MockNodes + .newNodeInfo(1, Resources.createResource(8192, 8), 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + // Request for app 1 + ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", + "user1", 1); + + scheduler.update(); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); + scheduler.handle(updateEvent); + + // App 1 should be running + assertEquals(1, scheduler.getSchedulerApp(attId1).getLiveContainers().size()); + + ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1", + "user1", 1); + + scheduler.update(); + scheduler.handle(updateEvent); + + // App 2 should not be running + assertEquals(0, scheduler.getSchedulerApp(attId2).getLiveContainers().size()); + + // Update userMaxApp + scheduler.getAllocationConfiguration().userMaxApps.put("user1", 2); + scheduler.maxRunningEnforcer.updateRunnabilityOnConfigurationReload(); + scheduler.update(); + scheduler.handle(updateEvent); + assertEquals(1, scheduler.getSchedulerApp(attId2).getLiveContainers().size()); + } + + @Test + public void testQueueMaxAMShare() throws Exception { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1024); + conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true); + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + + out.println(""); + out.println("1024 mb, 1 vcores"); + out.println("10240 mb, 10 vcores"); + out.println("0.4"); + out.println("0.2"); + out.println(""); + + out.println(""); + out.println("2048 mb, 2 vcores"); + out.println("10240 mb, 10 vcores"); + out.println("0.6"); + out.println("0.3"); + out.println(""); + out.println("1024 mb, 1 vcores"); + out.println("5120 mb, 5 vcores"); + out.println("0.5"); + out.println("0.3"); + out.println(""); + out.println(""); + out.println("1024 mb, 1 vcores"); + out.println("5120 mb, 5 vcores"); + out.println("0.5"); + out.println("0.3"); + out.println(""); + out.println(""); + + out.println("10"); + out.println("10"); + out.println(""); + out.close(); + + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node + RMNode node1 = + MockNodes + .newNodeInfo(1, Resources.createResource(10240, 10), 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + // The max number apps allowed for each queue + assertEquals(2, scheduler.maxRunningEnforcer.getMaxAppPerQueue( + scheduler.getQueueManager().getQueue("root.leaf1"))); + assertEquals(3, scheduler.maxRunningEnforcer.getMaxAppPerQueue( + scheduler.getQueueManager().getQueue("root.parent1"))); + assertEquals(2, scheduler.maxRunningEnforcer.getMaxAppPerQueue( + scheduler.getQueueManager().getQueue("root.parent1.leaf2"))); + assertEquals(2, scheduler.maxRunningEnforcer.getMaxAppPerQueue( + scheduler.getQueueManager().getQueue("root.parent1.leaf3"))); + + // Submit three apps to leaf 1 + ApplicationAttemptId attId1 = createSchedulingRequest(1024, "leaf1", + "user", 1); + ApplicationAttemptId attId2 = createSchedulingRequest(1024, "leaf1", + "user1", 1); + ApplicationAttemptId attId3 = createSchedulingRequest(1024, "leaf1", + "user1", 1); + scheduler.update(); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); + scheduler.handle(updateEvent); + assertEquals("App 1 should be running", 1, + scheduler.getSchedulerApp(attId1).getLiveContainers().size()); + assertEquals("App 2 should be running", 1, + scheduler.getSchedulerApp(attId2).getLiveContainers().size()); + assertEquals("App 3 should not be running", 0, + scheduler.getSchedulerApp(attId3).getLiveContainers().size()); + + // Request another container for app 1 + createSchedulingRequestExistingApplication(1024, 1, attId1); + scheduler.update(); + scheduler.handle(updateEvent); + // Request should be fulfilled + assertEquals("App 1 should have 2 running containers", + 2, scheduler.getSchedulerApp(attId1).getLiveContainers().size()); + + // Update leaf 1's queueMaxAMShare to 0.3f, and app 3 can run + scheduler.getAllocationConfiguration().queueMaxAMShares.put("root.leaf1", 0.3f); + assertEquals(3, scheduler.maxRunningEnforcer.getMaxAppPerQueue( + scheduler.getQueueManager().getQueue("root.leaf1"))); + scheduler.maxRunningEnforcer.updateRunnabilityOnConfigurationReload(); + scheduler.update(); + scheduler.handle(updateEvent); + assertEquals("App 3 should be running", 1, + scheduler.getSchedulerApp(attId3).getLiveContainers().size()); + + // Submit two apps to leaf 2 + ApplicationAttemptId attId4 = createSchedulingRequest(1024, "parent1.leaf2", + "user1", 1); + ApplicationAttemptId attId5 = createSchedulingRequest(1024, "parent1.leaf2", + "user1", 1); + scheduler.update(); + scheduler.handle(updateEvent); + assertEquals("App 4 should be running", 1, + scheduler.getSchedulerApp(attId4).getLiveContainers().size()); + assertEquals("App 5 should be running", 1, + scheduler.getSchedulerApp(attId5).getLiveContainers().size()); + + // Submit two apps to leaf 3 + ApplicationAttemptId attId6 = createSchedulingRequest(1024, "parent1.leaf3", + "user1", 1); + ApplicationAttemptId attId7 = createSchedulingRequest(1024, "parent1.leaf3", + "user1", 1); + scheduler.update(); + scheduler.handle(updateEvent); + assertEquals("App 6 should be running", 1, + scheduler.getSchedulerApp(attId6).getLiveContainers().size()); + assertEquals("App 7 should not be running", 0, + scheduler.getSchedulerApp(attId7).getLiveContainers().size()); + + // Finish one app from leaf 2 + AppAttemptRemovedSchedulerEvent appRemovedEvent4 = + new AppAttemptRemovedSchedulerEvent(attId4, + RMAppAttemptState.FINISHED, false); + scheduler.handle(appRemovedEvent4); + scheduler.update(); + scheduler.handle(updateEvent); + assertEquals("App 7 should be running", 1, + scheduler.getSchedulerApp(attId7).getLiveContainers().size()); + } @Test (timeout = 5000) public void testReservationWhileMultiplePriorities() throws IOException { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java index c1866f0..7c3d2d1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java @@ -30,13 +30,19 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; +import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Before; import org.junit.Test; public class TestMaxRunningAppsEnforcer { + private FairScheduler scheduler; private QueueManager queueManager; + private AllocationConfiguration allocConf; private Map queueMaxApps; private Map userMaxApps; + private Map queueMaxAMShares; private MaxRunningAppsEnforcer maxAppsEnforcer; private int appNum; private TestFairScheduler.MockClock clock; @@ -45,18 +51,18 @@ public void setup() throws Exception { Configuration conf = new Configuration(); clock = new TestFairScheduler.MockClock(); - FairScheduler scheduler = mock(FairScheduler.class); + scheduler = mock(FairScheduler.class); when(scheduler.getConf()).thenReturn( new FairSchedulerConfiguration(conf)); when(scheduler.getClock()).thenReturn(clock); - AllocationConfiguration allocConf = new AllocationConfiguration( - conf); + allocConf = new AllocationConfiguration(conf); when(scheduler.getAllocationConfiguration()).thenReturn(allocConf); queueManager = new QueueManager(scheduler); queueManager.initialize(conf); queueMaxApps = allocConf.queueMaxApps; userMaxApps = allocConf.userMaxApps; + queueMaxAMShares = allocConf.queueMaxAMShares; maxAppsEnforcer = new MaxRunningAppsEnforcer(scheduler); appNum = 0; } @@ -80,7 +86,44 @@ private void removeApp(FSSchedulerApp app) { maxAppsEnforcer.untrackRunnableApp(app); maxAppsEnforcer.updateRunnabilityOnAppRemoval(app, app.getQueue()); } - + + @Test + public void testGetMaxAppPerQueue() { + Resource minAllocation = Resource.newInstance(1024, 1); + when(scheduler.getMinimumResourceCapability()).thenReturn(minAllocation); + FSParentQueue root = queueManager.getRootQueue(); + + FSLeafQueue leaf1 = queueManager.getLeafQueue("root.queue1", true); + FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue2", true); + queueMaxApps.put("root", 10); + queueMaxApps.put("root.queue1", 5); + queueMaxApps.put("root.queue2", 5); + queueMaxAMShares.put("root", 0.5f); + queueMaxAMShares.put("root.queue1", 0.5f); + queueMaxAMShares.put("root.queue2", 0.5f); + + // Now limited by queueMaxAMShare + Resource maxShare1 = Resource.newInstance(1024 * 10, 10); + Resource maxShare2 = Resource.newInstance(1024 * 5, 5); + allocConf.maxQueueResources.put("root", maxShare1); + allocConf.maxQueueResources.put("root.queue1", maxShare2); + allocConf.maxQueueResources.put("root.queue2", maxShare2); + + assertEquals(5, maxAppsEnforcer.getMaxAppPerQueue(root)); + assertEquals(3, maxAppsEnforcer.getMaxAppPerQueue(leaf1)); + assertEquals(3, maxAppsEnforcer.getMaxAppPerQueue(leaf2)); + + // Increase the maxShare, now limited by maxApps + maxShare1.setMemory(1024 * 100); + maxShare1.setVirtualCores(100); + maxShare2.setMemory(1024 * 50); + maxShare2.setVirtualCores(50); + + assertEquals(10, maxAppsEnforcer.getMaxAppPerQueue(root)); + assertEquals(5, maxAppsEnforcer.getMaxAppPerQueue(leaf1)); + assertEquals(5, maxAppsEnforcer.getMaxAppPerQueue(leaf2)); + } + @Test public void testRemoveDoesNotEnableAnyApp() { FSLeafQueue leaf1 = queueManager.getLeafQueue("root.queue1", true); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm index 02890a1..d98d8f3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm @@ -233,6 +233,11 @@ Allocation file format * maxRunningApps: limit the number of apps from the queue to run at once + * maxAMShare: limit the resource share from the queue that can be used to run + application master. maxAMShare * queue_max_share is the maximum resources + that can be used by AMs in that queue. Default value is 1.0f, which means + AMs in that queue can take up to 100% resource of max share. + * weight: to share the cluster non-proportionally with other queues. Weights default to 1, and a queue with weight 2 should receive approximately twice as many resources as a queue with the default weight. @@ -275,6 +280,9 @@ Allocation file format * <>, which sets the default running app limit for queues; overriden by maxRunningApps element in each queue. + * <>, which sets the default AM resource + limit for queue; overriden by maxAMShare element in each queue. + * <>, which sets the default scheduling policy for queues; overriden by the schedulingPolicy element in each queue if specified. Defaults to "fair". @@ -323,6 +331,7 @@ Allocation file format 10000 mb,0vcores 90000 mb,0vcores 50 + 0.1 2.0 fair @@ -331,6 +340,8 @@ Allocation file format + 0.5 +