diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 99d03681040..ac912c95197 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -1138,7 +1138,7 @@ protected synchronized boolean assignToUser(Resource clusterResource, if (Resources.lessThanOrEqual( resourceCalculator, clusterResource, - Resources.subtract(user.getUsed(), application.getCurrentReservation()), + Resources.subtract(user.getUsed(label), application.getCurrentReservation()), limit)) { if (LOG.isDebugEnabled()) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java index 29e2a880a07..c710681737d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java @@ -23,6 +23,8 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isA; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; @@ -32,6 +34,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -52,6 +55,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -102,17 +106,22 @@ public void setUp() throws Exception { } private void setup(CapacitySchedulerConfiguration csConf) throws Exception { - setup(csConf, false); + setup(csConf, false, CommonNodeLabelsManager.NO_LABEL); + } + + private void setup(CapacitySchedulerConfiguration csConf, String label) + throws Exception { + setup(csConf, false, label); } private void setup(CapacitySchedulerConfiguration csConf, - boolean addUserLimits) throws Exception { + boolean addUserLimits, String label) throws Exception { csConf.setBoolean("yarn.scheduler.capacity.user-metrics.enable", true); final String newRoot = "root" + System.currentTimeMillis(); // final String newRoot = "root"; - setupQueueConfiguration(csConf, newRoot, addUserLimits); + setupQueueConfiguration(csConf, newRoot, addUserLimits, label); YarnConfiguration conf = new YarnConfiguration(); cs.setConf(conf); @@ -153,7 +162,7 @@ private void setup(CapacitySchedulerConfiguration csConf, private static final String A = "a"; private void setupQueueConfiguration(CapacitySchedulerConfiguration conf, - final String newRoot, boolean addUserLimits) { + final String newRoot, boolean addUserLimits, String label) { // Define top-level queues conf.setQueues(CapacitySchedulerConfiguration.ROOT, @@ -161,6 +170,11 @@ private void setupQueueConfiguration(CapacitySchedulerConfiguration conf, conf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT, 100); conf.setAcl(CapacitySchedulerConfiguration.ROOT, QueueACL.SUBMIT_APPLICATIONS, " "); + if (!label.equals(CommonNodeLabelsManager.NO_LABEL)) { + conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, label, 100); + conf.setMaximumCapacityByLabel(CapacitySchedulerConfiguration.ROOT, + label, 100); + } final String Q_newRoot = CapacitySchedulerConfiguration.ROOT + "." + newRoot; @@ -168,10 +182,18 @@ private void setupQueueConfiguration(CapacitySchedulerConfiguration conf, conf.setCapacity(Q_newRoot, 100); conf.setMaximumCapacity(Q_newRoot, 100); conf.setAcl(Q_newRoot, QueueACL.SUBMIT_APPLICATIONS, " "); + if (!label.equals(CommonNodeLabelsManager.NO_LABEL)) { + conf.setCapacityByLabel(Q_newRoot, label, 100); + conf.setMaximumCapacityByLabel(Q_newRoot, label, 100); + } final String Q_A = Q_newRoot + "." + A; conf.setCapacity(Q_A, 100f); conf.setMaximumCapacity(Q_A, 100); + if (!label.equals(CommonNodeLabelsManager.NO_LABEL)) { + conf.setCapacityByLabel(Q_A, label, 100); + conf.setMaximumCapacityByLabel(Q_A, label, 100); + } conf.setAcl(Q_A, QueueACL.SUBMIT_APPLICATIONS, "*"); if (addUserLimits) { @@ -368,7 +390,7 @@ public void testReservation() throws Exception { @Test public void testReservationLimitOtherUsers() throws Exception { CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); - setup(csConf, true); + setup(csConf, true, CommonNodeLabelsManager.NO_LABEL); // Manipulate queue 'a' LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); @@ -1234,6 +1256,88 @@ public void testAssignToUser() throws Exception { } @Test + public void testAssignToUserWithPartition() throws Exception { + + String label = "x"; + Set labelSingleton = Collections.singleton(label); + // Setup some nodes + String host_0 = "host_0"; + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, + 8 * GB); + String host_1 = "host_1"; + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, + 8 * GB); + node_1.updateLabels(labelSingleton); + + RMNodeLabelsManager mgr = mock(RMNodeLabelsManager.class); + when(mgr.getResourceByLabel(eq(label), isA(Resource.class))) + .thenReturn(Resources.createResource(8 * GB)); + when(mgr.getResourceByLabel(eq(CommonNodeLabelsManager.NO_LABEL), + isA(Resource.class))).thenReturn(Resources.createResource(8 * GB)); + when(mgr.getClusterNodeLabels()).thenReturn(labelSingleton); + rmContext.setNodeLabelManager(mgr); + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + setup(csConf, label); + + // Manipulate queue 'a' + LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); + + // Users + final String user_0 = "user_0"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = TestUtils + .getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), spyRMContext); + rmContext.getRMApps().put(app_0.getApplicationId(), mock(RMApp.class)); + a.submitApplicationAttempt(app_0, user_0); + + final ApplicationAttemptId appAttemptId_1 = TestUtils + .getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, + mock(ActiveUsersManager.class), spyRMContext); + a.submitApplicationAttempt(app_1, user_0); + + when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0); + when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1); + + Resource clusterResource = Resources.createResource(16 * GB); + when(csContext.getNumClusterNodes()).thenReturn(2); + + // Setup resource-requests + Priority priorityAM = TestUtils.createMockPriority(1); + Priority priorityMap = TestUtils.createMockPriority(5); + + ResourceRequest reqAM = TestUtils.createResourceRequest( + ResourceRequest.ANY, 3 * GB, 1, true, priorityAM, recordFactory); + reqAM.setNodeLabelExpression(label); + app_0.updateResourceRequests(Collections.singletonList(reqAM)); + ResourceRequest reqMap = TestUtils.createResourceRequest( + ResourceRequest.ANY, 2 * GB, 1, true, priorityMap, recordFactory); + reqMap.setNodeLabelExpression(label); + app_0.updateResourceRequests(Collections.singletonList(reqMap)); + + // Allocate AM on node_1 (label x) + a.assignContainers(clusterResource, node_1, + new ResourceLimits(clusterResource)); + assertEquals(0 * GB, a.getQueueResourceUsage().getUsed().getMemory()); + assertEquals(3 * GB, a.getQueueResourceUsage().getUsed(label).getMemory()); + assertEquals(0 * GB, a.getUser(user_0).getUsed().getMemory()); + assertEquals(3 * GB, a.getUser(user_0).getUsed(label).getMemory()); + assertEquals(0 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + + Resource limit = Resources.createResource(2 * GB, 0); + ResourceLimits userResourceLimits = new ResourceLimits(clusterResource); + // User limit is 2 GB, user's used resources on partition x exceeds this + // so allocation should fail + boolean res = a.assignToUser(clusterResource, user_0, limit, app_0, + labelSingleton, userResourceLimits); + assertFalse(res); + } + + @Test public void testReservationsNoneAvailable() throws Exception { // Test that we now unreserve and use a node that has space