diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 5ddb9a4..e3cdeae 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -961,31 +961,51 @@ private synchronized boolean assignToQueue(Resource clusterResource, return true; } + Resource updateBlackList4App(FiCaSchedulerApp application, + Resource clusterResource){ + int updatedMem = clusterResource.getMemory(); + int updatedVcore = clusterResource.getVirtualCores(); + if(updatedMem == 0 || updatedVcore == 0){ + return clusterResource; + } + + for(NodeId nodeId: scheduler.getRMContext().getRMNodes().keySet()){ + if(application.isBlacklisted(nodeId.getHost())){ + updatedMem -= scheduler.getRMContext().getRMNodes() + .get(nodeId).getTotalCapability().getMemory(); + updatedVcore -= scheduler.getRMContext().getRMNodes() + .get(nodeId).getTotalCapability().getVirtualCores(); + } + } + return Resource.newInstance(updatedMem, updatedVcore); + } + @Lock({LeafQueue.class, FiCaSchedulerApp.class}) private Resource computeUserLimitAndSetHeadroom( FiCaSchedulerApp application, Resource clusterResource, Resource required) { String user = application.getUser(); - + Resource resourceNoBlacklistNode = + updateBlackList4App(application, clusterResource); + /** * Headroom is min((userLimit, queue-max-cap) - consumed) */ Resource userLimit = // User limit - computeUserLimit(application, clusterResource, required); - + computeUserLimit(application, resourceNoBlacklistNode, required); Resource queueMaxCap = // Queue Max-Capacity Resources.multiplyAndNormalizeDown( - resourceCalculator, - clusterResource, + resourceCalculator, + resourceNoBlacklistNode, absoluteMaxCapacity, minimumAllocation); Resource userConsumed = getUser(user).getConsumedResources(); Resource headroom = Resources.subtract( - Resources.min(resourceCalculator, clusterResource, + Resources.min(resourceCalculator, resourceNoBlacklistNode, userLimit, queueMaxCap), userConsumed); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index a9a9975..d11cf85 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -63,7 +63,7 @@ final static int GB = 1024; LeafQueue queue; - + private final ResourceCalculator resourceCalculator = new DefaultResourceCalculator(); @Before @@ -73,7 +73,6 @@ public void setUp() throws IOException { YarnConfiguration conf = new YarnConfiguration(); setupQueueConfiguration(csConf); - CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class); when(csContext.getConfiguration()).thenReturn(csConf); when(csContext.getConf()).thenReturn(conf); @@ -295,7 +294,7 @@ public void testLimitsComputation() throws Exception { (queue.getUserLimit()/100.0f) * queue.getUserLimitFactor()); assertEquals(expectedMaxAppsPerUser, queue.getMaxApplicationsPerUser()); } - + @Test public void testActiveApplicationLimits() throws Exception { final String user_0 = "user_0"; @@ -462,7 +461,8 @@ public void testHeadroom() throws Exception { csConf.setUserLimit(CapacitySchedulerConfiguration.ROOT + "." + A, 25); setupQueueConfiguration(csConf); YarnConfiguration conf = new YarnConfiguration(); - + RMContext rmContext = TestUtils.getMockRMContext(); + CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class); when(csContext.getConfiguration()).thenReturn(csConf); when(csContext.getConf()).thenReturn(conf); @@ -475,7 +475,8 @@ public void testHeadroom() throws Exception { when(csContext.getQueueComparator()). thenReturn(CapacityScheduler.queueComparator); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); - + when(csContext.getRMContext()).thenReturn(rmContext); + // Say cluster has 100 nodes of 16G each Resource clusterResource = Resources.createResource(100 * 16 * GB); when(csContext.getClusterResource()).thenReturn(clusterResource); @@ -490,13 +491,13 @@ public void testHeadroom() throws Exception { String host_0 = "host_0"; String rack_0 = "rack_0"; FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 16*GB); + rmContext.getRMNodes().put(node_0.getNodeID(), node_0.getRMNode()); final String user_0 = "user_0"; final String user_1 = "user_1"; RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - RMContext rmContext = TestUtils.getMockRMContext(); Priority priority_1 = TestUtils.createMockPriority(1); @@ -567,6 +568,8 @@ public void testHeadroom() throws Exception { verify(app_0_0).setHeadroom(eq(expectedHeadroom)); verify(app_0_1).setHeadroom(eq(expectedHeadroom)); verify(app_1_0).setHeadroom(eq(expectedHeadroom)); + + csContext.getRMContext().getRMNodes().clear(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 2a26d30..7c71804 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -135,6 +135,7 @@ public void setUp() throws Exception { containerTokenSecretManager.rollMasterKey(); when(csContext.getContainerTokenSecretManager()).thenReturn( containerTokenSecretManager); + when(csContext.getRMContext()).thenReturn(rmContext); root = CapacityScheduler.parseQueue(csContext, csConf, null, @@ -210,7 +211,7 @@ public Container answer(InvocationOnMock invocation) throws Throwable { final FiCaSchedulerApp application = (FiCaSchedulerApp)(invocation.getArguments()[0]); - final ContainerId containerId = + final ContainerId containerId = TestUtils.getMockContainerId(application); Container container = TestUtils.getMockContainer( @@ -223,11 +224,11 @@ public Container answer(InvocationOnMock invocation) } ). when(queue).createContainer( - any(FiCaSchedulerApp.class), - any(FiCaSchedulerNode.class), - any(Resource.class), - any(Priority.class) - ); + any(FiCaSchedulerApp.class), + any(FiCaSchedulerNode.class), + any(Resource.class), + any(Priority.class) + ); // 2. Stub out LeafQueue.parent.completedContainer CSQueue parent = queue.getParent(); @@ -235,10 +236,24 @@ public Container answer(InvocationOnMock invocation) any(Resource.class), any(FiCaSchedulerApp.class), any(FiCaSchedulerNode.class), any(RMContainer.class), any(ContainerStatus.class), any(RMContainerEventType.class), any(CSQueue.class)); - + return queue; } - + + public void addNodes2RMContext(FiCaSchedulerNode nodes[]){ + if(nodes.length ==0 ){ + return; + } + RMContext rmContext = csContext.getRMContext(); + for(FiCaSchedulerNode node : nodes){ + rmContext.getRMNodes().put(node.getNodeID(), node.getRMNode()); + } + } + + public void clearNodes(){ + csContext.getRMContext().getRMNodes().clear(); + } + @Test public void testInitializeQueue() throws Exception { final float epsilon = 1e-5f; @@ -297,6 +312,7 @@ public void testSingleQueueOneUserMetrics() throws Exception { Resource clusterResource = Resources.createResource(numNodes * (8*GB), numNodes * 16); when(csContext.getNumClusterNodes()).thenReturn(numNodes); + addNodes2RMContext(new FiCaSchedulerNode[]{node_0}); // Setup resource-requests Priority priority = TestUtils.createMockPriority(1); @@ -311,6 +327,7 @@ public void testSingleQueueOneUserMetrics() throws Exception { assertEquals( (int)(node_0.getTotalResource().getMemory() * a.getCapacity()) - (1*GB), a.getMetrics().getAvailableMB()); + clearNodes(); } @Test @@ -428,6 +445,7 @@ public void testSingleQueueWithOneUser() throws Exception { Resource clusterResource = Resources.createResource(numNodes * (8*GB), numNodes * 16); when(csContext.getNumClusterNodes()).thenReturn(numNodes); + addNodes2RMContext(new FiCaSchedulerNode[]{node_0}); // Setup resource-requests Priority priority = TestUtils.createMockPriority(1); @@ -518,8 +536,39 @@ public void testSingleQueueWithOneUser() throws Exception { assertEquals(0*GB, a.getMetrics().getAllocatedMB()); assertEquals((int)(a.getCapacity() * node_0.getTotalResource().getMemory()), a.getMetrics().getAvailableMB()); + clearNodes(); } - + + @Test + public void testUpdateBlackList4App() { + + LeafQueue x = new LeafQueue(cs, queues.get(C1).getQueueName(), + queues.get(C), root); + x.setMaxCapacity(1.0f); + final String user_0 = "user_0"; + + String host_0 = "127.0.0.1"; + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); + String host_1 = "127.0.0.2"; + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB); + + final int numNodes = 2; + Resource clusterResource = + Resources.createResource(numNodes * (8*GB), numNodes * 16); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + addNodes2RMContext(new FiCaSchedulerNode[]{node_0, node_1}); + + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, x, + x.getActiveUsersManager(), rmContext); + app_0.updateBlacklist(Collections.singletonList(host_0), null); + Assert.assertEquals(8*GB, + x.updateBlackList4App(app_0, clusterResource).getMemory()); + clearNodes(); + } + @Test public void testUserLimits() throws Exception { // Mock the queue @@ -563,8 +612,9 @@ public void testUserLimits() throws Exception { Resource clusterResource = Resources.createResource(numNodes * (8*GB), numNodes * 16); when(csContext.getNumClusterNodes()).thenReturn(numNodes); - - // Setup resource-requests + addNodes2RMContext(new FiCaSchedulerNode[]{node_0, node_1}); + + // Setup resource-requests Priority priority = TestUtils.createMockPriority(1); app_0.updateResourceRequests(Collections.singletonList( TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true, @@ -612,6 +662,7 @@ public void testUserLimits() throws Exception { assertEquals(4*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(2*GB, app_1.getCurrentConsumption().getMemory()); + clearNodes(); } @Test @@ -656,7 +707,8 @@ public void testHeadroomWithMaxCap() throws Exception { final int numNodes = 2; Resource clusterResource = Resources.createResource(numNodes * (8*GB), 1); when(csContext.getNumClusterNodes()).thenReturn(numNodes); - + addNodes2RMContext(new FiCaSchedulerNode[]{node_0, node_1}); + // Setup resource-requests Priority priority = TestUtils.createMockPriority(1); app_0.updateResourceRequests(Collections.singletonList( @@ -720,7 +772,8 @@ public void testHeadroomWithMaxCap() throws Exception { priority, recordFactory))); assertEquals(1, a.getActiveUsersManager().getNumActiveUsers()); a.assignContainers(clusterResource, node_1); - assertEquals(1*GB, app_2.getHeadroom().getMemory()); // hit queue max-cap + assertEquals(1*GB, app_2.getHeadroom().getMemory()); // hit queue max-cap + clearNodes(); } @Test @@ -773,7 +826,7 @@ public void testSingleQueueWithMultipleUsers() throws Exception { Resource clusterResource = Resources.createResource(numNodes * (8*GB), numNodes * 16); when(csContext.getNumClusterNodes()).thenReturn(numNodes); - + addNodes2RMContext(new FiCaSchedulerNode[]{node_0}); // Setup resource-requests Priority priority = TestUtils.createMockPriority(1); app_0.updateResourceRequests(Collections.singletonList( @@ -898,6 +951,7 @@ public void testSingleQueueWithMultipleUsers() throws Exception { assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_2.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_3.getCurrentConsumption().getMemory()); + clearNodes(); } @Test @@ -935,7 +989,7 @@ public void testReservation() throws Exception { Resource clusterResource = Resources.createResource(numNodes * (4*GB), numNodes * 16); when(csContext.getNumClusterNodes()).thenReturn(numNodes); - + addNodes2RMContext(new FiCaSchedulerNode[]{node_0}); // Setup resource-requests Priority priority = TestUtils.createMockPriority(1); app_0.updateResourceRequests(Collections.singletonList( @@ -1001,6 +1055,7 @@ public void testReservation() throws Exception { assertEquals(4*GB, node_0.getUsedResource().getMemory()); assertEquals(0*GB, a.getMetrics().getReservedMB()); assertEquals(4*GB, a.getMetrics().getAllocatedMB()); + clearNodes(); } @Test @@ -1039,6 +1094,7 @@ public void testStolenReservedContainer() throws Exception { Resource clusterResource = Resources.createResource(numNodes * (4*GB), numNodes * 16); when(csContext.getNumClusterNodes()).thenReturn(numNodes); + addNodes2RMContext(new FiCaSchedulerNode[]{node_0, node_1}); // Setup resource-requests Priority priority = TestUtils.createMockPriority(1); @@ -1103,6 +1159,8 @@ public void testStolenReservedContainer() throws Exception { assertEquals(4*GB, node_0.getUsedResource().getMemory()); assertEquals(0*GB, a.getMetrics().getReservedMB()); assertEquals(8*GB, a.getMetrics().getAllocatedMB()); + + clearNodes(); } @Test @@ -1143,6 +1201,8 @@ public void testReservationExchange() throws Exception { final int numNodes = 3; Resource clusterResource = Resources.createResource(numNodes * (4*GB), numNodes * 16); + addNodes2RMContext(new FiCaSchedulerNode[]{node_0, node_1}); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); when(csContext.getMaximumResourceCapability()).thenReturn( Resources.createResource(4*GB, 16)); @@ -1227,6 +1287,7 @@ public void testReservationExchange() throws Exception { assertEquals(0*GB, node_0.getUsedResource().getMemory()); assertEquals(4*GB, assignment.getExcessReservation().getContainer().getResource().getMemory()); + clearNodes(); } @@ -1264,7 +1325,8 @@ public void testLocalityScheduling() throws Exception { Resource clusterResource = Resources.createResource(numNodes * (8*GB), numNodes * 16); when(csContext.getNumClusterNodes()).thenReturn(numNodes); - + addNodes2RMContext(new FiCaSchedulerNode[]{node_0, node_1, node_2}); + // Setup resource-requests and submit Priority priority = TestUtils.createMockPriority(1); List app_0_requests_0 = new ArrayList(); @@ -1353,7 +1415,7 @@ public void testLocalityScheduling() throws Exception { String host_3 = "127.0.0.4"; // on rack_1 FiCaSchedulerNode node_3 = TestUtils.getMockNode(host_3, rack_1, 0, 8*GB); - + addNodes2RMContext(new FiCaSchedulerNode[]{node_3}); // Rack-delay doReturn(1).when(a).getNodeLocalityDelay(); @@ -1370,6 +1432,8 @@ public void testLocalityScheduling() throws Exception { assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset assertEquals(1, app_0.getTotalRequiredResources(priority)); assertEquals(NodeType.RACK_LOCAL, assignment.getType()); + + clearNodes(); } @Test @@ -1405,7 +1469,7 @@ public void testApplicationPriorityScheduling() throws Exception { Resource clusterResource = Resources.createResource(numNodes * (8*GB), 1); when(csContext.getNumClusterNodes()).thenReturn(numNodes); - + addNodes2RMContext(new FiCaSchedulerNode[]{node_0, node_1, node_2}); // Setup resource-requests and submit List app_0_requests_0 = new ArrayList(); @@ -1500,6 +1564,7 @@ public void testApplicationPriorityScheduling() throws Exception { assertEquals(1, app_0.getSchedulingOpportunities(priority_2)); assertEquals(0, app_0.getTotalRequiredResources(priority_2)); + clearNodes(); } @Test @@ -1530,7 +1595,7 @@ public void testSchedulingConstraints() throws Exception { String host_1_0 = "127.0.0.3"; String rack_1 = "rack_1"; FiCaSchedulerNode node_1_0 = TestUtils.getMockNode(host_1_0, rack_1, 0, 8*GB); - + addNodes2RMContext(new FiCaSchedulerNode[]{node_0_0, node_1_0, node_0_1}); final int numNodes = 3; Resource clusterResource = Resources.createResource( numNodes * (8*GB), numNodes * 16); @@ -1603,6 +1668,7 @@ public void testSchedulingConstraints() throws Exception { assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset assertEquals(0, app_0.getTotalRequiredResources(priority)); + clearNodes(); } @Test (timeout = 30000) @@ -1796,7 +1862,8 @@ public void testLocalityConstraints() throws Exception { FiCaSchedulerNode node_1_0 = TestUtils.getMockNode(host_1_0, rack_1, 0, 8*GB); String host_1_1 = "127.0.0.4"; FiCaSchedulerNode node_1_1 = TestUtils.getMockNode(host_1_1, rack_1, 0, 8*GB); - + addNodes2RMContext(new FiCaSchedulerNode[]{node_1_0, node_0_1, node_1_1}); + final int numNodes = 4; Resource clusterResource = Resources.createResource( numNodes * (8*GB), numNodes * 1); @@ -1983,6 +2050,7 @@ public void testLocalityConstraints() throws Exception { assertEquals(0, app_0.getSchedulingOpportunities(priority)); assertEquals(0, app_0.getTotalRequiredResources(priority)); + clearNodes(); } @Test @@ -2043,6 +2111,7 @@ public void testAllocateContainerOnNodeWithoutOffSwitchSpecified() String host_0 = "127.0.0.1"; FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8 * GB); + addNodes2RMContext(new FiCaSchedulerNode[]{node_0}); final int numNodes = 1; Resource clusterResource = @@ -2062,6 +2131,8 @@ public void testAllocateContainerOnNodeWithoutOffSwitchSpecified() Assert.fail("NPE when allocating container on node but " + "forget to set off-switch request should be handled"); } + + clearNodes(); } private CapacitySchedulerContext mockCSContext(