diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java index 3bc2e9b..03fc40e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java @@ -117,7 +117,8 @@ public RMActiveServiceContext(Dispatcher rmDispatcher, RMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInRM nmTokenSecretManager, ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager, - RMApplicationHistoryWriter rmApplicationHistoryWriter) { + RMApplicationHistoryWriter rmApplicationHistoryWriter, + ResourceScheduler scheduler) { this(); this.setContainerAllocationExpirer(containerAllocationExpirer); this.setAMLivelinessMonitor(amLivelinessMonitor); @@ -128,6 +129,7 @@ public RMActiveServiceContext(Dispatcher rmDispatcher, this.setNMTokenSecretManager(nmTokenSecretManager); this.setClientToAMTokenSecretManager(clientToAMTokenSecretManager); this.setRMApplicationHistoryWriter(rmApplicationHistoryWriter); + this.setScheduler(scheduler); RMStateStore nullStore = new NullRMStateStore(); nullStore.setRMDispatcher(rmDispatcher); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index ebf2fe4..1d0d6c0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -87,18 +87,46 @@ public RMContextImpl(Dispatcher rmDispatcher, RMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInRM nmTokenSecretManager, ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager, - RMApplicationHistoryWriter rmApplicationHistoryWriter) { + RMApplicationHistoryWriter rmApplicationHistoryWriter, + ResourceScheduler scheduler) { this(); this.setDispatcher(rmDispatcher); setActiveServiceContext(new RMActiveServiceContext(rmDispatcher, containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, delegationTokenRenewer, appTokenSecretManager, containerTokenSecretManager, nmTokenSecretManager, - clientToAMTokenSecretManager, rmApplicationHistoryWriter)); + clientToAMTokenSecretManager, rmApplicationHistoryWriter, + scheduler)); ConfigurationProvider provider = new LocalConfigurationProvider(); setConfigurationProvider(provider); } + + @VisibleForTesting + // helper constructor for tests + public RMContextImpl(Dispatcher rmDispatcher, + ContainerAllocationExpirer containerAllocationExpirer, + AMLivelinessMonitor amLivelinessMonitor, + AMLivelinessMonitor amFinishingMonitor, + DelegationTokenRenewer delegationTokenRenewer, + AMRMTokenSecretManager appTokenSecretManager, + RMContainerTokenSecretManager containerTokenSecretManager, + NMTokenSecretManagerInRM nmTokenSecretManager, + ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager, + RMApplicationHistoryWriter rmApplicationHistoryWriter) { + this( + rmDispatcher, + containerAllocationExpirer, + amLivelinessMonitor, + amFinishingMonitor, + delegationTokenRenewer, + appTokenSecretManager, + containerTokenSecretManager, + nmTokenSecretManager, + clientToAMTokenSecretManager, + rmApplicationHistoryWriter, + null); + } @Override public Dispatcher getDispatcher() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index 624aa18..fbcaab9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -239,4 +240,6 @@ ApplicationReport createAndGetApplicationReport(String clientUserName, RMAppMetrics getRMAppMetrics(); ReservationId getReservationId(); + + ResourceRequest getAMResourceRequest(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 33b62fe..2d1737a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -1339,6 +1339,11 @@ public void setSystemClock(Clock clock) { public ReservationId getReservationId() { return submissionContext.getReservationID(); } + + @Override + public ResourceRequest getAMResourceRequest() { + return this.amReq; + } protected Credentials parseCredentials() throws IOException { Credentials credentials = new Credentials(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java index 0a2fa3a..f458057 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java @@ -109,30 +109,6 @@ public static float computeAbsoluteMaximumCapacity( } return absoluteMaxCapacityByNodeLabels; } - - public static int computeMaxActiveApplications( - ResourceCalculator calculator, - Resource clusterResource, Resource minimumAllocation, - float maxAMResourcePercent, float absoluteMaxCapacity) { - return - Math.max( - (int)Math.ceil( - Resources.ratio( - calculator, - clusterResource, - minimumAllocation) * - maxAMResourcePercent * absoluteMaxCapacity - ), - 1); - } - - public static int computeMaxActiveApplicationsPerUser( - int maxActiveApplications, int userLimit, float userLimitFactor) { - return Math.max( - (int)Math.ceil( - maxActiveApplications * (userLimit / 100.0f) * userLimitFactor), - 1); - } @Lock(CSQueue.class) public static void updateQueueStatistics( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 47679a6..440ad61 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -87,9 +87,6 @@ protected int maxApplicationsPerUser; private float maxAMResourcePerQueuePercent; - private int maxActiveApplications; // Based on absolute max capacity - private int maxActiveAppsUsingAbsCap; // Based on absolute capacity - private int maxActiveApplicationsPerUser; private int nodeLocalityDelay; @@ -115,6 +112,8 @@ private final QueueHeadroomInfo queueHeadroomInfo = new QueueHeadroomInfo(); + private final Resource usedAMResources = Resource.newInstance(0, 0); + public LeafQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { super(cs, queueName, parent, old); @@ -149,19 +148,6 @@ public LeafQueue(CapacitySchedulerContext cs, float maxAMResourcePerQueuePercent = cs.getConfiguration() .getMaximumApplicationMasterResourcePerQueuePercent(getQueuePath()); - int maxActiveApplications = - CSQueueUtils.computeMaxActiveApplications( - resourceCalculator, - cs.getClusterResource(), this.minimumAllocation, - maxAMResourcePerQueuePercent, absoluteMaxCapacity); - this.maxActiveAppsUsingAbsCap = - CSQueueUtils.computeMaxActiveApplications( - resourceCalculator, - cs.getClusterResource(), this.minimumAllocation, - maxAMResourcePerQueuePercent, absoluteCapacity); - int maxActiveApplicationsPerUser = - CSQueueUtils.computeMaxActiveApplicationsPerUser( - maxActiveAppsUsingAbsCap, userLimit, userLimitFactor); this.queueInfo.setChildQueues(new ArrayList()); @@ -173,8 +159,7 @@ public LeafQueue(CapacitySchedulerContext cs, setupQueueConfigs(cs.getClusterResource(), capacity, absoluteCapacity, maximumCapacity, absoluteMaxCapacity, userLimit, userLimitFactor, maxApplications, maxAMResourcePerQueuePercent, maxApplicationsPerUser, - maxActiveApplications, maxActiveApplicationsPerUser, state, acls, cs - .getConfiguration().getNodeLocalityDelay(), accessibleLabels, + state, acls, cs.getConfiguration().getNodeLocalityDelay(), accessibleLabels, defaultLabelExpression, this.capacitiyByNodeLabels, this.maxCapacityByNodeLabels, cs.getConfiguration().getReservationContinueLook()); @@ -202,8 +187,7 @@ protected synchronized void setupQueueConfigs( float maximumCapacity, float absoluteMaxCapacity, int userLimit, float userLimitFactor, int maxApplications, float maxAMResourcePerQueuePercent, - int maxApplicationsPerUser, int maxActiveApplications, - int maxActiveApplicationsPerUser, QueueState state, + int maxApplicationsPerUser, QueueState state, Map acls, int nodeLocalityDelay, Set labels, String defaultLabelExpression, Map capacitieByLabel, @@ -218,6 +202,15 @@ protected synchronized void setupQueueConfigs( float absCapacity = getParent().getAbsoluteCapacity() * capacity; CSQueueUtils.checkAbsoluteCapacity(getQueueName(), absCapacity, absoluteMaxCapacity); + + this.lastClusterResource = clusterResource; + + // Initialize headroom info, also used for calculating application + // master resource limits. Since this happens during queue initialization + // and all queues may not be realized yet, we'll use (optimistic) + // absoluteMaxCapacity (it will be replaced with the more accurate + // absoluteMaxAvailCapacity during headroom/userlimit/allocation events) + updateHeadroomInfo(clusterResource, absoluteMaxCapacity); this.absoluteCapacity = absCapacity; @@ -228,9 +221,6 @@ protected synchronized void setupQueueConfigs( this.maxAMResourcePerQueuePercent = maxAMResourcePerQueuePercent; this.maxApplicationsPerUser = maxApplicationsPerUser; - this.maxActiveApplications = maxActiveApplications; - this.maxActiveApplicationsPerUser = maxActiveApplicationsPerUser; - if (!SchedulerUtils.checkQueueLabelExpression(this.accessibleLabels, this.defaultLabelExpression)) { throw new IOException("Invalid default label expression of " @@ -282,21 +272,6 @@ protected synchronized void setupQueueConfigs( "maxApplicationsPerUser = " + maxApplicationsPerUser + " [= (int)(maxApplications * (userLimit / 100.0f) * " + "userLimitFactor) ]" + "\n" + - "maxActiveApplications = " + maxActiveApplications + - " [= max(" + - "(int)ceil((clusterResourceMemory / minimumAllocation) * " + - "maxAMResourcePerQueuePercent * absoluteMaxCapacity)," + - "1) ]" + "\n" + - "maxActiveAppsUsingAbsCap = " + maxActiveAppsUsingAbsCap + - " [= max(" + - "(int)ceil((clusterResourceMemory / minimumAllocation) *" + - "maxAMResourcePercent * absoluteCapacity)," + - "1) ]" + "\n" + - "maxActiveApplicationsPerUser = " + maxActiveApplicationsPerUser + - " [= max(" + - "(int)(maxActiveApplications * (userLimit / 100.0f) * " + - "userLimitFactor)," + - "1) ]" + "\n" + "usedCapacity = " + usedCapacity + " [= usedResourcesMemory / " + "(clusterResourceMemory * absoluteCapacity)]" + "\n" + @@ -349,14 +324,6 @@ public synchronized int getMaxApplicationsPerUser() { return maxApplicationsPerUser; } - public synchronized int getMaximumActiveApplications() { - return maxActiveApplications; - } - - public synchronized int getMaximumActiveApplicationsPerUser() { - return maxActiveApplicationsPerUser; - } - @Override public ActiveUsersManager getActiveUsersManager() { return activeUsersManager; @@ -519,8 +486,6 @@ public synchronized void reinitialize( newlyParsedLeafQueue.maxApplications, newlyParsedLeafQueue.maxAMResourcePerQueuePercent, newlyParsedLeafQueue.getMaxApplicationsPerUser(), - newlyParsedLeafQueue.getMaximumActiveApplications(), - newlyParsedLeafQueue.getMaximumActiveApplicationsPerUser(), newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls, newlyParsedLeafQueue.getNodeLocalityDelay(), newlyParsedLeafQueue.accessibleLabels, @@ -606,27 +571,95 @@ public void submitApplication(ApplicationId applicationId, String userName, } } + + @VisibleForTesting + protected Resource getAMResourceLimit() { + Resource queueMaxCap; + synchronized (queueHeadroomInfo) { + queueMaxCap = queueHeadroomInfo.getQueueMaxCap(); + } + return Resources.multiplyAndNormalizeUp( + resourceCalculator, + queueMaxCap, + maxAMResourcePerQueuePercent, minimumAllocation); + } + + @VisibleForTesting + protected Resource getUserAMResourceLimit() { + Resource queueMaxCap; + synchronized (queueHeadroomInfo) { + queueMaxCap = queueHeadroomInfo.getQueueMaxCap(); + } + return Resources.multiplyAndNormalizeUp( + resourceCalculator, + queueMaxCap, + maxAMResourcePerQueuePercent * (userLimit / 100.0f) * + userLimitFactor, minimumAllocation); + } private synchronized void activateApplications() { + //limit of allowed resource usage for application masters + Resource amLimit = getAMResourceLimit(); + Resource userAMLimit = getUserAMResourceLimit(); + for (Iterator i=pendingApplications.iterator(); i.hasNext(); ) { FiCaSchedulerApp application = i.next(); - // Check queue limit - if (getNumActiveApplications() >= getMaximumActiveApplications()) { - break; + // Check am resource limit + Resource amIfStarted = + Resources.add(application.getAMResource(), usedAMResources); + + if (LOG.isDebugEnabled()) { + LOG.debug("application AMResource " + application.getAMResource() + + " maxAMResourcePerQueuePercent " + maxAMResourcePerQueuePercent + + " amLimit " + amLimit + + " lastClusterResource " + lastClusterResource + + " amIfStarted " + amIfStarted); } - // Check user limit + if (!Resources.lessThanOrEqual( + resourceCalculator, lastClusterResource, amIfStarted, amLimit)) { + if (getNumActiveApplications() < 1) { + LOG.warn("maximum-am-resource-percent is insufficient to start a" + + " single application in queue, it is likely set too low." + + " skipping enforcement to allow at least one application to start"); + } else { + LOG.info("not starting application as amIfStarted exceeds amLimit"); + continue; + } + } + + // Check user am resource limit + User user = getUser(application.getUser()); - if (user.getActiveApplications() < getMaximumActiveApplicationsPerUser()) { - user.activateApplication(); - activeApplications.add(application); - i.remove(); - LOG.info("Application " + application.getApplicationId() + - " from user: " + application.getUser() + - " activated in queue: " + getQueueName()); + + Resource userAmIfStarted = + Resources.add(application.getAMResource(), + user.getConsumedAMResources()); + + if (!Resources.lessThanOrEqual( + resourceCalculator, lastClusterResource, userAmIfStarted, + userAMLimit)) { + if (getNumActiveApplications() < 1) { + LOG.warn("maximum-am-resource-percent is insufficient to start a" + + " single application in queue for user, it is likely set too low." + + " skipping enforcement to allow at least one application to start"); + } else { + LOG.info("not starting application as amIfStarted exceeds " + + "userAmLimit"); + continue; + } } + user.activateApplication(); + activeApplications.add(application); + Resources.addTo(usedAMResources, application.getAMResource()); + Resources.addTo(user.getConsumedAMResources(), + application.getAMResource()); + i.remove(); + LOG.info("Application " + application.getApplicationId() + + " from user: " + application.getUser() + + " activated in queue: " + getQueueName()); } } @@ -672,6 +705,10 @@ public synchronized void removeApplicationAttempt( boolean wasActive = activeApplications.remove(application); if (!wasActive) { pendingApplications.remove(application); + } else { + Resources.subtractFrom(usedAMResources, application.getAMResource()); + Resources.subtractFrom(user.getConsumedAMResources(), + application.getAMResource()); } applicationAttemptMap.remove(application.getApplicationAttemptId()); @@ -999,6 +1036,25 @@ synchronized boolean canAssignToThisQueue(Resource clusterResource, return canAssign; } + + private Resource updateHeadroomInfo(Resource clusterResource, + float absoluteCapacity) { + + Resource queueMaxCap = + Resources.multiplyAndNormalizeDown( + resourceCalculator, + clusterResource, + absoluteCapacity, + minimumAllocation); + + synchronized (queueHeadroomInfo) { + queueHeadroomInfo.setQueueMaxCap(queueMaxCap); + queueHeadroomInfo.setClusterResource(clusterResource); + } + + return queueMaxCap; + + } @Lock({LeafQueue.class, FiCaSchedulerApp.class}) Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application, @@ -1017,18 +1073,9 @@ Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application, //capacity float absoluteMaxAvailCapacity = CSQueueUtils.getAbsoluteMaxAvailCapacity( resourceCalculator, clusterResource, this); - - Resource queueMaxCap = // Queue Max-Capacity - Resources.multiplyAndNormalizeDown( - resourceCalculator, - clusterResource, - absoluteMaxAvailCapacity, - minimumAllocation); - - synchronized (queueHeadroomInfo) { - queueHeadroomInfo.setQueueMaxCap(queueMaxCap); - queueHeadroomInfo.setClusterResource(clusterResource); - } + + Resource queueMaxCap = + updateHeadroomInfo(clusterResource, absoluteMaxAvailCapacity); Resource headroom = getHeadroom(queueUser, queueMaxCap, clusterResource, userLimit); @@ -1728,20 +1775,10 @@ synchronized void releaseResource(Resource clusterResource, public synchronized void updateClusterResource(Resource clusterResource) { lastClusterResource = clusterResource; - // Update queue properties - maxActiveApplications = - CSQueueUtils.computeMaxActiveApplications( - resourceCalculator, - clusterResource, minimumAllocation, - maxAMResourcePerQueuePercent, absoluteMaxCapacity); - maxActiveAppsUsingAbsCap = - CSQueueUtils.computeMaxActiveApplications( - resourceCalculator, - clusterResource, minimumAllocation, - maxAMResourcePerQueuePercent, absoluteCapacity); - maxActiveApplicationsPerUser = - CSQueueUtils.computeMaxActiveApplicationsPerUser( - maxActiveAppsUsingAbsCap, userLimit, userLimitFactor); + // Update headroom info based on new cluster resource value + // absoluteMaxCapacity now, will be replaced with absoluteMaxAvailCapacity + // during allocation + updateHeadroomInfo(clusterResource, absoluteMaxCapacity); // Update metrics CSQueueUtils.updateQueueStatistics( @@ -1764,6 +1801,7 @@ public synchronized void updateClusterResource(Resource clusterResource) { @VisibleForTesting public static class User { Resource consumed = Resources.createResource(0, 0); + Resource consumedAMResources = Resources.createResource(0, 0); Map consumedByLabel = new HashMap(); int pendingApplications = 0; int activeApplications = 0; @@ -1787,6 +1825,10 @@ public int getPendingApplications() { public int getActiveApplications() { return activeApplications; } + + public Resource getConsumedAMResources() { + return consumedAMResources; + } public int getTotalApplications() { return getPendingApplications() + getActiveApplications(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 2f9569c..9f97b13 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; /** * Represents an application attempt from the viewpoint of the FIFO or Capacity @@ -72,6 +73,20 @@ public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, RMContext rmContext) { super(applicationAttemptId, user, queue, activeUsersManager, rmContext); + + RMApp rmApp = rmContext.getRMApps().get(getApplicationId()); + + Resource amResource; + if (rmApp == null || rmApp.getAMResourceRequest() == null) { + //the rmApp may be undefined (the resource manager checks for this too) + //and unmanaged applications do not provide an amResource request + //in these cases, provide a default using the scheduler + amResource = rmContext.getScheduler().getMinimumResourceCapability(); + } else { + amResource = rmApp.getAMResourceRequest().getCapability(); + } + + setAMResource(amResource); } synchronized public boolean containerCompleted(RMContainer rmContainer, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java index 89b4a78..5786129 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java @@ -114,8 +114,6 @@ protected void render(Block html) { _("Num Containers:", Integer.toString(lqinfo.getNumContainers())). _("Max Applications:", Integer.toString(lqinfo.getMaxApplications())). _("Max Applications Per User:", Integer.toString(lqinfo.getMaxApplicationsPerUser())). - _("Max Schedulable Applications:", Integer.toString(lqinfo.getMaxActiveApplications())). - _("Max Schedulable Applications Per User:", Integer.toString(lqinfo.getMaxActiveApplicationsPerUser())). _("Configured Capacity:", percent(lqinfo.getCapacity() / 100)). _("Configured Max Capacity:", percent(lqinfo.getMaxCapacity() / 100)). _("Configured Minimum User Limit Percent:", Integer.toString(lqinfo.getUserLimit()) + "%"). diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java index d90e963..639c515 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java @@ -32,8 +32,6 @@ protected int numContainers; protected int maxApplications; protected int maxApplicationsPerUser; - protected int maxActiveApplications; - protected int maxActiveApplicationsPerUser; protected int userLimit; protected UsersInfo users; // To add another level in the XML protected float userLimitFactor; @@ -48,8 +46,6 @@ numContainers = q.getNumContainers(); maxApplications = q.getMaxApplications(); maxApplicationsPerUser = q.getMaxApplicationsPerUser(); - maxActiveApplications = q.getMaximumActiveApplications(); - maxActiveApplicationsPerUser = q.getMaximumActiveApplicationsPerUser(); userLimit = q.getUserLimit(); users = new UsersInfo(q.getUsers()); userLimitFactor = q.getUserLimitFactor(); @@ -75,14 +71,6 @@ public int getMaxApplicationsPerUser() { return maxApplicationsPerUser; } - public int getMaxActiveApplications() { - return maxActiveApplications; - } - - public int getMaxActiveApplicationsPerUser() { - return maxActiveApplicationsPerUser; - } - public int getUserLimit() { return userLimit; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index 62e3e5c..f8d92aa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -51,6 +52,7 @@ public abstract class MockAsm extends MockApps { public static class ApplicationBase implements RMApp { + ResourceRequest amReq; @Override public String getUser() { throw new UnsupportedOperationException("Not supported yet."); @@ -183,6 +185,11 @@ public RMAppMetrics getRMAppMetrics() { public ReservationId getReservationId() { throw new UnsupportedOperationException("Not supported yet."); } + + @Override + public ResourceRequest getAMResourceRequest() { + return this.amReq; + } } public static RMApp newApplication(int i) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java index e93d351..f4cb3b3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java @@ -23,6 +23,7 @@ import org.junit.Assert; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; @@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -55,6 +57,13 @@ public void setUp() { dispatcher = new DrainDispatcher(); this.rm = new MockRM() { @Override + public void init(Configuration conf) { + conf.set( + CapacitySchedulerConfiguration.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, + "1.0"); + super.init(conf); + } + @Override protected EventHandler createSchedulerEventDispatcher() { return new SchedulerEventDispatcher(this.scheduler) { @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java index c7513ab..b8663f6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java @@ -82,6 +82,7 @@ public void setUp() throws Exception { .thenReturn(null); Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId) Matchers.any()); when(spyRMContext.getRMApps()).thenReturn(spyApps); + when(spyRMContext.getScheduler()).thenReturn(scheduler); CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index 787b5d7..ec990f9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -55,6 +56,7 @@ StringBuilder diagnostics = new StringBuilder(); RMAppAttempt attempt; int maxAppAttempts = 1; + ResourceRequest amReq; public MockRMApp(int newid, long time, RMAppState newState) { finish = time; @@ -264,4 +266,9 @@ public RMAppMetrics getRMAppMetrics() { public ReservationId getReservationId() { throw new UnsupportedOperationException("Not supported yet."); } + + @Override + public ResourceRequest getAMResourceRequest() { + return this.amReq; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index 0cd74d0..1e4ce1c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -28,16 +28,21 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import org.mockito.Matchers; +import org.mockito.Mockito; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; @@ -47,6 +52,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -56,6 +62,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.junit.Ignore; public class TestApplicationLimits { @@ -119,8 +126,6 @@ public void setUp() throws IOException { // Some default values doReturn(100).when(queue).getMaxApplications(); doReturn(25).when(queue).getMaxApplicationsPerUser(); - doReturn(10).when(queue).getMaximumActiveApplications(); - doReturn(2).when(queue).getMaximumActiveApplicationsPerUser(); } private static final String A = "a"; @@ -136,10 +141,17 @@ private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) { final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + B; conf.setCapacity(Q_B, 90); + conf.setUserLimit(CapacitySchedulerConfiguration.ROOT + "." + A, 50); + LOG.info("Setup top-level queues a and b"); } - + private FiCaSchedulerApp getMockApplication(int appId, String user) { + return getMockApplication(appId, user, Resource.newInstance(0, 0)); + } + + private FiCaSchedulerApp getMockApplication(int appId, String user, + Resource amResource) { FiCaSchedulerApp application = mock(FiCaSchedulerApp.class); ApplicationAttemptId applicationAttemptId = TestUtils.getMockApplicationAttemptId(appId, 0); @@ -147,171 +159,20 @@ private FiCaSchedulerApp getMockApplication(int appId, String user) { when(application).getApplicationId(); doReturn(applicationAttemptId). when(application).getApplicationAttemptId(); doReturn(user).when(application).getUser(); + doReturn(amResource).when(application).getAMResource(); return application; } @Test - public void testLimitsComputation() throws Exception { - CapacitySchedulerConfiguration csConf = - new CapacitySchedulerConfiguration(); - setupQueueConfiguration(csConf); - YarnConfiguration conf = new YarnConfiguration(); - - CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class); - when(csContext.getConfiguration()).thenReturn(csConf); - when(csContext.getConf()).thenReturn(conf); - when(csContext.getMinimumResourceCapability()). - thenReturn(Resources.createResource(GB, 1)); - when(csContext.getMaximumResourceCapability()). - thenReturn(Resources.createResource(16*GB, 16)); - when(csContext.getApplicationComparator()). - thenReturn(CapacityScheduler.applicationComparator); - when(csContext.getQueueComparator()). - thenReturn(CapacityScheduler.queueComparator); - when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); - when(csContext.getRMContext()).thenReturn(rmContext); - - // Say cluster has 100 nodes of 16G each - Resource clusterResource = Resources.createResource(100 * 16 * GB, 100 * 16); - when(csContext.getClusterResource()).thenReturn(clusterResource); - - Map queues = new HashMap(); - CSQueue root = - CapacityScheduler.parseQueue(csContext, csConf, null, "root", - queues, queues, TestUtils.spyHook); - - LeafQueue queue = (LeafQueue)queues.get(A); - - LOG.info("Queue 'A' -" + - " maxActiveApplications=" + queue.getMaximumActiveApplications() + - " maxActiveApplicationsPerUser=" + - queue.getMaximumActiveApplicationsPerUser()); - int expectedMaxActiveApps = - Math.max(1, - (int)Math.ceil(((float)clusterResource.getMemory() / (1*GB)) * - csConf. - getMaximumApplicationMasterResourcePerQueuePercent( - queue.getQueuePath()) * - queue.getAbsoluteMaximumCapacity())); - assertEquals(expectedMaxActiveApps, - queue.getMaximumActiveApplications()); - int expectedMaxActiveAppsUsingAbsCap = - Math.max(1, - (int)Math.ceil(((float)clusterResource.getMemory() / (1*GB)) * - csConf.getMaximumApplicationMasterResourcePercent() * - queue.getAbsoluteCapacity())); - assertEquals( - (int)Math.ceil( - expectedMaxActiveAppsUsingAbsCap * (queue.getUserLimit() / 100.0f) * - queue.getUserLimitFactor()), - queue.getMaximumActiveApplicationsPerUser()); - assertEquals( - (int)(clusterResource.getMemory() * queue.getAbsoluteCapacity()), - queue.getMetrics().getAvailableMB() - ); - - // Add some nodes to the cluster & test new limits - clusterResource = Resources.createResource(120 * 16 * GB); - root.updateClusterResource(clusterResource); - expectedMaxActiveApps = - Math.max(1, - (int)Math.ceil(((float)clusterResource.getMemory() / (1*GB)) * - csConf. - getMaximumApplicationMasterResourcePerQueuePercent( - queue.getQueuePath()) * - queue.getAbsoluteMaximumCapacity())); - assertEquals(expectedMaxActiveApps, - queue.getMaximumActiveApplications()); - expectedMaxActiveAppsUsingAbsCap = - Math.max(1, - (int)Math.ceil(((float)clusterResource.getMemory() / (1*GB)) * - csConf.getMaximumApplicationMasterResourcePercent() * - queue.getAbsoluteCapacity())); - assertEquals( - (int)Math.ceil(expectedMaxActiveAppsUsingAbsCap * - (queue.getUserLimit() / 100.0f) * queue.getUserLimitFactor()), - queue.getMaximumActiveApplicationsPerUser()); - assertEquals( - (int)(clusterResource.getMemory() * queue.getAbsoluteCapacity()), - queue.getMetrics().getAvailableMB() - ); - - // should return -1 if per queue setting not set - assertEquals( - (int)CapacitySchedulerConfiguration.UNDEFINED, - csConf.getMaximumApplicationsPerQueue(queue.getQueuePath())); - int expectedMaxApps = - (int) - (CapacitySchedulerConfiguration.DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS * - queue.getAbsoluteCapacity()); - assertEquals(expectedMaxApps, queue.getMaxApplications()); - - int expectedMaxAppsPerUser = (int)(expectedMaxApps * - (queue.getUserLimit()/100.0f) * queue.getUserLimitFactor()); - assertEquals(expectedMaxAppsPerUser, queue.getMaxApplicationsPerUser()); - - // should default to global setting if per queue setting not set - assertEquals( - (long)CapacitySchedulerConfiguration.DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT, - (long)csConf.getMaximumApplicationMasterResourcePerQueuePercent( - queue.getQueuePath()) - ); - - // Change the per-queue max AM resources percentage. - csConf.setFloat( - "yarn.scheduler.capacity." + - queue.getQueuePath() + - ".maximum-am-resource-percent", - 0.5f); - // Re-create queues to get new configs. - queues = new HashMap(); - root = - CapacityScheduler.parseQueue(csContext, csConf, null, "root", - queues, queues, TestUtils.spyHook); - clusterResource = Resources.createResource(100 * 16 * GB); - - queue = (LeafQueue)queues.get(A); - expectedMaxActiveApps = - Math.max(1, - (int)Math.ceil(((float)clusterResource.getMemory() / (1*GB)) * - csConf. - getMaximumApplicationMasterResourcePerQueuePercent( - queue.getQueuePath()) * - queue.getAbsoluteMaximumCapacity())); - - assertEquals((long) 0.5, - (long) csConf.getMaximumApplicationMasterResourcePerQueuePercent(queue.getQueuePath())); - assertEquals(expectedMaxActiveApps, - queue.getMaximumActiveApplications()); - - // Change the per-queue max applications. - csConf.setInt( - "yarn.scheduler.capacity." + - queue.getQueuePath() + - ".maximum-applications", 9999); - // Re-create queues to get new configs. - queues = new HashMap(); - root = - CapacityScheduler.parseQueue(csContext, csConf, null, "root", - queues, queues, TestUtils.spyHook); - - queue = (LeafQueue)queues.get(A); - assertEquals(9999, (int)csConf.getMaximumApplicationsPerQueue(queue.getQueuePath())); - assertEquals(9999, queue.getMaxApplications()); - - expectedMaxAppsPerUser = (int)(9999 * - (queue.getUserLimit()/100.0f) * queue.getUserLimitFactor()); - assertEquals(expectedMaxAppsPerUser, queue.getMaxApplicationsPerUser()); - } - - @Test public void testActiveApplicationLimits() throws Exception { final String user_0 = "user_0"; final String user_1 = "user_1"; + final String user_2 = "user_2"; int APPLICATION_ID = 0; // Submit first application - FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0); + FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0, + Resources.createResource(4 * GB, 0)); queue.submitApplicationAttempt(app_0, user_0); assertEquals(1, queue.getNumActiveApplications()); assertEquals(0, queue.getNumPendingApplications()); @@ -319,15 +180,17 @@ public void testActiveApplicationLimits() throws Exception { assertEquals(0, queue.getNumPendingApplications(user_0)); // Submit second application - FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0); + FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0, + Resources.createResource(4 * GB, 0)); queue.submitApplicationAttempt(app_1, user_0); assertEquals(2, queue.getNumActiveApplications()); assertEquals(0, queue.getNumPendingApplications()); assertEquals(2, queue.getNumActiveApplications(user_0)); assertEquals(0, queue.getNumPendingApplications(user_0)); - // Submit third application, should remain pending - FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0); + // Submit third application, should remain pending due to user amlimit + FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0, + Resources.createResource(4 * GB, 0)); queue.submitApplicationAttempt(app_2, user_0); assertEquals(2, queue.getNumActiveApplications()); assertEquals(1, queue.getNumPendingApplications()); @@ -342,18 +205,17 @@ public void testActiveApplicationLimits() throws Exception { assertEquals(0, queue.getNumPendingApplications(user_0)); // Submit another one for user_0 - FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0); + FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0, + Resources.createResource(4 * GB, 0)); queue.submitApplicationAttempt(app_3, user_0); assertEquals(2, queue.getNumActiveApplications()); assertEquals(1, queue.getNumPendingApplications()); assertEquals(2, queue.getNumActiveApplications(user_0)); assertEquals(1, queue.getNumPendingApplications(user_0)); - // Change queue limit to be smaller so 2 users can fill it up - doReturn(3).when(queue).getMaximumActiveApplications(); - // Submit first app for user_1 - FiCaSchedulerApp app_4 = getMockApplication(APPLICATION_ID++, user_1); + FiCaSchedulerApp app_4 = getMockApplication(APPLICATION_ID++, user_1, + Resources.createResource(8 * GB, 0)); queue.submitApplicationAttempt(app_4, user_1); assertEquals(3, queue.getNumActiveApplications()); assertEquals(1, queue.getNumPendingApplications()); @@ -362,15 +224,17 @@ public void testActiveApplicationLimits() throws Exception { assertEquals(1, queue.getNumActiveApplications(user_1)); assertEquals(0, queue.getNumPendingApplications(user_1)); - // Submit second app for user_1, should block due to queue-limit - FiCaSchedulerApp app_5 = getMockApplication(APPLICATION_ID++, user_1); - queue.submitApplicationAttempt(app_5, user_1); + // Submit first app for user_2, should block due to queue amlimit + FiCaSchedulerApp app_5 = getMockApplication(APPLICATION_ID++, user_2, + Resources.createResource(8 * GB, 0)); + queue.submitApplicationAttempt(app_5, user_2); assertEquals(3, queue.getNumActiveApplications()); assertEquals(2, queue.getNumPendingApplications()); assertEquals(2, queue.getNumActiveApplications(user_0)); assertEquals(1, queue.getNumPendingApplications(user_0)); assertEquals(1, queue.getNumActiveApplications(user_1)); - assertEquals(1, queue.getNumPendingApplications(user_1)); + assertEquals(0, queue.getNumPendingApplications(user_1)); + assertEquals(1, queue.getNumPendingApplications(user_2)); // Now finish one app of user_1 so app_5 should be activated queue.finishApplicationAttempt(app_4, A); @@ -378,21 +242,59 @@ public void testActiveApplicationLimits() throws Exception { assertEquals(1, queue.getNumPendingApplications()); assertEquals(2, queue.getNumActiveApplications(user_0)); assertEquals(1, queue.getNumPendingApplications(user_0)); - assertEquals(1, queue.getNumActiveApplications(user_1)); + assertEquals(0, queue.getNumActiveApplications(user_1)); assertEquals(0, queue.getNumPendingApplications(user_1)); + assertEquals(1, queue.getNumActiveApplications(user_2)); + assertEquals(0, queue.getNumPendingApplications(user_2)); } + + @Test + public void testAMResourceLimit() throws Exception { + final String user_0 = "user_0"; + + // This uses the default 10% of cluster value for the max am resources + // which are allowed, at 20GB = 2GB for AM's, each of our apps below has + // a 2GB AM, so only one may run at a time + queue.updateClusterResource(Resource.newInstance(20 * GB, 10)); + + assertEquals(Resource.newInstance(2 * GB, 1), queue.getAMResourceLimit()); + + int APPLICATION_ID = 0; + // Submit first application + FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0, + Resource.newInstance(2 * GB, 1)); + queue.submitApplicationAttempt(app_0, user_0); + assertEquals(1, queue.getNumActiveApplications()); + assertEquals(0, queue.getNumPendingApplications()); + assertEquals(1, queue.getNumActiveApplications(user_0)); + assertEquals(0, queue.getNumPendingApplications(user_0)); + // Submit second application + FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0, + Resource.newInstance(2 * GB, 1)); + queue.submitApplicationAttempt(app_1, user_0); + assertEquals(1, queue.getNumActiveApplications()); + assertEquals(1, queue.getNumPendingApplications()); + assertEquals(1, queue.getNumActiveApplications(user_0)); + assertEquals(1, queue.getNumPendingApplications(user_0)); + + // Now finish first app so second should be activated + queue.finishApplicationAttempt(app_0, A); + assertEquals(1, queue.getNumActiveApplications()); + assertEquals(0, queue.getNumPendingApplications()); + assertEquals(1, queue.getNumActiveApplications(user_0)); + assertEquals(0, queue.getNumPendingApplications(user_0)); + } + @Test public void testActiveLimitsWithKilledApps() throws Exception { final String user_0 = "user_0"; int APPLICATION_ID = 0; - // set max active to 2 - doReturn(2).when(queue).getMaximumActiveApplications(); - // Submit first application - FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0); + FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0, + Resources.createResource(4 * GB, 0)); queue.submitApplicationAttempt(app_0, user_0); assertEquals(1, queue.getNumActiveApplications()); assertEquals(0, queue.getNumPendingApplications()); @@ -401,7 +303,8 @@ public void testActiveLimitsWithKilledApps() throws Exception { assertTrue(queue.activeApplications.contains(app_0)); // Submit second application - FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0); + FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0, + Resources.createResource(4 * GB, 0)); queue.submitApplicationAttempt(app_1, user_0); assertEquals(2, queue.getNumActiveApplications()); assertEquals(0, queue.getNumPendingApplications()); @@ -410,7 +313,8 @@ public void testActiveLimitsWithKilledApps() throws Exception { assertTrue(queue.activeApplications.contains(app_1)); // Submit third application, should remain pending - FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0); + FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0, + Resources.createResource(4 * GB, 0)); queue.submitApplicationAttempt(app_2, user_0); assertEquals(2, queue.getNumActiveApplications()); assertEquals(1, queue.getNumPendingApplications()); @@ -419,7 +323,8 @@ public void testActiveLimitsWithKilledApps() throws Exception { assertTrue(queue.pendingApplications.contains(app_2)); // Submit fourth application, should remain pending - FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0); + FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0, + Resources.createResource(4 * GB, 0)); queue.submitApplicationAttempt(app_3, user_0); assertEquals(2, queue.getNumActiveApplications()); assertEquals(2, queue.getNumPendingApplications()); @@ -506,6 +411,18 @@ public void testHeadroom() throws Exception { RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); RMContext rmContext = TestUtils.getMockRMContext(); + RMContext spyRMContext = spy(rmContext); + + ConcurrentMap spyApps = + spy(new ConcurrentHashMap()); + RMApp rmApp = mock(RMApp.class); + ResourceRequest amResourceRequest = mock(ResourceRequest.class); + Resource amResource = Resources.createResource(0, 0); + when(amResourceRequest.getCapability()).thenReturn(amResource); + when(rmApp.getAMResourceRequest()).thenReturn(amResourceRequest); + Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId)Matchers.any()); + when(spyRMContext.getRMApps()).thenReturn(spyApps); + Priority priority_1 = TestUtils.createMockPriority(1); @@ -515,7 +432,7 @@ public void testHeadroom() throws Exception { TestUtils.getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0_0 = spy(new FiCaSchedulerApp(appAttemptId_0_0, user_0, queue, - queue.getActiveUsersManager(), rmContext)); + queue.getActiveUsersManager(), spyRMContext)); queue.submitApplicationAttempt(app_0_0, user_0); List app_0_0_requests = new ArrayList(); @@ -534,7 +451,7 @@ public void testHeadroom() throws Exception { TestUtils.getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_0_1 = spy(new FiCaSchedulerApp(appAttemptId_0_1, user_0, queue, - queue.getActiveUsersManager(), rmContext)); + queue.getActiveUsersManager(), spyRMContext)); queue.submitApplicationAttempt(app_0_1, user_0); List app_0_1_requests = new ArrayList(); @@ -553,7 +470,7 @@ public void testHeadroom() throws Exception { TestUtils.getMockApplicationAttemptId(2, 0); FiCaSchedulerApp app_1_0 = spy(new FiCaSchedulerApp(appAttemptId_1_0, user_1, queue, - queue.getActiveUsersManager(), rmContext)); + queue.getActiveUsersManager(), spyRMContext)); queue.submitApplicationAttempt(app_1_0, user_1); List app_1_0_requests = new ArrayList(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index fb7bb2c..dc84c91 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -101,6 +101,7 @@ RMContext rmContext; RMContext spyRMContext; + ResourceRequest amResourceRequest; CapacityScheduler cs; CapacitySchedulerConfiguration csConf; CapacitySchedulerContext csContext; @@ -124,6 +125,10 @@ public void setUp() throws Exception { spy(new ConcurrentHashMap()); RMApp rmApp = mock(RMApp.class); when(rmApp.getRMAppAttempt((ApplicationAttemptId)Matchers.any())).thenReturn(null); + amResourceRequest = mock(ResourceRequest.class); + when(amResourceRequest.getCapability()).thenReturn( + Resources.createResource(0, 0)); + when(rmApp.getAMResourceRequest()).thenReturn(amResourceRequest); Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId)Matchers.any()); when(spyRMContext.getRMApps()).thenReturn(spyApps); @@ -265,26 +270,37 @@ public Container answer(InvocationOnMock invocation) @Test public void testInitializeQueue() throws Exception { - final float epsilon = 1e-5f; - //can add more sturdy test with 3-layer queues - //once MAPREDUCE:3410 is resolved - LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); - assertEquals(0.085, a.getCapacity(), epsilon); - assertEquals(0.085, a.getAbsoluteCapacity(), epsilon); - assertEquals(0.2, a.getMaximumCapacity(), epsilon); - assertEquals(0.2, a.getAbsoluteMaximumCapacity(), epsilon); + final float epsilon = 1e-5f; + //can add more sturdy test with 3-layer queues + //once MAPREDUCE:3410 is resolved + LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); + assertEquals(0.085, a.getCapacity(), epsilon); + assertEquals(0.085, a.getAbsoluteCapacity(), epsilon); + assertEquals(0.2, a.getMaximumCapacity(), epsilon); + assertEquals(0.2, a.getAbsoluteMaximumCapacity(), epsilon); + + LeafQueue b = stubLeafQueue((LeafQueue)queues.get(B)); + assertEquals(0.80, b.getCapacity(), epsilon); + assertEquals(0.80, b.getAbsoluteCapacity(), epsilon); + assertEquals(0.99, b.getMaximumCapacity(), epsilon); + assertEquals(0.99, b.getAbsoluteMaximumCapacity(), epsilon); + + ParentQueue c = (ParentQueue)queues.get(C); + assertEquals(0.015, c.getCapacity(), epsilon); + assertEquals(0.015, c.getAbsoluteCapacity(), epsilon); + assertEquals(0.1, c.getMaximumCapacity(), epsilon); + assertEquals(0.1, c.getAbsoluteMaximumCapacity(), epsilon); + + //Verify the value for getAMResourceLimit for queues with < .1 maxcap + Resource clusterResource = Resource.newInstance(50 * GB, 50); - LeafQueue b = stubLeafQueue((LeafQueue)queues.get(B)); - assertEquals(0.80, b.getCapacity(), epsilon); - assertEquals(0.80, b.getAbsoluteCapacity(), epsilon); - assertEquals(0.99, b.getMaximumCapacity(), epsilon); - assertEquals(0.99, b.getAbsoluteMaximumCapacity(), epsilon); - - ParentQueue c = (ParentQueue)queues.get(C); - assertEquals(0.015, c.getCapacity(), epsilon); - assertEquals(0.015, c.getAbsoluteCapacity(), epsilon); - assertEquals(0.1, c.getMaximumCapacity(), epsilon); - assertEquals(0.1, c.getAbsoluteMaximumCapacity(), epsilon); + a.updateClusterResource(clusterResource); + assertEquals(Resource.newInstance(1 * GB, 1), + a.getAMResourceLimit()); + + b.updateClusterResource(clusterResource); + assertEquals(Resource.newInstance(5 * GB, 1), + b.getAMResourceLimit()); } @Test @@ -679,7 +695,7 @@ public void testComputeUserLimitAndSetHeadroom(){ TestUtils.getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, qb, - qb.getActiveUsersManager(), rmContext); + qb.getActiveUsersManager(), spyRMContext); qb.submitApplicationAttempt(app_0, user_0); Priority u0Priority = TestUtils.createMockPriority(1); app_0.updateResourceRequests(Collections.singletonList( @@ -702,7 +718,7 @@ public void testComputeUserLimitAndSetHeadroom(){ TestUtils.getMockApplicationAttemptId(2, 0); FiCaSchedulerApp app_2 = new FiCaSchedulerApp(appAttemptId_2, user_1, qb, - qb.getActiveUsersManager(), rmContext); + qb.getActiveUsersManager(), spyRMContext); Priority u1Priority = TestUtils.createMockPriority(2); app_2.updateResourceRequests(Collections.singletonList( TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true, @@ -736,12 +752,12 @@ public void testComputeUserLimitAndSetHeadroom(){ TestUtils.getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, qb, - qb.getActiveUsersManager(), rmContext); + qb.getActiveUsersManager(), spyRMContext); final ApplicationAttemptId appAttemptId_3 = TestUtils.getMockApplicationAttemptId(3, 0); FiCaSchedulerApp app_3 = new FiCaSchedulerApp(appAttemptId_3, user_1, qb, - qb.getActiveUsersManager(), rmContext); + qb.getActiveUsersManager(), spyRMContext); app_1.updateResourceRequests(Collections.singletonList( TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true, u0Priority, recordFactory))); @@ -764,7 +780,7 @@ public void testComputeUserLimitAndSetHeadroom(){ TestUtils.getMockApplicationAttemptId(4, 0); FiCaSchedulerApp app_4 = new FiCaSchedulerApp(appAttemptId_4, user_0, qb, - qb.getActiveUsersManager(), rmContext); + qb.getActiveUsersManager(), spyRMContext); qb.submitApplicationAttempt(app_4, user_0); app_4.updateResourceRequests(Collections.singletonList( TestUtils.createResourceRequest(ResourceRequest.ANY, 6*GB, 1, true, @@ -980,7 +996,6 @@ public void testHeadroomWithMaxCap() throws Exception { assertEquals(0*GB, app_1.getHeadroom().getMemory()); // Check headroom for app_2 - LOG.info("here"); app_1.updateResourceRequests(Collections.singletonList( // unset TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 0, true, priority, recordFactory))); @@ -1904,6 +1919,9 @@ public void testActivateApplicationAfterQueueRefresh() throws Exception { // Users final String user_e = "user_e"; + + when(amResourceRequest.getCapability()).thenReturn( + Resources.createResource(1 * GB, 0)); // Submit applications final ApplicationAttemptId appAttemptId_0 = @@ -1942,7 +1960,7 @@ public void testActivateApplicationAfterQueueRefresh() throws Exception { newQueues, queues, TestUtils.spyHook); queues = newQueues; - root.reinitialize(newRoot, cs.getClusterResource()); + root.reinitialize(newRoot, csContext.getClusterResource()); // after reinitialization assertEquals(3, e.activeApplications.size()); @@ -1982,6 +2000,9 @@ public void testActivateApplicationByUpdatingClusterResource() // Users final String user_e = "user_e"; + + when(amResourceRequest.getCapability()).thenReturn( + Resources.createResource(1 * GB, 0)); // Submit applications final ApplicationAttemptId appAttemptId_0 = @@ -2291,20 +2312,16 @@ public void testMaxAMResourcePerQueuePercentAfterQueueRefresh() csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + A, 80); LeafQueue a = new LeafQueue(csContext, A, root, null); assertEquals(0.1f, a.getMaxAMResourcePerQueuePercent(), 1e-3f); - assertEquals(160, a.getMaximumActiveApplications()); csConf.setFloat(CapacitySchedulerConfiguration. MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 0.2f); LeafQueue newA = new LeafQueue(csContext, A, root, null); a.reinitialize(newA, clusterResource); assertEquals(0.2f, a.getMaxAMResourcePerQueuePercent(), 1e-3f); - assertEquals(320, a.getMaximumActiveApplications()); Resource newClusterResource = Resources.createResource(100 * 20 * GB, 100 * 32); a.updateClusterResource(newClusterResource); - // 100 * 20 * 0.2 = 400 - assertEquals(400, a.getMaximumActiveApplications()); } @Test diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java index 2a49545..985609e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java @@ -77,6 +77,7 @@ .getRecordFactory(null); RMContext rmContext; + RMContext spyRMContext; CapacityScheduler cs; // CapacitySchedulerConfiguration csConf; CapacitySchedulerContext csContext; @@ -132,7 +133,10 @@ private void setup(CapacitySchedulerConfiguration csConf) throws Exception { root = CapacityScheduler.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); - cs.setRMContext(rmContext); + spyRMContext = spy(rmContext); + when(spyRMContext.getScheduler()).thenReturn(cs); + + cs.setRMContext(spyRMContext); cs.init(csConf); cs.start(); } @@ -212,14 +216,14 @@ public void testReservation() throws Exception { final ApplicationAttemptId appAttemptId_0 = TestUtils .getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), spyRMContext); a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils .getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), spyRMContext); a.submitApplicationAttempt(app_1, user_0); // Setup some nodes @@ -361,14 +365,14 @@ public void testReservationNoContinueLook() throws Exception { final ApplicationAttemptId appAttemptId_0 = TestUtils .getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), spyRMContext); a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils .getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), spyRMContext); a.submitApplicationAttempt(app_1, user_0); // Setup some nodes @@ -506,14 +510,14 @@ public void testAssignContainersNeedToUnreserve() throws Exception { final ApplicationAttemptId appAttemptId_0 = TestUtils .getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), spyRMContext); a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils .getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), spyRMContext); a.submitApplicationAttempt(app_1, user_0); // Setup some nodes @@ -618,7 +622,7 @@ public void testGetAppToUnreserve() throws Exception { .getMockApplicationAttemptId(0, 0); LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), spyRMContext); String host_0 = "host_0"; FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, @@ -685,7 +689,7 @@ public void testFindNodeToUnreserve() throws Exception { .getMockApplicationAttemptId(0, 0); LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), spyRMContext); String host_1 = "host_1"; FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, @@ -742,14 +746,14 @@ public void testAssignToQueue() throws Exception { final ApplicationAttemptId appAttemptId_0 = TestUtils .getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), spyRMContext); a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils .getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), spyRMContext); a.submitApplicationAttempt(app_1, user_0); // Setup some nodes @@ -916,14 +920,14 @@ public void testAssignToUser() throws Exception { final ApplicationAttemptId appAttemptId_0 = TestUtils .getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), spyRMContext); a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils .getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), spyRMContext); a.submitApplicationAttempt(app_1, user_0); // Setup some nodes @@ -1042,14 +1046,14 @@ public void testReservationsNoneAvailable() throws Exception { final ApplicationAttemptId appAttemptId_0 = TestUtils .getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), spyRMContext); a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils .getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), spyRMContext); a.submitApplicationAttempt(app_1, user_0); // Setup some nodes diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index b4c4c10..3918bf7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -143,13 +143,14 @@ public void testFifoSchedulerCapacityWhenNoNMs() { @Test(timeout=5000) public void testAppAttemptMetrics() throws Exception { AsyncDispatcher dispatcher = new InlineDispatcher(); + + FifoScheduler scheduler = new FifoScheduler(); RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); RMContext rmContext = new RMContextImpl(dispatcher, null, - null, null, null, null, null, null, null, writer); + null, null, null, null, null, null, null, writer, scheduler); ((RMContextImpl) rmContext).setSystemMetricsPublisher( mock(SystemMetricsPublisher.class)); - FifoScheduler scheduler = new FifoScheduler(); Configuration conf = new Configuration(); scheduler.setRMContext(rmContext); scheduler.init(conf); @@ -189,12 +190,14 @@ public void testNodeLocalAssignment() throws Exception { new NMTokenSecretManagerInRM(conf); nmTokenSecretManager.rollMasterKey(); RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); + + FifoScheduler scheduler = new FifoScheduler(); RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null, - null, containerTokenSecretManager, nmTokenSecretManager, null, writer); + null, containerTokenSecretManager, nmTokenSecretManager, null, writer, + scheduler); ((RMContextImpl) rmContext).setSystemMetricsPublisher( mock(SystemMetricsPublisher.class)); - FifoScheduler scheduler = new FifoScheduler(); scheduler.setRMContext(rmContext); scheduler.init(conf); scheduler.start(); @@ -260,17 +263,19 @@ public void testUpdateResourceOnNode() throws Exception { new NMTokenSecretManagerInRM(conf); nmTokenSecretManager.rollMasterKey(); RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); - RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null, - null, containerTokenSecretManager, nmTokenSecretManager, null, writer); - ((RMContextImpl) rmContext).setSystemMetricsPublisher( - mock(SystemMetricsPublisher.class)); - + FifoScheduler scheduler = new FifoScheduler(){ @SuppressWarnings("unused") public Map getNodes(){ return nodes; } }; + RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null, + null, containerTokenSecretManager, nmTokenSecretManager, null, writer, + scheduler); + ((RMContextImpl) rmContext).setSystemMetricsPublisher( + mock(SystemMetricsPublisher.class)); + scheduler.setRMContext(rmContext); scheduler.init(conf); scheduler.start(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java index c7c403d..ef7435a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java @@ -82,8 +82,6 @@ int numContainers; int maxApplications; int maxApplicationsPerUser; - int maxActiveApplications; - int maxActiveApplicationsPerUser; int userLimit; float userLimitFactor; } @@ -303,10 +301,6 @@ public void verifySubQueueXML(Element qElem, String q, WebServicesTestUtils.getXmlInt(qElem, "maxApplications"); lqi.maxApplicationsPerUser = WebServicesTestUtils.getXmlInt(qElem, "maxApplicationsPerUser"); - lqi.maxActiveApplications = - WebServicesTestUtils.getXmlInt(qElem, "maxActiveApplications"); - lqi.maxActiveApplicationsPerUser = - WebServicesTestUtils.getXmlInt(qElem, "maxActiveApplicationsPerUser"); lqi.userLimit = WebServicesTestUtils.getXmlInt(qElem, "userLimit"); lqi.userLimitFactor = WebServicesTestUtils.getXmlFloat(qElem, "userLimitFactor"); @@ -386,8 +380,6 @@ private void verifySubQueue(JSONObject info, String q, lqi.numContainers = info.getInt("numContainers"); lqi.maxApplications = info.getInt("maxApplications"); lqi.maxApplicationsPerUser = info.getInt("maxApplicationsPerUser"); - lqi.maxActiveApplications = info.getInt("maxActiveApplications"); - lqi.maxActiveApplicationsPerUser = info.getInt("maxActiveApplicationsPerUser"); lqi.userLimit = info.getInt("userLimit"); lqi.userLimitFactor = (float) info.getDouble("userLimitFactor"); verifyLeafQueueGeneric(q, lqi); @@ -449,10 +441,6 @@ private void verifyLeafQueueGeneric(String q, LeafQueueInfo info) (float)expectedMaxAppsPerUser, (float)info.maxApplicationsPerUser, info.userLimitFactor); - assertTrue("maxActiveApplications doesn't match", - info.maxActiveApplications > 0); - assertTrue("maxActiveApplicationsPerUser doesn't match", - info.maxActiveApplicationsPerUser > 0); assertEquals("userLimit doesn't match", csConf.getUserLimit(q), info.userLimit); assertEquals("userLimitFactor doesn't match",