diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractUsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractUsersManager.java index 4db3584..da4f054 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractUsersManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractUsersManager.java @@ -45,10 +45,22 @@ void deactivateApplication(String user, ApplicationId applicationId); /** - * Get number of active users i.e. users with applications which have pending - * resource requests. + * Get number of active users i.e. users with atleast 1 active applications * * @return number of active users */ int getNumActiveUsers(); + + /** + * Increment number of active users i.e. users with only pending apps + * + */ + void incrNumActiveUsersOfPendingApps(); + + /** + * Get number of active users i.e. users with only pending apps + * + * @return number of active users + */ + int getNumActiveUsersOfPendingApps(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java index 049f324..8ac1238 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java @@ -43,6 +43,7 @@ private final QueueMetrics metrics; private int activeUsers = 0; + private int activeUsersOfPendingApps = 0; private Map> usersApplications = new HashMap>(); @@ -104,8 +105,7 @@ synchronized public void deactivateApplication( } /** - * Get number of active users i.e. users with applications which have pending - * resource requests. + * Get number of active users i.e. users with atleast 1 active applications. * @return number of active users */ @Lock({Queue.class, SchedulerApplicationAttempt.class}) @@ -113,4 +113,16 @@ synchronized public void deactivateApplication( synchronized public int getNumActiveUsers() { return activeUsers; } + + @Lock({Queue.class, SchedulerApplicationAttempt.class}) + @Override + public void incrNumActiveUsersOfPendingApps() { + ++activeUsersOfPendingApps; + } + + @Lock({Queue.class, SchedulerApplicationAttempt.class}) + @Override + public int getNumActiveUsersOfPendingApps() { + return activeUsersOfPendingApps; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 1efdd8b..e3f8078 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PendingAskUpdateResult; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SingleConstraintAppPlacementAllocator; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; /** * This class keeps track of all the consumption of an application. This also @@ -95,11 +96,13 @@ public final ContainerUpdateContext updateContext; public final Map applicationSchedulingEnvs = new HashMap<>(); private final RMContext rmContext; + private SchedulerApplicationAttempt schedulerApplicationAttempt; public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user, Queue queue, AbstractUsersManager abstractUsersManager, long epoch, ResourceUsage appResourceUsage, - Map applicationSchedulingEnvs, RMContext rmContext) { + Map applicationSchedulingEnvs, RMContext rmContext, + SchedulerApplicationAttempt schedulerApplicationAttempt) { this.applicationAttemptId = appAttemptId; this.applicationId = appAttemptId.getApplicationId(); this.queue = queue; @@ -115,6 +118,7 @@ public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user, updateContext = new ContainerUpdateContext(this); readLock = lock.readLock(); writeLock = lock.writeLock(); + this.schedulerApplicationAttempt = schedulerApplicationAttempt; } public ApplicationId getApplicationId() { @@ -371,7 +375,14 @@ private void updatePendingResources(PendingAskUpdateResult updateResult, // Activate application. Metrics activation is done here. if (lastRequestContainers <= 0) { schedulerKeys.add(schedulerKey); - abstractUsersManager.activateApplication(user, applicationId); + ResourceCalculator rc = + this.rmContext.getScheduler().getResourceCalculator(); + Resource cr = this.rmContext.getScheduler().getClusterResource(); + if(! this.schedulerApplicationAttempt.isWaitingForAMContainer() + || Resources.lessThan(rc, cr, + metrics.getUsedAMResources(), metrics.getMaxAMResources())) { + abstractUsersManager.activateApplication(user, applicationId); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java index 20a5a1f..31013f1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java @@ -732,4 +732,12 @@ public long getAggegatedReleasedContainers() { public long getAggregatePreemptedContainers() { return aggregateContainersPreempted.value(); } + + public Resource getMaxAMResources() { + return Resource.newInstance(0,0); + } + + public Resource getUsedAMResources() { + return Resource.newInstance(0,0); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 857e736..a336e1e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -232,7 +232,7 @@ public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId, this.appSchedulingInfo = new AppSchedulingInfo(applicationAttemptId, user, queue, abstractUsersManager, rmContext.getEpoch(), attemptResourceUsage, - applicationSchedulingEnvs, rmContext); + applicationSchedulingEnvs, rmContext, this); ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); readLock = lock.readLock(); writeLock = lock.writeLock(); @@ -1470,4 +1470,4 @@ public String getDiagnosticMessage() { public Map getApplicationSchedulingEnvs() { return this.applicationSchedulingEnvs; } -} \ No newline at end of file +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java index 87fc234..7b79a54 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java @@ -165,4 +165,15 @@ public synchronized QueueMetrics getUserMetrics(String userName) { return metrics; } + @Override + public Resource getMaxAMResources() { + return Resource.newInstance( + getAMResourceLimitMB(), (int) getAMResourceLimitVCores()); + } + + @Override + public Resource getUsedAMResources() { + return Resource.newInstance( + getUsedAMResourceMB(), (int) getUsedAMResourceVCores()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 776e512..a1990f5a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -957,6 +957,7 @@ private void addApplication(ApplicationId applicationId, String queueName, SchedulerApplication application = new SchedulerApplication(queue, user, priority); applications.put(applicationId, application); + queue.getAbstractUsersManager().incrNumActiveUsersOfPendingApps(); LOG.info("Accepted application " + applicationId + " from user: " + user + ", in queue: " + queueName); rmContext.getDispatcher().getEventHandler().handle( @@ -1128,12 +1129,12 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, List release, List blacklistAdditions, List blacklistRemovals, ContainerUpdates updateRequests) { FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId); + if (application == null) { LOG.error("Calling allocate on removed or non existent application " + applicationAttemptId.getApplicationId()); return EMPTY_ALLOCATION; } - // The allocate may be the leftover from previous attempt, and it will // impact current attempt, such as confuse the request and allocation for // current attempt's AM container. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 1ae8f91..45de530 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -702,7 +702,8 @@ public Resource getUserAMResourceLimitPerPartition( * modified by the userlimit and the userlimit factor as is the userlimit */ float effectiveUserLimit = Math.max(usersManager.getUserLimit() / 100.0f, - 1.0f / Math.max(getAbstractUsersManager().getNumActiveUsers(), 1)); + 1.0f / Math.max((getAbstractUsersManager().getNumActiveUsers() + + getAbstractUsersManager().getNumActiveUsersOfPendingApps()), 1)); float preWeightedUserLimit = effectiveUserLimit; effectiveUserLimit = Math.min(effectiveUserLimit * userWeight, 1.0f); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java index 7287c5b..db5166d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java @@ -85,6 +85,7 @@ private final QueueMetrics metrics; private AtomicInteger activeUsers = new AtomicInteger(0); + private AtomicInteger activeUsersOfPendingApps = new AtomicInteger(0); private Map> usersApplications = new HashMap>(); @@ -837,13 +838,13 @@ private void incQueueUsageRatio(String nodePartition, float delta) { public void activateApplication(String user, ApplicationId applicationId) { try { this.writeLock.lock(); - Set userApps = usersApplications.get(user); if (userApps == null) { userApps = new HashSet(); usersApplications.put(user, userApps); activeUsers.incrementAndGet(); metrics.incrActiveUsers(); + activeUsersOfPendingApps.decrementAndGet(); // A user is added to active list. Invalidate user-limit cache. userLimitNeedsRecompute(); @@ -895,6 +896,16 @@ public int getNumActiveUsers() { return activeUsers.get(); } + @Override + public void incrNumActiveUsersOfPendingApps() { + activeUsersOfPendingApps.incrementAndGet(); + } + + @Override + public int getNumActiveUsersOfPendingApps() { + return activeUsersOfPendingApps.get(); + } + float sumActiveUsersTimesWeights() { float count = 0.0f; try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java index 4fe3973..e278435 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java @@ -222,4 +222,15 @@ static FSQueueMetrics forQueue(MetricsSystem ms, String queueName, return (FSQueueMetrics)metrics; } + + @Override + public Resource getMaxAMResources() { + return Resource.newInstance(getMaxAMShareMB(), getMaxAMShareVCores()); + } + + @Override + public Resource getUsedAMResources() { + return Resource.newInstance( + getAMResourceUsageMB(), getAMResourceUsageVCores()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java index b7b0eb7..5ab3131 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java @@ -44,9 +44,10 @@ public void testBacklistChanged() { FSLeafQueue queue = mock(FSLeafQueue.class); doReturn("test").when(queue).getQueueName(); + SchedulerApplicationAttempt saa = mock(SchedulerApplicationAttempt.class); AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo(appAttemptId, "test", queue, null, 0, new ResourceUsage(), - new HashMap(), null); + new HashMap(), null, saa); appSchedulingInfo.updatePlacesBlacklistedByApp(new ArrayList(), new ArrayList()); @@ -116,9 +117,10 @@ public void testSchedulerKeyAccounting() { Queue queue = mock(Queue.class); doReturn(mock(QueueMetrics.class)).when(queue).getMetrics(); + SchedulerApplicationAttempt saa = mock(SchedulerApplicationAttempt.class); AppSchedulingInfo info = new AppSchedulingInfo( appAttemptId, "test", queue, mock(ActiveUsersManager.class), 0, - new ResourceUsage(), new HashMap<>(), null); + new ResourceUsage(), new HashMap<>(), null, saa); Assert.assertEquals(0, info.getSchedulerKeys().size()); Priority pri1 = Priority.newInstance(1);