diff --git hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index 315044d..a33635e 100644 --- hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -153,6 +153,12 @@ + + + + + + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java index 3bc2e9b..03fc40e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java @@ -117,7 +117,8 @@ public RMActiveServiceContext(Dispatcher rmDispatcher, RMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInRM nmTokenSecretManager, ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager, - RMApplicationHistoryWriter rmApplicationHistoryWriter) { + RMApplicationHistoryWriter rmApplicationHistoryWriter, + ResourceScheduler scheduler) { this(); this.setContainerAllocationExpirer(containerAllocationExpirer); this.setAMLivelinessMonitor(amLivelinessMonitor); @@ -128,6 +129,7 @@ public RMActiveServiceContext(Dispatcher rmDispatcher, this.setNMTokenSecretManager(nmTokenSecretManager); this.setClientToAMTokenSecretManager(clientToAMTokenSecretManager); this.setRMApplicationHistoryWriter(rmApplicationHistoryWriter); + this.setScheduler(scheduler); RMStateStore nullStore = new NullRMStateStore(); nullStore.setRMDispatcher(rmDispatcher); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index ebf2fe4..1d0d6c0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -87,18 +87,46 @@ public RMContextImpl(Dispatcher rmDispatcher, RMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInRM nmTokenSecretManager, ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager, - RMApplicationHistoryWriter rmApplicationHistoryWriter) { + RMApplicationHistoryWriter rmApplicationHistoryWriter, + ResourceScheduler scheduler) { this(); this.setDispatcher(rmDispatcher); setActiveServiceContext(new RMActiveServiceContext(rmDispatcher, containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, delegationTokenRenewer, appTokenSecretManager, containerTokenSecretManager, nmTokenSecretManager, - clientToAMTokenSecretManager, rmApplicationHistoryWriter)); + clientToAMTokenSecretManager, rmApplicationHistoryWriter, + scheduler)); ConfigurationProvider provider = new LocalConfigurationProvider(); setConfigurationProvider(provider); } + + @VisibleForTesting + // helper constructor for tests + public RMContextImpl(Dispatcher rmDispatcher, + ContainerAllocationExpirer containerAllocationExpirer, + AMLivelinessMonitor amLivelinessMonitor, + AMLivelinessMonitor amFinishingMonitor, + DelegationTokenRenewer delegationTokenRenewer, + AMRMTokenSecretManager appTokenSecretManager, + RMContainerTokenSecretManager containerTokenSecretManager, + NMTokenSecretManagerInRM nmTokenSecretManager, + ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager, + RMApplicationHistoryWriter rmApplicationHistoryWriter) { + this( + rmDispatcher, + containerAllocationExpirer, + amLivelinessMonitor, + amFinishingMonitor, + delegationTokenRenewer, + appTokenSecretManager, + containerTokenSecretManager, + nmTokenSecretManager, + clientToAMTokenSecretManager, + rmApplicationHistoryWriter, + null); + } @Override public Dispatcher getDispatcher() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index 624aa18..fbcaab9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -239,4 +240,6 @@ ApplicationReport createAndGetApplicationReport(String clientUserName, RMAppMetrics getRMAppMetrics(); ReservationId getReservationId(); + + ResourceRequest getAMResourceRequest(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 33b62fe..2d1737a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -1339,6 +1339,11 @@ public void setSystemClock(Clock clock) { public ReservationId getReservationId() { return submissionContext.getReservationID(); } + + @Override + public ResourceRequest getAMResourceRequest() { + return this.amReq; + } protected Credentials parseCredentials() throws IOException { Credentials credentials = new Credentials(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 47679a6..e316691 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -115,6 +115,8 @@ private final QueueHeadroomInfo queueHeadroomInfo = new QueueHeadroomInfo(); + private final Resource usedAMResources = Resource.newInstance(0, 0); + public LeafQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { super(cs, queueName, parent, old); @@ -606,8 +608,28 @@ public void submitApplication(ApplicationId applicationId, String userName, } } + + @VisibleForTesting + protected Resource getAMResourceLimit() { + return Resources.multiply( + lastClusterResource, + maxAMResourcePerQueuePercent * absoluteMaxCapacity); + } + + @VisibleForTesting + protected Resource getUserAMResourceLimit() { + return Resources.multiply( + lastClusterResource, + ( maxAMResourcePerQueuePercent * (userLimit / 100.0f) * + userLimitFactor ) + * absoluteMaxCapacity); + } private synchronized void activateApplications() { + //limit of allowed resource usage for application masters + Resource amLimit = getAMResourceLimit(); + Resource userAMLimit = getUserAMResourceLimit(); + for (Iterator i=pendingApplications.iterator(); i.hasNext(); ) { FiCaSchedulerApp application = i.next(); @@ -617,11 +639,55 @@ private synchronized void activateApplications() { break; } - // Check user limit + // Check am resource limit + Resource amIfStarted = + Resources.add(application.getAMResource(), usedAMResources); + + if (LOG.isDebugEnabled()) { + LOG.debug("application AMResource " + application.getAMResource() + + " maxAMResourcePerQueuePercent " + maxAMResourcePerQueuePercent + + " amLimit " + amLimit + + " lastClusterResource " + lastClusterResource + + " amIfStarted " + amIfStarted); + } + + if (!Resources.fitsIn(amIfStarted, amLimit)) { + if (getNumActiveApplications() < 1) { + LOG.warn("maximum-am-resource-percent is insufficient to start a" + + " single application in queue, it is likely set too low." + + " skipping enforcement to allow at least one application to start"); + } else { + LOG.info("not starting application as amIfStarted exceeds amLimit"); + continue; + } + } + + // Check user limits User user = getUser(application.getUser()); + + // AM Resource Limit + Resource userAmIfStarted = + Resources.add(application.getAMResource(), + user.getConsumedAMResources()); + + if (!Resources.fitsIn(userAmIfStarted, userAMLimit)) { + if (getNumActiveApplications() < 1) { + LOG.warn("maximum-am-resource-percent is insufficient to start a" + + " single application in queue for user, it is likely set too low." + + " skipping enforcement to allow at least one application to start"); + } else { + LOG.info("not starting application as amIfStarted exceeds " + + "userAmLimit"); + continue; + } + } + if (user.getActiveApplications() < getMaximumActiveApplicationsPerUser()) { user.activateApplication(); activeApplications.add(application); + Resources.addTo(usedAMResources, application.getAMResource()); + Resources.addTo(user.getConsumedAMResources(), + application.getAMResource()); i.remove(); LOG.info("Application " + application.getApplicationId() + " from user: " + application.getUser() + @@ -672,6 +738,10 @@ public synchronized void removeApplicationAttempt( boolean wasActive = activeApplications.remove(application); if (!wasActive) { pendingApplications.remove(application); + } else { + Resources.subtractFrom(usedAMResources, application.getAMResource()); + Resources.subtractFrom(user.getConsumedAMResources(), + application.getAMResource()); } applicationAttemptMap.remove(application.getApplicationAttemptId()); @@ -1764,6 +1834,7 @@ public synchronized void updateClusterResource(Resource clusterResource) { @VisibleForTesting public static class User { Resource consumed = Resources.createResource(0, 0); + Resource consumedAMResources = Resources.createResource(0, 0); Map consumedByLabel = new HashMap(); int pendingApplications = 0; int activeApplications = 0; @@ -1787,6 +1858,10 @@ public int getPendingApplications() { public int getActiveApplications() { return activeApplications; } + + public Resource getConsumedAMResources() { + return consumedAMResources; + } public int getTotalApplications() { return getPendingApplications() + getActiveApplications(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 2f9569c..9f97b13 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; /** * Represents an application attempt from the viewpoint of the FIFO or Capacity @@ -72,6 +73,20 @@ public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, RMContext rmContext) { super(applicationAttemptId, user, queue, activeUsersManager, rmContext); + + RMApp rmApp = rmContext.getRMApps().get(getApplicationId()); + + Resource amResource; + if (rmApp == null || rmApp.getAMResourceRequest() == null) { + //the rmApp may be undefined (the resource manager checks for this too) + //and unmanaged applications do not provide an amResource request + //in these cases, provide a default using the scheduler + amResource = rmContext.getScheduler().getMinimumResourceCapability(); + } else { + amResource = rmApp.getAMResourceRequest().getCapability(); + } + + setAMResource(amResource); } synchronized public boolean containerCompleted(RMContainer rmContainer, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index 62e3e5c..f8d92aa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -51,6 +52,7 @@ public abstract class MockAsm extends MockApps { public static class ApplicationBase implements RMApp { + ResourceRequest amReq; @Override public String getUser() { throw new UnsupportedOperationException("Not supported yet."); @@ -183,6 +185,11 @@ public RMAppMetrics getRMAppMetrics() { public ReservationId getReservationId() { throw new UnsupportedOperationException("Not supported yet."); } + + @Override + public ResourceRequest getAMResourceRequest() { + return this.amReq; + } } public static RMApp newApplication(int i) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java index e93d351..f4cb3b3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java @@ -23,6 +23,7 @@ import org.junit.Assert; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; @@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -55,6 +57,13 @@ public void setUp() { dispatcher = new DrainDispatcher(); this.rm = new MockRM() { @Override + public void init(Configuration conf) { + conf.set( + CapacitySchedulerConfiguration.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, + "1.0"); + super.init(conf); + } + @Override protected EventHandler createSchedulerEventDispatcher() { return new SchedulerEventDispatcher(this.scheduler) { @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java index c7513ab..b8663f6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java @@ -82,6 +82,7 @@ public void setUp() throws Exception { .thenReturn(null); Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId) Matchers.any()); when(spyRMContext.getRMApps()).thenReturn(spyApps); + when(spyRMContext.getScheduler()).thenReturn(scheduler); CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index 787b5d7..ec990f9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -55,6 +56,7 @@ StringBuilder diagnostics = new StringBuilder(); RMAppAttempt attempt; int maxAppAttempts = 1; + ResourceRequest amReq; public MockRMApp(int newid, long time, RMAppState newState) { finish = time; @@ -264,4 +266,9 @@ public RMAppMetrics getRMAppMetrics() { public ReservationId getReservationId() { throw new UnsupportedOperationException("Not supported yet."); } + + @Override + public ResourceRequest getAMResourceRequest() { + return this.amReq; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index 0cd74d0..28dc29a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -28,16 +28,21 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import org.mockito.Matchers; +import org.mockito.Mockito; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; @@ -47,6 +52,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -56,6 +62,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.junit.Ignore; public class TestApplicationLimits { @@ -138,8 +145,13 @@ private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) { LOG.info("Setup top-level queues a and b"); } - + private FiCaSchedulerApp getMockApplication(int appId, String user) { + return getMockApplication(appId, user, Resource.newInstance(0, 0)); + } + + private FiCaSchedulerApp getMockApplication(int appId, String user, + Resource amResource) { FiCaSchedulerApp application = mock(FiCaSchedulerApp.class); ApplicationAttemptId applicationAttemptId = TestUtils.getMockApplicationAttemptId(appId, 0); @@ -147,6 +159,7 @@ private FiCaSchedulerApp getMockApplication(int appId, String user) { when(application).getApplicationId(); doReturn(applicationAttemptId). when(application).getApplicationAttemptId(); doReturn(user).when(application).getUser(); + doReturn(amResource).when(application).getAMResource(); return application; } @@ -381,6 +394,170 @@ public void testActiveApplicationLimits() throws Exception { assertEquals(1, queue.getNumActiveApplications(user_1)); assertEquals(0, queue.getNumPendingApplications(user_1)); } + + @Test + public void testUserAMResourceLimitAccumulated() throws Exception { + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + csConf.setUserLimit(CapacitySchedulerConfiguration.ROOT + "." + A, 50); + setupQueueConfiguration(csConf); + YarnConfiguration conf = new YarnConfiguration(); + + CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class); + when(csContext.getConfiguration()).thenReturn(csConf); + when(csContext.getConf()).thenReturn(conf); + when(csContext.getMinimumResourceCapability()). + thenReturn(Resources.createResource(GB, 1)); + when(csContext.getMaximumResourceCapability()). + thenReturn(Resources.createResource(16*GB, 16)); + when(csContext.getApplicationComparator()). + thenReturn(CapacityScheduler.applicationComparator); + when(csContext.getQueueComparator()). + thenReturn(CapacityScheduler.queueComparator); + when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); + when(csContext.getRMContext()).thenReturn(rmContext); + + Resource clusterResource = Resources.createResource(40 * GB, 20); + when(csContext.getClusterResource()).thenReturn(clusterResource); + + Map queues = new HashMap(); + CapacityScheduler.parseQueue(csContext, csConf, null, "root", + queues, queues, TestUtils.spyHook); + + LeafQueue queue = TestLeafQueue.stubLeafQueue((LeafQueue)queues.get(A)); + + final String user_0 = "user_0"; + final String user_1 = "user_1"; + + RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + RMContext rmContext = TestUtils.getMockRMContext(); + RMContext spyRMContext = spy(rmContext); + + ConcurrentMap spyApps = + spy(new ConcurrentHashMap()); + RMApp rmApp = mock(RMApp.class); + ResourceRequest amResourceRequest = mock(ResourceRequest.class); + Resource amResource = Resources.createResource(0, 0); + when(amResourceRequest.getCapability()).thenReturn(amResource); + when(rmApp.getAMResourceRequest()).thenReturn(amResourceRequest); + Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId)Matchers.any()); + when(spyRMContext.getRMApps()).thenReturn(spyApps); + + // This uses the default 10% of cluster value for the max am resources + // which are allowed, at 40GB = 4GB for AM's, two can run at a time + // due to user limit, one per user + queue.updateClusterResource(Resource.newInstance(40 * GB, 20)); + + assertEquals(Resource.newInstance(4 * GB, 2), queue.getAMResourceLimit()); + assertEquals(Resource.newInstance(2 * GB, 1), + queue.getUserAMResourceLimit()); + + int APPLICATION_ID = 0; + // Submit first application + FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0, + Resource.newInstance(2 * GB, 1)); + queue.submitApplicationAttempt(app_0, user_0); + assertEquals(1, queue.getNumActiveApplications()); + assertEquals(0, queue.getNumPendingApplications()); + assertEquals(1, queue.getNumActiveApplications(user_0)); + assertEquals(0, queue.getNumPendingApplications(user_0)); + + // Submit second application, will not start due to user amlimit + FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0, + Resource.newInstance(2 * GB, 1)); + queue.submitApplicationAttempt(app_1, user_0); + assertEquals(1, queue.getNumActiveApplications()); + assertEquals(1, queue.getNumPendingApplications()); + assertEquals(1, queue.getNumActiveApplications(user_0)); + assertEquals(1, queue.getNumPendingApplications(user_0)); + + // Submit third application, will start, as is diff user + FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_1, + Resource.newInstance(2 * GB, 1)); + queue.submitApplicationAttempt(app_2, user_1); + assertEquals(2, queue.getNumActiveApplications()); + assertEquals(1, queue.getNumPendingApplications()); + assertEquals(1, queue.getNumActiveApplications(user_0)); + assertEquals(1, queue.getNumActiveApplications(user_1)); + } + + @Test + public void testAMResourceLimit() throws Exception { + final String user_0 = "user_0"; + + // This uses the default 10% of cluster value for the max am resources + // which are allowed, at 20GB = 2GB for AM's, each of our apps below has + // a 2GB AM, so only one may run at a time + queue.updateClusterResource(Resource.newInstance(20 * GB, 10)); + + assertEquals(Resource.newInstance(2 * GB, 1), queue.getAMResourceLimit()); + + int APPLICATION_ID = 0; + // Submit first application + FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0, + Resource.newInstance(2 * GB, 1)); + queue.submitApplicationAttempt(app_0, user_0); + assertEquals(1, queue.getNumActiveApplications()); + assertEquals(0, queue.getNumPendingApplications()); + assertEquals(1, queue.getNumActiveApplications(user_0)); + assertEquals(0, queue.getNumPendingApplications(user_0)); + + // Submit second application + FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0, + Resource.newInstance(2 * GB, 1)); + queue.submitApplicationAttempt(app_1, user_0); + assertEquals(1, queue.getNumActiveApplications()); + assertEquals(1, queue.getNumPendingApplications()); + assertEquals(1, queue.getNumActiveApplications(user_0)); + assertEquals(1, queue.getNumPendingApplications(user_0)); + + // Now finish first app so second should be activated + queue.finishApplicationAttempt(app_0, A); + assertEquals(1, queue.getNumActiveApplications()); + assertEquals(0, queue.getNumPendingApplications()); + assertEquals(1, queue.getNumActiveApplications(user_0)); + assertEquals(0, queue.getNumPendingApplications(user_0)); + } + + @Test + public void testAMResourceLimitAccumulated() throws Exception { + final String user_0 = "user_0"; + + // This uses the default 10% of cluster value for the max am resources + // which are allowed, at 40GB = 4GB for AM's, two can run at a time + queue.updateClusterResource(Resource.newInstance(40 * GB, 20)); + + assertEquals(Resource.newInstance(4 * GB, 2), queue.getAMResourceLimit()); + + int APPLICATION_ID = 0; + // Submit first application + FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0, + Resource.newInstance(2 * GB, 1)); + queue.submitApplicationAttempt(app_0, user_0); + assertEquals(1, queue.getNumActiveApplications()); + assertEquals(0, queue.getNumPendingApplications()); + assertEquals(1, queue.getNumActiveApplications(user_0)); + assertEquals(0, queue.getNumPendingApplications(user_0)); + + // Submit second application + FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0, + Resource.newInstance(2 * GB, 1)); + queue.submitApplicationAttempt(app_1, user_0); + assertEquals(2, queue.getNumActiveApplications()); + assertEquals(0, queue.getNumPendingApplications()); + assertEquals(2, queue.getNumActiveApplications(user_0)); + assertEquals(0, queue.getNumPendingApplications(user_0)); + + // Submit third application, will not start, insufficient resources + FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0, + Resource.newInstance(2 * GB, 1)); + queue.submitApplicationAttempt(app_2, user_0); + assertEquals(2, queue.getNumActiveApplications()); + assertEquals(1, queue.getNumPendingApplications()); + assertEquals(2, queue.getNumActiveApplications(user_0)); + assertEquals(1, queue.getNumPendingApplications(user_0)); + } @Test public void testActiveLimitsWithKilledApps() throws Exception { @@ -506,6 +683,18 @@ public void testHeadroom() throws Exception { RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); RMContext rmContext = TestUtils.getMockRMContext(); + RMContext spyRMContext = spy(rmContext); + + ConcurrentMap spyApps = + spy(new ConcurrentHashMap()); + RMApp rmApp = mock(RMApp.class); + ResourceRequest amResourceRequest = mock(ResourceRequest.class); + Resource amResource = Resources.createResource(0, 0); + when(amResourceRequest.getCapability()).thenReturn(amResource); + when(rmApp.getAMResourceRequest()).thenReturn(amResourceRequest); + Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId)Matchers.any()); + when(spyRMContext.getRMApps()).thenReturn(spyApps); + Priority priority_1 = TestUtils.createMockPriority(1); @@ -515,7 +704,7 @@ public void testHeadroom() throws Exception { TestUtils.getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0_0 = spy(new FiCaSchedulerApp(appAttemptId_0_0, user_0, queue, - queue.getActiveUsersManager(), rmContext)); + queue.getActiveUsersManager(), spyRMContext)); queue.submitApplicationAttempt(app_0_0, user_0); List app_0_0_requests = new ArrayList(); @@ -534,7 +723,7 @@ public void testHeadroom() throws Exception { TestUtils.getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_0_1 = spy(new FiCaSchedulerApp(appAttemptId_0_1, user_0, queue, - queue.getActiveUsersManager(), rmContext)); + queue.getActiveUsersManager(), spyRMContext)); queue.submitApplicationAttempt(app_0_1, user_0); List app_0_1_requests = new ArrayList(); @@ -553,7 +742,7 @@ public void testHeadroom() throws Exception { TestUtils.getMockApplicationAttemptId(2, 0); FiCaSchedulerApp app_1_0 = spy(new FiCaSchedulerApp(appAttemptId_1_0, user_1, queue, - queue.getActiveUsersManager(), rmContext)); + queue.getActiveUsersManager(), spyRMContext)); queue.submitApplicationAttempt(app_1_0, user_1); List app_1_0_requests = new ArrayList(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index fb7bb2c..8d0a71c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -124,6 +124,10 @@ public void setUp() throws Exception { spy(new ConcurrentHashMap()); RMApp rmApp = mock(RMApp.class); when(rmApp.getRMAppAttempt((ApplicationAttemptId)Matchers.any())).thenReturn(null); + ResourceRequest amResourceRequest = mock(ResourceRequest.class); + Resource amResource = Resources.createResource(0, 0); + when(amResourceRequest.getCapability()).thenReturn(amResource); + when(rmApp.getAMResourceRequest()).thenReturn(amResourceRequest); Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId)Matchers.any()); when(spyRMContext.getRMApps()).thenReturn(spyApps); @@ -265,26 +269,39 @@ public Container answer(InvocationOnMock invocation) @Test public void testInitializeQueue() throws Exception { - final float epsilon = 1e-5f; - //can add more sturdy test with 3-layer queues - //once MAPREDUCE:3410 is resolved - LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); - assertEquals(0.085, a.getCapacity(), epsilon); - assertEquals(0.085, a.getAbsoluteCapacity(), epsilon); - assertEquals(0.2, a.getMaximumCapacity(), epsilon); - assertEquals(0.2, a.getAbsoluteMaximumCapacity(), epsilon); + final float epsilon = 1e-5f; + //can add more sturdy test with 3-layer queues + //once MAPREDUCE:3410 is resolved + LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); + assertEquals(0.085, a.getCapacity(), epsilon); + assertEquals(0.085, a.getAbsoluteCapacity(), epsilon); + assertEquals(0.2, a.getMaximumCapacity(), epsilon); + assertEquals(0.2, a.getAbsoluteMaximumCapacity(), epsilon); + + LeafQueue b = stubLeafQueue((LeafQueue)queues.get(B)); + assertEquals(0.80, b.getCapacity(), epsilon); + assertEquals(0.80, b.getAbsoluteCapacity(), epsilon); + assertEquals(0.99, b.getMaximumCapacity(), epsilon); + assertEquals(0.99, b.getAbsoluteMaximumCapacity(), epsilon); + + ParentQueue c = (ParentQueue)queues.get(C); + assertEquals(0.015, c.getCapacity(), epsilon); + assertEquals(0.015, c.getAbsoluteCapacity(), epsilon); + assertEquals(0.1, c.getMaximumCapacity(), epsilon); + assertEquals(0.1, c.getAbsoluteMaximumCapacity(), epsilon); + + //Verify the value for getAMResourceLimit for queues with < .1 maxcap + Resource clusterResource = Resource.newInstance(50 * GB, 50); - LeafQueue b = stubLeafQueue((LeafQueue)queues.get(B)); - assertEquals(0.80, b.getCapacity(), epsilon); - assertEquals(0.80, b.getAbsoluteCapacity(), epsilon); - assertEquals(0.99, b.getMaximumCapacity(), epsilon); - assertEquals(0.99, b.getAbsoluteMaximumCapacity(), epsilon); - - ParentQueue c = (ParentQueue)queues.get(C); - assertEquals(0.015, c.getCapacity(), epsilon); - assertEquals(0.015, c.getAbsoluteCapacity(), epsilon); - assertEquals(0.1, c.getMaximumCapacity(), epsilon); - assertEquals(0.1, c.getAbsoluteMaximumCapacity(), epsilon); + a.updateClusterResource(clusterResource); + assertEquals(Resources.multiply(clusterResource, + a.getAbsoluteMaximumCapacity() * a.getMaxAMResourcePerQueuePercent()), + a.getAMResourceLimit()); + + b.updateClusterResource(clusterResource); + assertEquals(Resources.multiply(clusterResource, + b.getAbsoluteMaximumCapacity() * b.getMaxAMResourcePerQueuePercent()), + b.getAMResourceLimit()); } @Test @@ -679,7 +696,7 @@ public void testComputeUserLimitAndSetHeadroom(){ TestUtils.getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, qb, - qb.getActiveUsersManager(), rmContext); + qb.getActiveUsersManager(), spyRMContext); qb.submitApplicationAttempt(app_0, user_0); Priority u0Priority = TestUtils.createMockPriority(1); app_0.updateResourceRequests(Collections.singletonList( @@ -702,7 +719,7 @@ public void testComputeUserLimitAndSetHeadroom(){ TestUtils.getMockApplicationAttemptId(2, 0); FiCaSchedulerApp app_2 = new FiCaSchedulerApp(appAttemptId_2, user_1, qb, - qb.getActiveUsersManager(), rmContext); + qb.getActiveUsersManager(), spyRMContext); Priority u1Priority = TestUtils.createMockPriority(2); app_2.updateResourceRequests(Collections.singletonList( TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true, @@ -736,12 +753,12 @@ public void testComputeUserLimitAndSetHeadroom(){ TestUtils.getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, qb, - qb.getActiveUsersManager(), rmContext); + qb.getActiveUsersManager(), spyRMContext); final ApplicationAttemptId appAttemptId_3 = TestUtils.getMockApplicationAttemptId(3, 0); FiCaSchedulerApp app_3 = new FiCaSchedulerApp(appAttemptId_3, user_1, qb, - qb.getActiveUsersManager(), rmContext); + qb.getActiveUsersManager(), spyRMContext); app_1.updateResourceRequests(Collections.singletonList( TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true, u0Priority, recordFactory))); @@ -764,7 +781,7 @@ public void testComputeUserLimitAndSetHeadroom(){ TestUtils.getMockApplicationAttemptId(4, 0); FiCaSchedulerApp app_4 = new FiCaSchedulerApp(appAttemptId_4, user_0, qb, - qb.getActiveUsersManager(), rmContext); + qb.getActiveUsersManager(), spyRMContext); qb.submitApplicationAttempt(app_4, user_0); app_4.updateResourceRequests(Collections.singletonList( TestUtils.createResourceRequest(ResourceRequest.ANY, 6*GB, 1, true, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java index 2a49545..985609e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java @@ -77,6 +77,7 @@ .getRecordFactory(null); RMContext rmContext; + RMContext spyRMContext; CapacityScheduler cs; // CapacitySchedulerConfiguration csConf; CapacitySchedulerContext csContext; @@ -132,7 +133,10 @@ private void setup(CapacitySchedulerConfiguration csConf) throws Exception { root = CapacityScheduler.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); - cs.setRMContext(rmContext); + spyRMContext = spy(rmContext); + when(spyRMContext.getScheduler()).thenReturn(cs); + + cs.setRMContext(spyRMContext); cs.init(csConf); cs.start(); } @@ -212,14 +216,14 @@ public void testReservation() throws Exception { final ApplicationAttemptId appAttemptId_0 = TestUtils .getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), spyRMContext); a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils .getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), spyRMContext); a.submitApplicationAttempt(app_1, user_0); // Setup some nodes @@ -361,14 +365,14 @@ public void testReservationNoContinueLook() throws Exception { final ApplicationAttemptId appAttemptId_0 = TestUtils .getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), spyRMContext); a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils .getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), spyRMContext); a.submitApplicationAttempt(app_1, user_0); // Setup some nodes @@ -506,14 +510,14 @@ public void testAssignContainersNeedToUnreserve() throws Exception { final ApplicationAttemptId appAttemptId_0 = TestUtils .getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), spyRMContext); a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils .getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), spyRMContext); a.submitApplicationAttempt(app_1, user_0); // Setup some nodes @@ -618,7 +622,7 @@ public void testGetAppToUnreserve() throws Exception { .getMockApplicationAttemptId(0, 0); LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), spyRMContext); String host_0 = "host_0"; FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, @@ -685,7 +689,7 @@ public void testFindNodeToUnreserve() throws Exception { .getMockApplicationAttemptId(0, 0); LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), spyRMContext); String host_1 = "host_1"; FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, @@ -742,14 +746,14 @@ public void testAssignToQueue() throws Exception { final ApplicationAttemptId appAttemptId_0 = TestUtils .getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), spyRMContext); a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils .getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), spyRMContext); a.submitApplicationAttempt(app_1, user_0); // Setup some nodes @@ -916,14 +920,14 @@ public void testAssignToUser() throws Exception { final ApplicationAttemptId appAttemptId_0 = TestUtils .getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), spyRMContext); a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils .getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), spyRMContext); a.submitApplicationAttempt(app_1, user_0); // Setup some nodes @@ -1042,14 +1046,14 @@ public void testReservationsNoneAvailable() throws Exception { final ApplicationAttemptId appAttemptId_0 = TestUtils .getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), spyRMContext); a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils .getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), spyRMContext); a.submitApplicationAttempt(app_1, user_0); // Setup some nodes diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index b4c4c10..3918bf7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -143,13 +143,14 @@ public void testFifoSchedulerCapacityWhenNoNMs() { @Test(timeout=5000) public void testAppAttemptMetrics() throws Exception { AsyncDispatcher dispatcher = new InlineDispatcher(); + + FifoScheduler scheduler = new FifoScheduler(); RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); RMContext rmContext = new RMContextImpl(dispatcher, null, - null, null, null, null, null, null, null, writer); + null, null, null, null, null, null, null, writer, scheduler); ((RMContextImpl) rmContext).setSystemMetricsPublisher( mock(SystemMetricsPublisher.class)); - FifoScheduler scheduler = new FifoScheduler(); Configuration conf = new Configuration(); scheduler.setRMContext(rmContext); scheduler.init(conf); @@ -189,12 +190,14 @@ public void testNodeLocalAssignment() throws Exception { new NMTokenSecretManagerInRM(conf); nmTokenSecretManager.rollMasterKey(); RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); + + FifoScheduler scheduler = new FifoScheduler(); RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null, - null, containerTokenSecretManager, nmTokenSecretManager, null, writer); + null, containerTokenSecretManager, nmTokenSecretManager, null, writer, + scheduler); ((RMContextImpl) rmContext).setSystemMetricsPublisher( mock(SystemMetricsPublisher.class)); - FifoScheduler scheduler = new FifoScheduler(); scheduler.setRMContext(rmContext); scheduler.init(conf); scheduler.start(); @@ -260,17 +263,19 @@ public void testUpdateResourceOnNode() throws Exception { new NMTokenSecretManagerInRM(conf); nmTokenSecretManager.rollMasterKey(); RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); - RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null, - null, containerTokenSecretManager, nmTokenSecretManager, null, writer); - ((RMContextImpl) rmContext).setSystemMetricsPublisher( - mock(SystemMetricsPublisher.class)); - + FifoScheduler scheduler = new FifoScheduler(){ @SuppressWarnings("unused") public Map getNodes(){ return nodes; } }; + RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null, + null, containerTokenSecretManager, nmTokenSecretManager, null, writer, + scheduler); + ((RMContextImpl) rmContext).setSystemMetricsPublisher( + mock(SystemMetricsPublisher.class)); + scheduler.setRMContext(rmContext); scheduler.init(conf); scheduler.start();