diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 97d29cf..4a735be 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -1027,6 +1027,7 @@ protected void setAttemptRecovering(boolean isRecovering) { public static enum AMState { UNMANAGED("User launched the Application Master, since it's unmanaged. "), + //In fair scheduler, INACTIVATED means Non-runnable INACTIVATED("Application is added to the scheduler and is not yet activated. "), ACTIVATED("Application is Activated, waiting for resources to be assigned for AM. "), ASSIGNED("Scheduler has assigned a container for AM, waiting for AM " diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 9e5a807..aab7048 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -817,8 +817,7 @@ private boolean hasContainerForNode(SchedulerRequestKey key, ResourceRequest rackRequest = getResourceRequest(key, node.getRackName()); ResourceRequest nodeRequest = getResourceRequest(key, node.getNodeName()); - return - // There must be outstanding requests at the given priority: + if (!(// There must be outstanding requests at the given priority: anyRequest != null && anyRequest.getNumContainers() > 0 && // If locality relaxation is turned off at *-level, there must be a // non-zero request for the node's rack: @@ -831,9 +830,23 @@ private boolean hasContainerForNode(SchedulerRequestKey key, // The requested container must be able to fit on the node: Resources.lessThanOrEqual(RESOURCE_CALCULATOR, null, anyRequest.getCapability(), - node.getRMNode().getTotalCapability()) && - // The requested container must fit in queue maximum share: - getQueue().fitsInMaxShare(anyRequest.getCapability()); + node.getRMNode().getTotalCapability()))) { + return false; + } else if (!getQueue().fitsInMaxShare(anyRequest.getCapability())) { + // The requested container must fit in queue maximum share + if (isWaitingForAMContainer()) { + StringBuilder diagnosticMessageBldr = new StringBuilder(); + diagnosticMessageBldr.append(" (Resource request: "); + diagnosticMessageBldr.append(anyRequest.getCapability()); + diagnosticMessageBldr.append(" exceeds current queue or its parents" + + " maximum resource allowed)."); + updateAMContainerDiagnostics(AMState.INACTIVATED, + diagnosticMessageBldr.toString()); + } + return false; + } + + return true; } private boolean isValidReservation(FSSchedulerNode node) { @@ -977,6 +990,17 @@ public void updateDemand() { @Override public Resource assignContainer(FSSchedulerNode node) { if (isOverAMShareLimit()) { + if (isWaitingForAMContainer()) { + List ask = appSchedulingInfo.getAllResourceRequests(); + StringBuilder diagnosticMessageBldr = new StringBuilder(); + diagnosticMessageBldr.append(" (Resource request: "); + diagnosticMessageBldr.append(ask.get(0).getCapability()); + diagnosticMessageBldr.append(" exceeds current queue" + + " maximum AM resource allowed)."); + updateAMContainerDiagnostics(AMState.INACTIVATED, + diagnosticMessageBldr.toString()); + } + if (LOG.isDebugEnabled()) { LOG.debug("Skipping allocation because maxAMShare limit would " + "be exceeded"); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 15e2318..e664400 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -702,7 +702,7 @@ protected synchronized void addApplicationAttempt( } application.setCurrentAppAttempt(attempt); - boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, user); + boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, attempt); queue.addApp(attempt, runnable); if (runnable) { maxRunningEnforcer.trackRunnableApp(attempt); @@ -1686,7 +1686,7 @@ private void executeMove(SchedulerApplication app, boolean wasRunnable = oldQueue.removeApp(attempt); // if app was not runnable before, it may be runnable now boolean nowRunnable = maxRunningEnforcer.canAppBeRunnable(newQueue, - attempt.getUser()); + attempt); if (wasRunnable && !nowRunnable) { throw new IllegalStateException("Should have already verified that app " + attempt.getApplicationId() + " would be runnable in new queue"); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java index 8592fa6..4f77d76 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java @@ -30,6 +30,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ListMultimap; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState; /** * Handles tracking and enforcement for user and queue maxRunningApps @@ -54,25 +55,61 @@ public MaxRunningAppsEnforcer(FairScheduler scheduler) { /** * Checks whether making the application runnable would exceed any * maxRunningApps limits. + * + * @param queue the current queue + * @param attempt the app attempt being checked + * @return true if the application is runnable; false otherwise */ - public boolean canAppBeRunnable(FSQueue queue, String user) { + public boolean canAppBeRunnable(FSQueue queue, FSAppAttempt attempt) { + if (exceedUserMaxApps(attempt.getUser())) { + attempt.updateAMContainerDiagnostics(AMState.INACTIVATED, + "User runnable application number has reached its maximum limit."); + return false; + } else if (exceedQueueMaxRunningApps(queue)) { + attempt.updateAMContainerDiagnostics(AMState.INACTIVATED, + "Queue runnable application number has reached its maximum limit."); + return false; + } + + return true; + } + + /** + * Checks whether the number of user runnable apps exceeds the limitation. + * + * @param user the user name + * @return true if the number hits the limit; false otherwise + */ + public boolean exceedUserMaxApps(String user) { AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); Integer userNumRunnable = usersNumRunnableApps.get(user); if (userNumRunnable == null) { userNumRunnable = 0; } if (userNumRunnable >= allocConf.getUserMaxApps(user)) { - return false; + return true; } + + return false; + } + + /** + * Recursively checks whether the number of queue runnable apps exceeds the + * limitation. + * + * @param queue the current queue + * @return true if the number hits the limit; false otherwise + */ + public boolean exceedQueueMaxRunningApps(FSQueue queue) { // Check queue and all parent queues while (queue != null) { if (queue.getNumRunnableApps() >= queue.getMaxRunningApps()) { - return false; + return true; } queue = queue.getParent(); } - return true; + return false; } /** @@ -198,7 +235,7 @@ private void updateAppsRunnability(List> continue; } - if (canAppBeRunnable(next.getQueue(), next.getUser())) { + if (canAppBeRunnable(next.getQueue(), next)) { trackRunnableApp(next); FSAppAttempt appSched = next; next.getQueue().addApp(appSched, true); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java index 3f17081..f77df1e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java @@ -68,9 +68,9 @@ public void setup() throws Exception { private FSAppAttempt addApp(FSLeafQueue queue, String user) { ApplicationId appId = ApplicationId.newInstance(0l, appNum++); ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 0); - boolean runnable = maxAppsEnforcer.canAppBeRunnable(queue, user); FSAppAttempt app = new FSAppAttempt(scheduler, attId, user, queue, null, rmContext); + boolean runnable = maxAppsEnforcer.canAppBeRunnable(queue, app); queue.addApp(app, runnable); if (runnable) { maxAppsEnforcer.trackRunnableApp(app);