diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 809a8603e25..90d88f8a8ae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -802,8 +802,10 @@ private Resource getCurrentLimitResource(String nodePartition, Resource queueMaxResource = getQueueMaxResource(nodePartition); - return Resources.min(resourceCalculator, clusterResource, - queueMaxResource, currentResourceLimits.getLimit()); + return Resources.componentwiseMin( + Resources.min(resourceCalculator, clusterResource, + queueMaxResource, currentResourceLimits.getLimit()), + queueMaxResource); } else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { // When we doing non-exclusive resource allocation, maximum capacity of // all queues on this label equals to total resource with the label. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerMetrics.java index 6277290246b..d3002eaaf5e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerMetrics.java @@ -120,6 +120,11 @@ public long getNumOfCommitSuccess() { return this.commitSuccess.lastStat().numSamples(); } + @VisibleForTesting + public long getNumOfCommitFailure() { + return this.commitFailure.lastStat().numSamples(); + } + public void addSchedulerNodeHBInterval(long heartbeatInterval) { schedulerNodeHBInterval.add(heartbeatInterval); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 4a9e45e756f..54edd106820 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -374,6 +374,162 @@ public void testCapacityScheduler() throws Exception { LOG.info("--- END: testCapacityScheduler ---"); } + @Test + public void testQueueMaxCapsAreRespectedWithDRC() throws Exception { + /* + * Queue tree: + * Root max <60G, 100> + * / + * A max <60G, 100> + * / \ + * A1 A2 + * max<5G,100> max<40,70> + * + * Test this situation + * A2 allocate <30GB, 1> then A has <30, 99> + * A1 allocate <10, 1> will fail to commit because A1 max capacity + */ + + CapacitySchedulerConfiguration csconf = + new CapacitySchedulerConfiguration(); + // Define top-level queues + csconf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a"}); + + QueuePath QUEUEA_FULL = + new QueuePath(CapacitySchedulerConfiguration.ROOT, "a"); + QueuePath QUEUEA1_FULL = + new QueuePath(QUEUEA_FULL.getFullPath(), "a1"); + QueuePath QUEUEA2_FULL = + new QueuePath(QUEUEA_FULL.getFullPath(), "a2"); + + csconf.setMinimumResourceRequirement("", QUEUEA_FULL, Resource.newInstance(20 * 1024, 0)); + csconf.setMaximumResourceRequirement("", QUEUEA_FULL, Resource.newInstance(60 * 1024,100)); + + // Define top-level queues + csconf.setQueues(A, new String[] {"a1", "a2"}); + + csconf.setMinimumResourceRequirement("", QUEUEA1_FULL, Resource.newInstance(5 * 1024, 0)); + csconf.setMaximumResourceRequirement("", QUEUEA1_FULL, Resource.newInstance(5 * 1024,100)); + + csconf.setMinimumResourceRequirement("", QUEUEA2_FULL, Resource.newInstance(10 * 1024, 0)); + csconf.setMaximumResourceRequirement("", QUEUEA2_FULL, Resource.newInstance(40 * 1024,70)); + + csconf.setResourceComparator(DominantResourceCalculator.class); + csconf.setMaximumApplicationsPerQueue(QUEUEA1_FULL.getFullPath(), 10); + csconf.setMaximumApplicationsPerQueue(QUEUEA2_FULL.getFullPath(), 10); + YarnConfiguration conf = new YarnConfiguration(csconf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + + MockRM rm = new MockRM(conf); + rm.start(); + + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + LeafQueue qb = (LeafQueue)cs.getQueue(QUEUEA1_FULL.getLeafName()); + qb.setUserLimitFactor((float)1000); + LeafQueue qc = (LeafQueue)cs.getQueue(QUEUEA2_FULL.getLeafName()); + qc.setUserLimitFactor((float)1000); + + // add app 1 + ApplicationId appId = BuilderUtils.newApplicationId(100, 1); + ApplicationAttemptId appAttemptId = + BuilderUtils.newApplicationAttemptId(appId, 1); + + RMAppAttemptMetrics attemptMetric = + new RMAppAttemptMetrics(appAttemptId, rm.getRMContext()); + RMAppImpl app = mock(RMAppImpl.class); + when(app.getApplicationId()).thenReturn(appId); + RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class); + Container container = mock(Container.class); + when(attempt.getMasterContainer()).thenReturn(container); + ApplicationSubmissionContext submissionContext = mock( + ApplicationSubmissionContext.class); + when(attempt.getSubmissionContext()).thenReturn(submissionContext); + when(attempt.getAppAttemptId()).thenReturn(appAttemptId); + when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric); + when(app.getCurrentAppAttempt()).thenReturn(attempt); + + rm.getRMContext().getRMApps().put(appId, app); + + SchedulerEvent addAppEvent = + new AppAddedSchedulerEvent(appId, QUEUEA1_FULL.getLeafName(), "user1"); + cs.handle(addAppEvent); + SchedulerEvent addAttemptEvent = + new AppAttemptAddedSchedulerEvent(appAttemptId, false); + cs.handle(addAttemptEvent); + + // add app 2 + ApplicationId appId2 = BuilderUtils.newApplicationId(100, 2); + ApplicationAttemptId appAttemptId2 = + BuilderUtils.newApplicationAttemptId(appId2, 1); + + RMAppAttemptMetrics attemptMetric2 = + new RMAppAttemptMetrics(appAttemptId2, rm.getRMContext()); + RMAppImpl app2 = mock(RMAppImpl.class); + when(app2.getApplicationId()).thenReturn(appId2); + RMAppAttemptImpl attempt2 = mock(RMAppAttemptImpl.class); + when(attempt2.getMasterContainer()).thenReturn(container); + when(attempt2.getSubmissionContext()).thenReturn(submissionContext); + when(attempt2.getAppAttemptId()).thenReturn(appAttemptId2); + when(attempt2.getRMAppAttemptMetrics()).thenReturn(attemptMetric2); + when(app2.getCurrentAppAttempt()).thenReturn(attempt2); + + rm.getRMContext().getRMApps().put(appId2, app2); + addAppEvent = + new AppAddedSchedulerEvent(appId2, QUEUEA2_FULL.getLeafName(), "user2"); + cs.handle(addAppEvent); + addAttemptEvent = + new AppAttemptAddedSchedulerEvent(appAttemptId2, false); + cs.handle(addAttemptEvent); + + // add nodes to cluster, so cluster have 20GB and 20 vcores + Resource newResource = Resource.newInstance(30 * GB, 50); + RMNode node = MockNodes.newNodeInfo(0, newResource, 1, "127.0.0.1"); + cs.handle(new NodeAddedSchedulerEvent(node)); + + Resource newResource2 = Resource.newInstance(30 * GB, 50); + RMNode node2 = MockNodes.newNodeInfo(0, newResource2, 1, "127.0.0.2"); + cs.handle(new NodeAddedSchedulerEvent(node2)); + + FiCaSchedulerApp fiCaApp1 = + cs.getSchedulerApplications().get(app.getApplicationId()) + .getCurrentAppAttempt(); + + FiCaSchedulerApp fiCaApp2 = + cs.getSchedulerApplications().get(app2.getApplicationId()) + .getCurrentAppAttempt(); + Priority u0Priority = TestUtils.createMockPriority(1); + RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + + // allocate container for app2 with 30GB memory and 1 vcore + fiCaApp2.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 30*GB, 1, true, + u0Priority, recordFactory))); + cs.handle(new NodeUpdateSchedulerEvent(node)); + cs.handle(new NodeUpdateSchedulerEvent(node2)); + Assert.assertEquals(CapacitySchedulerMetrics.getMetrics().getNumOfCommitSuccess(), 1); + Assert.assertEquals(CapacitySchedulerMetrics.getMetrics().getNumOfCommitFailure(), 0); + + // allocate container for app1 with 10GB memory and 1 vcore + fiCaApp1.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 10*GB, 1, true, + u0Priority, recordFactory))); + cs.handle(new NodeUpdateSchedulerEvent(node)); + cs.handle(new NodeUpdateSchedulerEvent(node2)); + Assert.assertEquals(CapacitySchedulerMetrics.getMetrics().getNumOfCommitSuccess(), 1); + Assert.assertEquals(CapacitySchedulerMetrics.getMetrics().getNumOfCommitFailure(), 0); + + // allocate container for app1 with 10GB memory and 1 vcore + fiCaApp1.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, + u0Priority, recordFactory))); + cs.handle(new NodeUpdateSchedulerEvent(node)); + cs.handle(new NodeUpdateSchedulerEvent(node2)); + Assert.assertEquals(CapacitySchedulerMetrics.getMetrics().getNumOfCommitSuccess(), 2); + Assert.assertEquals(CapacitySchedulerMetrics.getMetrics().getNumOfCommitFailure(), 0); + } + @Test public void testNotAssignMultiple() throws Exception { LOG.info("--- START: testNotAssignMultiple ---");