diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index 624aa18..fbcaab9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -239,4 +240,6 @@ ApplicationReport createAndGetApplicationReport(String clientUserName, RMAppMetrics getRMAppMetrics(); ReservationId getReservationId(); + + ResourceRequest getAMResourceRequest(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 33b62fe..2d1737a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -1339,6 +1339,11 @@ public void setSystemClock(Clock clock) { public ReservationId getReservationId() { return submissionContext.getReservationID(); } + + @Override + public ResourceRequest getAMResourceRequest() { + return this.amReq; + } protected Credentials parseCredentials() throws IOException { Credentials credentials = new Credentials(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index f129ff4..37a6028 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -115,6 +115,8 @@ private final QueueHeadroomInfo queueHeadroomInfo = new QueueHeadroomInfo(); + private final Resource usedAMResources = Resource.newInstance(0, 0); + public LeafQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { super(cs, queueName, parent, old); @@ -606,8 +608,18 @@ public void submitApplication(ApplicationId applicationId, String userName, } } + + @VisibleForTesting + protected Resource getAMResourceLimit() { + return Resources.multiply( + lastClusterResource, + maxAMResourcePerQueuePercent * absoluteMaxCapacity); + } private synchronized void activateApplications() { + //limit of allowed resource usage for application masters + Resource amLimit = getAMResourceLimit(); + for (Iterator i=pendingApplications.iterator(); i.hasNext(); ) { FiCaSchedulerApp application = i.next(); @@ -617,11 +629,35 @@ private synchronized void activateApplications() { break; } + // Check am resource limit + Resource amIfStarted = + Resources.add(application.getAMResource(), usedAMResources); + + if (LOG.isDebugEnabled()) { + LOG.debug("application AMResource " + application.getAMResource() + + " maxAMResourcePerQueuePercent " + maxAMResourcePerQueuePercent + + " amLimit " + amLimit + + " lastClusterResource " + lastClusterResource + + " amIfStarted " + amIfStarted); + } + + if (!Resources.fitsIn(amIfStarted, amLimit)) { + if (getNumActiveApplications() < 1) { + LOG.warn("maximum-am-resource-percent is insufficient to start a" + + " single application in queue, it is likely set too low." + + " skipping enforcement to allow at least one application to start"); + } else { + LOG.info("not starting application as amIfStarted exceeds amLimit"); + continue; + } + } + // Check user limit User user = getUser(application.getUser()); if (user.getActiveApplications() < getMaximumActiveApplicationsPerUser()) { user.activateApplication(); activeApplications.add(application); + Resources.addTo(usedAMResources, application.getAMResource()); i.remove(); LOG.info("Application " + application.getApplicationId() + " from user: " + application.getUser() + @@ -672,6 +708,8 @@ public synchronized void removeApplicationAttempt( boolean wasActive = activeApplications.remove(application); if (!wasActive) { pendingApplications.remove(application); + } else { + Resources.subtractFrom(usedAMResources, application.getAMResource()); } applicationAttemptMap.remove(application.getApplicationAttemptId()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 2f9569c..700dd26 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; /** * Represents an application attempt from the viewpoint of the FIFO or Capacity @@ -72,6 +73,22 @@ public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, RMContext rmContext) { super(applicationAttemptId, user, queue, activeUsersManager, rmContext); + + Resource amResource = rmContext + .getRMApps() + .get(getApplicationId()) + .getAMResourceRequest() + .getCapability(); + + setAMResource(amResource); + } + + public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, + String user, Queue queue, ActiveUsersManager activeUsersManager, + RMContext rmContext, Resource amResource) { + super(applicationAttemptId, user, queue, activeUsersManager, rmContext); + + setAMResource(amResource); } synchronized public boolean containerCompleted(RMContainer rmContainer, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index 62e3e5c..f8d92aa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -51,6 +52,7 @@ public abstract class MockAsm extends MockApps { public static class ApplicationBase implements RMApp { + ResourceRequest amReq; @Override public String getUser() { throw new UnsupportedOperationException("Not supported yet."); @@ -183,6 +185,11 @@ public RMAppMetrics getRMAppMetrics() { public ReservationId getReservationId() { throw new UnsupportedOperationException("Not supported yet."); } + + @Override + public ResourceRequest getAMResourceRequest() { + return this.amReq; + } } public static RMApp newApplication(int i) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java index e93d351..f4cb3b3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java @@ -23,6 +23,7 @@ import org.junit.Assert; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; @@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -55,6 +57,13 @@ public void setUp() { dispatcher = new DrainDispatcher(); this.rm = new MockRM() { @Override + public void init(Configuration conf) { + conf.set( + CapacitySchedulerConfiguration.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, + "1.0"); + super.init(conf); + } + @Override protected EventHandler createSchedulerEventDispatcher() { return new SchedulerEventDispatcher(this.scheduler) { @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java index c603f5b..c21cc8c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java @@ -48,6 +48,8 @@ import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -79,6 +81,9 @@ public void setUp() throws Exception { ConcurrentMap spyApps = spy(new ConcurrentHashMap()); RMApp rmApp = mock(RMApp.class); + ResourceRequest amr = mock(ResourceRequest.class); + when(amr.getCapability()).thenReturn(Resource.newInstance(0, 0)); + when(rmApp.getAMResourceRequest()).thenReturn(amr); when(rmApp.getRMAppAttempt((ApplicationAttemptId) Matchers.any())) .thenReturn(null); Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId) Matchers.any()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index 787b5d7..ec990f9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -55,6 +56,7 @@ StringBuilder diagnostics = new StringBuilder(); RMAppAttempt attempt; int maxAppAttempts = 1; + ResourceRequest amReq; public MockRMApp(int newid, long time, RMAppState newState) { finish = time; @@ -264,4 +266,9 @@ public RMAppMetrics getRMAppMetrics() { public ReservationId getReservationId() { throw new UnsupportedOperationException("Not supported yet."); } + + @Override + public ResourceRequest getAMResourceRequest() { + return this.amReq; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index 0cd74d0..4f1df0d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -56,6 +56,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.junit.Ignore; public class TestApplicationLimits { @@ -138,8 +139,13 @@ private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) { LOG.info("Setup top-level queues a and b"); } - + private FiCaSchedulerApp getMockApplication(int appId, String user) { + return getMockApplication(appId, user, Resource.newInstance(0, 0)); + } + + private FiCaSchedulerApp getMockApplication(int appId, String user, + Resource amResource) { FiCaSchedulerApp application = mock(FiCaSchedulerApp.class); ApplicationAttemptId applicationAttemptId = TestUtils.getMockApplicationAttemptId(appId, 0); @@ -147,6 +153,7 @@ private FiCaSchedulerApp getMockApplication(int appId, String user) { when(application).getApplicationId(); doReturn(applicationAttemptId). when(application).getApplicationAttemptId(); doReturn(user).when(application).getUser(); + doReturn(amResource).when(application).getAMResource(); return application; } @@ -381,6 +388,83 @@ public void testActiveApplicationLimits() throws Exception { assertEquals(1, queue.getNumActiveApplications(user_1)); assertEquals(0, queue.getNumPendingApplications(user_1)); } + + @Test + public void testAMResourceLimit() throws Exception { + final String user_0 = "user_0"; + + // This uses the default 10% of cluster value for the max am resources + // which are allowed, at 20GB = 2GB for AM's, each of our apps below has + // a 2GB AM, so only one may run at a time + queue.updateClusterResource(Resource.newInstance(20 * GB, 10)); + + assertEquals(Resource.newInstance(2 * GB, 1), queue.getAMResourceLimit()); + + int APPLICATION_ID = 0; + // Submit first application + FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0, + Resource.newInstance(2 * GB, 1)); + queue.submitApplicationAttempt(app_0, user_0); + assertEquals(1, queue.getNumActiveApplications()); + assertEquals(0, queue.getNumPendingApplications()); + assertEquals(1, queue.getNumActiveApplications(user_0)); + assertEquals(0, queue.getNumPendingApplications(user_0)); + + // Submit second application + FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0, + Resource.newInstance(2 * GB, 1)); + queue.submitApplicationAttempt(app_1, user_0); + assertEquals(1, queue.getNumActiveApplications()); + assertEquals(1, queue.getNumPendingApplications()); + assertEquals(1, queue.getNumActiveApplications(user_0)); + assertEquals(1, queue.getNumPendingApplications(user_0)); + + // Now finish first app so second should be activated + queue.finishApplicationAttempt(app_0, A); + assertEquals(1, queue.getNumActiveApplications()); + assertEquals(0, queue.getNumPendingApplications()); + assertEquals(1, queue.getNumActiveApplications(user_0)); + assertEquals(0, queue.getNumPendingApplications(user_0)); + } + + @Test + public void testAMResourceLimitAccumulated() throws Exception { + final String user_0 = "user_0"; + + // This uses the default 10% of cluster value for the max am resources + // which are allowed, at 40GB = 4GB for AM's, two can run at a time + queue.updateClusterResource(Resource.newInstance(40 * GB, 20)); + + assertEquals(Resource.newInstance(4 * GB, 2), queue.getAMResourceLimit()); + + int APPLICATION_ID = 0; + // Submit first application + FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0, + Resource.newInstance(2 * GB, 1)); + queue.submitApplicationAttempt(app_0, user_0); + assertEquals(1, queue.getNumActiveApplications()); + assertEquals(0, queue.getNumPendingApplications()); + assertEquals(1, queue.getNumActiveApplications(user_0)); + assertEquals(0, queue.getNumPendingApplications(user_0)); + + // Submit second application + FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0, + Resource.newInstance(2 * GB, 1)); + queue.submitApplicationAttempt(app_1, user_0); + assertEquals(2, queue.getNumActiveApplications()); + assertEquals(0, queue.getNumPendingApplications()); + assertEquals(2, queue.getNumActiveApplications(user_0)); + assertEquals(0, queue.getNumPendingApplications(user_0)); + + // Submit third application, will not start, insufficient resources + FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0, + Resource.newInstance(2 * GB, 1)); + queue.submitApplicationAttempt(app_2, user_0); + assertEquals(2, queue.getNumActiveApplications()); + assertEquals(1, queue.getNumPendingApplications()); + assertEquals(2, queue.getNumActiveApplications(user_0)); + assertEquals(1, queue.getNumPendingApplications(user_0)); + } @Test public void testActiveLimitsWithKilledApps() throws Exception { @@ -515,7 +599,8 @@ public void testHeadroom() throws Exception { TestUtils.getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0_0 = spy(new FiCaSchedulerApp(appAttemptId_0_0, user_0, queue, - queue.getActiveUsersManager(), rmContext)); + queue.getActiveUsersManager(), rmContext, + Resources.createResource(0, 0))); queue.submitApplicationAttempt(app_0_0, user_0); List app_0_0_requests = new ArrayList(); @@ -534,7 +619,8 @@ public void testHeadroom() throws Exception { TestUtils.getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_0_1 = spy(new FiCaSchedulerApp(appAttemptId_0_1, user_0, queue, - queue.getActiveUsersManager(), rmContext)); + queue.getActiveUsersManager(), rmContext, + Resources.createResource(0, 0))); queue.submitApplicationAttempt(app_0_1, user_0); List app_0_1_requests = new ArrayList(); @@ -553,7 +639,8 @@ public void testHeadroom() throws Exception { TestUtils.getMockApplicationAttemptId(2, 0); FiCaSchedulerApp app_1_0 = spy(new FiCaSchedulerApp(appAttemptId_1_0, user_1, queue, - queue.getActiveUsersManager(), rmContext)); + queue.getActiveUsersManager(), rmContext, + Resources.createResource(0, 0))); queue.submitApplicationAttempt(app_1_0, user_1); List app_1_0_requests = new ArrayList(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 2aa57a0..8ae140d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -624,7 +624,8 @@ public void testBlackListNodes() throws Exception { MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, host); cs.handle(new NodeAddedSchedulerEvent(node)); - ApplicationId appId = BuilderUtils.newApplicationId(100, 1); + RMApp app1 = rm.submitApp(1024); + ApplicationId appId = app1.getApplicationId(); ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( appId, 1); SchedulerEvent addAppEvent = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 642363e..0fc83c4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -120,6 +120,9 @@ public void setUp() throws Exception { ConcurrentMap spyApps = spy(new ConcurrentHashMap()); RMApp rmApp = mock(RMApp.class); + ResourceRequest amr = mock(ResourceRequest.class); + when(amr.getCapability()).thenReturn(Resource.newInstance(0, 0)); + when(rmApp.getAMResourceRequest()).thenReturn(amr); when(rmApp.getRMAppAttempt((ApplicationAttemptId)Matchers.any())).thenReturn(null); Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId)Matchers.any()); when(spyRMContext.getRMApps()).thenReturn(spyApps); @@ -282,6 +285,19 @@ public void testInitializeQueue() throws Exception { assertEquals(0.015, c.getAbsoluteCapacity(), epsilon); assertEquals(0.1, c.getMaximumCapacity(), epsilon); assertEquals(0.1, c.getAbsoluteMaximumCapacity(), epsilon); + + //Verify the value for getAMResourceLimit for queues with < .1 maxcap + Resource clusterResource = Resource.newInstance(50 * GB, 50); + + a.updateClusterResource(clusterResource); + assertEquals(Resources.multiply(clusterResource, + a.getAbsoluteMaximumCapacity() * a.getMaxAMResourcePerQueuePercent()), + a.getAMResourceLimit()); + + b.updateClusterResource(clusterResource); + assertEquals(Resources.multiply(clusterResource, + b.getAbsoluteMaximumCapacity() * b.getMaxAMResourcePerQueuePercent()), + b.getAMResourceLimit()); } @Test @@ -676,7 +692,8 @@ public void testComputeUserLimitAndSetHeadroom(){ TestUtils.getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, qb, - qb.getActiveUsersManager(), rmContext); + qb.getActiveUsersManager(), rmContext, + Resources.createResource(0, 0)); qb.submitApplicationAttempt(app_0, user_0); Priority u0Priority = TestUtils.createMockPriority(1); app_0.updateResourceRequests(Collections.singletonList( @@ -699,7 +716,8 @@ public void testComputeUserLimitAndSetHeadroom(){ TestUtils.getMockApplicationAttemptId(2, 0); FiCaSchedulerApp app_2 = new FiCaSchedulerApp(appAttemptId_2, user_1, qb, - qb.getActiveUsersManager(), rmContext); + qb.getActiveUsersManager(), rmContext, + Resources.createResource(0, 0)); Priority u1Priority = TestUtils.createMockPriority(2); app_2.updateResourceRequests(Collections.singletonList( TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true, @@ -733,12 +751,14 @@ public void testComputeUserLimitAndSetHeadroom(){ TestUtils.getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, qb, - qb.getActiveUsersManager(), rmContext); + qb.getActiveUsersManager(), rmContext, + Resources.createResource(0, 0)); final ApplicationAttemptId appAttemptId_3 = TestUtils.getMockApplicationAttemptId(3, 0); FiCaSchedulerApp app_3 = new FiCaSchedulerApp(appAttemptId_3, user_1, qb, - qb.getActiveUsersManager(), rmContext); + qb.getActiveUsersManager(), rmContext, + Resources.createResource(0, 0)); app_1.updateResourceRequests(Collections.singletonList( TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true, u0Priority, recordFactory))); @@ -761,7 +781,8 @@ public void testComputeUserLimitAndSetHeadroom(){ TestUtils.getMockApplicationAttemptId(4, 0); FiCaSchedulerApp app_4 = new FiCaSchedulerApp(appAttemptId_4, user_0, qb, - qb.getActiveUsersManager(), rmContext); + qb.getActiveUsersManager(), rmContext, + Resources.createResource(0, 0)); qb.submitApplicationAttempt(app_4, user_0); app_4.updateResourceRequests(Collections.singletonList( TestUtils.createResourceRequest(ResourceRequest.ANY, 6*GB, 1, true, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java index 2a49545..4e98ef5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java @@ -212,14 +212,16 @@ public void testReservation() throws Exception { final ApplicationAttemptId appAttemptId_0 = TestUtils .getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), rmContext, + csContext.getMinimumResourceCapability()); a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils .getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), rmContext, + csContext.getMinimumResourceCapability()); a.submitApplicationAttempt(app_1, user_0); // Setup some nodes @@ -361,14 +363,16 @@ public void testReservationNoContinueLook() throws Exception { final ApplicationAttemptId appAttemptId_0 = TestUtils .getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), rmContext, + csContext.getMinimumResourceCapability()); a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils .getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), rmContext, + csContext.getMinimumResourceCapability()); a.submitApplicationAttempt(app_1, user_0); // Setup some nodes @@ -506,14 +510,16 @@ public void testAssignContainersNeedToUnreserve() throws Exception { final ApplicationAttemptId appAttemptId_0 = TestUtils .getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), rmContext, + csContext.getMinimumResourceCapability()); a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils .getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), rmContext, + csContext.getMinimumResourceCapability()); a.submitApplicationAttempt(app_1, user_0); // Setup some nodes @@ -618,7 +624,8 @@ public void testGetAppToUnreserve() throws Exception { .getMockApplicationAttemptId(0, 0); LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), rmContext, + csContext.getMinimumResourceCapability()); String host_0 = "host_0"; FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, @@ -685,7 +692,8 @@ public void testFindNodeToUnreserve() throws Exception { .getMockApplicationAttemptId(0, 0); LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), rmContext, + csContext.getMinimumResourceCapability()); String host_1 = "host_1"; FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, @@ -742,14 +750,16 @@ public void testAssignToQueue() throws Exception { final ApplicationAttemptId appAttemptId_0 = TestUtils .getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), rmContext, + csContext.getMinimumResourceCapability()); a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils .getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), rmContext, + csContext.getMinimumResourceCapability()); a.submitApplicationAttempt(app_1, user_0); // Setup some nodes @@ -916,14 +926,16 @@ public void testAssignToUser() throws Exception { final ApplicationAttemptId appAttemptId_0 = TestUtils .getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), rmContext, + csContext.getMinimumResourceCapability()); a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils .getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), rmContext, + csContext.getMinimumResourceCapability()); a.submitApplicationAttempt(app_1, user_0); // Setup some nodes @@ -1042,14 +1054,16 @@ public void testReservationsNoneAvailable() throws Exception { final ApplicationAttemptId appAttemptId_0 = TestUtils .getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), rmContext, + csContext.getMinimumResourceCapability()); a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils .getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), rmContext, + csContext.getMinimumResourceCapability()); a.submitApplicationAttempt(app_1, user_0); // Setup some nodes