diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java index 36e6858..9db5985 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java @@ -43,7 +43,7 @@ private final QueueMetrics metrics; private int activeUsers = 0; - private Map> usersApplications = + private Map> usersApplications = new HashMap>(); public ActiveUsersManager(QueueMetrics metrics) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 631b418..f7abfcb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -68,7 +68,8 @@ private Queue queue; private ActiveUsersManager activeUsersManager; - private boolean pending = true; // whether accepted/allocated by scheduler + private boolean noResourceAllocated = true; // whether accepted/allocated by scheduler + private boolean isActivated = true; // if it is activated from scheduler's perspective. private ResourceUsage appResourceUsage; private AtomicBoolean userBlacklistChanged = new AtomicBoolean(false); private final Set amBlacklist = new HashSet<>(); @@ -113,12 +114,19 @@ public synchronized String getQueueName() { return queue.getQueueName(); } - public synchronized boolean isPending() { - return pending; + public synchronized boolean isNoResourceAllocated() { + return noResourceAllocated; + } + + public synchronized void activateApplication() { + isActivated = true; + if (hasPendingResourceRequests()) { + activeUsersManager.activateApplication(user, applicationId); + } } /** - * Clear any pending requests from this application. + * Clear any noResourceAllocated requests from this application. */ private synchronized void clearRequests() { priorities.clear(); @@ -257,7 +265,7 @@ public synchronized boolean removeIncreaseRequest(NodeId nodeId, Priority priori return false; } - // update queue's pending resource if request exists + // update queue's noResourceAllocated resource if request exists String partition = request.getRMContainer().getNodeLabelExpression(); Resource delta = request.getDeltaCapacity(); appResourceUsage.decPending(partition, delta); @@ -330,7 +338,7 @@ public synchronized boolean updateResourceRequests( // Activate application. Metrics activation is done here. // TODO: Shouldn't we activate even if numContainers = 0? - if (request.getNumContainers() > 0) { + if (request.getNumContainers() > 0 && isActivated) { activeUsersManager.activateApplication(user, applicationId); } @@ -532,7 +540,7 @@ public synchronized void increaseContainer( queue.getMetrics().allocateResources(user, increaseRequest.getDeltaCapacity()); - // remove the increase request from pending increase request map + // remove the increase request from noResourceAllocated increase request map removeIncreaseRequest(nodeId, priority, containerId); // update usage @@ -575,10 +583,10 @@ public synchronized void decreaseContainer( allocateOffSwitch(request, resourceRequests); } QueueMetrics metrics = queue.getMetrics(); - if (pending) { + if (noResourceAllocated) { // once an allocation is done we assume the application is // running from scheduler's POV. - pending = false; + noResourceAllocated = false; metrics.runAppAttempt(applicationId, user); } @@ -676,18 +684,21 @@ private synchronized void decrementOutstanding( queue.decPendingResource(offSwitchRequest.getNodeLabelExpression(), offSwitchRequest.getCapability()); } - - private synchronized void checkForDeactivation() { - boolean deactivate = true; + + private synchronized boolean hasPendingResourceRequests() { for (Priority priority : getPriorities()) { ResourceRequest request = getResourceRequest(priority, ResourceRequest.ANY); if (request != null) { if (request.getNumContainers() > 0) { - deactivate = false; - break; + return true; } } } + return false; + } + + private synchronized void checkForDeactivation() { + boolean deactivate = !hasPendingResourceRequests(); // also we need to check increase request if (!deactivate) { @@ -726,7 +737,7 @@ public synchronized void move(Queue newQueue) { } public synchronized void stop() { - // clear pending resources metrics for the application + // clear noResourceAllocated resources metrics for the application QueueMetrics metrics = queue.getMetrics(); for (Map asks : resourceRequestMap.values()) { ResourceRequest request = asks.get(ResourceRequest.ANY); @@ -741,7 +752,7 @@ public synchronized void stop() { request.getNumContainers())); } } - metrics.finishAppAttempt(applicationId, pending, user); + metrics.finishAppAttempt(applicationId, noResourceAllocated, user); // Clear requests themselves clearRequests(); @@ -770,10 +781,10 @@ public synchronized void transferStateFromPreviousAppSchedulingInfo( public synchronized void recoverContainer(RMContainer rmContainer) { QueueMetrics metrics = queue.getMetrics(); - if (pending) { + if (noResourceAllocated) { // If there was any container to recover, the application was // running from scheduler's POV. - pending = false; + noResourceAllocated = false; metrics.runAppAttempt(applicationId, user); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java index d9c7283..a56b88f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java @@ -290,7 +290,7 @@ public void finishApp(String user, RMAppState rmAppFinalState) { } public void moveAppFrom(AppSchedulingInfo app) { - if (app.isPending()) { + if (app.isNoResourceAllocated()) { appsPending.decr(); } else { appsRunning.decr(); @@ -305,7 +305,7 @@ public void moveAppFrom(AppSchedulingInfo app) { } public void moveAppTo(AppSchedulingInfo app) { - if (app.isPending()) { + if (app.isNoResourceAllocated()) { appsPending.incr(); } else { appsRunning.incr(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index ca05fe9..d957778 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -203,7 +203,7 @@ public AppSchedulingInfo getAppSchedulingInfo() { * @return true if it is else false. */ public boolean isPending() { - return appSchedulingInfo.isPending(); + return appSchedulingInfo.isNoResourceAllocated(); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 9c6d8ee..e477119 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -719,6 +719,7 @@ private synchronized void activateApplications( user.activateApplication(); orderingPolicy.addSchedulableEntity(application); application.updateAMContainerDiagnostics(AMState.ACTIVATED, null); + application.getAppSchedulingInfo().activateApplication(); queueUsage.incAMUsed(partitionName, application.getAMResource(partitionName)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java index c772ae1..3441fc3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java @@ -96,7 +96,7 @@ public void setUp() { 0, 0, 0); metrics.finishAppAttempt( - app.getApplicationId(), app.isPending(), app.getUser()); + app.getApplicationId(), app.isNoResourceAllocated(), app.getUser()); checkApps(queueSource, 1, 0, 0, 0, 0, 0, true); metrics.finishApp(user, RMAppState.FINISHED); checkApps(queueSource, 1, 0, 0, 1, 0, 0, true); @@ -123,7 +123,7 @@ public void testQueueAppMetricsForMultipleFailures() { checkApps(queueSource, 1, 0, 1, 0, 0, 0, true); metrics.finishAppAttempt( - app.getApplicationId(), app.isPending(), app.getUser()); + app.getApplicationId(), app.isNoResourceAllocated(), app.getUser()); checkApps(queueSource, 1, 0, 0, 0, 0, 0, true); // As the application has failed, framework retries the same application @@ -136,7 +136,7 @@ public void testQueueAppMetricsForMultipleFailures() { // Suppose say application has failed this time as well. metrics.finishAppAttempt( - app.getApplicationId(), app.isPending(), app.getUser()); + app.getApplicationId(), app.isNoResourceAllocated(), app.getUser()); checkApps(queueSource, 1, 0, 0, 0, 0, 0, true); // As the application has failed, framework retries the same application @@ -149,7 +149,7 @@ public void testQueueAppMetricsForMultipleFailures() { // Suppose say application has failed, and there's no more retries. metrics.finishAppAttempt( - app.getApplicationId(), app.isPending(), app.getUser()); + app.getApplicationId(), app.isNoResourceAllocated(), app.getUser()); checkApps(queueSource, 1, 0, 0, 0, 0, 0, true); metrics.finishApp(user, RMAppState.FAILED); @@ -198,7 +198,7 @@ public void testQueueAppMetricsForMultipleFailures() { checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0); metrics.finishAppAttempt( - app.getApplicationId(), app.isPending(), app.getUser()); + app.getApplicationId(), app.isNoResourceAllocated(), app.getUser()); checkApps(queueSource, 1, 0, 0, 0, 0, 0, true); checkApps(userSource, 1, 0, 0, 0, 0, 0, true); metrics.finishApp(user, RMAppState.FINISHED); @@ -314,7 +314,7 @@ public void testQueueAppMetricsForMultipleFailures() { checkResources(parentUserSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0); metrics.finishAppAttempt( - app.getApplicationId(), app.isPending(), app.getUser()); + app.getApplicationId(), app.isNoResourceAllocated(), app.getUser()); checkApps(queueSource, 1, 0, 0, 0, 0, 0, true); checkApps(parentQueueSource, 1, 0, 0, 0, 0, 0, true); checkApps(userSource, 1, 0, 0, 0, 0, 0, true);