diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index e8de096c6a4..07316649487 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -89,7 +89,11 @@ @Private public static final String MAXIMUM_SYSTEM_APPLICATIONS = PREFIX + MAXIMUM_APPLICATIONS_SUFFIX; - + + @Private + public static final String MAXIMUM_ACTIVE_APPLICATIONS_SUFFIX = + "maximum-active-applications"; + @Private public static final String MAXIMUM_AM_RESOURCE_SUFFIX = "maximum-am-resource-percent"; @@ -416,6 +420,16 @@ public int getMaximumApplicationsPerQueue(String queue) { return maxApplicationsPerQueue; } + /** + * Get the maximum concurrently activated applications per queue setting. + * @param queue name of the queue + * @return setting specified or -1 if not set + */ + public int getMaximumActiveAppsPerQueue(String queue) { + return getInt(getQueuePrefix(queue) + + MAXIMUM_ACTIVE_APPLICATIONS_SUFFIX, (int) UNDEFINED); + } + /** * Get the maximum am resource percent per queue setting. * @param queue name of the queue diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 366bad0a4f2..e651769b23f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -90,6 +90,8 @@ protected int maxApplications; protected volatile int maxApplicationsPerUser; + protected volatile int maxActiveApplications; + protected volatile int maxActiveApplicationsPerUser; private float maxAMResourcePerQueuePercent; @@ -211,6 +213,15 @@ protected void setupQueueConfigs(Resource clusterResource, (int) (maxApplications * (usersManager.getUserLimit() / 100.0f) * usersManager.getUserLimitFactor())); + maxActiveApplications = + conf.getMaximumActiveAppsPerQueue(getQueuePath()); + if (maxActiveApplications < 0) { + maxActiveApplications = maxApplications; + } + maxActiveApplicationsPerUser = Math.min(maxActiveApplications, + (int) (maxActiveApplications * (usersManager.getUserLimit() + / 100.0f) * usersManager.getUserLimitFactor())); + maxAMResourcePerQueuePercent = conf.getMaximumApplicationMasterResourcePerQueuePercent( getQueuePath()); @@ -309,7 +320,14 @@ protected void setupQueueConfigs(Resource clusterResource, + " (int)(configuredMaximumSystemApplications * absoluteCapacity)]" + "\n" + "maxApplicationsPerUser = " + maxApplicationsPerUser + " [= (int)(maxApplications * (userLimit / 100.0f) * " - + "userLimitFactor) ]" + "\n" + "usedCapacity = " + + "userLimitFactor) ]" + "\n" + "maxActiveApplications = " + + maxActiveApplications + + " [= configuredMaximumActivatedApplicationsPerQueue or" + + " maxApplications]" + + "\n" + "maxActiveApplicationsPerUser = " + + maxActiveApplicationsPerUser + + " [= (int)(maxActiveApplications * (userLimit / 100.0f)" + + " * userLimitFactor) ]" + "\n" + "usedCapacity = " + queueCapacities.getUsedCapacity() + " [= usedResourcesMemory / " + "(clusterResourceMemory * absoluteCapacity)]" + "\n" + "absoluteUsedCapacity = " + absoluteUsedCapacity @@ -364,6 +382,14 @@ public int getMaxApplicationsPerUser() { return maxApplicationsPerUser; } + public int getMaxActiveApplications() { + return maxActiveApplications; + } + + public int getMaxActiveApplicationsPerUser() { + return maxActiveApplicationsPerUser; + } + /** * * @return UsersManager instance. @@ -814,99 +840,124 @@ protected void activateApplications() { fsApp.hasNext(); ) { FiCaSchedulerApp application = fsApp.next(); ApplicationId applicationId = application.getApplicationId(); + User user = getUser(application.getUser()); - // Get the am-node-partition associated with each application - // and calculate max-am resource limit for this partition. - String partitionName = application.getAppAMNodePartitionName(); - - Resource amLimit = getAMResourceLimitPerPartition(partitionName); - // Verify whether we already calculated am-limit for this label. - if (amLimit == null) { - amLimit = calculateAndGetAMResourceLimitPerPartition(partitionName); + // Check activated apps limits for queues + if (getNumActiveApplications() >= getMaxActiveApplications()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Queue " + getQueuePath() + " already has " + + getNumActiveApplications() + " active applications," + + " cannot activate application: " + applicationId); + } + continue; } - // Check am resource limit. - Resource amIfStarted = Resources.add( - application.getAMResource(partitionName), - queueUsage.getAMUsed(partitionName)); - if (LOG.isDebugEnabled()) { - LOG.debug("application " + application.getId() + " AMResource " - + application.getAMResource(partitionName) - + " maxAMResourcePerQueuePercent " + maxAMResourcePerQueuePercent - + " amLimit " + amLimit + " lastClusterResource " - + lastClusterResource + " amIfStarted " + amIfStarted - + " AM node-partition name " + partitionName); + // Check activated apps limits for the user on this queue + if (user.getActiveApplications() >= getMaxActiveApplicationsPerUser()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Queue " + getQueuePath() + " already has " + user + .getActiveApplications() + " active applications from user " + + user.getUserName() + ", cannot activate application: " + + applicationId); + } + continue; } - if (!Resources.lessThanOrEqual(resourceCalculator, lastClusterResource, - amIfStarted, amLimit)) { - if (getNumActiveApplications() < 1 || (Resources.lessThanOrEqual( - resourceCalculator, lastClusterResource, - queueUsage.getAMUsed(partitionName), Resources.none()))) { - LOG.warn("maximum-am-resource-percent is insufficient to start a" - + " single application in queue, it is likely set too low." - + " skipping enforcement to allow at least one application" - + " to start"); - } else{ - application.updateAMContainerDiagnostics(AMState.INACTIVATED, - CSAMContainerLaunchDiagnosticsConstants.QUEUE_AM_RESOURCE_LIMIT_EXCEED); - if (LOG.isDebugEnabled()) { - LOG.debug("Not activating application " + applicationId - + " as amIfStarted: " + amIfStarted + " exceeds amLimit: " - + amLimit); + // Should skip amLimit/userAMLimit check for unmanaged AM + if (!application.getUnmanagedAM()) { + // Get the am-node-partition associated with each application + // and calculate max-am resource limit for this partition. + String partitionName = application.getAppAMNodePartitionName(); + + Resource amLimit = getAMResourceLimitPerPartition(partitionName); + // Verify whether we already calculated am-limit for this label. + if (amLimit == null) { + amLimit = calculateAndGetAMResourceLimitPerPartition(partitionName); + } + // Check am resource limit. + Resource amIfStarted = Resources.add( + application.getAMResource(partitionName), + queueUsage.getAMUsed(partitionName)); + + if (LOG.isDebugEnabled()) { + LOG.debug("application " + application.getId() + " AMResource " + + application.getAMResource(partitionName) + + " maxAMResourcePerQueuePercent " + + maxAMResourcePerQueuePercent + + " amLimit " + amLimit + " lastClusterResource " + + lastClusterResource + " amIfStarted " + amIfStarted + + " AM node-partition name " + partitionName); + } + + if (!Resources.lessThanOrEqual(resourceCalculator, + lastClusterResource, amIfStarted, amLimit)) { + if (getNumActiveApplications() < 1 || (Resources.lessThanOrEqual( + resourceCalculator, lastClusterResource, + queueUsage.getAMUsed(partitionName), Resources.none()))) { + LOG.warn("maximum-am-resource-percent is insufficient to start a" + + " single application in queue, it is likely set too low." + + " skipping enforcement to allow at least one application" + + " to start"); + } else{ + application.updateAMContainerDiagnostics(AMState.INACTIVATED, + CSAMContainerLaunchDiagnosticsConstants.QUEUE_AM_RESOURCE_LIMIT_EXCEED); + if (LOG.isDebugEnabled()) { + LOG.debug("Not activating application " + applicationId + + " as amIfStarted: " + amIfStarted + " exceeds amLimit: " + + amLimit); + } + continue; } - continue; } - } - // Check user am resource limit - User user = getUser(application.getUser()); - Resource userAMLimit = userAmPartitionLimit.get(partitionName); + // Check user am resource limit + Resource userAMLimit = userAmPartitionLimit.get(partitionName); - // Verify whether we already calculated user-am-limit for this label. - if (userAMLimit == null) { - userAMLimit = getUserAMResourceLimitPerPartition(partitionName, - application.getUser()); - userAmPartitionLimit.put(partitionName, userAMLimit); - } + // Verify whether we already calculated user-am-limit for this label. + if (userAMLimit == null) { + userAMLimit = getUserAMResourceLimitPerPartition(partitionName, + application.getUser()); + userAmPartitionLimit.put(partitionName, userAMLimit); + } - Resource userAmIfStarted = Resources.add( - application.getAMResource(partitionName), - user.getConsumedAMResources(partitionName)); - - if (!Resources.lessThanOrEqual(resourceCalculator, lastClusterResource, - userAmIfStarted, userAMLimit)) { - if (getNumActiveApplications() < 1 || (Resources.lessThanOrEqual( - resourceCalculator, lastClusterResource, - queueUsage.getAMUsed(partitionName), Resources.none()))) { - LOG.warn("maximum-am-resource-percent is insufficient to start a" - + " single application in queue for user, it is likely set too" - + " low. skipping enforcement to allow at least one application" - + " to start"); - } else{ - application.updateAMContainerDiagnostics(AMState.INACTIVATED, - CSAMContainerLaunchDiagnosticsConstants.USER_AM_RESOURCE_LIMIT_EXCEED); - if (LOG.isDebugEnabled()) { - LOG.debug("Not activating application " + applicationId - + " for user: " + user + " as userAmIfStarted: " - + userAmIfStarted + " exceeds userAmLimit: " + userAMLimit); + Resource userAmIfStarted = Resources.add( + application.getAMResource(partitionName), + user.getConsumedAMResources(partitionName)); + + if (!Resources.lessThanOrEqual(resourceCalculator, lastClusterResource, + userAmIfStarted, userAMLimit)) { + if (getNumActiveApplications() < 1 || (Resources.lessThanOrEqual( + resourceCalculator, lastClusterResource, + queueUsage.getAMUsed(partitionName), Resources.none()))) { + LOG.warn("maximum-am-resource-percent is insufficient to start a" + + " single application in queue for user, it is likely set" + + " too low. skipping enforcement to allow at least one" + + " application to start"); + } else{ + application.updateAMContainerDiagnostics(AMState.INACTIVATED, + CSAMContainerLaunchDiagnosticsConstants.USER_AM_RESOURCE_LIMIT_EXCEED); + if (LOG.isDebugEnabled()) { + LOG.debug("Not activating application " + applicationId + + " for user: " + user + " as userAmIfStarted: " + + userAmIfStarted + " exceeds userAmLimit: " + userAMLimit); + } + continue; } - continue; } + queueUsage.incAMUsed(partitionName, + application.getAMResource(partitionName)); + user.getResourceUsage().incAMUsed(partitionName, + application.getAMResource(partitionName)); + user.getResourceUsage().setAMLimit(partitionName, userAMLimit); + metrics.incAMUsed(partitionName, application.getUser(), + application.getAMResource(partitionName)); + metrics.setAMResouceLimitForUser(partitionName, + application.getUser(), userAMLimit); } user.activateApplication(); orderingPolicy.addSchedulableEntity(application); application.updateAMContainerDiagnostics(AMState.ACTIVATED, null); - queueUsage.incAMUsed(partitionName, - application.getAMResource(partitionName)); - user.getResourceUsage().incAMUsed(partitionName, - application.getAMResource(partitionName)); - user.getResourceUsage().setAMLimit(partitionName, userAMLimit); - metrics.incAMUsed(partitionName, application.getUser(), - application.getAMResource(partitionName)); - metrics.setAMResouceLimitForUser(partitionName, - application.getUser(), userAMLimit); fsApp.remove(); LOG.info("Application " + applicationId + " from user: " + application .getUser() + " activated in queue: " + getQueueName()); @@ -982,7 +1033,7 @@ private void removeApplicationAttempt( boolean wasActive = orderingPolicy.removeSchedulableEntity(application); if (!wasActive) { pendingOrderingPolicy.removeSchedulableEntity(application); - } else{ + } else if (!application.getUnmanagedAM()) { queueUsage.decAMUsed(partitionName, application.getAMResource(partitionName)); user.getResourceUsage().decAMUsed(partitionName, @@ -2094,6 +2145,15 @@ public void setMaxApplications(int maxApplications) { this.maxApplications = maxApplications; } + public void setMaxActiveApplications(int maxActiveApplications) { + this.maxActiveApplications = maxActiveApplications; + } + + public void setMaxActiveApplicationsPerUser( + int maxActiveApplicationsPerUser) { + this.maxActiveApplicationsPerUser = maxActiveApplicationsPerUser; + } + public void setMaxAMResourcePerQueuePercent( float maxAMResourcePerQueuePercent) { this.maxAMResourcePerQueuePercent = maxAMResourcePerQueuePercent; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java index 34f4aa151ab..0ffd41336d3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java @@ -81,6 +81,8 @@ private void updateQuotas(int userLimit, float userLimitFactor, setUserLimitFactor(userLimitFactor); setMaxApplications(maxAppsForReservation); maxApplicationsPerUser = maxAppsPerUserForReservation; + setMaxActiveApplications(maxAppsForReservation); + setMaxActiveApplicationsPerUser(maxAppsPerUserForReservation); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java index ed2f64e0945..71895848cfe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java @@ -196,6 +196,8 @@ private void renderCommonLeafQueueInfo(ResponseInfo ri) { __("Num Containers:", Integer.toString(lqinfo.getNumContainers())). __("Max Applications:", Integer.toString(lqinfo.getMaxApplications())). __("Max Applications Per User:", Integer.toString(lqinfo.getMaxApplicationsPerUser())). + __("Max Active Applications:", Integer.toString(lqinfo.getMaxActiveApplications())). + __("Max Active Applications Per User:", Integer.toString(lqinfo.getMaxActiveApplicationsPerUser())). __("Configured Minimum User Limit Percent:", Integer.toString(lqinfo.getUserLimit()) + "%"). __("Configured User Limit Factor:", lqinfo.getUserLimitFactor()). __("Accessible Node Labels:", StringUtils.join(",", lqinfo.getNodeLabels())). diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java index a53e9212eeb..60a2c9cba83 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java @@ -42,6 +42,8 @@ protected int numContainers; protected int maxApplications; protected int maxApplicationsPerUser; + protected int maxActiveApplications; + protected int maxActiveApplicationsPerUser; protected int userLimit; protected UsersInfo users; // To add another level in the XML protected float userLimitFactor; @@ -67,6 +69,8 @@ numContainers = q.getNumContainers(); maxApplications = q.getMaxApplications(); maxApplicationsPerUser = q.getMaxApplicationsPerUser(); + maxActiveApplications = q.getMaxActiveApplications(); + maxActiveApplicationsPerUser = q.getMaxActiveApplicationsPerUser(); userLimit = q.getUserLimit(); users = new UsersInfo(q.getUsersManager().getUsersInfo()); userLimitFactor = q.getUserLimitFactor(); @@ -172,4 +176,12 @@ public int getDefaultApplicationPriority() { public boolean isAutoCreatedLeafQueue() { return isAutoCreatedLeafQueue; } + + public int getMaxActiveApplications() { + return maxActiveApplications; + } + + public int getMaxActiveApplicationsPerUser() { + return maxActiveApplicationsPerUser; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index 24ae244a969..c9699bbc717 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -576,6 +576,96 @@ public void testActiveLimitsWithKilledApps() throws Exception { assertFalse(queue.getApplications().contains(app_3)); } + + @Test + public void testActiveLimitsWithMaxActiveLimits() throws Exception { + final String user_0 = "user_0"; + int APPLICATION_ID = 0; + + // set max-active-applications/max-active-applications-per-user to be 2 + doReturn(2).when(queue).getMaxActiveApplications(); + doReturn(2).when(queue).getMaxActiveApplicationsPerUser(); + + // Submit first application + FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0, + Resources.createResource(1 * GB, 0)); + queue.submitApplicationAttempt(app_0, user_0); + assertEquals(1, queue.getNumActiveApplications()); + assertEquals(0, queue.getNumPendingApplications()); + assertEquals(1, queue.getNumActiveApplications(user_0)); + assertEquals(0, queue.getNumPendingApplications(user_0)); + assertTrue(queue.getApplications().contains(app_0)); + + // Submit second application + FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0, + Resources.createResource(1 * GB, 0)); + queue.submitApplicationAttempt(app_1, user_0); + assertEquals(2, queue.getNumActiveApplications()); + assertEquals(0, queue.getNumPendingApplications()); + assertEquals(2, queue.getNumActiveApplications(user_0)); + assertEquals(0, queue.getNumPendingApplications(user_0)); + assertTrue(queue.getApplications().contains(app_1)); + + // Submit third application, should remain pending + FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0, + Resources.createResource(1 * GB, 0)); + queue.submitApplicationAttempt(app_2, user_0); + assertEquals(2, queue.getNumActiveApplications()); + assertEquals(1, queue.getNumPendingApplications()); + assertEquals(2, queue.getNumActiveApplications(user_0)); + assertEquals(1, queue.getNumPendingApplications(user_0)); + assertTrue(queue.getPendingApplications().contains(app_2)); + + // Submit fourth application, should remain pending + FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0, + Resources.createResource(1 * GB, 0)); + queue.submitApplicationAttempt(app_3, user_0); + assertEquals(2, queue.getNumActiveApplications()); + assertEquals(2, queue.getNumPendingApplications()); + assertEquals(2, queue.getNumActiveApplications(user_0)); + assertEquals(2, queue.getNumPendingApplications(user_0)); + assertTrue(queue.getPendingApplications().contains(app_3)); + + // Kill 3rd pending application + queue.finishApplicationAttempt(app_2, A); + assertEquals(2, queue.getNumActiveApplications()); + assertEquals(1, queue.getNumPendingApplications()); + assertEquals(2, queue.getNumActiveApplications(user_0)); + assertEquals(1, queue.getNumPendingApplications(user_0)); + assertFalse(queue.getPendingApplications().contains(app_2)); + assertFalse(queue.getApplications().contains(app_2)); + + // Finish 1st application, app_3 should become active + queue.finishApplicationAttempt(app_0, A); + assertEquals(2, queue.getNumActiveApplications()); + assertEquals(0, queue.getNumPendingApplications()); + assertEquals(2, queue.getNumActiveApplications(user_0)); + assertEquals(0, queue.getNumPendingApplications(user_0)); + assertTrue(queue.getApplications().contains(app_3)); + assertFalse(queue.getPendingApplications().contains(app_3)); + assertFalse(queue.getApplications().contains(app_0)); + + // Finish 2nd application + queue.finishApplicationAttempt(app_1, A); + assertEquals(1, queue.getNumActiveApplications()); + assertEquals(0, queue.getNumPendingApplications()); + assertEquals(1, queue.getNumActiveApplications(user_0)); + assertEquals(0, queue.getNumPendingApplications(user_0)); + assertFalse(queue.getApplications().contains(app_1)); + + // Finish 4th application + queue.finishApplicationAttempt(app_3, A); + assertEquals(0, queue.getNumActiveApplications()); + assertEquals(0, queue.getNumPendingApplications()); + assertEquals(0, queue.getNumActiveApplications(user_0)); + assertEquals(0, queue.getNumPendingApplications(user_0)); + assertFalse(queue.getApplications().contains(app_3)); + + // recover the temporary modification for this test + doReturn(queue.getMaxActiveApplications()).when(queue).getMaxActiveApplications(); + doReturn(queue.getMaxActiveApplicationsPerUser()).when(queue).getMaxActiveApplicationsPerUser(); + } + @Test public void testHeadroom() throws Exception { CapacitySchedulerConfiguration csConf = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 04bb7912d1c..9a60e5881e1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -56,6 +56,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -67,6 +68,7 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -75,6 +77,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; @@ -158,6 +162,14 @@ private void setUpInternal(ResourceCalculator rC) throws Exception { spy(new ConcurrentHashMap()); RMApp rmApp = mock(RMApp.class); when(rmApp.getRMAppAttempt((ApplicationAttemptId)Matchers.any())).thenReturn(null); + ApplicationSubmissionContext submissionContext = + new ApplicationSubmissionContextPBImpl(); + submissionContext.setUnmanagedAM(false); + when(rmApp.getApplicationSubmissionContext()).thenReturn(submissionContext); + RMAppAttempt rmAppAttempt = mock(RMAppAttempt.class); + when(rmAppAttempt.getRMAppAttemptMetrics()) + .thenReturn(new RMAppAttemptMetrics(null, spyRMContext)); + when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt); amResourceRequest = mock(ResourceRequest.class); when(amResourceRequest.getCapability()).thenReturn( Resources.createResource(0, 0)); @@ -4092,6 +4104,63 @@ public void testSetupQueueConfigsWithSpecifiedConfiguration() } } + /** + * Test unmanaged AM should not be affected by AMLimit or userAMLimit: + * Create two unmanaged apps in partition a with non resource, they will be + * activated immediately. + * @throws IOException + */ + @Test + public void testUnmanagedAMShouldNotBeAffectedByAMlimit() throws IOException { + final String leafQueueName = "leaf-queue"; + final String user_0 = "user_0"; + final String partitionName = ""; + // set unmanageAM to be false + RMApp rmApp = spyRMContext.getRMApps().get(null); + rmApp.getApplicationSubmissionContext().setUnmanagedAM(true); + + csConf.setInt(CapacitySchedulerConfiguration.PREFIX + + CapacitySchedulerConfiguration.ROOT + + CapacitySchedulerConfiguration.DOT + leafQueueName + + CapacitySchedulerConfiguration.DOT + + CapacitySchedulerConfiguration.MAXIMUM_APPLICATIONS_SUFFIX, 10); + LeafQueue leafQueue = + new LeafQueue(csContext, csConf, leafQueueName, cs.getRootQueue(), null); + Resource amLimit = leafQueue.getAMResourceLimitPerPartition(partitionName); + Assert.assertEquals(0, amLimit.getMemorySize()); + Assert.assertEquals(0, amLimit.getVirtualCores()); + + // Submit 2 applications for user_0 in partition a + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, leafQueue, + mock(ActiveUsersManager.class), spyRMContext); + app_0.setAppAMNodePartitionName(partitionName); + app_0.setAMResource(partitionName, Resource.newInstance(1024, 1)); + leafQueue.submitApplicationAttempt(app_0, user_0); + Assert.assertTrue(app_0.getUnmanagedAM()); + Assert.assertEquals(1, leafQueue.getNumActiveApplications()); + Assert.assertEquals(0, + leafQueue.getQueueResourceUsage().getAMUsed().getMemorySize()); + + final ApplicationAttemptId appAttemptId_1 = + TestUtils.getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_0, leafQueue, + mock(ActiveUsersManager.class), spyRMContext); + app_1.setAppAMNodePartitionName(partitionName); + app_1.setAMResource(partitionName, Resource.newInstance(1024, 1)); + leafQueue.submitApplicationAttempt(app_1, user_0); + Assert.assertTrue(app_1.getUnmanagedAM()); + Assert.assertEquals(2, leafQueue.getNumActiveApplications()); + Assert.assertEquals(0, + leafQueue.getQueueResourceUsage().getAMUsed().getMemorySize()); + + // reset unmanageAM to be false + rmApp.getApplicationSubmissionContext().setUnmanagedAM(false); + } + private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) { ApplicationId appIdImpl = ApplicationId.newInstance(0, appId); ApplicationAttemptId attId = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java index 46d0a6614fa..b920a77aa3c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java @@ -365,7 +365,7 @@ private void verifySubQueue(JSONObject info, String q, int numExpectedElements = 20; boolean isParentQueue = true; if (!info.has("queues")) { - numExpectedElements = 35; + numExpectedElements = 37; isParentQueue = false; } assertEquals("incorrect number of elements", numExpectedElements, info.length());