diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 971edb8..cd560e2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -961,31 +961,51 @@ private synchronized boolean assignToQueue(Resource clusterResource, return true; } + Resource updateBlackList4App(FiCaSchedulerApp application, + Resource clusterResource){ + int updatedMem = clusterResource.getMemory(); + int updatedVcore = clusterResource.getVirtualCores(); + if(updatedMem == 0 || updatedVcore == 0){ + return clusterResource; + } + + for(NodeId nodeId: scheduler.getRMContext().getRMNodes().keySet()){ + if(application.isBlacklisted(nodeId.getHost())){ + updatedMem -= scheduler.getRMContext().getRMNodes() + .get(nodeId).getTotalCapability().getMemory(); + updatedVcore -= scheduler.getRMContext().getRMNodes() + .get(nodeId).getTotalCapability().getVirtualCores(); + } + } + return Resource.newInstance(updatedMem, updatedVcore); + } + @Lock({LeafQueue.class, FiCaSchedulerApp.class}) private Resource computeUserLimitAndSetHeadroom( FiCaSchedulerApp application, Resource clusterResource, Resource required) { String user = application.getUser(); - + Resource resourceNoBlacklistNode = + updateBlackList4App(application, clusterResource); + /** * Headroom is min((userLimit, queue-max-cap) - consumed) */ Resource userLimit = // User limit - computeUserLimit(application, clusterResource, required); - + computeUserLimit(application, resourceNoBlacklistNode, required); Resource queueMaxCap = // Queue Max-Capacity Resources.multiplyAndNormalizeDown( - resourceCalculator, - clusterResource, + resourceCalculator, + resourceNoBlacklistNode, absoluteMaxCapacity, minimumAllocation); Resource userConsumed = getUser(user).getConsumedResources(); Resource headroom = Resources.subtract( - Resources.min(resourceCalculator, clusterResource, + Resources.min(resourceCalculator, resourceNoBlacklistNode, userLimit, queueMaxCap), userConsumed); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 13cdcf0..2af47a1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -135,6 +128,7 @@ public void setUp() throws Exception { containerTokenSecretManager.rollMasterKey(); when(csContext.getContainerTokenSecretManager()).thenReturn( containerTokenSecretManager); + when(csContext.getRMContext()).thenReturn(rmContext); root = CapacityScheduler.parseQueue(csContext, csConf, null, @@ -210,7 +204,7 @@ public Container answer(InvocationOnMock invocation) throws Throwable { final FiCaSchedulerApp application = (FiCaSchedulerApp)(invocation.getArguments()[0]); - final ContainerId containerId = + final ContainerId containerId = TestUtils.getMockContainerId(application); Container container = TestUtils.getMockContainer( @@ -223,11 +217,11 @@ public Container answer(InvocationOnMock invocation) } ). when(queue).createContainer( - any(FiCaSchedulerApp.class), - any(FiCaSchedulerNode.class), - any(Resource.class), - any(Priority.class) - ); + any(FiCaSchedulerApp.class), + any(FiCaSchedulerNode.class), + any(Resource.class), + any(Priority.class) + ); // 2. Stub out LeafQueue.parent.completedContainer CSQueue parent = queue.getParent(); @@ -235,10 +229,24 @@ public Container answer(InvocationOnMock invocation) any(Resource.class), any(FiCaSchedulerApp.class), any(FiCaSchedulerNode.class), any(RMContainer.class), any(ContainerStatus.class), any(RMContainerEventType.class), any(CSQueue.class)); - + return queue; } - + + public void addNodes2RMContext(FiCaSchedulerNode nodes[]){ + if(nodes.length ==0 ){ + return; + } + RMContext rmContext = csContext.getRMContext(); + for(FiCaSchedulerNode node : nodes){ + rmContext.getRMNodes().put(node.getNodeID(), node.getRMNode()); + } + } + + public void clearNodes(){ + csContext.getRMContext().getRMNodes().clear(); + } + @Test public void testInitializeQueue() throws Exception { final float epsilon = 1e-5f; @@ -297,6 +305,7 @@ public void testSingleQueueOneUserMetrics() throws Exception { Resource clusterResource = Resources.createResource(numNodes * (8*GB), numNodes * 16); when(csContext.getNumClusterNodes()).thenReturn(numNodes); + addNodes2RMContext(new FiCaSchedulerNode[]{node_0}); // Setup resource-requests Priority priority = TestUtils.createMockPriority(1); @@ -311,6 +320,7 @@ public void testSingleQueueOneUserMetrics() throws Exception { assertEquals( (int)(node_0.getTotalResource().getMemory() * a.getCapacity()) - (1*GB), a.getMetrics().getAvailableMB()); + clearNodes(); } @Test @@ -429,6 +439,7 @@ public void testSingleQueueWithOneUser() throws Exception { Resource clusterResource = Resources.createResource(numNodes * (8*GB), numNodes * 16); when(csContext.getNumClusterNodes()).thenReturn(numNodes); + addNodes2RMContext(new FiCaSchedulerNode[]{node_0}); // Setup resource-requests Priority priority = TestUtils.createMockPriority(1); @@ -519,8 +530,39 @@ public void testSingleQueueWithOneUser() throws Exception { assertEquals(0*GB, a.getMetrics().getAllocatedMB()); assertEquals((int)(a.getCapacity() * node_0.getTotalResource().getMemory()), a.getMetrics().getAvailableMB()); + clearNodes(); } - + + @Test + public void testUpdateBlackList4App() { + + LeafQueue x = new LeafQueue(cs, queues.get(C1).getQueueName(), + queues.get(C), root); + x.setMaxCapacity(1.0f); + final String user_0 = "user_0"; + + String host_0 = "127.0.0.1"; + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); + String host_1 = "127.0.0.2"; + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB); + + final int numNodes = 2; + Resource clusterResource = + Resources.createResource(numNodes * (8*GB), numNodes * 16); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + addNodes2RMContext(new FiCaSchedulerNode[]{node_0, node_1}); + + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, x, + x.getActiveUsersManager(), rmContext); + app_0.updateBlacklist(Collections.singletonList(host_0), null); + Assert.assertEquals(8*GB, + x.updateBlackList4App(app_0, clusterResource).getMemory()); + clearNodes(); + } + @Test public void testUserLimits() throws Exception { // Mock the queue @@ -564,8 +606,9 @@ public void testUserLimits() throws Exception { Resource clusterResource = Resources.createResource(numNodes * (8*GB), numNodes * 16); when(csContext.getNumClusterNodes()).thenReturn(numNodes); - - // Setup resource-requests + addNodes2RMContext(new FiCaSchedulerNode[]{node_0, node_1}); + + // Setup resource-requests Priority priority = TestUtils.createMockPriority(1); app_0.updateResourceRequests(Collections.singletonList( TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true, @@ -613,6 +656,7 @@ public void testUserLimits() throws Exception { assertEquals(4*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(2*GB, app_1.getCurrentConsumption().getMemory()); + clearNodes(); } @Test @@ -657,7 +701,8 @@ public void testHeadroomWithMaxCap() throws Exception { final int numNodes = 2; Resource clusterResource = Resources.createResource(numNodes * (8*GB), 1); when(csContext.getNumClusterNodes()).thenReturn(numNodes); - + addNodes2RMContext(new FiCaSchedulerNode[]{node_0, node_1}); + // Setup resource-requests Priority priority = TestUtils.createMockPriority(1); app_0.updateResourceRequests(Collections.singletonList( @@ -721,7 +766,8 @@ public void testHeadroomWithMaxCap() throws Exception { priority, recordFactory))); assertEquals(1, a.getActiveUsersManager().getNumActiveUsers()); a.assignContainers(clusterResource, node_1); - assertEquals(1*GB, app_2.getHeadroom().getMemory()); // hit queue max-cap + assertEquals(1*GB, app_2.getHeadroom().getMemory()); // hit queue max-cap + clearNodes(); } @Test @@ -774,7 +820,7 @@ public void testSingleQueueWithMultipleUsers() throws Exception { Resource clusterResource = Resources.createResource(numNodes * (8*GB), numNodes * 16); when(csContext.getNumClusterNodes()).thenReturn(numNodes); - + addNodes2RMContext(new FiCaSchedulerNode[]{node_0}); // Setup resource-requests Priority priority = TestUtils.createMockPriority(1); app_0.updateResourceRequests(Collections.singletonList( @@ -899,6 +945,7 @@ public void testSingleQueueWithMultipleUsers() throws Exception { assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_2.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_3.getCurrentConsumption().getMemory()); + clearNodes(); } @Test @@ -936,7 +983,7 @@ public void testReservation() throws Exception { Resource clusterResource = Resources.createResource(numNodes * (4*GB), numNodes * 16); when(csContext.getNumClusterNodes()).thenReturn(numNodes); - + addNodes2RMContext(new FiCaSchedulerNode[]{node_0}); // Setup resource-requests Priority priority = TestUtils.createMockPriority(1); app_0.updateResourceRequests(Collections.singletonList( @@ -1002,6 +1049,7 @@ public void testReservation() throws Exception { assertEquals(4*GB, node_0.getUsedResource().getMemory()); assertEquals(0*GB, a.getMetrics().getReservedMB()); assertEquals(4*GB, a.getMetrics().getAllocatedMB()); + clearNodes(); } @Test @@ -1040,6 +1088,7 @@ public void testStolenReservedContainer() throws Exception { Resource clusterResource = Resources.createResource(numNodes * (4*GB), numNodes * 16); when(csContext.getNumClusterNodes()).thenReturn(numNodes); + addNodes2RMContext(new FiCaSchedulerNode[]{node_0, node_1}); // Setup resource-requests Priority priority = TestUtils.createMockPriority(1); @@ -1104,6 +1153,8 @@ public void testStolenReservedContainer() throws Exception { assertEquals(4*GB, node_0.getUsedResource().getMemory()); assertEquals(0*GB, a.getMetrics().getReservedMB()); assertEquals(8*GB, a.getMetrics().getAllocatedMB()); + + clearNodes(); } @Test @@ -1144,6 +1195,8 @@ public void testReservationExchange() throws Exception { final int numNodes = 3; Resource clusterResource = Resources.createResource(numNodes * (4*GB), numNodes * 16); + addNodes2RMContext(new FiCaSchedulerNode[]{node_0, node_1}); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); when(csContext.getMaximumResourceCapability()).thenReturn( Resources.createResource(4*GB, 16)); @@ -1228,6 +1281,7 @@ public void testReservationExchange() throws Exception { assertEquals(0*GB, node_0.getUsedResource().getMemory()); assertEquals(4*GB, assignment.getExcessReservation().getContainer().getResource().getMemory()); + clearNodes(); } @@ -1265,7 +1319,8 @@ public void testLocalityScheduling() throws Exception { Resource clusterResource = Resources.createResource(numNodes * (8*GB), numNodes * 16); when(csContext.getNumClusterNodes()).thenReturn(numNodes); - + addNodes2RMContext(new FiCaSchedulerNode[]{node_0, node_1, node_2}); + // Setup resource-requests and submit Priority priority = TestUtils.createMockPriority(1); List app_0_requests_0 = new ArrayList(); @@ -1354,7 +1409,7 @@ public void testLocalityScheduling() throws Exception { String host_3 = "127.0.0.4"; // on rack_1 FiCaSchedulerNode node_3 = TestUtils.getMockNode(host_3, rack_1, 0, 8*GB); - + addNodes2RMContext(new FiCaSchedulerNode[]{node_3}); // Rack-delay doReturn(1).when(a).getNodeLocalityDelay(); @@ -1371,6 +1426,8 @@ public void testLocalityScheduling() throws Exception { assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset assertEquals(1, app_0.getTotalRequiredResources(priority)); assertEquals(NodeType.RACK_LOCAL, assignment.getType()); + + clearNodes(); } @Test @@ -1406,7 +1463,7 @@ public void testApplicationPriorityScheduling() throws Exception { Resource clusterResource = Resources.createResource(numNodes * (8*GB), 1); when(csContext.getNumClusterNodes()).thenReturn(numNodes); - + addNodes2RMContext(new FiCaSchedulerNode[]{node_0, node_1, node_2}); // Setup resource-requests and submit List app_0_requests_0 = new ArrayList(); @@ -1501,6 +1558,7 @@ public void testApplicationPriorityScheduling() throws Exception { assertEquals(1, app_0.getSchedulingOpportunities(priority_2)); assertEquals(0, app_0.getTotalRequiredResources(priority_2)); + clearNodes(); } @Test @@ -1531,7 +1589,7 @@ public void testSchedulingConstraints() throws Exception { String host_1_0 = "127.0.0.3"; String rack_1 = "rack_1"; FiCaSchedulerNode node_1_0 = TestUtils.getMockNode(host_1_0, rack_1, 0, 8*GB); - + addNodes2RMContext(new FiCaSchedulerNode[]{node_0_0, node_1_0, node_0_1}); final int numNodes = 3; Resource clusterResource = Resources.createResource( numNodes * (8*GB), numNodes * 16); @@ -1604,6 +1662,7 @@ public void testSchedulingConstraints() throws Exception { assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset assertEquals(0, app_0.getTotalRequiredResources(priority)); + clearNodes(); } @Test (timeout = 30000) @@ -1797,7 +1856,8 @@ public void testLocalityConstraints() throws Exception { FiCaSchedulerNode node_1_0 = TestUtils.getMockNode(host_1_0, rack_1, 0, 8*GB); String host_1_1 = "127.0.0.4"; FiCaSchedulerNode node_1_1 = TestUtils.getMockNode(host_1_1, rack_1, 0, 8*GB); - + addNodes2RMContext(new FiCaSchedulerNode[]{node_1_0, node_0_1, node_1_1}); + final int numNodes = 4; Resource clusterResource = Resources.createResource( numNodes * (8*GB), numNodes * 1); @@ -1984,6 +2044,7 @@ public void testLocalityConstraints() throws Exception { assertEquals(0, app_0.getSchedulingOpportunities(priority)); assertEquals(0, app_0.getTotalRequiredResources(priority)); + clearNodes(); } @Test @@ -2044,6 +2105,7 @@ public void testAllocateContainerOnNodeWithoutOffSwitchSpecified() String host_0 = "127.0.0.1"; FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8 * GB); + addNodes2RMContext(new FiCaSchedulerNode[]{node_0}); final int numNodes = 1; Resource clusterResource = @@ -2063,6 +2125,8 @@ public void testAllocateContainerOnNodeWithoutOffSwitchSpecified() Assert.fail("NPE when allocating container on node but " + "forget to set off-switch request should be handled"); } + + clearNodes(); } private CapacitySchedulerContext mockCSContext( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index db28dca..c2cd580 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -53,6 +53,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.util.resource.Resources; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + public class TestUtils { private static final Log LOG = LogFactory.getLog(TestUtils.class); @@ -92,7 +95,7 @@ public EventHandler getEventHandler() { new RMContainerTokenSecretManager(conf), new NMTokenSecretManagerInRM(conf), new ClientToAMTokenSecretManagerInRM(), writer); - + return rmContext; }