diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java index 0f9d906..237cad2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java @@ -53,6 +53,10 @@ private final int userMaxAppsDefault; private final int queueMaxAppsDefault; + // Maximum resource share for each leaf queue that can be used to run AMs + final Map queueMaxAMShares; + private final float queueMaxAMShareDefault; + // ACL's for each queue. Only specifies non-default ACL's from configuration. private final Map> queueAcls; @@ -84,8 +88,9 @@ public AllocationConfiguration(Map minQueueResources, Map maxQueueResources, Map queueMaxApps, Map userMaxApps, - Map queueWeights, int userMaxAppsDefault, - int queueMaxAppsDefault, + Map queueWeights, + Map queueMaxAMShares, int userMaxAppsDefault, + int queueMaxAppsDefault, float queueMaxAMShareDefault, Map schedulingPolicies, SchedulingPolicy defaultSchedulingPolicy, Map minSharePreemptionTimeouts, @@ -97,9 +102,11 @@ public AllocationConfiguration(Map minQueueResources, this.maxQueueResources = maxQueueResources; this.queueMaxApps = queueMaxApps; this.userMaxApps = userMaxApps; + this.queueMaxAMShares = queueMaxAMShares; this.queueWeights = queueWeights; this.userMaxAppsDefault = userMaxAppsDefault; this.queueMaxAppsDefault = queueMaxAppsDefault; + this.queueMaxAMShareDefault = queueMaxAMShareDefault; this.defaultSchedulingPolicy = defaultSchedulingPolicy; this.schedulingPolicies = schedulingPolicies; this.minSharePreemptionTimeouts = minSharePreemptionTimeouts; @@ -116,8 +123,10 @@ public AllocationConfiguration(Configuration conf) { queueWeights = new HashMap(); queueMaxApps = new HashMap(); userMaxApps = new HashMap(); + queueMaxAMShares = new HashMap(); userMaxAppsDefault = Integer.MAX_VALUE; queueMaxAppsDefault = Integer.MAX_VALUE; + queueMaxAMShareDefault = 1.0f; queueAcls = new HashMap>(); minSharePreemptionTimeouts = new HashMap(); defaultMinSharePreemptionTimeout = Long.MAX_VALUE; @@ -184,6 +193,11 @@ public int getQueueMaxApps(String queue) { return (maxApps == null) ? queueMaxAppsDefault : maxApps; } + public float getQueueMaxAMShare(String queue) { + Float maxAMShare = queueMaxAMShares.get(queue); + return (maxAMShare == null) ? queueMaxAMShareDefault : maxAMShare; + } + /** * Get the minimum resource allocation for the given queue. * @return the cap set on this queue, or 0 if not set. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java index bedbb64..8fa6221 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java @@ -200,6 +200,7 @@ public synchronized void reloadAllocations() throws IOException, Map maxQueueResources = new HashMap(); Map queueMaxApps = new HashMap(); Map userMaxApps = new HashMap(); + Map queueMaxAMShares = new HashMap(); Map queueWeights = new HashMap(); Map queuePolicies = new HashMap(); Map minSharePreemptionTimeouts = new HashMap(); @@ -207,6 +208,7 @@ public synchronized void reloadAllocations() throws IOException, new HashMap>(); int userMaxAppsDefault = Integer.MAX_VALUE; int queueMaxAppsDefault = Integer.MAX_VALUE; + float queueMaxAMShareDefault = 1.0f; long fairSharePreemptionTimeout = Long.MAX_VALUE; long defaultMinSharePreemptionTimeout = Long.MAX_VALUE; SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.DEFAULT_POLICY; @@ -273,6 +275,11 @@ public synchronized void reloadAllocations() throws IOException, String text = ((Text)element.getFirstChild()).getData().trim(); int val = Integer.parseInt(text); queueMaxAppsDefault = val; + } else if ("queueMaxAMShareDefault".equals(element.getTagName())) { + String text = ((Text)element.getFirstChild()).getData().trim(); + float val = Float.parseFloat(text); + val = val > 1.0f ? 1.0f : val; + queueMaxAMShareDefault = val; } else if ("defaultQueueSchedulingPolicy".equals(element.getTagName()) || "defaultQueueSchedulingMode".equals(element.getTagName())) { String text = ((Text)element.getFirstChild()).getData().trim(); @@ -297,8 +304,8 @@ public synchronized void reloadAllocations() throws IOException, parent = null; } loadQueue(parent, element, minQueueResources, maxQueueResources, - queueMaxApps, userMaxApps, queueWeights, queuePolicies, - minSharePreemptionTimeouts, queueAcls, + queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights, + queuePolicies, minSharePreemptionTimeouts, queueAcls, configuredQueues); } @@ -313,8 +320,8 @@ public synchronized void reloadAllocations() throws IOException, } AllocationConfiguration info = new AllocationConfiguration(minQueueResources, maxQueueResources, - queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault, - queueMaxAppsDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts, + queueMaxApps, userMaxApps, queueWeights, queueMaxAMShares, userMaxAppsDefault, + queueMaxAppsDefault, queueMaxAMShareDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts, queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout, newPlacementPolicy, configuredQueues); @@ -329,7 +336,8 @@ public synchronized void reloadAllocations() throws IOException, */ private void loadQueue(String parentName, Element element, Map minQueueResources, Map maxQueueResources, Map queueMaxApps, - Map userMaxApps, Map queueWeights, + Map userMaxApps, Map queueMaxAMShares, + Map queueWeights, Map queuePolicies, Map minSharePreemptionTimeouts, Map> queueAcls, @@ -361,6 +369,11 @@ private void loadQueue(String parentName, Element element, Map String text = ((Text)field.getFirstChild()).getData().trim(); int val = Integer.parseInt(text); queueMaxApps.put(queueName, val); + } else if ("maxAMShare".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + float val = Float.parseFloat(text); + val = val > 1.0f ? 1.0f : val; + queueMaxAMShares.put(queueName, val); } else if ("weight".equals(field.getTagName())) { String text = ((Text)field.getFirstChild()).getData().trim(); double val = Double.parseDouble(text); @@ -383,8 +396,9 @@ private void loadQueue(String parentName, Element element, Map } else if ("queue".endsWith(field.getTagName()) || "pool".equals(field.getTagName())) { loadQueue(queueName, field, minQueueResources, maxQueueResources, - queueMaxApps, userMaxApps, queueWeights, queuePolicies, - minSharePreemptionTimeouts, queueAcls, configuredQueues); + queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights, + queuePolicies, minSharePreemptionTimeouts, queueAcls, + configuredQueues); configuredQueues.get(FSQueueType.PARENT).add(queueName); isLeaf = false; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 6d71ea2..4e73cfa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -175,7 +175,7 @@ protected int maxAssign; // Max containers to assign per heartbeat @VisibleForTesting - final MaxRunningAppsEnforcer maxRunningEnforcer; + final MaxRunningAppsAndAMShareEnforcer maxRunningEnforcer; private AllocationFileLoaderService allocsLoader; @VisibleForTesting @@ -185,7 +185,7 @@ public FairScheduler() { clock = new SystemClock(); allocsLoader = new AllocationFileLoaderService(); queueMgr = new QueueManager(this); - maxRunningEnforcer = new MaxRunningAppsEnforcer(this); + maxRunningEnforcer = new MaxRunningAppsAndAMShareEnforcer(this); } private void validateConf(Configuration conf) { @@ -606,12 +606,24 @@ protected synchronized void addApplicationAttempt( } application.setCurrentAppAttempt(attempt); - boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, user); + Resource amResource = null; + if (rmContext.getRMApps().containsKey(attempt.getApplicationId()) + && rmContext.getRMApps().get(attempt.getApplicationId()) + .getApplicationSubmissionContext() != null) { + amResource = rmContext.getRMApps().get(attempt.getApplicationId()) + .getApplicationSubmissionContext().getResource(); + } + if (amResource == null) { + amResource = Resource.newInstance(0, 0); + } + boolean runnable = + maxRunningEnforcer.canAppBeRunnable(attempt, queue, amResource); queue.addApp(attempt, runnable); + if (runnable) { - maxRunningEnforcer.trackRunnableApp(attempt); + maxRunningEnforcer.trackRunnableApp(attempt, amResource); } else { - maxRunningEnforcer.trackNonRunnableApp(attempt); + maxRunningEnforcer.trackNonRunnableApp(attempt, amResource); } queue.getMetrics().submitAppAttempt(user); @@ -1379,8 +1391,18 @@ private void executeMove(SchedulerApplication app, FSSchedulerApp attempt, FSLeafQueue oldQueue, FSLeafQueue newQueue) { boolean wasRunnable = oldQueue.removeApp(attempt); // if app was not runnable before, it may be runnable now - boolean nowRunnable = maxRunningEnforcer.canAppBeRunnable(newQueue, - attempt.getUser()); + Resource amResource = null; + if (rmContext.getRMApps().containsKey(attempt.getApplicationId()) + && rmContext.getRMApps().get(attempt.getApplicationId()) + .getApplicationSubmissionContext() != null) { + amResource = rmContext.getRMApps().get(attempt.getApplicationId()) + .getApplicationSubmissionContext().getResource(); + } + if (amResource == null) { + amResource = Resource.newInstance(0, 0); + } + boolean nowRunnable = + maxRunningEnforcer.canAppBeRunnable(attempt, newQueue, amResource); if (wasRunnable && !nowRunnable) { throw new IllegalStateException("Should have already verified that app " + attempt.getApplicationId() + " would be runnable in new queue"); @@ -1398,7 +1420,7 @@ private void executeMove(SchedulerApplication app, newQueue.addApp(attempt, nowRunnable); if (nowRunnable) { - maxRunningEnforcer.trackRunnableApp(attempt); + maxRunningEnforcer.trackRunnableApp(attempt, amResource); } if (wasRunnable) { maxRunningEnforcer.updateRunnabilityOnAppRemoval(attempt, oldQueue); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsAndAMShareEnforcer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsAndAMShareEnforcer.java new file mode 100644 index 0000000..871bf8c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsAndAMShareEnforcer.java @@ -0,0 +1,385 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ListMultimap; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.resource.Resources; + +/** + * Handles tracking and enforcement for user's maxRunningApps and leaf queue's + * maxAMResourceShares constraints + */ +public class MaxRunningAppsAndAMShareEnforcer { + private static final Log LOG = LogFactory.getLog(FairScheduler.class); + + private final FairScheduler scheduler; + + // Tracks the number of running applications by user. + private final Map usersNumRunnableApps; + @VisibleForTesting + final ListMultimap usersNonRunnableApps; + + // Tracks the AM resource usage for each leaf queue. + private final Map queueAMResourceUsages; + + // Tracks the AM resource reqeust for each application. + private final Map appAMResourceRequests; + + public MaxRunningAppsAndAMShareEnforcer(FairScheduler scheduler) { + this.scheduler = scheduler; + this.usersNumRunnableApps = new HashMap(); + this.usersNonRunnableApps = ArrayListMultimap.create(); + this.queueAMResourceUsages = new HashMap(); + this.appAMResourceRequests = new HashMap(); + } + + /** + * Checks whether making the application runnable would exceed any + * maxRunningApps and queueMaxAMShare limits. + */ + public boolean canAppBeRunnable(FSSchedulerApp app, FSQueue queue, + Resource amResource){ + String user = app.getUser(); + // Check user's maxRunningApps limit + AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); + Integer userNumRunnable = usersNumRunnableApps.get(user); + if (userNumRunnable == null) { + userNumRunnable = 0; + } + if (userNumRunnable >= allocConf.getUserMaxApps(user)) { + return false; + } + // Check leaf queue's AM share + Resource maxAMResource = Resources.multiply(queue.getFairShare(), + allocConf.getQueueMaxAMShare(queue.getName())); + Resource ifRunAMResource = + queueAMResourceUsages.containsKey(queue.getName()) ? + Resources.clone(queueAMResourceUsages.get(queue.getName())) : + Resource.newInstance(0, 0); + Resources.addTo(ifRunAMResource, amResource); + if (queue.getPolicy() + .checkIfAMResourceUsageOverLimit(ifRunAMResource, maxAMResource)) { + return false; + } + // Check queue and all parent queues' maxRunningApps + while (queue != null) { + int queueMaxApps = allocConf.getQueueMaxApps(queue.getName()); + if (queue.getNumRunnableApps() >= queueMaxApps) { + return false; + } + queue = queue.getParent(); + } + + return true; + } + + /** + * Tracks the given new runnable app for purposes of maintaining max running + * app limits. + */ + public void trackRunnableApp(FSSchedulerApp app, Resource amResource) { + appAMResourceRequests.put(app.getApplicationId(), amResource); + + String user = app.getUser(); + FSLeafQueue queue = app.getQueue(); + // Increment running counts for all parent queues + FSParentQueue parent = queue.getParent(); + while (parent != null) { + parent.incrementRunnableApps(); + parent = parent.getParent(); + } + + Integer userNumRunnable = usersNumRunnableApps.get(user); + usersNumRunnableApps.put(user, (userNumRunnable == null ? 0 + : userNumRunnable) + 1); + + // Increment AM resource usage for the leaf queue + Resource queueAMResourceUsage = + queueAMResourceUsages.get(app.getQueue().getName()); + if (queueAMResourceUsage == null) { + queueAMResourceUsage = Resource.newInstance(0, 0); + queueAMResourceUsages.put(app.getQueue().getName(), queueAMResourceUsage); + } + Resources.addTo(queueAMResourceUsage, amResource); + } + + /** + * Tracks the given new non runnable app so that it can be made runnable when + * it would not violate max running app limits. + */ + public void trackNonRunnableApp(FSSchedulerApp app, Resource amResource) { + appAMResourceRequests.put(app.getApplicationId(), amResource); + String user = app.getUser(); + usersNonRunnableApps.put(user, app.getAppSchedulable()); + } + + /** + * Checks to see whether any other applications runnable now that the given + * application has been removed from the given queue. And makes them so. + * + * Runs in O(n log(n)) where n is the number of queues that are under the + * highest queue that went from having no slack to having slack. + */ + public void updateRunnabilityOnAppRemoval(FSSchedulerApp app, FSLeafQueue queue) { + AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); + + // childqueueX might have no pending apps itself, but if a queue higher up + // in the hierarchy parentqueueY has a maxRunningApps set, an app completion + // in childqueueX could allow an app in some other distant child of + // parentqueueY to become runnable. + // An app removal will only possibly allow another app to become runnable if + // the queue was already at its max before the removal. + // Thus we find the ancestor queue highest in the tree for which the app + // that was at its maxRunningApps before the removal. + // Here we also need to consider whether current leaf queue has nonrunnable + // apps due to its maxAMShare. + FSQueue highestQueueWithAppsNowRunnable = queue; + FSParentQueue parent = queue.getParent(); + while (parent != null) { + if (parent.getNumRunnableApps() == allocConf.getQueueMaxApps(parent + .getName()) - 1) { + highestQueueWithAppsNowRunnable = parent; + } + parent = parent.getParent(); + } + + List> appsNowMaybeRunnable = + new ArrayList>(); + + // Compile lists of apps which may now be runnable + // We gather lists instead of building a set of all non-runnable apps so + // that this whole operation can be O(number of queues) instead of + // O(number of apps) + gatherPossiblyRunnableAppLists(highestQueueWithAppsNowRunnable, + appsNowMaybeRunnable); + String user = app.getUser(); + Integer userNumRunning = usersNumRunnableApps.get(user); + if (userNumRunning == null) { + userNumRunning = 0; + } + if (userNumRunning == allocConf.getUserMaxApps(user) - 1) { + List userWaitingApps = usersNonRunnableApps.get(user); + if (userWaitingApps != null) { + appsNowMaybeRunnable.add(userWaitingApps); + } + } + + // Scan through and check whether this means that any apps are now runnable + Iterator iter = new MultiListStartTimeIterator( + appsNowMaybeRunnable); + FSSchedulerApp prev = null; + List noLongerPendingApps = new ArrayList(); + while (iter.hasNext()) { + FSSchedulerApp next = iter.next(); + if (next == prev) { + continue; + } + + Resource amResource = + appAMResourceRequests.get(next.getApplicationId()); + if (canAppBeRunnable(next, next.getQueue(), amResource)) { + untrackNonRunnableApp(next); + trackRunnableApp(next, amResource); + AppSchedulable appSched = next.getAppSchedulable(); + next.getQueue().getRunnableAppSchedulables().add(appSched); + noLongerPendingApps.add(appSched); + + // No more than one app per list will be able to be made runnable, so + // we can stop looking after we've found that many + if (noLongerPendingApps.size() >= appsNowMaybeRunnable.size()) { + break; + } + } + + prev = next; + } + + // We remove the apps from their pending lists afterwards so that we don't + // pull them out from under the iterator. If they are not in these lists + // in the first place, there is a bug. + for (AppSchedulable appSched : noLongerPendingApps) { + if (!appSched.getApp().getQueue().getNonRunnableAppSchedulables() + .remove(appSched)) { + LOG.error("Can't make app runnable that does not already exist in queue" + + " as non-runnable: " + appSched + ". This should never happen."); + } + + if (!usersNonRunnableApps.remove(appSched.getApp().getUser(), appSched)) { + LOG.error("Waiting app " + appSched + " expected to be in " + + "usersNonRunnableApps, but was not. This should never happen."); + } + } + } + + /** + * Updates the relevant tracking variables after a runnable app with the given + * queue and user has been removed. + */ + public void untrackRunnableApp(FSSchedulerApp app) { + // Update usersRunnableApps + String user = app.getUser(); + int newUserNumRunning = usersNumRunnableApps.get(user) - 1; + if (newUserNumRunning == 0) { + usersNumRunnableApps.remove(user); + } else { + usersNumRunnableApps.put(user, newUserNumRunning); + } + + // Update runnable app bookkeeping for queues + FSLeafQueue queue = app.getQueue(); + FSParentQueue parent = queue.getParent(); + while (parent != null) { + parent.decrementRunnableApps(); + parent = parent.getParent(); + } + + // Update AMResourceUsage for the leaf queue + Resource amResource = appAMResourceRequests.remove(app.getApplicationId()); + Resources.subtractFrom(queueAMResourceUsages.get(app.getQueue().getName()), + amResource); + } + + /** + * Stops tracking the given non-runnable app + */ + public void untrackNonRunnableApp(FSSchedulerApp app) { + usersNonRunnableApps.remove(app.getUser(), app.getAppSchedulable()); + appAMResourceRequests.remove(app.getApplicationId()); + } + + /** + * Traverses the queue hierarchy under the given queue to gather all lists + * of non-runnable applications. + */ + private void gatherPossiblyRunnableAppLists(FSQueue queue, + List> appLists) { + if (queue.getNumRunnableApps() < scheduler.getAllocationConfiguration() + .getQueueMaxApps(queue.getName())) { + if (queue instanceof FSLeafQueue) { + appLists.add(((FSLeafQueue)queue).getNonRunnableAppSchedulables()); + } else { + for (FSQueue child : queue.getChildQueues()) { + gatherPossiblyRunnableAppLists(child, appLists); + } + } + } + } + + /** + * Takes a list of lists, each of which is ordered by start time, and returns + * their elements in order of start time. + * + * We maintain positions in each of the lists. Each next() call advances + * the position in one of the lists. We maintain a heap that orders lists + * by the start time of the app in the current position in that list. + * This allows us to pick which list to advance in O(log(num lists)) instead + * of O(num lists) time. + */ + static class MultiListStartTimeIterator implements + Iterator { + + private List[] appLists; + private int[] curPositionsInAppLists; + private PriorityQueue appListsByCurStartTime; + + @SuppressWarnings("unchecked") + public MultiListStartTimeIterator(List> appListList) { + appLists = appListList.toArray(new List[appListList.size()]); + curPositionsInAppLists = new int[appLists.length]; + appListsByCurStartTime = new PriorityQueue(); + for (int i = 0; i < appLists.length; i++) { + long time = appLists[i].isEmpty() ? Long.MAX_VALUE : appLists[i].get(0) + .getStartTime(); + appListsByCurStartTime.add(new IndexAndTime(i, time)); + } + } + + @Override + public boolean hasNext() { + return !appListsByCurStartTime.isEmpty() + && appListsByCurStartTime.peek().time != Long.MAX_VALUE; + } + + @Override + public FSSchedulerApp next() { + IndexAndTime indexAndTime = appListsByCurStartTime.remove(); + int nextListIndex = indexAndTime.index; + AppSchedulable next = appLists[nextListIndex] + .get(curPositionsInAppLists[nextListIndex]); + curPositionsInAppLists[nextListIndex]++; + + if (curPositionsInAppLists[nextListIndex] < appLists[nextListIndex].size()) { + indexAndTime.time = appLists[nextListIndex] + .get(curPositionsInAppLists[nextListIndex]).getStartTime(); + } else { + indexAndTime.time = Long.MAX_VALUE; + } + appListsByCurStartTime.add(indexAndTime); + + return next.getApp(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Remove not supported"); + } + + private static class IndexAndTime implements Comparable { + public int index; + public long time; + + public IndexAndTime(int index, long time) { + this.index = index; + this.time = time; + } + + @Override + public int compareTo(IndexAndTime o) { + return time < o.time ? -1 : (time > o.time ? 1 : 0); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof IndexAndTime)) { + return false; + } + IndexAndTime other = (IndexAndTime)o; + return other.time == time; + } + + @Override + public int hashCode() { + return (int)time; + } + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java deleted file mode 100644 index 359519a..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java +++ /dev/null @@ -1,339 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.PriorityQueue; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.ListMultimap; - -/** - * Handles tracking and enforcement for user and queue maxRunningApps - * constraints - */ -public class MaxRunningAppsEnforcer { - private static final Log LOG = LogFactory.getLog(FairScheduler.class); - - private final FairScheduler scheduler; - - // Tracks the number of running applications by user. - private final Map usersNumRunnableApps; - @VisibleForTesting - final ListMultimap usersNonRunnableApps; - - public MaxRunningAppsEnforcer(FairScheduler scheduler) { - this.scheduler = scheduler; - this.usersNumRunnableApps = new HashMap(); - this.usersNonRunnableApps = ArrayListMultimap.create(); - } - - /** - * Checks whether making the application runnable would exceed any - * maxRunningApps limits. - */ - public boolean canAppBeRunnable(FSQueue queue, String user) { - AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); - Integer userNumRunnable = usersNumRunnableApps.get(user); - if (userNumRunnable == null) { - userNumRunnable = 0; - } - if (userNumRunnable >= allocConf.getUserMaxApps(user)) { - return false; - } - // Check queue and all parent queues - while (queue != null) { - int queueMaxApps = allocConf.getQueueMaxApps(queue.getName()); - if (queue.getNumRunnableApps() >= queueMaxApps) { - return false; - } - queue = queue.getParent(); - } - - return true; - } - - /** - * Tracks the given new runnable app for purposes of maintaining max running - * app limits. - */ - public void trackRunnableApp(FSSchedulerApp app) { - String user = app.getUser(); - FSLeafQueue queue = app.getQueue(); - // Increment running counts for all parent queues - FSParentQueue parent = queue.getParent(); - while (parent != null) { - parent.incrementRunnableApps(); - parent = parent.getParent(); - } - - Integer userNumRunnable = usersNumRunnableApps.get(user); - usersNumRunnableApps.put(user, (userNumRunnable == null ? 0 - : userNumRunnable) + 1); - } - - /** - * Tracks the given new non runnable app so that it can be made runnable when - * it would not violate max running app limits. - */ - public void trackNonRunnableApp(FSSchedulerApp app) { - String user = app.getUser(); - usersNonRunnableApps.put(user, app.getAppSchedulable()); - } - - /** - * Checks to see whether any other applications runnable now that the given - * application has been removed from the given queue. And makes them so. - * - * Runs in O(n log(n)) where n is the number of queues that are under the - * highest queue that went from having no slack to having slack. - */ - public void updateRunnabilityOnAppRemoval(FSSchedulerApp app, FSLeafQueue queue) { - AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); - - // childqueueX might have no pending apps itself, but if a queue higher up - // in the hierarchy parentqueueY has a maxRunningApps set, an app completion - // in childqueueX could allow an app in some other distant child of - // parentqueueY to become runnable. - // An app removal will only possibly allow another app to become runnable if - // the queue was already at its max before the removal. - // Thus we find the ancestor queue highest in the tree for which the app - // that was at its maxRunningApps before the removal. - FSQueue highestQueueWithAppsNowRunnable = (queue.getNumRunnableApps() == - allocConf.getQueueMaxApps(queue.getName()) - 1) ? queue : null; - FSParentQueue parent = queue.getParent(); - while (parent != null) { - if (parent.getNumRunnableApps() == allocConf.getQueueMaxApps(parent - .getName()) - 1) { - highestQueueWithAppsNowRunnable = parent; - } - parent = parent.getParent(); - } - - List> appsNowMaybeRunnable = - new ArrayList>(); - - // Compile lists of apps which may now be runnable - // We gather lists instead of building a set of all non-runnable apps so - // that this whole operation can be O(number of queues) instead of - // O(number of apps) - if (highestQueueWithAppsNowRunnable != null) { - gatherPossiblyRunnableAppLists(highestQueueWithAppsNowRunnable, - appsNowMaybeRunnable); - } - String user = app.getUser(); - Integer userNumRunning = usersNumRunnableApps.get(user); - if (userNumRunning == null) { - userNumRunning = 0; - } - if (userNumRunning == allocConf.getUserMaxApps(user) - 1) { - List userWaitingApps = usersNonRunnableApps.get(user); - if (userWaitingApps != null) { - appsNowMaybeRunnable.add(userWaitingApps); - } - } - - // Scan through and check whether this means that any apps are now runnable - Iterator iter = new MultiListStartTimeIterator( - appsNowMaybeRunnable); - FSSchedulerApp prev = null; - List noLongerPendingApps = new ArrayList(); - while (iter.hasNext()) { - FSSchedulerApp next = iter.next(); - if (next == prev) { - continue; - } - - if (canAppBeRunnable(next.getQueue(), next.getUser())) { - trackRunnableApp(next); - AppSchedulable appSched = next.getAppSchedulable(); - next.getQueue().getRunnableAppSchedulables().add(appSched); - noLongerPendingApps.add(appSched); - - // No more than one app per list will be able to be made runnable, so - // we can stop looking after we've found that many - if (noLongerPendingApps.size() >= appsNowMaybeRunnable.size()) { - break; - } - } - - prev = next; - } - - // We remove the apps from their pending lists afterwards so that we don't - // pull them out from under the iterator. If they are not in these lists - // in the first place, there is a bug. - for (AppSchedulable appSched : noLongerPendingApps) { - if (!appSched.getApp().getQueue().getNonRunnableAppSchedulables() - .remove(appSched)) { - LOG.error("Can't make app runnable that does not already exist in queue" - + " as non-runnable: " + appSched + ". This should never happen."); - } - - if (!usersNonRunnableApps.remove(appSched.getApp().getUser(), appSched)) { - LOG.error("Waiting app " + appSched + " expected to be in " - + "usersNonRunnableApps, but was not. This should never happen."); - } - } - } - - /** - * Updates the relevant tracking variables after a runnable app with the given - * queue and user has been removed. - */ - public void untrackRunnableApp(FSSchedulerApp app) { - // Update usersRunnableApps - String user = app.getUser(); - int newUserNumRunning = usersNumRunnableApps.get(user) - 1; - if (newUserNumRunning == 0) { - usersNumRunnableApps.remove(user); - } else { - usersNumRunnableApps.put(user, newUserNumRunning); - } - - // Update runnable app bookkeeping for queues - FSLeafQueue queue = app.getQueue(); - FSParentQueue parent = queue.getParent(); - while (parent != null) { - parent.decrementRunnableApps(); - parent = parent.getParent(); - } - } - - /** - * Stops tracking the given non-runnable app - */ - public void untrackNonRunnableApp(FSSchedulerApp app) { - usersNonRunnableApps.remove(app.getUser(), app.getAppSchedulable()); - } - - /** - * Traverses the queue hierarchy under the given queue to gather all lists - * of non-runnable applications. - */ - private void gatherPossiblyRunnableAppLists(FSQueue queue, - List> appLists) { - if (queue.getNumRunnableApps() < scheduler.getAllocationConfiguration() - .getQueueMaxApps(queue.getName())) { - if (queue instanceof FSLeafQueue) { - appLists.add(((FSLeafQueue)queue).getNonRunnableAppSchedulables()); - } else { - for (FSQueue child : queue.getChildQueues()) { - gatherPossiblyRunnableAppLists(child, appLists); - } - } - } - } - - /** - * Takes a list of lists, each of which is ordered by start time, and returns - * their elements in order of start time. - * - * We maintain positions in each of the lists. Each next() call advances - * the position in one of the lists. We maintain a heap that orders lists - * by the start time of the app in the current position in that list. - * This allows us to pick which list to advance in O(log(num lists)) instead - * of O(num lists) time. - */ - static class MultiListStartTimeIterator implements - Iterator { - - private List[] appLists; - private int[] curPositionsInAppLists; - private PriorityQueue appListsByCurStartTime; - - @SuppressWarnings("unchecked") - public MultiListStartTimeIterator(List> appListList) { - appLists = appListList.toArray(new List[appListList.size()]); - curPositionsInAppLists = new int[appLists.length]; - appListsByCurStartTime = new PriorityQueue(); - for (int i = 0; i < appLists.length; i++) { - long time = appLists[i].isEmpty() ? Long.MAX_VALUE : appLists[i].get(0) - .getStartTime(); - appListsByCurStartTime.add(new IndexAndTime(i, time)); - } - } - - @Override - public boolean hasNext() { - return !appListsByCurStartTime.isEmpty() - && appListsByCurStartTime.peek().time != Long.MAX_VALUE; - } - - @Override - public FSSchedulerApp next() { - IndexAndTime indexAndTime = appListsByCurStartTime.remove(); - int nextListIndex = indexAndTime.index; - AppSchedulable next = appLists[nextListIndex] - .get(curPositionsInAppLists[nextListIndex]); - curPositionsInAppLists[nextListIndex]++; - - if (curPositionsInAppLists[nextListIndex] < appLists[nextListIndex].size()) { - indexAndTime.time = appLists[nextListIndex] - .get(curPositionsInAppLists[nextListIndex]).getStartTime(); - } else { - indexAndTime.time = Long.MAX_VALUE; - } - appListsByCurStartTime.add(indexAndTime); - - return next.getApp(); - } - - @Override - public void remove() { - throw new UnsupportedOperationException("Remove not supported"); - } - - private static class IndexAndTime implements Comparable { - public int index; - public long time; - - public IndexAndTime(int index, long time) { - this.index = index; - this.time = time; - } - - @Override - public int compareTo(IndexAndTime o) { - return time < o.time ? -1 : (time > o.time ? 1 : 0); - } - - @Override - public boolean equals(Object o) { - if (!(o instanceof IndexAndTime)) { - return false; - } - IndexAndTime other = (IndexAndTime)o; - return other.time == time; - } - - @Override - public int hashCode() { - return (int)time; - } - } - } -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java index 1d77a43..1087c73 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java @@ -149,4 +149,15 @@ public abstract void computeShares( */ public abstract boolean checkIfUsageOverFairShare( Resource usage, Resource fairShare); + + /** + * Check if a leaf queue's AM resource usage over its limit under this policy + * + * @param usage {@link Resource} the resource used by application masters + * @param maxAMResource {@link Resource} the maximum allowed resource for + * application masters + * @return true if AM resource usage is over the limit + */ + public abstract boolean checkIfAMResourceUsageOverLimit( + Resource usage, Resource maxAMResource); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java index 4b663d9..af674b9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java @@ -75,6 +75,11 @@ public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) { } @Override + public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMResource) { + return !Resources.fitsIn(usage, maxAMResource); + } + + @Override public void initialize(Resource clusterCapacity) { comparator.setClusterCapacity(clusterCapacity); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java index ca7297f..8f5f442 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java @@ -125,6 +125,11 @@ public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) { } @Override + public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMResource) { + return Resources.greaterThan(RESOURCE_CALCULATOR, null, usage, maxAMResource); + } + + @Override public byte getApplicableDepth() { return SchedulingPolicy.DEPTH_ANY; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java index d996944..62699d1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; @@ -36,6 +37,8 @@ @VisibleForTesting public static final String NAME = "FIFO"; private FifoComparator comparator = new FifoComparator(); + private static final DefaultResourceCalculator RESOURCE_CALCULATOR = + new DefaultResourceCalculator(); @Override public String getName() { @@ -95,6 +98,12 @@ public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) { } @Override + public boolean checkIfAMResourceUsageOverLimit( + Resource usage, Resource maxAMResource) { + return Resources.greaterThan(RESOURCE_CALCULATOR, null, usage, maxAMResource); + } + + @Override public byte getApplicableDepth() { return SchedulingPolicy.DEPTH_LEAF; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index 5f92676..519a975 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -20,14 +20,21 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Clock; @@ -149,6 +156,22 @@ protected ApplicationAttemptId createSchedulingRequest( return id; } + protected void createApplicationWithAMResource(ApplicationAttemptId attId, + String queue, String user, Resource amResource) { + RMContext rmContext = resourceManager.getRMContext(); + RMApp rmApp = new RMAppImpl(attId.getApplicationId(), rmContext, conf, + null, null, null, ApplicationSubmissionContext.newInstance(null, null, + null, null, null, false, false, 0, amResource, null), null, null, + 0, null, null); + rmContext.getRMApps().put(attId.getApplicationId(), rmApp); + AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent( + attId.getApplicationId(), queue, user); + scheduler.handle(appAddedEvent); + AppAttemptAddedSchedulerEvent attempAddedEvent = + new AppAttemptAddedSchedulerEvent(attId, false); + scheduler.handle(attempAddedEvent); + } + protected void createSchedulingRequestExistingApplication( int memory, int priority, ApplicationAttemptId attId) { ResourceRequest request = createResourceRequest(memory, ResourceRequest.ANY, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java index 2a725d8..b1ccfb4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java @@ -174,9 +174,10 @@ public void testAllocationFileParsing() throws Exception { out.println(""); out.println("alice,bob admins"); out.println(""); - // Give queue D a limit of 3 running apps + // Give queue D a limit of 3 running apps and 0.4f maxAMShare out.println(""); out.println("3"); + out.println("0.4"); out.println(""); // Give queue E a preemption timeout of one minute out.println(""); @@ -194,6 +195,8 @@ public void testAllocationFileParsing() throws Exception { out.println("15"); // Set default limit of apps per user to 5 out.println("5"); + // Set default limit of AMResourceShare to 0.5f + out.println("0.5f"); // Give user1 a limit of 10 jobs out.println(""); out.println("10"); @@ -240,6 +243,13 @@ public void testAllocationFileParsing() throws Exception { assertEquals(10, queueConf.getUserMaxApps("user1")); assertEquals(5, queueConf.getUserMaxApps("user2")); + assertEquals(.5f, queueConf.getQueueMaxAMShare("root." + YarnConfiguration.DEFAULT_QUEUE_NAME), 0.0001); + assertEquals(.5f, queueConf.getQueueMaxAMShare("root.queueA"), 0.01); + assertEquals(.5f, queueConf.getQueueMaxAMShare("root.queueB"), 0.01); + assertEquals(.5f, queueConf.getQueueMaxAMShare("root.queueC"), 0.01); + assertEquals(.4f, queueConf.getQueueMaxAMShare("root.queueD"), 0.01); + assertEquals(.5f, queueConf.getQueueMaxAMShare("root.queueE"), 0.01); + // Root should get * ACL assertEquals("*", queueConf.getQueueAcl("root", QueueACL.ADMINISTER_QUEUE).getAclString()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index bda9564..cb02b3f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -64,7 +64,6 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; -import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; @@ -488,47 +487,22 @@ public void testSimpleContainerReservation() throws Exception { public void testUserAsDefaultQueue() throws Exception { conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true"); scheduler.reinitialize(conf, resourceManager.getRMContext()); - RMContext rmContext = resourceManager.getRMContext(); - Map appsMap = rmContext.getRMApps(); ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1); - RMApp rmApp = new RMAppImpl(appAttemptId.getApplicationId(), rmContext, conf, - null, null, null, ApplicationSubmissionContext.newInstance(null, null, - null, null, null, false, false, 0, null, null), null, null, 0, null, null); - appsMap.put(appAttemptId.getApplicationId(), rmApp); - - AppAddedSchedulerEvent appAddedEvent = - new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "default", - "user1"); - scheduler.handle(appAddedEvent); - AppAttemptAddedSchedulerEvent attempAddedEvent = - new AppAttemptAddedSchedulerEvent(appAttemptId, false); - scheduler.handle(attempAddedEvent); + createApplicationWithAMResource(appAttemptId, "default", "user1", null); assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1", true) .getRunnableAppSchedulables().size()); assertEquals(0, scheduler.getQueueManager().getLeafQueue("default", true) .getRunnableAppSchedulables().size()); - assertEquals("root.user1", rmApp.getQueue()); + assertEquals("root.user1", resourceManager.getRMContext().getRMApps() + .get(appAttemptId.getApplicationId()).getQueue()); } @Test public void testNotUserAsDefaultQueue() throws Exception { conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false"); scheduler.reinitialize(conf, resourceManager.getRMContext()); - RMContext rmContext = resourceManager.getRMContext(); - Map appsMap = rmContext.getRMApps(); ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1); - RMApp rmApp = new RMAppImpl(appAttemptId.getApplicationId(), rmContext, conf, - null, null, null, ApplicationSubmissionContext.newInstance(null, null, - null, null, null, false, false, 0, null, null), null, null, 0, null, null); - appsMap.put(appAttemptId.getApplicationId(), rmApp); - - AppAddedSchedulerEvent appAddedEvent = - new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "default", - "user2"); - scheduler.handle(appAddedEvent); - AppAttemptAddedSchedulerEvent attempAddedEvent = - new AppAttemptAddedSchedulerEvent(appAttemptId, false); - scheduler.handle(attempAddedEvent); + createApplicationWithAMResource(appAttemptId, "default", "user2", null); assertEquals(0, scheduler.getQueueManager().getLeafQueue("user1", true) .getRunnableAppSchedulables().size()); assertEquals(1, scheduler.getQueueManager().getLeafQueue("default", true) @@ -2186,7 +2160,7 @@ private void verifyQueueNumRunnable(String queueName, int numRunnableInQueue, } @Test - public void testUserAndQueueMaxRunningApps() throws Exception { + public void testUserAndQueueMaxRunningAppsAndMaxAMShare() throws Exception { conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); @@ -2194,6 +2168,7 @@ public void testUserAndQueueMaxRunningApps() throws Exception { out.println(""); out.println(""); out.println("2"); + out.println("0.2"); out.println(""); out.println(""); out.println("1"); @@ -2202,37 +2177,52 @@ public void testUserAndQueueMaxRunningApps() throws Exception { out.close(); scheduler.reinitialize(conf, resourceManager.getRMContext()); + + scheduler.getQueueManager().getLeafQueue("queue1", true) + .setFairShare(Resources.createResource(10240)); + scheduler.getQueueManager().getLeafQueue("queue2", true) + .setFairShare(Resources.createResource(10240)); + Resource amResource1 = Resource.newInstance(1024, 1); + Resource amResource2 = Resource.newInstance(2048, 2); // exceeds no limits - ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "user1"); + ApplicationAttemptId attId1 = createAppAttemptId(1, 1); + createApplicationWithAMResource(attId1, "queue1", "user1", amResource1); verifyAppRunnable(attId1, true); verifyQueueNumRunnable("queue1", 1, 0); // exceeds user limit - ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue2", "user1"); + ApplicationAttemptId attId2 = createAppAttemptId(2, 1); + createApplicationWithAMResource(attId2, "queue2", "user1", amResource1); verifyAppRunnable(attId2, false); verifyQueueNumRunnable("queue2", 0, 1); + // exceeds queue AM limit + ApplicationAttemptId attId3 = createAppAttemptId(3, 1); + createApplicationWithAMResource(attId3, "queue1", "user2", amResource2); + verifyAppRunnable(attId3, false); + verifyQueueNumRunnable("queue1", 1, 1); // exceeds no limits - ApplicationAttemptId attId3 = createSchedulingRequest(1024, "queue1", "user2"); - verifyAppRunnable(attId3, true); - verifyQueueNumRunnable("queue1", 2, 0); - // exceeds queue limit - ApplicationAttemptId attId4 = createSchedulingRequest(1024, "queue1", "user2"); - verifyAppRunnable(attId4, false); + ApplicationAttemptId attId4 = createAppAttemptId(4, 1); + createApplicationWithAMResource(attId4, "queue1", "user2", amResource1); + verifyAppRunnable(attId4, true); verifyQueueNumRunnable("queue1", 2, 1); - // Remove app 1 and both app 2 and app 4 should becomes runnable in its place + // Remove both app1 & app4, and app2 and app3 should become runnable AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent(attId1, RMAppAttemptState.FINISHED, false); scheduler.handle(appRemovedEvent1); + AppAttemptRemovedSchedulerEvent appRemovedEvent4 = + new AppAttemptRemovedSchedulerEvent(attId4, RMAppAttemptState.FINISHED, false); + scheduler.handle(appRemovedEvent4); verifyAppRunnable(attId2, true); verifyQueueNumRunnable("queue2", 1, 0); - verifyAppRunnable(attId4, true); - verifyQueueNumRunnable("queue1", 2, 0); + verifyAppRunnable(attId3, true); + verifyQueueNumRunnable("queue1", 1, 0); - // A new app to queue1 should not be runnable - ApplicationAttemptId attId5 = createSchedulingRequest(1024, "queue1", "user2"); + // A new app to queue1 cannot run + ApplicationAttemptId attId5 = createAppAttemptId(5, 1); + createApplicationWithAMResource(attId5, "queue1", "user2", amResource1); verifyAppRunnable(attId5, false); - verifyQueueNumRunnable("queue1", 2, 1); + verifyQueueNumRunnable("queue1", 1, 1); } @Test @@ -2309,6 +2299,84 @@ public void testMaxRunningAppsHierarchicalQueues() throws Exception { verifyQueueNumRunnable("queue1.sub3", 0, 0); } + @Test + public void testMaxAMResourceShare() throws Exception { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true); + MockClock clock = new MockClock(); + scheduler.setClock(clock); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("0.4"); + out.println(""); + out.println("100"); + out.println("100"); + out.println(""); + out.close(); + + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + scheduler.getQueueManager().getLeafQueue("queue1", true) + .setFairShare(Resources.createResource(10240)); + + Resource amResource1 = Resource.newInstance(1024, 1); + Resource amResource2 = Resource.newInstance(2048, 2); + // exceeds no limits + ApplicationAttemptId attId1 = createAppAttemptId(1, 1); + createApplicationWithAMResource(attId1, "queue1", "user1", amResource1); + verifyAppRunnable(attId1, true); + verifyQueueNumRunnable("queue1", 1, 0); + clock.tick(10); + // exceeds no limits + ApplicationAttemptId attId2 = createAppAttemptId(2, 1); + createApplicationWithAMResource(attId2, "queue1", "user1", amResource2); + verifyAppRunnable(attId2, true); + verifyQueueNumRunnable("queue1", 2, 0); + clock.tick(10); + // exceeds limit + ApplicationAttemptId attId3 = createAppAttemptId(3, 1); + createApplicationWithAMResource(attId3, "queue1", "user1", amResource2); + verifyAppRunnable(attId3, false); + verifyQueueNumRunnable("queue1", 2, 1); + clock.tick(10); + // exceeds no limits + ApplicationAttemptId attId4 = createAppAttemptId(4, 1); + createApplicationWithAMResource(attId4, "queue1", "user1", amResource1); + verifyAppRunnable(attId4, true); + verifyQueueNumRunnable("queue1", 3, 1); + // exceeds limit + ApplicationAttemptId attId5 = createAppAttemptId(5, 1); + createApplicationWithAMResource(attId5, "queue1", "user1", amResource1); + verifyAppRunnable(attId5, false); + verifyQueueNumRunnable("queue1", 3, 2); + clock.tick(10); + + // Now test removal of another runnable app (attId1), attId5 gets to go + // because attId3's AM resource is over AM limit if scheduled. + AppAttemptRemovedSchedulerEvent appRemovedEvent2 = + new AppAttemptRemovedSchedulerEvent(attId1, RMAppAttemptState.FINISHED, false); + scheduler.handle(appRemovedEvent2); + verifyAppRunnable(attId5, true); + verifyQueueNumRunnable("queue1", 3, 1); + + // Now test removal of a non-runnable app + AppAttemptRemovedSchedulerEvent appRemovedEvent3 = + new AppAttemptRemovedSchedulerEvent(attId3, RMAppAttemptState.KILLED, true); + scheduler.handle(appRemovedEvent3); + assertEquals(0, scheduler.maxRunningEnforcer.usersNonRunnableApps + .get("user1").size()); + // Verify app gone in queue accounting + verifyQueueNumRunnable("queue1", 3, 0); + // Verify it doesn't become runnable when there would be space for it + AppAttemptRemovedSchedulerEvent appRemovedEvent4 = + new AppAttemptRemovedSchedulerEvent(attId2, RMAppAttemptState.FINISHED, true); + scheduler.handle(appRemovedEvent4); + verifyQueueNumRunnable("queue1", 2, 0); + } + @Test (timeout = 10000) public void testContinuousScheduling() throws Exception { // set continuous scheduling enabled diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsAndAMShareEnforcer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsAndAMShareEnforcer.java new file mode 100644 index 0000000..29d1dfe --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsAndAMShareEnforcer.java @@ -0,0 +1,244 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Before; +import org.junit.Test; + +public class TestMaxRunningAppsAndAMShareEnforcer { + private FairScheduler scheduler; + private QueueManager queueManager; + private AllocationConfiguration allocConf; + private Map queueMaxApps; + private Map userMaxApps; + private Map queueMaxAMShares; + private MaxRunningAppsAndAMShareEnforcer maxAppsEnforcer; + private int appNum; + private TestFairScheduler.MockClock clock; + + @Before + public void setup() throws Exception { + Configuration conf = new Configuration(); + clock = new TestFairScheduler.MockClock(); + scheduler = mock(FairScheduler.class); + when(scheduler.getConf()).thenReturn( + new FairSchedulerConfiguration(conf)); + when(scheduler.getClock()).thenReturn(clock); + allocConf = new AllocationConfiguration(conf); + when(scheduler.getAllocationConfiguration()).thenReturn(allocConf); + + queueManager = new QueueManager(scheduler); + queueManager.initialize(conf); + queueMaxApps = allocConf.queueMaxApps; + userMaxApps = allocConf.userMaxApps; + queueMaxAMShares = allocConf.queueMaxAMShares; + maxAppsEnforcer = new MaxRunningAppsAndAMShareEnforcer(scheduler); + appNum = 0; + } + + private FSSchedulerApp addApp(FSLeafQueue queue, String user) { + return addApp(queue, user, Resource.newInstance(1024, 1)); + } + private FSSchedulerApp addApp(FSLeafQueue queue, String user, + Resource amResource) { + ApplicationId appId = ApplicationId.newInstance(0l, appNum++); + ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 0); + FSSchedulerApp app = new FSSchedulerApp(attId, user, queue, null, null); + boolean runnable = + maxAppsEnforcer.canAppBeRunnable(app, app.getQueue(), amResource); + queue.addApp(app, runnable); + if (runnable) { + maxAppsEnforcer.trackRunnableApp(app, amResource); + } else { + maxAppsEnforcer.trackNonRunnableApp(app, amResource); + } + return app; + } + + private void removeApp(FSSchedulerApp app) { + app.getQueue().removeApp(app); + maxAppsEnforcer.untrackRunnableApp(app); + maxAppsEnforcer.updateRunnabilityOnAppRemoval(app, app.getQueue()); + } + + @Test + public void testRemoveDoesNotEnableAnyApp() { + FSLeafQueue leaf1 = queueManager.getLeafQueue("root.queue1", true); + FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue2", true); + leaf1.setFairShare(Resources.createResource(102400)); + leaf2.setFairShare(Resources.createResource(102400)); + queueMaxApps.put("root", 2); + queueMaxApps.put("root.queue1", 1); + queueMaxApps.put("root.queue2", 1); + FSSchedulerApp app1 = addApp(leaf1, "user"); + addApp(leaf2, "user"); + addApp(leaf2, "user"); + assertEquals(1, leaf1.getRunnableAppSchedulables().size()); + assertEquals(1, leaf2.getRunnableAppSchedulables().size()); + assertEquals(1, leaf2.getNonRunnableAppSchedulables().size()); + removeApp(app1); + assertEquals(0, leaf1.getRunnableAppSchedulables().size()); + assertEquals(1, leaf2.getRunnableAppSchedulables().size()); + assertEquals(1, leaf2.getNonRunnableAppSchedulables().size()); + } + + @Test + public void testRemoveEnablesAppOnCousinQueue() { + FSLeafQueue leaf1 = queueManager.getLeafQueue("root.queue1.subqueue1.leaf1", true); + FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue1.subqueue2.leaf2", true); + leaf1.setFairShare(Resources.createResource(102400)); + leaf2.setFairShare(Resources.createResource(102400)); + queueMaxApps.put("root.queue1", 2); + FSSchedulerApp app1 = addApp(leaf1, "user"); + addApp(leaf2, "user"); + addApp(leaf2, "user"); + assertEquals(1, leaf1.getRunnableAppSchedulables().size()); + assertEquals(1, leaf2.getRunnableAppSchedulables().size()); + assertEquals(1, leaf2.getNonRunnableAppSchedulables().size()); + removeApp(app1); + assertEquals(0, leaf1.getRunnableAppSchedulables().size()); + assertEquals(2, leaf2.getRunnableAppSchedulables().size()); + assertEquals(0, leaf2.getNonRunnableAppSchedulables().size()); + } + + @Test + public void testRemoveEnablesOneByQueueOneByUser() { + FSLeafQueue leaf1 = queueManager.getLeafQueue("root.queue1.leaf1", true); + FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue1.leaf2", true); + leaf1.setFairShare(Resources.createResource(102400)); + leaf2.setFairShare(Resources.createResource(102400)); + queueMaxApps.put("root.queue1.leaf1", 2); + userMaxApps.put("user1", 1); + FSSchedulerApp app1 = addApp(leaf1, "user1"); + addApp(leaf1, "user2"); + addApp(leaf1, "user3"); + addApp(leaf2, "user1"); + assertEquals(2, leaf1.getRunnableAppSchedulables().size()); + assertEquals(1, leaf1.getNonRunnableAppSchedulables().size()); + assertEquals(1, leaf2.getNonRunnableAppSchedulables().size()); + removeApp(app1); + assertEquals(2, leaf1.getRunnableAppSchedulables().size()); + assertEquals(1, leaf2.getRunnableAppSchedulables().size()); + assertEquals(0, leaf1.getNonRunnableAppSchedulables().size()); + assertEquals(0, leaf2.getNonRunnableAppSchedulables().size()); + } + + @Test + public void testRemoveEnablingOrderedByStartTime() { + FSLeafQueue leaf1 = queueManager.getLeafQueue("root.queue1.subqueue1.leaf1", true); + FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue1.subqueue2.leaf2", true); + leaf1.setFairShare(Resources.createResource(102400)); + leaf2.setFairShare(Resources.createResource(102400)); + queueMaxApps.put("root.queue1", 2); + FSSchedulerApp app1 = addApp(leaf1, "user"); + addApp(leaf2, "user"); + addApp(leaf2, "user"); + clock.tick(20); + addApp(leaf1, "user"); + assertEquals(1, leaf1.getRunnableAppSchedulables().size()); + assertEquals(1, leaf2.getRunnableAppSchedulables().size()); + assertEquals(1, leaf1.getNonRunnableAppSchedulables().size()); + assertEquals(1, leaf2.getNonRunnableAppSchedulables().size()); + removeApp(app1); + assertEquals(0, leaf1.getRunnableAppSchedulables().size()); + assertEquals(2, leaf2.getRunnableAppSchedulables().size()); + assertEquals(0, leaf2.getNonRunnableAppSchedulables().size()); + } + + @Test + public void testMultipleAppsWaitingOnCousinQueue() { + FSLeafQueue leaf1 = queueManager.getLeafQueue("root.queue1.subqueue1.leaf1", true); + FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue1.subqueue2.leaf2", true); + leaf1.setFairShare(Resources.createResource(102400)); + leaf2.setFairShare(Resources.createResource(102400)); + queueMaxApps.put("root.queue1", 2); + FSSchedulerApp app1 = addApp(leaf1, "user"); + addApp(leaf2, "user"); + addApp(leaf2, "user"); + addApp(leaf2, "user"); + assertEquals(1, leaf1.getRunnableAppSchedulables().size()); + assertEquals(1, leaf2.getRunnableAppSchedulables().size()); + assertEquals(2, leaf2.getNonRunnableAppSchedulables().size()); + removeApp(app1); + assertEquals(0, leaf1.getRunnableAppSchedulables().size()); + assertEquals(2, leaf2.getRunnableAppSchedulables().size()); + assertEquals(1, leaf2.getNonRunnableAppSchedulables().size()); + } + + @Test + public void testQueueMaxAMResourceLimit() { + FSLeafQueue queue1 = queueManager.getLeafQueue("root.queue1", true); + queue1.setFairShare(Resources.createResource(10240)); + queueMaxApps.put("root.queue1", 100); + queueMaxAMShares.put("root.queue1", 0.2f); + Resource amResource1 = Resource.newInstance(1024, 1); + Resource amResource2 = Resource.newInstance(2048, 2); + + // test the max AM limit (queue1's maxAMResource is <2048, 2>) + FSSchedulerApp app1 = addApp(queue1, "user", amResource1); + FSSchedulerApp app2 = addApp(queue1, "user", amResource2); + assertEquals(1, queue1.getRunnableAppSchedulables().size()); + assertEquals(1, queue1.getNonRunnableAppSchedulables().size()); + + // queue1 still can accept an app with smaller AM resource + FSSchedulerApp app3 = addApp(queue1, "user", amResource1); + assertEquals(2, queue1.getRunnableAppSchedulables().size()); + assertEquals(1, queue1.getNonRunnableAppSchedulables().size()); + + // remove app1 and app3 will let app2 in + removeApp(app1); + removeApp(app3); + assertEquals(1, queue1.getRunnableAppSchedulables().size()); + assertEquals(0, queue1.getNonRunnableAppSchedulables().size()); + } + + @Test + public void testMultiListStartTimeIteratorEmptyAppLists() { + List> lists = new ArrayList>(); + lists.add(Arrays.asList(mockAppSched(1))); + lists.add(Arrays.asList(mockAppSched(2))); + Iterator iter = + new MaxRunningAppsAndAMShareEnforcer.MultiListStartTimeIterator(lists); + assertEquals(1, iter.next().getAppSchedulable().getStartTime()); + assertEquals(2, iter.next().getAppSchedulable().getStartTime()); + } + + private AppSchedulable mockAppSched(long startTime) { + AppSchedulable appSched = mock(AppSchedulable.class); + when(appSched.getStartTime()).thenReturn(startTime); + FSSchedulerApp schedApp = mock(FSSchedulerApp.class); + when(schedApp.getAppSchedulable()).thenReturn(appSched); + when(appSched.getApp()).thenReturn(schedApp); + return appSched; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java deleted file mode 100644 index c1866f0..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java +++ /dev/null @@ -1,197 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.junit.Before; -import org.junit.Test; - -public class TestMaxRunningAppsEnforcer { - private QueueManager queueManager; - private Map queueMaxApps; - private Map userMaxApps; - private MaxRunningAppsEnforcer maxAppsEnforcer; - private int appNum; - private TestFairScheduler.MockClock clock; - - @Before - public void setup() throws Exception { - Configuration conf = new Configuration(); - clock = new TestFairScheduler.MockClock(); - FairScheduler scheduler = mock(FairScheduler.class); - when(scheduler.getConf()).thenReturn( - new FairSchedulerConfiguration(conf)); - when(scheduler.getClock()).thenReturn(clock); - AllocationConfiguration allocConf = new AllocationConfiguration( - conf); - when(scheduler.getAllocationConfiguration()).thenReturn(allocConf); - - queueManager = new QueueManager(scheduler); - queueManager.initialize(conf); - queueMaxApps = allocConf.queueMaxApps; - userMaxApps = allocConf.userMaxApps; - maxAppsEnforcer = new MaxRunningAppsEnforcer(scheduler); - appNum = 0; - } - - private FSSchedulerApp addApp(FSLeafQueue queue, String user) { - ApplicationId appId = ApplicationId.newInstance(0l, appNum++); - ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 0); - boolean runnable = maxAppsEnforcer.canAppBeRunnable(queue, user); - FSSchedulerApp app = new FSSchedulerApp(attId, user, queue, null, null); - queue.addApp(app, runnable); - if (runnable) { - maxAppsEnforcer.trackRunnableApp(app); - } else { - maxAppsEnforcer.trackNonRunnableApp(app); - } - return app; - } - - private void removeApp(FSSchedulerApp app) { - app.getQueue().removeApp(app); - maxAppsEnforcer.untrackRunnableApp(app); - maxAppsEnforcer.updateRunnabilityOnAppRemoval(app, app.getQueue()); - } - - @Test - public void testRemoveDoesNotEnableAnyApp() { - FSLeafQueue leaf1 = queueManager.getLeafQueue("root.queue1", true); - FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue2", true); - queueMaxApps.put("root", 2); - queueMaxApps.put("root.queue1", 1); - queueMaxApps.put("root.queue2", 1); - FSSchedulerApp app1 = addApp(leaf1, "user"); - addApp(leaf2, "user"); - addApp(leaf2, "user"); - assertEquals(1, leaf1.getRunnableAppSchedulables().size()); - assertEquals(1, leaf2.getRunnableAppSchedulables().size()); - assertEquals(1, leaf2.getNonRunnableAppSchedulables().size()); - removeApp(app1); - assertEquals(0, leaf1.getRunnableAppSchedulables().size()); - assertEquals(1, leaf2.getRunnableAppSchedulables().size()); - assertEquals(1, leaf2.getNonRunnableAppSchedulables().size()); - } - - @Test - public void testRemoveEnablesAppOnCousinQueue() { - FSLeafQueue leaf1 = queueManager.getLeafQueue("root.queue1.subqueue1.leaf1", true); - FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue1.subqueue2.leaf2", true); - queueMaxApps.put("root.queue1", 2); - FSSchedulerApp app1 = addApp(leaf1, "user"); - addApp(leaf2, "user"); - addApp(leaf2, "user"); - assertEquals(1, leaf1.getRunnableAppSchedulables().size()); - assertEquals(1, leaf2.getRunnableAppSchedulables().size()); - assertEquals(1, leaf2.getNonRunnableAppSchedulables().size()); - removeApp(app1); - assertEquals(0, leaf1.getRunnableAppSchedulables().size()); - assertEquals(2, leaf2.getRunnableAppSchedulables().size()); - assertEquals(0, leaf2.getNonRunnableAppSchedulables().size()); - } - - @Test - public void testRemoveEnablesOneByQueueOneByUser() { - FSLeafQueue leaf1 = queueManager.getLeafQueue("root.queue1.leaf1", true); - FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue1.leaf2", true); - queueMaxApps.put("root.queue1.leaf1", 2); - userMaxApps.put("user1", 1); - FSSchedulerApp app1 = addApp(leaf1, "user1"); - addApp(leaf1, "user2"); - addApp(leaf1, "user3"); - addApp(leaf2, "user1"); - assertEquals(2, leaf1.getRunnableAppSchedulables().size()); - assertEquals(1, leaf1.getNonRunnableAppSchedulables().size()); - assertEquals(1, leaf2.getNonRunnableAppSchedulables().size()); - removeApp(app1); - assertEquals(2, leaf1.getRunnableAppSchedulables().size()); - assertEquals(1, leaf2.getRunnableAppSchedulables().size()); - assertEquals(0, leaf1.getNonRunnableAppSchedulables().size()); - assertEquals(0, leaf2.getNonRunnableAppSchedulables().size()); - } - - @Test - public void testRemoveEnablingOrderedByStartTime() { - FSLeafQueue leaf1 = queueManager.getLeafQueue("root.queue1.subqueue1.leaf1", true); - FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue1.subqueue2.leaf2", true); - queueMaxApps.put("root.queue1", 2); - FSSchedulerApp app1 = addApp(leaf1, "user"); - addApp(leaf2, "user"); - addApp(leaf2, "user"); - clock.tick(20); - addApp(leaf1, "user"); - assertEquals(1, leaf1.getRunnableAppSchedulables().size()); - assertEquals(1, leaf2.getRunnableAppSchedulables().size()); - assertEquals(1, leaf1.getNonRunnableAppSchedulables().size()); - assertEquals(1, leaf2.getNonRunnableAppSchedulables().size()); - removeApp(app1); - assertEquals(0, leaf1.getRunnableAppSchedulables().size()); - assertEquals(2, leaf2.getRunnableAppSchedulables().size()); - assertEquals(0, leaf2.getNonRunnableAppSchedulables().size()); - } - - @Test - public void testMultipleAppsWaitingOnCousinQueue() { - FSLeafQueue leaf1 = queueManager.getLeafQueue("root.queue1.subqueue1.leaf1", true); - FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue1.subqueue2.leaf2", true); - queueMaxApps.put("root.queue1", 2); - FSSchedulerApp app1 = addApp(leaf1, "user"); - addApp(leaf2, "user"); - addApp(leaf2, "user"); - addApp(leaf2, "user"); - assertEquals(1, leaf1.getRunnableAppSchedulables().size()); - assertEquals(1, leaf2.getRunnableAppSchedulables().size()); - assertEquals(2, leaf2.getNonRunnableAppSchedulables().size()); - removeApp(app1); - assertEquals(0, leaf1.getRunnableAppSchedulables().size()); - assertEquals(2, leaf2.getRunnableAppSchedulables().size()); - assertEquals(1, leaf2.getNonRunnableAppSchedulables().size()); - } - - @Test - public void testMultiListStartTimeIteratorEmptyAppLists() { - List> lists = new ArrayList>(); - lists.add(Arrays.asList(mockAppSched(1))); - lists.add(Arrays.asList(mockAppSched(2))); - Iterator iter = - new MaxRunningAppsEnforcer.MultiListStartTimeIterator(lists); - assertEquals(1, iter.next().getAppSchedulable().getStartTime()); - assertEquals(2, iter.next().getAppSchedulable().getStartTime()); - } - - private AppSchedulable mockAppSched(long startTime) { - AppSchedulable appSched = mock(AppSchedulable.class); - when(appSched.getStartTime()).thenReturn(startTime); - FSSchedulerApp schedApp = mock(FSSchedulerApp.class); - when(schedApp.getAppSchedulable()).thenReturn(appSched); - when(appSched.getApp()).thenReturn(schedApp); - return appSched; - } -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm index 54daf2d..07a3edd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm @@ -237,6 +237,12 @@ Allocation file format * maxRunningApps: limit the number of apps from the queue to run at once + * maxAMShare: limit the resource share from the leaf queue that can be used + to run application master. maxAMShare * queue_fair_share is the maximum + resources that can be used by AMs in that leaf queue. Default value is + 1.0f, which means AMs in that leaf queue can take up to 100% resource of + fair share. + * weight: to share the cluster non-proportionally with other queues. Weights default to 1, and a queue with weight 2 should receive approximately twice as many resources as a queue with the default weight. @@ -279,6 +285,9 @@ Allocation file format * <>, which sets the default running app limit for queues; overriden by maxRunningApps element in each queue. + * <>, which sets the default AM resource + limit for queue; overriden by maxAMShare element in each queue. + * <>, which sets the default scheduling policy for queues; overriden by the schedulingPolicy element in each queue if specified. Defaults to "fair". @@ -328,6 +337,7 @@ Allocation file format 10000 mb,0vcores 90000 mb,0vcores 50 + 0.1 2.0 fair @@ -336,6 +346,8 @@ Allocation file format + 0.5 +