diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index 624aa18..fbcaab9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -239,4 +240,6 @@ ApplicationReport createAndGetApplicationReport(String clientUserName, RMAppMetrics getRMAppMetrics(); ReservationId getReservationId(); + + ResourceRequest getAMResourceRequest(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 33b62fe..2d1737a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -1339,6 +1339,11 @@ public void setSystemClock(Clock clock) { public ReservationId getReservationId() { return submissionContext.getReservationID(); } + + @Override + public ResourceRequest getAMResourceRequest() { + return this.amReq; + } protected Credentials parseCredentials() throws IOException { Credentials credentials = new Credentials(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index f129ff4..37a6028 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -115,6 +115,8 @@ private final QueueHeadroomInfo queueHeadroomInfo = new QueueHeadroomInfo(); + private final Resource usedAMResources = Resource.newInstance(0, 0); + public LeafQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { super(cs, queueName, parent, old); @@ -606,8 +608,18 @@ public void submitApplication(ApplicationId applicationId, String userName, } } + + @VisibleForTesting + protected Resource getAMResourceLimit() { + return Resources.multiply( + lastClusterResource, + maxAMResourcePerQueuePercent * absoluteMaxCapacity); + } private synchronized void activateApplications() { + //limit of allowed resource usage for application masters + Resource amLimit = getAMResourceLimit(); + for (Iterator i=pendingApplications.iterator(); i.hasNext(); ) { FiCaSchedulerApp application = i.next(); @@ -617,11 +629,35 @@ private synchronized void activateApplications() { break; } + // Check am resource limit + Resource amIfStarted = + Resources.add(application.getAMResource(), usedAMResources); + + if (LOG.isDebugEnabled()) { + LOG.debug("application AMResource " + application.getAMResource() + + " maxAMResourcePerQueuePercent " + maxAMResourcePerQueuePercent + + " amLimit " + amLimit + + " lastClusterResource " + lastClusterResource + + " amIfStarted " + amIfStarted); + } + + if (!Resources.fitsIn(amIfStarted, amLimit)) { + if (getNumActiveApplications() < 1) { + LOG.warn("maximum-am-resource-percent is insufficient to start a" + + " single application in queue, it is likely set too low." + + " skipping enforcement to allow at least one application to start"); + } else { + LOG.info("not starting application as amIfStarted exceeds amLimit"); + continue; + } + } + // Check user limit User user = getUser(application.getUser()); if (user.getActiveApplications() < getMaximumActiveApplicationsPerUser()) { user.activateApplication(); activeApplications.add(application); + Resources.addTo(usedAMResources, application.getAMResource()); i.remove(); LOG.info("Application " + application.getApplicationId() + " from user: " + application.getUser() + @@ -672,6 +708,8 @@ public synchronized void removeApplicationAttempt( boolean wasActive = activeApplications.remove(application); if (!wasActive) { pendingApplications.remove(application); + } else { + Resources.subtractFrom(usedAMResources, application.getAMResource()); } applicationAttemptMap.remove(application.getApplicationAttemptId()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 2f9569c..96fb05e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; /** * Represents an application attempt from the viewpoint of the FIFO or Capacity @@ -72,6 +73,24 @@ public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, RMContext rmContext) { super(applicationAttemptId, user, queue, activeUsersManager, rmContext); + + Resource amResource = null; + RMApp app = rmContext.getRMApps().get(getApplicationId()); + if (app != null) { + ResourceRequest amRequest = app.getAMResourceRequest(); + if (amRequest != null && amRequest.getCapability() != null) { + amResource = amRequest.getCapability(); + } + } + //If we cannot get the amResource value from the application we attempt + //to fall back and use the scheduler's minimum resource capability + if (amResource == null && rmContext.getScheduler() != null && + rmContext.getScheduler().getMinimumResourceCapability() != null) { + amResource = rmContext.getScheduler().getMinimumResourceCapability(); + } else if (amResource == null) { + amResource = Resource.newInstance(0, 0); + } + setAMResource(amResource); } synchronized public boolean containerCompleted(RMContainer rmContainer, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index 62e3e5c..f8d92aa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -51,6 +52,7 @@ public abstract class MockAsm extends MockApps { public static class ApplicationBase implements RMApp { + ResourceRequest amReq; @Override public String getUser() { throw new UnsupportedOperationException("Not supported yet."); @@ -183,6 +185,11 @@ public RMAppMetrics getRMAppMetrics() { public ReservationId getReservationId() { throw new UnsupportedOperationException("Not supported yet."); } + + @Override + public ResourceRequest getAMResourceRequest() { + return this.amReq; + } } public static RMApp newApplication(int i) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index 787b5d7..ec990f9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -55,6 +56,7 @@ StringBuilder diagnostics = new StringBuilder(); RMAppAttempt attempt; int maxAppAttempts = 1; + ResourceRequest amReq; public MockRMApp(int newid, long time, RMAppState newState) { finish = time; @@ -264,4 +266,9 @@ public RMAppMetrics getRMAppMetrics() { public ReservationId getReservationId() { throw new UnsupportedOperationException("Not supported yet."); } + + @Override + public ResourceRequest getAMResourceRequest() { + return this.amReq; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index 0cd74d0..29e464e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -56,6 +56,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.junit.Ignore; public class TestApplicationLimits { @@ -138,8 +139,13 @@ private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) { LOG.info("Setup top-level queues a and b"); } - + private FiCaSchedulerApp getMockApplication(int appId, String user) { + return getMockApplication(appId, user, Resource.newInstance(0, 0)); + } + + private FiCaSchedulerApp getMockApplication(int appId, String user, + Resource amResource) { FiCaSchedulerApp application = mock(FiCaSchedulerApp.class); ApplicationAttemptId applicationAttemptId = TestUtils.getMockApplicationAttemptId(appId, 0); @@ -147,6 +153,7 @@ private FiCaSchedulerApp getMockApplication(int appId, String user) { when(application).getApplicationId(); doReturn(applicationAttemptId). when(application).getApplicationAttemptId(); doReturn(user).when(application).getUser(); + doReturn(amResource).when(application).getAMResource(); return application; } @@ -381,6 +388,83 @@ public void testActiveApplicationLimits() throws Exception { assertEquals(1, queue.getNumActiveApplications(user_1)); assertEquals(0, queue.getNumPendingApplications(user_1)); } + + @Test + public void testAMResourceLimit() throws Exception { + final String user_0 = "user_0"; + + // This uses the default 10% of cluster value for the max am resources + // which are allowed, at 20GB = 2GB for AM's, each of our apps below has + // a 2GB AM, so only one may run at a time + queue.updateClusterResource(Resource.newInstance(20 * GB, 10)); + + assertEquals(Resource.newInstance(2 * GB, 1), queue.getAMResourceLimit()); + + int APPLICATION_ID = 0; + // Submit first application + FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0, + Resource.newInstance(2 * GB, 1)); + queue.submitApplicationAttempt(app_0, user_0); + assertEquals(1, queue.getNumActiveApplications()); + assertEquals(0, queue.getNumPendingApplications()); + assertEquals(1, queue.getNumActiveApplications(user_0)); + assertEquals(0, queue.getNumPendingApplications(user_0)); + + // Submit second application + FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0, + Resource.newInstance(2 * GB, 1)); + queue.submitApplicationAttempt(app_1, user_0); + assertEquals(1, queue.getNumActiveApplications()); + assertEquals(1, queue.getNumPendingApplications()); + assertEquals(1, queue.getNumActiveApplications(user_0)); + assertEquals(1, queue.getNumPendingApplications(user_0)); + + // Now finish first app so second should be activated + queue.finishApplicationAttempt(app_0, A); + assertEquals(1, queue.getNumActiveApplications()); + assertEquals(0, queue.getNumPendingApplications()); + assertEquals(1, queue.getNumActiveApplications(user_0)); + assertEquals(0, queue.getNumPendingApplications(user_0)); + } + + @Test + public void testAMResourceLimitAccumulated() throws Exception { + final String user_0 = "user_0"; + + // This uses the default 10% of cluster value for the max am resources + // which are allowed, at 40GB = 4GB for AM's, two can run at a time + queue.updateClusterResource(Resource.newInstance(40 * GB, 20)); + + assertEquals(Resource.newInstance(4 * GB, 2), queue.getAMResourceLimit()); + + int APPLICATION_ID = 0; + // Submit first application + FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0, + Resource.newInstance(2 * GB, 1)); + queue.submitApplicationAttempt(app_0, user_0); + assertEquals(1, queue.getNumActiveApplications()); + assertEquals(0, queue.getNumPendingApplications()); + assertEquals(1, queue.getNumActiveApplications(user_0)); + assertEquals(0, queue.getNumPendingApplications(user_0)); + + // Submit second application + FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0, + Resource.newInstance(2 * GB, 1)); + queue.submitApplicationAttempt(app_1, user_0); + assertEquals(2, queue.getNumActiveApplications()); + assertEquals(0, queue.getNumPendingApplications()); + assertEquals(2, queue.getNumActiveApplications(user_0)); + assertEquals(0, queue.getNumPendingApplications(user_0)); + + // Submit third application, will not start, insufficient resources + FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0, + Resource.newInstance(2 * GB, 1)); + queue.submitApplicationAttempt(app_2, user_0); + assertEquals(2, queue.getNumActiveApplications()); + assertEquals(1, queue.getNumPendingApplications()); + assertEquals(2, queue.getNumActiveApplications(user_0)); + assertEquals(1, queue.getNumPendingApplications(user_0)); + } @Test public void testActiveLimitsWithKilledApps() throws Exception { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 642363e..587df3f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -282,6 +282,19 @@ public void testInitializeQueue() throws Exception { assertEquals(0.015, c.getAbsoluteCapacity(), epsilon); assertEquals(0.1, c.getMaximumCapacity(), epsilon); assertEquals(0.1, c.getAbsoluteMaximumCapacity(), epsilon); + + //Verify the value for getAMResourceLimit for queues with < .1 maxcap + Resource clusterResource = Resource.newInstance(50 * GB, 50); + + a.updateClusterResource(clusterResource); + assertEquals(Resources.multiply(clusterResource, + a.getAbsoluteMaximumCapacity() * a.getMaxAMResourcePerQueuePercent()), + a.getAMResourceLimit()); + + b.updateClusterResource(clusterResource); + assertEquals(Resources.multiply(clusterResource, + b.getAbsoluteMaximumCapacity() * b.getMaxAMResourcePerQueuePercent()), + b.getAMResourceLimit()); } @Test