Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java (revision 1610498) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java (working copy) @@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; @@ -140,7 +141,7 @@ // Next call - nothing if (allocation > 0) { doReturn(new CSAssignment(Resources.none(), type)). - when(queue).assignContainers(eq(clusterResource), eq(node)); + when(queue).assignContainers(eq(clusterResource), eq(node), anyBoolean()); // Mock the node's resource availability Resource available = node.getAvailableResource(); @@ -151,7 +152,7 @@ return new CSAssignment(allocatedResource, type); } }). - when(queue).assignContainers(eq(clusterResource), eq(node)); + when(queue).assignContainers(eq(clusterResource), eq(node), anyBoolean()); doNothing().when(node).releaseContainer(any(Container.class)); } @@ -243,7 +244,6 @@ doReturn(true).when(app_0).containerCompleted(any(RMContainer.class), any(ContainerStatus.class),any(RMContainerEventType.class)); - // Priority priority = TestUtils.createMockPriority(1); ContainerAllocationExpirer expirer = mock(ContainerAllocationExpirer.class); @@ -266,14 +266,14 @@ stubQueueAllocation(b, clusterResource, node_0, 0*GB); stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); - root.assignContainers(clusterResource, node_0); + root.assignContainers(clusterResource, node_0, false); for(int i=0; i < 2; i++) { stubQueueAllocation(a, clusterResource, node_0, 0*GB); stubQueueAllocation(b, clusterResource, node_0, 1*GB); stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); - root.assignContainers(clusterResource, node_0); + root.assignContainers(clusterResource, node_0, false); } for(int i=0; i < 3; i++) { @@ -281,7 +281,7 @@ stubQueueAllocation(b, clusterResource, node_0, 0*GB); stubQueueAllocation(c, clusterResource, node_0, 1*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); - root.assignContainers(clusterResource, node_0); + root.assignContainers(clusterResource, node_0, false); } for(int i=0; i < 4; i++) { @@ -289,7 +289,7 @@ stubQueueAllocation(b, clusterResource, node_0, 0*GB); stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 1*GB); - root.assignContainers(clusterResource, node_0); + root.assignContainers(clusterResource, node_0, false); } verifyQueueMetrics(a, 1*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); @@ -302,7 +302,7 @@ for(int i=0; i < 3;i++) { d.completedContainer(clusterResource, app_0, node_0, - rmContainer, null, RMContainerEventType.KILL, null); + rmContainer, null, RMContainerEventType.KILL, null, true); } verifyQueueMetrics(a, 1*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); @@ -322,7 +322,7 @@ stubQueueAllocation(b, clusterResource, node_0, 0*GB); stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); - root.assignContainers(clusterResource, node_0); + root.assignContainers(clusterResource, node_0, false); } verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); @@ -333,7 +333,7 @@ //Release 1GB Container from A a.completedContainer(clusterResource, app_0, node_0, - rmContainer, null, RMContainerEventType.KILL, null); + rmContainer, null, RMContainerEventType.KILL, null, true); verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); verifyQueueMetrics(c, 3*GB, clusterResource); @@ -349,7 +349,7 @@ stubQueueAllocation(b, clusterResource, node_0, 1*GB); stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); - root.assignContainers(clusterResource, node_0); + root.assignContainers(clusterResource, node_0, false); verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(b, 3*GB, clusterResource); verifyQueueMetrics(c, 3*GB, clusterResource); @@ -359,7 +359,7 @@ //Release 1GB container resources from B b.completedContainer(clusterResource, app_0, node_0, - rmContainer, null, RMContainerEventType.KILL, null); + rmContainer, null, RMContainerEventType.KILL, null, true); verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); verifyQueueMetrics(c, 3*GB, clusterResource); @@ -375,7 +375,7 @@ stubQueueAllocation(b, clusterResource, node_0, 0*GB); stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); - root.assignContainers(clusterResource, node_0); + root.assignContainers(clusterResource, node_0, false); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); verifyQueueMetrics(c, 3*GB, clusterResource); @@ -389,12 +389,12 @@ stubQueueAllocation(b, clusterResource, node_0, 1*GB); stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 1*GB); - root.assignContainers(clusterResource, node_0); + root.assignContainers(clusterResource, node_0, false); InOrder allocationOrder = inOrder(d,b); allocationOrder.verify(d).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); verifyQueueMetrics(c, 3*GB, clusterResource); Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java (revision 0) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java (revision 0) @@ -0,0 +1,1179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.QueueACL; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Before; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +public class TestReservations { + + private static final Log LOG = LogFactory.getLog(TestReservations.class); + + private final RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); + + RMContext rmContext; + CapacityScheduler cs; + // CapacitySchedulerConfiguration csConf; + CapacitySchedulerContext csContext; + + private final ResourceCalculator resourceCalculator = new DefaultResourceCalculator(); + + CSQueue root; + Map queues = new HashMap(); + Map oldQueues = new HashMap(); + + final static int GB = 1024; + final static String DEFAULT_RACK = "/default"; + + @Before + public void setUp() throws Exception { + CapacityScheduler spyCs = new CapacityScheduler(); + cs = spy(spyCs); + rmContext = TestUtils.getMockRMContext(); + + } + + private void setup(CapacitySchedulerConfiguration csConf) throws Exception { + + csConf.setBoolean("yarn.scheduler.capacity.user-metrics.enable", true); + final String newRoot = "root" + System.currentTimeMillis(); + // final String newRoot = "root"; + + setupQueueConfiguration(csConf, newRoot); + YarnConfiguration conf = new YarnConfiguration(); + cs.setConf(conf); + + csContext = mock(CapacitySchedulerContext.class); + when(csContext.getConfiguration()).thenReturn(csConf); + when(csContext.getConf()).thenReturn(conf); + when(csContext.getMinimumResourceCapability()).thenReturn( + Resources.createResource(GB, 1)); + when(csContext.getMaximumResourceCapability()).thenReturn( + Resources.createResource(16 * GB, 12)); + when(csContext.getClusterResource()).thenReturn( + Resources.createResource(100 * 16 * GB, 100 * 12)); + when(csContext.getApplicationComparator()).thenReturn( + CapacityScheduler.applicationComparator); + when(csContext.getQueueComparator()).thenReturn( + CapacityScheduler.queueComparator); + when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); + RMContainerTokenSecretManager containerTokenSecretManager = new RMContainerTokenSecretManager( + conf); + containerTokenSecretManager.rollMasterKey(); + when(csContext.getContainerTokenSecretManager()).thenReturn( + containerTokenSecretManager); + + root = CapacityScheduler.parseQueue(csContext, csConf, null, + CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); + + cs.setRMContext(rmContext); + cs.init(csConf); + cs.start(); + } + + private static final String A = "a"; + + private void setupQueueConfiguration(CapacitySchedulerConfiguration conf, + final String newRoot) { + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] { newRoot }); + conf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT, 100); + conf.setAcl(CapacitySchedulerConfiguration.ROOT, + QueueACL.SUBMIT_APPLICATIONS, " "); + + final String Q_newRoot = CapacitySchedulerConfiguration.ROOT + "." + + newRoot; + conf.setQueues(Q_newRoot, new String[] { A }); + conf.setCapacity(Q_newRoot, 100); + conf.setMaximumCapacity(Q_newRoot, 100); + conf.setAcl(Q_newRoot, QueueACL.SUBMIT_APPLICATIONS, " "); + + final String Q_A = Q_newRoot + "." + A; + conf.setCapacity(Q_A, 100f); + conf.setMaximumCapacity(Q_A, 100); + conf.setAcl(Q_A, QueueACL.SUBMIT_APPLICATIONS, "*"); + + } + + static LeafQueue stubLeafQueue(LeafQueue queue) { + + // Mock some methods for ease in these unit tests + + // 1. LeafQueue.createContainer to return dummy containers + doAnswer(new Answer() { + @Override + public Container answer(InvocationOnMock invocation) throws Throwable { + final FiCaSchedulerApp application = (FiCaSchedulerApp) (invocation + .getArguments()[0]); + final ContainerId containerId = TestUtils + .getMockContainerId(application); + + Container container = TestUtils.getMockContainer(containerId, + ((FiCaSchedulerNode) (invocation.getArguments()[1])).getNodeID(), + (Resource) (invocation.getArguments()[2]), + ((Priority) invocation.getArguments()[3])); + return container; + } + }).when(queue).createContainer(any(FiCaSchedulerApp.class), + any(FiCaSchedulerNode.class), any(Resource.class), any(Priority.class)); + + // 2. Stub out LeafQueue.parent.completedContainer + CSQueue parent = queue.getParent(); + doNothing().when(parent).completedContainer(any(Resource.class), + any(FiCaSchedulerApp.class), any(FiCaSchedulerNode.class), + any(RMContainer.class), any(ContainerStatus.class), + any(RMContainerEventType.class), any(CSQueue.class), anyBoolean()); + + return queue; + } + + @Test + public void testReservation() throws Exception { + // Test that we now unreserve and use a node that has space + + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + setup(csConf); + + // Manipulate queue 'a' + LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); + + // Users + final String user_0 = "user_0"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = TestUtils + .getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), rmContext); + + a.submitApplicationAttempt(app_0, user_0); + + final ApplicationAttemptId appAttemptId_1 = TestUtils + .getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, + mock(ActiveUsersManager.class), rmContext); + a.submitApplicationAttempt(app_1, user_0); + + // Setup some nodes + String host_0 = "host_0"; + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, + 8 * GB); + String host_1 = "host_1"; + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, + 8 * GB); + String host_2 = "host_2"; + FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, DEFAULT_RACK, 0, + 8 * GB); + + when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0); + when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1); + when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2); + + final int numNodes = 3; + Resource clusterResource = Resources.createResource(numNodes * (8 * GB)); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + // Setup resource-requests + Priority priorityAM = TestUtils.createMockPriority(1); + Priority priorityMap = TestUtils.createMockPriority(5); + Priority priorityReduce = TestUtils.createMockPriority(10); + + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true, + priorityAM, recordFactory))); + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 5 * GB, 2, true, + priorityReduce, recordFactory))); + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 3 * GB, 2, true, + priorityMap, recordFactory))); + + // Start testing... + // Only AM + a.assignContainers(clusterResource, node_0, false); + assertEquals(2 * GB, a.getUsedResources().getMemory()); + assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(2 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(22 * GB, a.getMetrics().getAvailableMB()); + assertEquals(2 * GB, node_0.getUsedResource().getMemory()); + assertEquals(0 * GB, node_1.getUsedResource().getMemory()); + assertEquals(0 * GB, node_2.getUsedResource().getMemory()); + + // Only 1 map - simulating reduce + a.assignContainers(clusterResource, node_0, false); + assertEquals(5 * GB, a.getUsedResources().getMemory()); + assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(5 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(19 * GB, a.getMetrics().getAvailableMB()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(0 * GB, node_1.getUsedResource().getMemory()); + assertEquals(0 * GB, node_2.getUsedResource().getMemory()); + + // Only 1 map to other node - simulating reduce + a.assignContainers(clusterResource, node_1, false); + assertEquals(8 * GB, a.getUsedResources().getMemory()); + assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(16 * GB, a.getMetrics().getAvailableMB()); + assertEquals(16 * GB, app_0.getHeadroom().getMemory()); + assertEquals(null, node_0.getReservedContainer()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(0 * GB, node_2.getUsedResource().getMemory()); + assertEquals(2, app_0.getTotalRequiredResources(priorityReduce)); + + // try to assign reducer (5G on node 0 and should reserve) + a.assignContainers(clusterResource, node_0, false); + assertEquals(13 * GB, a.getUsedResources().getMemory()); + assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(5 * GB, a.getMetrics().getReservedMB()); + assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(11 * GB, a.getMetrics().getAvailableMB()); + assertEquals(11 * GB, app_0.getHeadroom().getMemory()); + assertEquals(5 * GB, node_0.getReservedContainer().getReservedResource() + .getMemory()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(0 * GB, node_2.getUsedResource().getMemory()); + assertEquals(2, app_0.getTotalRequiredResources(priorityReduce)); + + // assign reducer to node 2 + a.assignContainers(clusterResource, node_2, false); + assertEquals(18 * GB, a.getUsedResources().getMemory()); + assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(5 * GB, a.getMetrics().getReservedMB()); + assertEquals(13 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(6 * GB, a.getMetrics().getAvailableMB()); + assertEquals(6 * GB, app_0.getHeadroom().getMemory()); + assertEquals(5 * GB, node_0.getReservedContainer().getReservedResource() + .getMemory()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(5 * GB, node_2.getUsedResource().getMemory()); + assertEquals(1, app_0.getTotalRequiredResources(priorityReduce)); + + // node_1 heartbeat and unreserves from node_0 in order to allocate + // on node_1 + a.assignContainers(clusterResource, node_1, false); + assertEquals(18 * GB, a.getUsedResources().getMemory()); + assertEquals(18 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(18 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(6 * GB, a.getMetrics().getAvailableMB()); + assertEquals(6 * GB, app_0.getHeadroom().getMemory()); + assertEquals(null, node_0.getReservedContainer()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(8 * GB, node_1.getUsedResource().getMemory()); + assertEquals(5 * GB, node_2.getUsedResource().getMemory()); + assertEquals(0, app_0.getTotalRequiredResources(priorityReduce)); + } + + @Test + public void testReservationNoContinueLook() throws Exception { + // Test that with reservations-continue-look-all-nodes feature off + // we don't unreserve and show we could get stuck + + queues = new HashMap(); + // test that the deadlock occurs when turned off + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + csConf.setBoolean( + "yarn.scheduler.capacity.reservations-continue-look-all-nodes", false); + setup(csConf); + + // Manipulate queue 'a' + LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); + + // Users + final String user_0 = "user_0"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = TestUtils + .getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), rmContext); + + a.submitApplicationAttempt(app_0, user_0); + + final ApplicationAttemptId appAttemptId_1 = TestUtils + .getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, + mock(ActiveUsersManager.class), rmContext); + a.submitApplicationAttempt(app_1, user_0); + + // Setup some nodes + String host_0 = "host_0"; + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, + 8 * GB); + String host_1 = "host_1"; + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, + 8 * GB); + String host_2 = "host_2"; + FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, DEFAULT_RACK, 0, + 8 * GB); + + when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0); + when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1); + when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2); + + final int numNodes = 3; + Resource clusterResource = Resources.createResource(numNodes * (8 * GB)); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + // Setup resource-requests + Priority priorityAM = TestUtils.createMockPriority(1); + Priority priorityMap = TestUtils.createMockPriority(5); + Priority priorityReduce = TestUtils.createMockPriority(10); + + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true, + priorityAM, recordFactory))); + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 5 * GB, 2, true, + priorityReduce, recordFactory))); + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 3 * GB, 2, true, + priorityMap, recordFactory))); + + // Start testing... + // Only AM + a.assignContainers(clusterResource, node_0, false); + assertEquals(2 * GB, a.getUsedResources().getMemory()); + assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(2 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(22 * GB, a.getMetrics().getAvailableMB()); + assertEquals(2 * GB, node_0.getUsedResource().getMemory()); + assertEquals(0 * GB, node_1.getUsedResource().getMemory()); + assertEquals(0 * GB, node_2.getUsedResource().getMemory()); + + // Only 1 map - simulating reduce + a.assignContainers(clusterResource, node_0, false); + assertEquals(5 * GB, a.getUsedResources().getMemory()); + assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(5 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(19 * GB, a.getMetrics().getAvailableMB()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(0 * GB, node_1.getUsedResource().getMemory()); + assertEquals(0 * GB, node_2.getUsedResource().getMemory()); + + // Only 1 map to other node - simulating reduce + a.assignContainers(clusterResource, node_1, false); + assertEquals(8 * GB, a.getUsedResources().getMemory()); + assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(16 * GB, a.getMetrics().getAvailableMB()); + assertEquals(16 * GB, app_0.getHeadroom().getMemory()); + assertEquals(null, node_0.getReservedContainer()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(0 * GB, node_2.getUsedResource().getMemory()); + assertEquals(2, app_0.getTotalRequiredResources(priorityReduce)); + + // try to assign reducer (5G on node 0 and should reserve) + a.assignContainers(clusterResource, node_0, false); + assertEquals(13 * GB, a.getUsedResources().getMemory()); + assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(5 * GB, a.getMetrics().getReservedMB()); + assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(11 * GB, a.getMetrics().getAvailableMB()); + assertEquals(11 * GB, app_0.getHeadroom().getMemory()); + assertEquals(5 * GB, node_0.getReservedContainer().getReservedResource() + .getMemory()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(0 * GB, node_2.getUsedResource().getMemory()); + assertEquals(2, app_0.getTotalRequiredResources(priorityReduce)); + + // assign reducer to node 2 + a.assignContainers(clusterResource, node_2, false); + assertEquals(18 * GB, a.getUsedResources().getMemory()); + assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(5 * GB, a.getMetrics().getReservedMB()); + assertEquals(13 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(6 * GB, a.getMetrics().getAvailableMB()); + assertEquals(6 * GB, app_0.getHeadroom().getMemory()); + assertEquals(5 * GB, node_0.getReservedContainer().getReservedResource() + .getMemory()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(5 * GB, node_2.getUsedResource().getMemory()); + assertEquals(1, app_0.getTotalRequiredResources(priorityReduce)); + + // node_1 heartbeat and won't unreserve from node_0, potentially stuck + // if AM doesn't handle + a.assignContainers(clusterResource, node_1, false); + assertEquals(18 * GB, a.getUsedResources().getMemory()); + assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(5 * GB, a.getMetrics().getReservedMB()); + assertEquals(13 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(6 * GB, a.getMetrics().getAvailableMB()); + assertEquals(6 * GB, app_0.getHeadroom().getMemory()); + assertEquals(5 * GB, node_0.getReservedContainer().getReservedResource() + .getMemory()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(5 * GB, node_2.getUsedResource().getMemory()); + assertEquals(1, app_0.getTotalRequiredResources(priorityReduce)); + } + + @Test + public void testAssignContainersNeedToUnreserve() throws Exception { + // Test that we now unreserve and use a node that has space + + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + setup(csConf); + + // Manipulate queue 'a' + LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); + + // Users + final String user_0 = "user_0"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = TestUtils + .getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), rmContext); + + a.submitApplicationAttempt(app_0, user_0); + + final ApplicationAttemptId appAttemptId_1 = TestUtils + .getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, + mock(ActiveUsersManager.class), rmContext); + a.submitApplicationAttempt(app_1, user_0); + + // Setup some nodes + String host_0 = "host_0"; + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, + 8 * GB); + String host_1 = "host_1"; + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, + 8 * GB); + + when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0); + when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1); + + final int numNodes = 2; + Resource clusterResource = Resources.createResource(numNodes * (8 * GB)); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + // Setup resource-requests + Priority priorityAM = TestUtils.createMockPriority(1); + Priority priorityMap = TestUtils.createMockPriority(5); + Priority priorityReduce = TestUtils.createMockPriority(10); + + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true, + priorityAM, recordFactory))); + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 5 * GB, 2, true, + priorityReduce, recordFactory))); + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 3 * GB, 2, true, + priorityMap, recordFactory))); + + // Start testing... + // Only AM + a.assignContainers(clusterResource, node_0, false); + assertEquals(2 * GB, a.getUsedResources().getMemory()); + assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(2 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(14 * GB, a.getMetrics().getAvailableMB()); + assertEquals(2 * GB, node_0.getUsedResource().getMemory()); + assertEquals(0 * GB, node_1.getUsedResource().getMemory()); + + // Only 1 map - simulating reduce + a.assignContainers(clusterResource, node_0, false); + assertEquals(5 * GB, a.getUsedResources().getMemory()); + assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(5 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(11 * GB, a.getMetrics().getAvailableMB()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(0 * GB, node_1.getUsedResource().getMemory()); + + // Only 1 map to other node - simulating reduce + a.assignContainers(clusterResource, node_1, false); + assertEquals(8 * GB, a.getUsedResources().getMemory()); + assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(8 * GB, a.getMetrics().getAvailableMB()); + assertEquals(8 * GB, app_0.getHeadroom().getMemory()); + assertEquals(null, node_0.getReservedContainer()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(2, app_0.getTotalRequiredResources(priorityReduce)); + + // try to assign reducer (5G on node 0 and should reserve) + a.assignContainers(clusterResource, node_0, false); + assertEquals(13 * GB, a.getUsedResources().getMemory()); + assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(5 * GB, a.getMetrics().getReservedMB()); + assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(3 * GB, a.getMetrics().getAvailableMB()); + assertEquals(3 * GB, app_0.getHeadroom().getMemory()); + assertEquals(5 * GB, node_0.getReservedContainer().getReservedResource() + .getMemory()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(2, app_0.getTotalRequiredResources(priorityReduce)); + + // could allocate but told need to unreserve first + a.assignContainers(clusterResource, node_1, true); + assertEquals(13 * GB, a.getUsedResources().getMemory()); + assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(13 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(3 * GB, a.getMetrics().getAvailableMB()); + assertEquals(3 * GB, app_0.getHeadroom().getMemory()); + assertEquals(null, node_0.getReservedContainer()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(8 * GB, node_1.getUsedResource().getMemory()); + assertEquals(1, app_0.getTotalRequiredResources(priorityReduce)); + } + + @Test + public void testGetAppToUnreserve() throws Exception { + + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + setup(csConf); + final String user_0 = "user_0"; + final ApplicationAttemptId appAttemptId_0 = TestUtils + .getMockApplicationAttemptId(0, 0); + LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); + FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), rmContext); + + String host_0 = "host_0"; + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, + 8 * GB); + String host_1 = "host_1"; + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, + 8 * GB); + + // Setup resource-requests + Priority priorityMap = TestUtils.createMockPriority(5); + Resource capability = Resources.createResource(2*GB, 0); + + RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); + RMContext rmContext = mock(RMContext.class); + ContainerAllocationExpirer expirer = + mock(ContainerAllocationExpirer.class); + DrainDispatcher drainDispatcher = new DrainDispatcher(); + when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer); + when(rmContext.getDispatcher()).thenReturn(drainDispatcher); + when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); + ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( + app_0.getApplicationId(), 1); + ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1); + Container container = TestUtils.getMockContainer(containerId, + node_1.getNodeID(), Resources.createResource(2*GB), priorityMap); + RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, + node_1.getNodeID(), "user", rmContext); + + Container container_1 = TestUtils.getMockContainer(containerId, + node_0.getNodeID(), Resources.createResource(1*GB), priorityMap); + RMContainer rmContainer_1 = new RMContainerImpl(container_1, appAttemptId, + node_0.getNodeID(), "user", rmContext); + + // no reserved containers + NodeId unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability); + assertEquals(null, unreserveId); + + // no reserved containers - reserve then unreserve + app_0.reserve(node_0, priorityMap, rmContainer_1, container_1); + app_0.unreserve(node_0, priorityMap); + unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability); + assertEquals(null, unreserveId); + + // no container large enough is reserved + app_0.reserve(node_0, priorityMap, rmContainer_1, container_1); + unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability); + assertEquals(null, unreserveId); + + // reserve one that is now large enough + app_0.reserve(node_1, priorityMap, rmContainer, container); + unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability); + assertEquals(node_1.getNodeID(), unreserveId); + } + + @Test + public void testFindNodeToUnreserve() throws Exception { + + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + setup(csConf); + final String user_0 = "user_0"; + final ApplicationAttemptId appAttemptId_0 = TestUtils + .getMockApplicationAttemptId(0, 0); + LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); + FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), rmContext); + + String host_1 = "host_1"; + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, + 8 * GB); + + // Setup resource-requests + Priority priorityMap = TestUtils.createMockPriority(5); + Resource capability = Resources.createResource(2 * GB, 0); + + RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); + RMContext rmContext = mock(RMContext.class); + ContainerAllocationExpirer expirer = + mock(ContainerAllocationExpirer.class); + DrainDispatcher drainDispatcher = new DrainDispatcher(); + when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer); + when(rmContext.getDispatcher()).thenReturn(drainDispatcher); + when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); + ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( + app_0.getApplicationId(), 1); + ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1); + Container container = TestUtils.getMockContainer(containerId, + node_1.getNodeID(), Resources.createResource(2*GB), priorityMap); + RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, + node_1.getNodeID(), "user", rmContext); + + // nothing reserved + boolean res = a.findNodeToUnreserve(csContext.getClusterResource(), + node_1, app_0, priorityMap, capability); + assertFalse(res); + + // reserved but scheduler doesn't know about that node. + app_0.reserve(node_1, priorityMap, rmContainer, container); + node_1.reserveResource(app_0, priorityMap, rmContainer); + res = a.findNodeToUnreserve(csContext.getClusterResource(), node_1, app_0, + priorityMap, capability); + assertFalse(res); + } + + @Test + public void testAssignToQueue() throws Exception { + + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + setup(csConf); + + // Manipulate queue 'a' + LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); + + // Users + final String user_0 = "user_0"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = TestUtils + .getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), rmContext); + + a.submitApplicationAttempt(app_0, user_0); + + final ApplicationAttemptId appAttemptId_1 = TestUtils + .getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, + mock(ActiveUsersManager.class), rmContext); + a.submitApplicationAttempt(app_1, user_0); + + // Setup some nodes + String host_0 = "host_0"; + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, + 8 * GB); + String host_1 = "host_1"; + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, + 8 * GB); + String host_2 = "host_2"; + FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, DEFAULT_RACK, 0, + 8 * GB); + + when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0); + when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1); + when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2); + + final int numNodes = 2; + Resource clusterResource = Resources.createResource(numNodes * (8 * GB)); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + // Setup resource-requests + Priority priorityAM = TestUtils.createMockPriority(1); + Priority priorityMap = TestUtils.createMockPriority(5); + Priority priorityReduce = TestUtils.createMockPriority(10); + + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true, + priorityAM, recordFactory))); + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 5 * GB, 2, true, + priorityReduce, recordFactory))); + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 3 * GB, 2, true, + priorityMap, recordFactory))); + + // Start testing... + // Only AM + a.assignContainers(clusterResource, node_0, false); + assertEquals(2 * GB, a.getUsedResources().getMemory()); + assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(2 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(14 * GB, a.getMetrics().getAvailableMB()); + assertEquals(2 * GB, node_0.getUsedResource().getMemory()); + assertEquals(0 * GB, node_1.getUsedResource().getMemory()); + + // Only 1 map - simulating reduce + a.assignContainers(clusterResource, node_0, false); + assertEquals(5 * GB, a.getUsedResources().getMemory()); + assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(5 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(11 * GB, a.getMetrics().getAvailableMB()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(0 * GB, node_1.getUsedResource().getMemory()); + + // Only 1 map to other node - simulating reduce + a.assignContainers(clusterResource, node_1, false); + assertEquals(8 * GB, a.getUsedResources().getMemory()); + assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(8 * GB, a.getMetrics().getAvailableMB()); + assertEquals(null, node_0.getReservedContainer()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + + // allocate to queue so that the potential new capacity is greater then + // absoluteMaxCapacity + Resource capability = Resources.createResource(32 * GB, 0); + boolean res = a.assignToQueue(clusterResource, capability, app_0, true); + assertFalse(res); + + // now add in reservations and make sure it continues if config set + // allocate to queue so that the potential new capacity is greater then + // absoluteMaxCapacity + a.assignContainers(clusterResource, node_0, false); + assertEquals(13 * GB, a.getUsedResources().getMemory()); + assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(5 * GB, a.getMetrics().getReservedMB()); + assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(3 * GB, a.getMetrics().getAvailableMB()); + assertEquals(3 * GB, app_0.getHeadroom().getMemory()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + + capability = Resources.createResource(5 * GB, 0); + res = a + .assignToQueue(clusterResource, capability, app_0, true); + assertTrue(res); + + // tell to not check reservations + res = a.assignToQueue(clusterResource, capability, app_0, false); + assertFalse(res); + + refreshQueuesTurnOffReservationsContLook(a, csConf); + + // should return false no matter what checkReservations is passed + // in since feature is off + res = a.assignToQueue(clusterResource, capability, app_0, false); + assertFalse(res); + + res = a + .assignToQueue(clusterResource, capability, app_0, true); + assertFalse(res); + } + + public void refreshQueuesTurnOffReservationsContLook(LeafQueue a, + CapacitySchedulerConfiguration csConf) throws Exception { + // before reinitialization + assertEquals(true, a.getReservationContinueLooking()); + assertEquals(true, + ((ParentQueue) a.getParent()).getReservationContinueLooking()); + + csConf.setBoolean( + CapacitySchedulerConfiguration.RESERVE_CONT_LOOK_ALL_NODES, false); + Map newQueues = new HashMap(); + CSQueue newRoot = CapacityScheduler.parseQueue(csContext, csConf, null, + CapacitySchedulerConfiguration.ROOT, newQueues, queues, + TestUtils.spyHook); + queues = newQueues; + root.reinitialize(newRoot, cs.getClusterResource()); + + // after reinitialization + assertEquals(false, a.getReservationContinueLooking()); + assertEquals(false, + ((ParentQueue) a.getParent()).getReservationContinueLooking()); + } + + @Test + public void testContinueLookingReservationsAfterQueueRefresh() + throws Exception { + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + setup(csConf); + + // Manipulate queue 'e' + LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); + + refreshQueuesTurnOffReservationsContLook(a, csConf); + } + + @Test + public void testAssignToUser() throws Exception { + + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + setup(csConf); + + // Manipulate queue 'a' + LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); + + // Users + final String user_0 = "user_0"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = TestUtils + .getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), rmContext); + + a.submitApplicationAttempt(app_0, user_0); + + final ApplicationAttemptId appAttemptId_1 = TestUtils + .getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, + mock(ActiveUsersManager.class), rmContext); + a.submitApplicationAttempt(app_1, user_0); + + // Setup some nodes + String host_0 = "host_0"; + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, + 8 * GB); + String host_1 = "host_1"; + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, + 8 * GB); + String host_2 = "host_2"; + FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, DEFAULT_RACK, 0, + 8 * GB); + + when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0); + when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1); + when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2); + + final int numNodes = 2; + Resource clusterResource = Resources.createResource(numNodes * (8 * GB)); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + // Setup resource-requests + Priority priorityAM = TestUtils.createMockPriority(1); + Priority priorityMap = TestUtils.createMockPriority(5); + Priority priorityReduce = TestUtils.createMockPriority(10); + + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true, + priorityAM, recordFactory))); + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 5 * GB, 2, true, + priorityReduce, recordFactory))); + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 3 * GB, 2, true, + priorityMap, recordFactory))); + + // Start testing... + // Only AM + a.assignContainers(clusterResource, node_0, false); + assertEquals(2 * GB, a.getUsedResources().getMemory()); + assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(2 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(14 * GB, a.getMetrics().getAvailableMB()); + assertEquals(2 * GB, node_0.getUsedResource().getMemory()); + assertEquals(0 * GB, node_1.getUsedResource().getMemory()); + + // Only 1 map - simulating reduce + a.assignContainers(clusterResource, node_0, false); + assertEquals(5 * GB, a.getUsedResources().getMemory()); + assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(5 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(11 * GB, a.getMetrics().getAvailableMB()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(0 * GB, node_1.getUsedResource().getMemory()); + + // Only 1 map to other node - simulating reduce + a.assignContainers(clusterResource, node_1, false); + assertEquals(8 * GB, a.getUsedResources().getMemory()); + assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(8 * GB, a.getMetrics().getAvailableMB()); + assertEquals(null, node_0.getReservedContainer()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + + // now add in reservations and make sure it continues if config set + // allocate to queue so that the potential new capacity is greater then + // absoluteMaxCapacity + a.assignContainers(clusterResource, node_0, false); + assertEquals(13 * GB, a.getUsedResources().getMemory()); + assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(5 * GB, app_0.getCurrentReservation().getMemory()); + + assertEquals(5 * GB, a.getMetrics().getReservedMB()); + assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(3 * GB, a.getMetrics().getAvailableMB()); + assertEquals(3 * GB, app_0.getHeadroom().getMemory()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + + // set limit so subtrace reservations it can continue + Resource limit = Resources.createResource(12 * GB, 0); + boolean res = a.assignToUser(clusterResource, user_0, limit, app_0, + true); + assertTrue(res); + + // tell it not to check for reservations and should fail as already over + // limit + res = a.assignToUser(clusterResource, user_0, limit, app_0, false); + assertFalse(res); + + refreshQueuesTurnOffReservationsContLook(a, csConf); + + // should now return false since feature off + res = a.assignToUser(clusterResource, user_0, limit, app_0, true); + assertFalse(res); + } + + @Test + public void testReservationsNoneAvailable() throws Exception { + // Test that we now unreserve and use a node that has space + + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + setup(csConf); + + // Manipulate queue 'a' + LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); + + // Users + final String user_0 = "user_0"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = TestUtils + .getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), rmContext); + + a.submitApplicationAttempt(app_0, user_0); + + final ApplicationAttemptId appAttemptId_1 = TestUtils + .getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, + mock(ActiveUsersManager.class), rmContext); + a.submitApplicationAttempt(app_1, user_0); + + // Setup some nodes + String host_0 = "host_0"; + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, + 8 * GB); + String host_1 = "host_1"; + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, + 8 * GB); + String host_2 = "host_2"; + FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, DEFAULT_RACK, 0, + 8 * GB); + + when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0); + when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1); + when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2); + + final int numNodes = 3; + Resource clusterResource = Resources.createResource(numNodes * (8 * GB)); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + // Setup resource-requests + Priority priorityAM = TestUtils.createMockPriority(1); + Priority priorityMap = TestUtils.createMockPriority(5); + Priority priorityReduce = TestUtils.createMockPriority(10); + Priority priorityLast = TestUtils.createMockPriority(12); + + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true, + priorityAM, recordFactory))); + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 5 * GB, 1, true, + priorityReduce, recordFactory))); + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 3 * GB, 2, true, + priorityMap, recordFactory))); + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 8 * GB, 2, true, + priorityLast, recordFactory))); + + // Start testing... + // Only AM + a.assignContainers(clusterResource, node_0, false); + assertEquals(2 * GB, a.getUsedResources().getMemory()); + assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(2 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(22 * GB, a.getMetrics().getAvailableMB()); + assertEquals(2 * GB, node_0.getUsedResource().getMemory()); + assertEquals(0 * GB, node_1.getUsedResource().getMemory()); + assertEquals(0 * GB, node_2.getUsedResource().getMemory()); + + // Only 1 map - simulating reduce + a.assignContainers(clusterResource, node_0, false); + assertEquals(5 * GB, a.getUsedResources().getMemory()); + assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(5 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(19 * GB, a.getMetrics().getAvailableMB()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(0 * GB, node_1.getUsedResource().getMemory()); + assertEquals(0 * GB, node_2.getUsedResource().getMemory()); + + // Only 1 map to other node - simulating reduce + a.assignContainers(clusterResource, node_1, false); + assertEquals(8 * GB, a.getUsedResources().getMemory()); + assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(16 * GB, a.getMetrics().getAvailableMB()); + assertEquals(16 * GB, app_0.getHeadroom().getMemory()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(0 * GB, node_2.getUsedResource().getMemory()); + + // try to assign reducer (5G on node 0), but tell it + // it has to unreserve. No room to allocate and shouldn't reserve + // since nothing currently reserved. + a.assignContainers(clusterResource, node_0, true); + assertEquals(8 * GB, a.getUsedResources().getMemory()); + assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(16 * GB, a.getMetrics().getAvailableMB()); + assertEquals(16 * GB, app_0.getHeadroom().getMemory()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(0 * GB, node_2.getUsedResource().getMemory()); + + // try to assign reducer (5G on node 2), but tell it + // it has to unreserve. Has room but shouldn't reserve + // since nothing currently reserved. + a.assignContainers(clusterResource, node_2, true); + assertEquals(8 * GB, a.getUsedResources().getMemory()); + assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(16 * GB, a.getMetrics().getAvailableMB()); + assertEquals(16 * GB, app_0.getHeadroom().getMemory()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(0 * GB, node_2.getUsedResource().getMemory()); + + // let it assign 5G to node_2 + a.assignContainers(clusterResource, node_2, false); + assertEquals(13 * GB, a.getUsedResources().getMemory()); + assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(13 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(11 * GB, a.getMetrics().getAvailableMB()); + assertEquals(11 * GB, app_0.getHeadroom().getMemory()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(5 * GB, node_2.getUsedResource().getMemory()); + + // reserve 8G node_0 + a.assignContainers(clusterResource, node_0, false); + assertEquals(21 * GB, a.getUsedResources().getMemory()); + assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(8 * GB, a.getMetrics().getReservedMB()); + assertEquals(13 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(3 * GB, a.getMetrics().getAvailableMB()); + assertEquals(3 * GB, app_0.getHeadroom().getMemory()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(5 * GB, node_2.getUsedResource().getMemory()); + + // try to assign (8G on node 2). No room to allocate, + // continued to try due to having reservation above, + // but hits queue limits so can't reserve anymore. + a.assignContainers(clusterResource, node_2, false); + assertEquals(21 * GB, a.getUsedResources().getMemory()); + assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(8 * GB, a.getMetrics().getReservedMB()); + assertEquals(13 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(3 * GB, a.getMetrics().getAvailableMB()); + assertEquals(3 * GB, app_0.getHeadroom().getMemory()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(5 * GB, node_2.getUsedResource().getMemory()); + } +} Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java (revision 1610498) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java (working copy) @@ -516,7 +516,7 @@ app_0_0.updateResourceRequests(app_0_0_requests); // Schedule to compute - queue.assignContainers(clusterResource, node_0); + queue.assignContainers(clusterResource, node_0, false); Resource expectedHeadroom = Resources.createResource(10*16*GB, 1); verify(app_0_0).setHeadroom(eq(expectedHeadroom)); @@ -535,7 +535,7 @@ app_0_1.updateResourceRequests(app_0_1_requests); // Schedule to compute - queue.assignContainers(clusterResource, node_0); // Schedule to compute + queue.assignContainers(clusterResource, node_0, false); // Schedule to compute verify(app_0_0, times(2)).setHeadroom(eq(expectedHeadroom)); verify(app_0_1).setHeadroom(eq(expectedHeadroom));// no change @@ -554,7 +554,7 @@ app_1_0.updateResourceRequests(app_1_0_requests); // Schedule to compute - queue.assignContainers(clusterResource, node_0); // Schedule to compute + queue.assignContainers(clusterResource, node_0, false); // Schedule to compute expectedHeadroom = Resources.createResource(10*16*GB / 2, 1); // changes verify(app_0_0).setHeadroom(eq(expectedHeadroom)); verify(app_0_1).setHeadroom(eq(expectedHeadroom)); @@ -562,7 +562,7 @@ // Now reduce cluster size and check for the smaller headroom clusterResource = Resources.createResource(90*16*GB); - queue.assignContainers(clusterResource, node_0); // Schedule to compute + queue.assignContainers(clusterResource, node_0, false); // Schedule to compute expectedHeadroom = Resources.createResource(9*16*GB / 2, 1); // changes verify(app_0_0).setHeadroom(eq(expectedHeadroom)); verify(app_0_1).setHeadroom(eq(expectedHeadroom)); Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java (revision 1610498) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java (working copy) @@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; @@ -237,7 +238,7 @@ doNothing().when(parent).completedContainer( any(Resource.class), any(FiCaSchedulerApp.class), any(FiCaSchedulerNode.class), any(RMContainer.class), any(ContainerStatus.class), - any(RMContainerEventType.class), any(CSQueue.class)); + any(RMContainerEventType.class), any(CSQueue.class), anyBoolean()); return queue; } @@ -310,7 +311,7 @@ // Start testing... // Only 1 container - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals( (int)(node_0.getTotalResource().getMemory() * a.getCapacity()) - (1*GB), a.getMetrics().getAvailableMB()); @@ -445,7 +446,7 @@ // Start testing... // Only 1 container - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(1*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -455,7 +456,7 @@ // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also // you can get one container more than user-limit - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -463,7 +464,7 @@ assertEquals(2*GB, a.getMetrics().getAllocatedMB()); // Can't allocate 3rd due to user-limit - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -472,7 +473,7 @@ // Bump up user-limit-factor, now allocate should work a.setUserLimitFactor(10); - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(3*GB, a.getUsedResources().getMemory()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -480,7 +481,7 @@ assertEquals(3*GB, a.getMetrics().getAllocatedMB()); // One more should work, for app_1, due to user-limit-factor - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(4*GB, a.getUsedResources().getMemory()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); @@ -490,7 +491,7 @@ // Test max-capacity // Now - no more allocs since we are at max-cap a.setMaxCapacity(0.5f); - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(4*GB, a.getUsedResources().getMemory()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); @@ -503,7 +504,7 @@ ContainerStatus.newInstance(rmContainer.getContainerId(), ContainerState.COMPLETE, "", ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), - RMContainerEventType.KILL, null); + RMContainerEventType.KILL, null, true); } assertEquals(1*GB, a.getUsedResources().getMemory()); assertEquals(0*GB, app_0.getCurrentConsumption().getMemory()); @@ -517,7 +518,7 @@ ContainerStatus.newInstance(rmContainer.getContainerId(), ContainerState.COMPLETE, "", ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), - RMContainerEventType.KILL, null); + RMContainerEventType.KILL, null, true); } assertEquals(0*GB, a.getUsedResources().getMemory()); @@ -605,19 +606,19 @@ // recordFactory))); // 1 container to user_0 - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); // Again one to user_0 since he hasn't exceeded user limit yet - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(3*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); // One more to user_0 since he is the only active user - a.assignContainers(clusterResource, node_1); + a.assignContainers(clusterResource, node_1, false); assertEquals(4*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(2*GB, app_1.getCurrentConsumption().getMemory()); @@ -690,7 +691,7 @@ 1, a.getActiveUsersManager().getNumActiveUsers()); // 1 container to user_0 - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -698,7 +699,7 @@ assertEquals(0*GB, app_1.getHeadroom().getMemory()); // User limit = 2G // Again one to user_0 since he hasn't exceeded user limit yet - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(3*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); @@ -714,7 +715,7 @@ // No more to user_0 since he is already over user-limit // and no more containers to queue since it's already at max-cap - a.assignContainers(clusterResource, node_1); + a.assignContainers(clusterResource, node_1, false); assertEquals(3*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); @@ -728,7 +729,7 @@ TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 0, true, priority, recordFactory))); assertEquals(1, a.getActiveUsersManager().getNumActiveUsers()); - a.assignContainers(clusterResource, node_1); + a.assignContainers(clusterResource, node_1, false); assertEquals(1*GB, app_2.getHeadroom().getMemory()); // hit queue max-cap } @@ -798,21 +799,21 @@ */ // Only 1 container - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(1*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also // you can get one container more than user-limit - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); // Can't allocate 3rd due to user-limit a.setUserLimit(25); - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -830,7 +831,7 @@ // Now allocations should goto app_2 since // user_0 is at limit inspite of high user-limit-factor a.setUserLimitFactor(10); - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(5*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -839,7 +840,7 @@ // Now allocations should goto app_0 since // user_0 is at user-limit not above it - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(6*GB, a.getUsedResources().getMemory()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -849,7 +850,7 @@ // Test max-capacity // Now - no more allocs since we are at max-cap a.setMaxCapacity(0.5f); - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(6*GB, a.getUsedResources().getMemory()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -860,7 +861,7 @@ // Now, allocations should goto app_3 since it's under user-limit a.setMaxCapacity(1.0f); a.setUserLimitFactor(1); - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(7*GB, a.getUsedResources().getMemory()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -868,7 +869,7 @@ assertEquals(1*GB, app_3.getCurrentConsumption().getMemory()); // Now we should assign to app_3 again since user_2 is under user-limit - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(8*GB, a.getUsedResources().getMemory()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -881,7 +882,7 @@ ContainerStatus.newInstance(rmContainer.getContainerId(), ContainerState.COMPLETE, "", ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), - RMContainerEventType.KILL, null); + RMContainerEventType.KILL, null, true); } assertEquals(5*GB, a.getUsedResources().getMemory()); assertEquals(0*GB, app_0.getCurrentConsumption().getMemory()); @@ -895,7 +896,7 @@ ContainerStatus.newInstance(rmContainer.getContainerId(), ContainerState.COMPLETE, "", ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), - RMContainerEventType.KILL, null); + RMContainerEventType.KILL, null, true); } assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(0*GB, app_0.getCurrentConsumption().getMemory()); @@ -909,7 +910,7 @@ ContainerStatus.newInstance(rmContainer.getContainerId(), ContainerState.COMPLETE, "", ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), - RMContainerEventType.KILL, null); + RMContainerEventType.KILL, null, true); } assertEquals(0*GB, a.getUsedResources().getMemory()); assertEquals(0*GB, app_0.getCurrentConsumption().getMemory()); @@ -967,7 +968,7 @@ // Start testing... // Only 1 container - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(1*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -977,7 +978,7 @@ // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also // you can get one container more than user-limit - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -985,7 +986,7 @@ assertEquals(2*GB, a.getMetrics().getAllocatedMB()); // Now, reservation should kick in for app_1 - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(6*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1000,8 +1001,8 @@ ContainerStatus.newInstance(rmContainer.getContainerId(), ContainerState.COMPLETE, "", ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), - RMContainerEventType.KILL, null); - a.assignContainers(clusterResource, node_0); + RMContainerEventType.KILL, null, true); + a.assignContainers(clusterResource, node_0, false); assertEquals(5*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1016,8 +1017,8 @@ ContainerStatus.newInstance(rmContainer.getContainerId(), ContainerState.COMPLETE, "", ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), - RMContainerEventType.KILL, null); - a.assignContainers(clusterResource, node_0); + RMContainerEventType.KILL, null, true); + a.assignContainers(clusterResource, node_0, false); assertEquals(4*GB, a.getUsedResources().getMemory()); assertEquals(0*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(4*GB, app_1.getCurrentConsumption().getMemory()); @@ -1084,7 +1085,7 @@ // Start testing... - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1093,7 +1094,7 @@ assertEquals(0*GB, a.getMetrics().getAvailableMB()); // Now, reservation should kick in for app_1 - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(6*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1106,7 +1107,7 @@ // We do not need locality delay here doReturn(-1).when(a).getNodeLocalityDelay(); - a.assignContainers(clusterResource, node_1); + a.assignContainers(clusterResource, node_1, false); assertEquals(10*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(4*GB, app_1.getCurrentConsumption().getMemory()); @@ -1121,8 +1122,8 @@ ContainerStatus.newInstance(rmContainer.getContainerId(), ContainerState.COMPLETE, "", ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), - RMContainerEventType.KILL, null); - a.assignContainers(clusterResource, node_0); + RMContainerEventType.KILL, null, true); + a.assignContainers(clusterResource, node_0, false); assertEquals(8*GB, a.getUsedResources().getMemory()); assertEquals(0*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(8*GB, app_1.getCurrentConsumption().getMemory()); @@ -1190,20 +1191,20 @@ // Start testing... // Only 1 container - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(1*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also // you can get one container more than user-limit - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); // Now, reservation should kick in for app_1 - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(6*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1216,8 +1217,8 @@ ContainerStatus.newInstance(rmContainer.getContainerId(), ContainerState.COMPLETE, "", ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), - RMContainerEventType.KILL, null); - a.assignContainers(clusterResource, node_0); + RMContainerEventType.KILL, null, true); + a.assignContainers(clusterResource, node_0, false); assertEquals(5*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1226,7 +1227,7 @@ assertEquals(1, app_1.getReReservations(priority)); // Re-reserve - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(5*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1235,7 +1236,7 @@ assertEquals(2, app_1.getReReservations(priority)); // Try to schedule on node_1 now, should *move* the reservation - a.assignContainers(clusterResource, node_1); + a.assignContainers(clusterResource, node_1, false); assertEquals(9*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(4*GB, app_1.getCurrentConsumption().getMemory()); @@ -1251,8 +1252,8 @@ ContainerStatus.newInstance(rmContainer.getContainerId(), ContainerState.COMPLETE, "", ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), - RMContainerEventType.KILL, null); - CSAssignment assignment = a.assignContainers(clusterResource, node_0); + RMContainerEventType.KILL, null, true); + CSAssignment assignment = a.assignContainers(clusterResource, node_0, false); assertEquals(8*GB, a.getUsedResources().getMemory()); assertEquals(0*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(4*GB, app_1.getCurrentConsumption().getMemory()); @@ -1263,6 +1264,7 @@ } + @Test public void testLocalityScheduling() throws Exception { @@ -1322,7 +1324,7 @@ CSAssignment assignment = null; // Start with off switch, shouldn't allocate due to delay scheduling - assignment = a.assignContainers(clusterResource, node_2); + assignment = a.assignContainers(clusterResource, node_2, false); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(1, app_0.getSchedulingOpportunities(priority)); @@ -1330,7 +1332,7 @@ assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL // Another off switch, shouldn't allocate due to delay scheduling - assignment = a.assignContainers(clusterResource, node_2); + assignment = a.assignContainers(clusterResource, node_2, false); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(2, app_0.getSchedulingOpportunities(priority)); @@ -1338,7 +1340,7 @@ assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL // Another off switch, shouldn't allocate due to delay scheduling - assignment = a.assignContainers(clusterResource, node_2); + assignment = a.assignContainers(clusterResource, node_2, false); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(3, app_0.getSchedulingOpportunities(priority)); @@ -1347,7 +1349,7 @@ // Another off switch, now we should allocate // since missedOpportunities=3 and reqdContainers=3 - assignment = a.assignContainers(clusterResource, node_2); + assignment = a.assignContainers(clusterResource, node_2, false); verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(4, app_0.getSchedulingOpportunities(priority)); // should NOT reset @@ -1355,7 +1357,7 @@ assertEquals(NodeType.OFF_SWITCH, assignment.getType()); // NODE_LOCAL - node_0 - assignment = a.assignContainers(clusterResource, node_0); + assignment = a.assignContainers(clusterResource, node_0, false); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset @@ -1363,7 +1365,7 @@ assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // NODE_LOCAL - node_1 - assignment = a.assignContainers(clusterResource, node_1); + assignment = a.assignContainers(clusterResource, node_1, false); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset @@ -1391,13 +1393,13 @@ doReturn(1).when(a).getNodeLocalityDelay(); // Shouldn't assign RACK_LOCAL yet - assignment = a.assignContainers(clusterResource, node_3); + assignment = a.assignContainers(clusterResource, node_3, false); assertEquals(1, app_0.getSchedulingOpportunities(priority)); assertEquals(2, app_0.getTotalRequiredResources(priority)); assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL // Should assign RACK_LOCAL now - assignment = a.assignContainers(clusterResource, node_3); + assignment = a.assignContainers(clusterResource, node_3, false); verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_3), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset @@ -1478,7 +1480,7 @@ // Start with off switch, shouldn't allocate P1 due to delay scheduling // thus, no P2 either! - a.assignContainers(clusterResource, node_2); + a.assignContainers(clusterResource, node_2, false); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), eq(priority_1), any(ResourceRequest.class), any(Container.class)); assertEquals(1, app_0.getSchedulingOpportunities(priority_1)); @@ -1490,7 +1492,7 @@ // Another off-switch, shouldn't allocate P1 due to delay scheduling // thus, no P2 either! - a.assignContainers(clusterResource, node_2); + a.assignContainers(clusterResource, node_2, false); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), eq(priority_1), any(ResourceRequest.class), any(Container.class)); assertEquals(2, app_0.getSchedulingOpportunities(priority_1)); @@ -1501,7 +1503,7 @@ assertEquals(1, app_0.getTotalRequiredResources(priority_2)); // Another off-switch, shouldn't allocate OFF_SWITCH P1 - a.assignContainers(clusterResource, node_2); + a.assignContainers(clusterResource, node_2, false); verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2), eq(priority_1), any(ResourceRequest.class), any(Container.class)); assertEquals(3, app_0.getSchedulingOpportunities(priority_1)); @@ -1512,7 +1514,7 @@ assertEquals(1, app_0.getTotalRequiredResources(priority_2)); // Now, DATA_LOCAL for P1 - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0), eq(priority_1), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority_1)); @@ -1523,7 +1525,7 @@ assertEquals(1, app_0.getTotalRequiredResources(priority_2)); // Now, OFF_SWITCH for P2 - a.assignContainers(clusterResource, node_1); + a.assignContainers(clusterResource, node_1, false); verify(app_0, never()).allocate(any(NodeType.class), eq(node_1), eq(priority_1), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority_1)); @@ -1599,7 +1601,7 @@ app_0.updateResourceRequests(app_0_requests_0); // NODE_LOCAL - node_0_1 - a.assignContainers(clusterResource, node_0_0); + a.assignContainers(clusterResource, node_0_0, false); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0_0), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset @@ -1607,7 +1609,7 @@ // No allocation on node_1_0 even though it's node/rack local since // required(ANY) == 0 - a.assignContainers(clusterResource, node_1_0); + a.assignContainers(clusterResource, node_1_0, false); verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // Still zero @@ -1623,14 +1625,14 @@ // No allocation on node_0_1 even though it's node/rack local since // required(rack_1) == 0 - a.assignContainers(clusterResource, node_0_1); + a.assignContainers(clusterResource, node_0_1, false); verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(1, app_0.getSchedulingOpportunities(priority)); assertEquals(1, app_0.getTotalRequiredResources(priority)); // NODE_LOCAL - node_1 - a.assignContainers(clusterResource, node_1_0); + a.assignContainers(clusterResource, node_1_0, false); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset @@ -1874,7 +1876,7 @@ // node_0_1 // Shouldn't allocate since RR(rack_0) = null && RR(ANY) = relax: false - a.assignContainers(clusterResource, node_0_1); + a.assignContainers(clusterResource, node_0_1, false); verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0 @@ -1896,7 +1898,7 @@ // node_1_1 // Shouldn't allocate since RR(rack_1) = relax: false - a.assignContainers(clusterResource, node_1_1); + a.assignContainers(clusterResource, node_1_1, false); verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0 @@ -1926,7 +1928,7 @@ // node_1_1 // Shouldn't allocate since node_1_1 is blacklisted - a.assignContainers(clusterResource, node_1_1); + a.assignContainers(clusterResource, node_1_1, false); verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0 @@ -1954,7 +1956,7 @@ // node_1_1 // Shouldn't allocate since rack_1 is blacklisted - a.assignContainers(clusterResource, node_1_1); + a.assignContainers(clusterResource, node_1_1, false); verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0 @@ -1980,7 +1982,7 @@ // Blacklist: < host_0_0 > <---- // Now, should allocate since RR(rack_1) = relax: true - a.assignContainers(clusterResource, node_1_1); + a.assignContainers(clusterResource, node_1_1, false); verify(app_0,never()).allocate(eq(NodeType.RACK_LOCAL), eq(node_1_1), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); @@ -2010,7 +2012,7 @@ // host_1_0: 8G // host_1_1: 7G - a.assignContainers(clusterResource, node_1_0); + a.assignContainers(clusterResource, node_1_0, false); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); @@ -2090,7 +2092,7 @@ recordFactory))); try { - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); } catch (NullPointerException e) { Assert.fail("NPE when allocating container on node but " + "forget to set off-switch request should be handled"); Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java (revision 1610498) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java (working copy) @@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; @@ -153,7 +154,7 @@ // Next call - nothing if (allocation > 0) { doReturn(new CSAssignment(Resources.none(), type)). - when(queue).assignContainers(eq(clusterResource), eq(node)); + when(queue).assignContainers(eq(clusterResource), eq(node), eq(false)); // Mock the node's resource availability Resource available = node.getAvailableResource(); @@ -164,7 +165,7 @@ return new CSAssignment(allocatedResource, type); } }). - when(queue).assignContainers(eq(clusterResource), eq(node)); + when(queue).assignContainers(eq(clusterResource), eq(node), eq(false)); } private float computeQueueAbsoluteUsedCapacity(CSQueue queue, @@ -227,19 +228,19 @@ // Simulate B returning a container on node_0 stubQueueAllocation(a, clusterResource, node_0, 0*GB); stubQueueAllocation(b, clusterResource, node_0, 1*GB); - root.assignContainers(clusterResource, node_0); + root.assignContainers(clusterResource, node_0, false); verifyQueueMetrics(a, 0*GB, clusterResource); verifyQueueMetrics(b, 1*GB, clusterResource); // Now, A should get the scheduling opportunity since A=0G/6G, B=1G/14G stubQueueAllocation(a, clusterResource, node_1, 2*GB); stubQueueAllocation(b, clusterResource, node_1, 1*GB); - root.assignContainers(clusterResource, node_1); + root.assignContainers(clusterResource, node_1, false); InOrder allocationOrder = inOrder(a, b); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); @@ -247,12 +248,12 @@ // since A has 2/6G while B has 2/14G stubQueueAllocation(a, clusterResource, node_0, 1*GB); stubQueueAllocation(b, clusterResource, node_0, 2*GB); - root.assignContainers(clusterResource, node_0); + root.assignContainers(clusterResource, node_0, false); allocationOrder = inOrder(b, a); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 4*GB, clusterResource); @@ -260,12 +261,12 @@ // since A has 3/6G while B has 4/14G stubQueueAllocation(a, clusterResource, node_0, 0*GB); stubQueueAllocation(b, clusterResource, node_0, 4*GB); - root.assignContainers(clusterResource, node_0); + root.assignContainers(clusterResource, node_0, false); allocationOrder = inOrder(b, a); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 8*GB, clusterResource); @@ -273,12 +274,12 @@ // since A has 3/6G while B has 8/14G stubQueueAllocation(a, clusterResource, node_1, 1*GB); stubQueueAllocation(b, clusterResource, node_1, 1*GB); - root.assignContainers(clusterResource, node_1); + root.assignContainers(clusterResource, node_1, false); allocationOrder = inOrder(a, b); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); verifyQueueMetrics(a, 4*GB, clusterResource); verifyQueueMetrics(b, 9*GB, clusterResource); } @@ -439,7 +440,7 @@ stubQueueAllocation(b, clusterResource, node_0, 0*GB); stubQueueAllocation(c, clusterResource, node_0, 1*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); - root.assignContainers(clusterResource, node_0); + root.assignContainers(clusterResource, node_0, false); verifyQueueMetrics(a, 0*GB, clusterResource); verifyQueueMetrics(b, 0*GB, clusterResource); verifyQueueMetrics(c, 1*GB, clusterResource); @@ -451,7 +452,7 @@ stubQueueAllocation(a, clusterResource, node_1, 0*GB); stubQueueAllocation(b2, clusterResource, node_1, 4*GB); stubQueueAllocation(c, clusterResource, node_1, 0*GB); - root.assignContainers(clusterResource, node_1); + root.assignContainers(clusterResource, node_1, false); verifyQueueMetrics(a, 0*GB, clusterResource); verifyQueueMetrics(b, 4*GB, clusterResource); verifyQueueMetrics(c, 1*GB, clusterResource); @@ -462,14 +463,14 @@ stubQueueAllocation(a1, clusterResource, node_0, 1*GB); stubQueueAllocation(b3, clusterResource, node_0, 2*GB); stubQueueAllocation(c, clusterResource, node_0, 2*GB); - root.assignContainers(clusterResource, node_0); + root.assignContainers(clusterResource, node_0, false); InOrder allocationOrder = inOrder(a, c, b); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); allocationOrder.verify(c).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); verifyQueueMetrics(a, 1*GB, clusterResource); verifyQueueMetrics(b, 6*GB, clusterResource); verifyQueueMetrics(c, 3*GB, clusterResource); @@ -488,16 +489,16 @@ stubQueueAllocation(b3, clusterResource, node_2, 1*GB); stubQueueAllocation(b1, clusterResource, node_2, 1*GB); stubQueueAllocation(c, clusterResource, node_2, 1*GB); - root.assignContainers(clusterResource, node_2); + root.assignContainers(clusterResource, node_2, false); allocationOrder = inOrder(a, a2, a1, b, c); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); allocationOrder.verify(a2).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); allocationOrder.verify(c).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 8*GB, clusterResource); verifyQueueMetrics(c, 4*GB, clusterResource); @@ -597,7 +598,7 @@ // Simulate B returning a container on node_0 stubQueueAllocation(a, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH); stubQueueAllocation(b, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH); - root.assignContainers(clusterResource, node_0); + root.assignContainers(clusterResource, node_0, false); verifyQueueMetrics(a, 0*GB, clusterResource); verifyQueueMetrics(b, 1*GB, clusterResource); @@ -605,12 +606,12 @@ // also, B gets a scheduling opportunity since A allocates RACK_LOCAL stubQueueAllocation(a, clusterResource, node_1, 2*GB, NodeType.RACK_LOCAL); stubQueueAllocation(b, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH); - root.assignContainers(clusterResource, node_1); + root.assignContainers(clusterResource, node_1, false); InOrder allocationOrder = inOrder(a, b); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); @@ -619,12 +620,12 @@ // However, since B returns off-switch, A won't get an opportunity stubQueueAllocation(a, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL); stubQueueAllocation(b, clusterResource, node_0, 2*GB, NodeType.OFF_SWITCH); - root.assignContainers(clusterResource, node_0); + root.assignContainers(clusterResource, node_0, false); allocationOrder = inOrder(b, a); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(b, 4*GB, clusterResource); @@ -663,7 +664,7 @@ // Simulate B3 returning a container on node_0 stubQueueAllocation(b2, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH); stubQueueAllocation(b3, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH); - root.assignContainers(clusterResource, node_0); + root.assignContainers(clusterResource, node_0, false); verifyQueueMetrics(b2, 0*GB, clusterResource); verifyQueueMetrics(b3, 1*GB, clusterResource); @@ -671,12 +672,12 @@ // also, B3 gets a scheduling opportunity since B2 allocates RACK_LOCAL stubQueueAllocation(b2, clusterResource, node_1, 1*GB, NodeType.RACK_LOCAL); stubQueueAllocation(b3, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH); - root.assignContainers(clusterResource, node_1); + root.assignContainers(clusterResource, node_1, false); InOrder allocationOrder = inOrder(b2, b3); allocationOrder.verify(b2).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); allocationOrder.verify(b3).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); verifyQueueMetrics(b2, 1*GB, clusterResource); verifyQueueMetrics(b3, 2*GB, clusterResource); @@ -685,12 +686,12 @@ // However, since B3 returns off-switch, B2 won't get an opportunity stubQueueAllocation(b2, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL); stubQueueAllocation(b3, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH); - root.assignContainers(clusterResource, node_0); + root.assignContainers(clusterResource, node_0, false); allocationOrder = inOrder(b3, b2); allocationOrder.verify(b3).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); allocationOrder.verify(b2).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); verifyQueueMetrics(b2, 1*GB, clusterResource); verifyQueueMetrics(b3, 3*GB, clusterResource); Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java (revision 1610498) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java (working copy) @@ -243,5 +243,30 @@ currentContPreemption, Collections.singletonList(rr), allocation.getNMTokenList()); } + + synchronized public NodeId getNodeIdToUnreserve(Priority priority, + Resource capability) { + // first go around make this algorithm simple and just grab first + // reservation that has enough resources + Map reservedContainers = this.reservedContainers + .get(priority); + + if ((reservedContainers != null) && (!reservedContainers.isEmpty())) { + for (Map.Entry entry : reservedContainers.entrySet()) { + // make sure we unreserve one with at least the same amount of + // resources, otherwise could affect capacity limits + if (Resources.fitsIn(capability, entry.getValue().getContainer() + .getResource())) { + LOG.info("unreserving node with reservation size: " + + entry.getValue().getContainer().getResource() + + " in order to allocate container with size: " + capability); + return entry.getKey(); + } + } + } + return null; + } + + } Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java (revision 1610498) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java (working copy) @@ -21,10 +21,12 @@ import java.util.Comparator; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; /** @@ -55,4 +57,6 @@ ResourceCalculator getResourceCalculator(); Comparator getQueueComparator(); + + FiCaSchedulerNode getNode(NodeId nodeId); } Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java (revision 1610498) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java (working copy) @@ -82,6 +82,13 @@ @Private public static final String STATE = "state"; + + @Private + public static final String RESERVE_CONT_LOOK_ALL_NODES = PREFIX + + "reservations-continue-look-all-nodes"; + + @Private + public static final boolean DEFAULT_RESERVE_CONT_LOOK_ALL_NODES = true; @Private public static final int DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS = 10000; @@ -271,6 +278,17 @@ QueueState.valueOf(state.toUpperCase()) : QueueState.RUNNING; } + /* + * Returns whether we should continue to look at all heart beating nodes even + * after the reservation limit was hit. The node heart beating in could + * satisfy the request thus could be a better pick then waiting for the + * reservation to be fullfilled. This config is refreshable. + */ + public boolean getReservationContinueLook() { + return getBoolean(RESERVE_CONT_LOOK_ALL_NODES, + DEFAULT_RESERVE_CONT_LOOK_ALL_NODES); + } + private static String getAclKey(QueueACL acl) { return "acl_" + acl.toString().toLowerCase(); } Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java (revision 1610498) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java (working copy) @@ -184,10 +184,11 @@ * Assign containers to applications in the queue or it's children (if any). * @param clusterResource the resource of the cluster. * @param node node on which resources are available + * @param needToUnreserve assign container only if it can unreserve one first * @return the assignment */ public CSAssignment assignContainers( - Resource clusterResource, FiCaSchedulerNode node); + Resource clusterResource, FiCaSchedulerNode node, boolean needToUnreserve); /** * A container assigned to the queue has completed. @@ -200,11 +201,13 @@ * container * @param childQueue CSQueue to reinsert in childQueues * @param event event to be sent to the container + * @param sortQueues indicates whether it should re-sort the queues */ public void completedContainer(Resource clusterResource, FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer container, ContainerStatus containerStatus, - RMContainerEventType event, CSQueue childQueue); + RMContainerEventType event, CSQueue childQueue, + boolean sortQueues); /** * Get the number of applications in the queue. Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (revision 1610498) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (working copy) @@ -484,13 +484,13 @@ "Queue configuration missing child queue names for " + queueName); } queue = - new LeafQueue(csContext, queueName, parent,oldQueues.get(queueName)); + new LeafQueue(csContext, queueName, parent, oldQueues.get(queueName)); // Used only for unit tests queue = hook.hook(queue); } else { ParentQueue parentQueue = - new ParentQueue(csContext, queueName, parent,oldQueues.get(queueName)); + new ParentQueue(csContext, queueName, parent, oldQueues.get(queueName)); // Used only for unit tests queue = hook.hook(parentQueue); @@ -824,7 +824,8 @@ node.getNodeID()); LeafQueue queue = ((LeafQueue)reservedApplication.getQueue()); - CSAssignment assignment = queue.assignContainers(clusterResource, node); + CSAssignment assignment = queue.assignContainers(clusterResource, node, + false); RMContainer excessReservation = assignment.getExcessReservation(); if (excessReservation != null) { @@ -835,7 +836,7 @@ SchedulerUtils.createAbnormalContainerStatus( container.getId(), SchedulerUtils.UNRESERVED_CONTAINER), - RMContainerEventType.RELEASED, null); + RMContainerEventType.RELEASED, null, true); } } @@ -848,7 +849,7 @@ LOG.debug("Trying to schedule on node: " + node.getNodeName() + ", available: " + node.getAvailableResource()); } - root.assignContainers(clusterResource, node); + root.assignContainers(clusterResource, node, false); } } else { LOG.info("Skipping scheduling since node " + node.getNodeID() + @@ -1029,7 +1030,7 @@ // Inform the queue LeafQueue queue = (LeafQueue)application.getQueue(); queue.completedContainer(clusterResource, application, node, - rmContainer, containerStatus, event, null); + rmContainer, containerStatus, event, null, true); LOG.info("Application attempt " + application.getApplicationAttemptId() + " released container " + container.getId() + " on node: " + node @@ -1045,7 +1046,7 @@ } @Lock(Lock.NoLock.class) - FiCaSchedulerNode getNode(NodeId nodeId) { + public FiCaSchedulerNode getNode(NodeId nodeId) { return nodes.get(nodeId); } Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (revision 1610498) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (working copy) @@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -129,6 +130,8 @@ private final ResourceCalculator resourceCalculator; + private boolean reservationsContinueLooking; + public LeafQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) { this.scheduler = cs; @@ -202,8 +205,9 @@ maximumCapacity, absoluteMaxCapacity, userLimit, userLimitFactor, maxApplications, maxAMResourcePerQueuePercent, maxApplicationsPerUser, - maxActiveApplications, maxActiveApplicationsPerUser, state, acls, cs - .getConfiguration().getNodeLocalityDelay()); + maxActiveApplications, maxActiveApplicationsPerUser, state, acls, + cs.getConfiguration().getNodeLocalityDelay(), + cs.getConfiguration().getReservationContinueLook()); if(LOG.isDebugEnabled()) { LOG.debug("LeafQueue:" + " name=" + queueName @@ -225,7 +229,8 @@ int maxApplications, float maxAMResourcePerQueuePercent, int maxApplicationsPerUser, int maxActiveApplications, int maxActiveApplicationsPerUser, QueueState state, - Map acls, int nodeLocalityDelay) + Map acls, int nodeLocalityDelay, + boolean continueLooking) { // Sanity check CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity); @@ -257,6 +262,7 @@ this.queueInfo.setQueueState(this.state); this.nodeLocalityDelay = nodeLocalityDelay; + this.reservationsContinueLooking = continueLooking; StringBuilder aclsString = new StringBuilder(); for (Map.Entry e : acls.entrySet()) { @@ -321,7 +327,9 @@ " [= configuredState ]" + "\n" + "acls = " + aclsString + " [= configuredAcls ]" + "\n" + - "nodeLocalityDelay = " + nodeLocalityDelay + "\n"); + "nodeLocalityDelay = " + nodeLocalityDelay + "\n" + + "reservationsContinueLooking = " + + reservationsContinueLooking + "\n"); } @Override @@ -555,6 +563,11 @@ return nodeLocalityDelay; } + @Private + boolean getReservationContinueLooking() { + return reservationsContinueLooking; + } + public String toString() { return queueName + ": " + "capacity=" + capacity + ", " + @@ -613,7 +626,8 @@ newlyParsedLeafQueue.getMaximumActiveApplications(), newlyParsedLeafQueue.getMaximumActiveApplicationsPerUser(), newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls, - newlyParsedLeafQueue.getNodeLocalityDelay()); + newlyParsedLeafQueue.getNodeLocalityDelay(), + newlyParsedLeafQueue.reservationsContinueLooking); // queue metrics are updated, more resource may be available // activate the pending applications if possible @@ -800,8 +814,8 @@ private static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true); @Override - public synchronized CSAssignment - assignContainers(Resource clusterResource, FiCaSchedulerNode node) { + public synchronized CSAssignment assignContainers(Resource clusterResource, + FiCaSchedulerNode node, boolean needToUnreserve) { if(LOG.isDebugEnabled()) { LOG.debug("assignContainers: node=" + node.getNodeName() @@ -846,9 +860,17 @@ Resource required = anyRequest.getCapability(); // Do we need containers at this 'priority'? - if (!needContainers(application, priority, required)) { + if (application.getTotalRequiredResources(priority) <= 0) { continue; } + if (!this.reservationsContinueLooking) { + if (!needContainers(application, priority, required)) { + if (LOG.isDebugEnabled()) { + LOG.debug("doesn't need containers based on reservation algo!"); + } + continue; + } + } // Compute user-limit & set headroom // Note: We compute both user-limit & headroom with the highest @@ -860,14 +882,14 @@ required); // Check queue max-capacity limit - if (!assignToQueue(clusterResource, required)) { + if (!assignToQueue(clusterResource, required, application, true)) { return NULL_ASSIGNMENT; } // Check user limit - if (!assignToUser( - clusterResource, application.getUser(), userLimit)) { - break; + if (!assignToUser(clusterResource, application.getUser(), userLimit, + application, true)) { + break; } // Inform the application it is about to get a scheduling opportunity @@ -876,7 +898,7 @@ // Try to schedule CSAssignment assignment = assignContainersOnNode(clusterResource, node, application, priority, - null); + null, needToUnreserve); // Did the application skip this node? if (assignment.getSkipped()) { @@ -898,6 +920,9 @@ // otherwise the app will be delayed for each non-local assignment. // This helps apps with many off-cluster requests schedule faster. if (assignment.getType() != NodeType.OFF_SWITCH) { + if (LOG.isDebugEnabled()) { + LOG.debug("Resetting scheduling opportunities"); + } application.resetSchedulingOpportunities(priority); } @@ -933,37 +958,73 @@ // Try to assign if we have sufficient resources assignContainersOnNode(clusterResource, node, application, priority, - rmContainer); + rmContainer, false); // Doesn't matter... since it's already charged for at time of reservation // "re-reservation" is *free* return new CSAssignment(Resources.none(), NodeType.NODE_LOCAL); } - private synchronized boolean assignToQueue(Resource clusterResource, - Resource required) { + + @Private + protected synchronized boolean assignToQueue(Resource clusterResource, + Resource required, FiCaSchedulerApp application, + boolean checkReservations) { + + Resource potentialTotalResource = Resources.add(usedResources, required); // Check how of the cluster's absolute capacity we are currently using... - float potentialNewCapacity = - Resources.divide( - resourceCalculator, clusterResource, - Resources.add(usedResources, required), - clusterResource); + float potentialNewCapacity = Resources.divide(resourceCalculator, + clusterResource, potentialTotalResource, clusterResource); if (potentialNewCapacity > absoluteMaxCapacity) { - if (LOG.isDebugEnabled()) { - LOG.debug(getQueueName() - + " usedResources: " + usedResources - + " clusterResources: " + clusterResource - + " currentCapacity " - + Resources.divide(resourceCalculator, clusterResource, + // if enabled, check to see if could we potentially use this node instead + // of a reserved node if the application has reserved containers + if (this.reservationsContinueLooking && checkReservations) { + + float potentialNewWithoutReservedCapacity = Resources.divide( + resourceCalculator, + clusterResource, + Resources.subtract(potentialTotalResource, + application.getCurrentReservation()), + clusterResource); + + if (potentialNewWithoutReservedCapacity <= absoluteMaxCapacity) { + LOG.info("try to use reserved: " + + getQueueName() + + " usedResources: " + + usedResources + + " clusterResources: " + + clusterResource + + " reservedResources: " + + application.getCurrentReservation() + + " currentCapacity " + + Resources.divide(resourceCalculator, clusterResource, + usedResources, clusterResource) + " required " + required + + " potentialNewWithoutReservedCapacity: " + + potentialNewWithoutReservedCapacity + " ( " + " max-capacity: " + + absoluteMaxCapacity + ")"); + // we could potentially use this node instead of reserved node + return true; + } + + } + LOG.info(getQueueName() + + " usedResources: " + + usedResources + + " clusterResources: " + + clusterResource + + " currentCapacity " + + Resources.divide(resourceCalculator, clusterResource, usedResources, clusterResource) + " required " + required - + " potentialNewCapacity: " + potentialNewCapacity + " ( " - + " max-capacity: " + absoluteMaxCapacity + ")"); - } + + " potentialNewCapacity: " + potentialNewCapacity + " ( " + + " max-capacity: " + absoluteMaxCapacity + ")"); + return false; } return true; } + + @Lock({LeafQueue.class, FiCaSchedulerApp.class}) private Resource computeUserLimitAndSetHeadroom( FiCaSchedulerApp application, Resource clusterResource, Resource required) { @@ -1078,25 +1139,43 @@ return limit; } - private synchronized boolean assignToUser(Resource clusterResource, - String userName, Resource limit) { + @Private + protected synchronized boolean assignToUser(Resource clusterResource, + String userName, Resource limit, FiCaSchedulerApp application, + boolean checkReservations) { User user = getUser(userName); - + // Note: We aren't considering the current request since there is a fixed // overhead of the AM, but it's a > check, not a >= check, so... - if (Resources.greaterThan(resourceCalculator, clusterResource, - user.getConsumedResources(), limit)) { + if (Resources.greaterThan(resourceCalculator, clusterResource, + user.getConsumedResources(), limit)) { + + // if enabled, check to see if could we potentially use this node instead + // of a reserved node if the application has reserved containers + if (this.reservationsContinueLooking && checkReservations) { + if (Resources.lessThanOrEqual( + resourceCalculator, + clusterResource, + Resources.subtract(user.getConsumedResources(), + application.getCurrentReservation()), limit)) { + + if (LOG.isDebugEnabled()) { + LOG.debug("User " + userName + " in queue " + getQueueName() + + " will exceed limit based on reservations - " + " consumed: " + + user.getConsumedResources() + " reserved: " + + application.getCurrentReservation() + " limit: " + limit); + } + return true; + } + } if (LOG.isDebugEnabled()) { - LOG.debug("User " + userName + " in queue " + getQueueName() + - " will exceed limit - " + - " consumed: " + user.getConsumedResources() + - " limit: " + limit - ); + LOG.debug("User " + userName + " in queue " + getQueueName() + + " will exceed limit - " + " consumed: " + + user.getConsumedResources() + " limit: " + limit); } return false; } - return true; } @@ -1132,7 +1211,7 @@ private CSAssignment assignContainersOnNode(Resource clusterResource, FiCaSchedulerNode node, FiCaSchedulerApp application, - Priority priority, RMContainer reservedContainer) { + Priority priority, RMContainer reservedContainer, boolean needToUnreserve) { Resource assigned = Resources.none(); @@ -1142,7 +1221,7 @@ if (nodeLocalResourceRequest != null) { assigned = assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, - node, application, priority, reservedContainer); + node, application, priority, reservedContainer, needToUnreserve); if (Resources.greaterThan(resourceCalculator, clusterResource, assigned, Resources.none())) { return new CSAssignment(assigned, NodeType.NODE_LOCAL); @@ -1159,7 +1238,7 @@ assigned = assignRackLocalContainers(clusterResource, rackLocalResourceRequest, - node, application, priority, reservedContainer); + node, application, priority, reservedContainer, needToUnreserve); if (Resources.greaterThan(resourceCalculator, clusterResource, assigned, Resources.none())) { return new CSAssignment(assigned, NodeType.RACK_LOCAL); @@ -1176,21 +1255,91 @@ return new CSAssignment( assignOffSwitchContainers(clusterResource, offSwitchResourceRequest, - node, application, priority, reservedContainer), + node, application, priority, reservedContainer, needToUnreserve), NodeType.OFF_SWITCH); } return SKIP_ASSIGNMENT; } - private Resource assignNodeLocalContainers( - Resource clusterResource, ResourceRequest nodeLocalResourceRequest, - FiCaSchedulerNode node, FiCaSchedulerApp application, - Priority priority, RMContainer reservedContainer) { + @Private + protected boolean findNodeToUnreserve(Resource clusterResource, + FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, + Resource capability) { + // need to unreserve some other container first + NodeId idToUnreserve = application.getNodeIdToUnreserve(priority, capability); + if (idToUnreserve == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("checked to see if could unreserve for app but nothing " + + "reserved that matches for this app"); + } + return false; + } + FiCaSchedulerNode nodeToUnreserve = scheduler.getNode(idToUnreserve); + if (nodeToUnreserve == null) { + LOG.error("node to unreserve doesn't exist, nodeid: " + idToUnreserve); + return false; + } + LOG.info("unreserving for app: " + application.getApplicationId() + + " on nodeId: " + idToUnreserve + + " in order to replace reserved application and place it on node: " + + node.getNodeID() + " needing: " + capability); + + // headroom + Resources.addTo(application.getHeadroom(), nodeToUnreserve + .getReservedContainer().getReservedResource()); + + // Make sure to not have completedContainers sort the queues here since + // we are already inside an iterator loop for the queues and this would + // cause an concurrent modification exception. + completedContainer(clusterResource, application, nodeToUnreserve, + nodeToUnreserve.getReservedContainer(), + SchedulerUtils.createAbnormalContainerStatus(nodeToUnreserve + .getReservedContainer().getContainerId(), + SchedulerUtils.UNRESERVED_CONTAINER), + RMContainerEventType.RELEASED, null, false); + return true; + } + + @Private + protected boolean checkLimitsToReserve(Resource clusterResource, + FiCaSchedulerApp application, Resource capability, + boolean needToUnreserve) { + if (needToUnreserve) { + LOG.info("we needed to unreserve to be able to allocate"); + return false; + } + + // we can't reserve if we got here based on the limit + // checks assuming we could unreserve!!! + Resource userLimit = computeUserLimitAndSetHeadroom(application, + clusterResource, capability); + + // Check queue max-capacity limit + if (!assignToQueue(clusterResource, capability, application, false)) { + LOG.info("was going to reserve but hit queue limit"); + return false; + } + + // Check user limit + if (!assignToUser(clusterResource, application.getUser(), userLimit, + application, false)) { + LOG.info("was going to reserve but hit user limit"); + return false; + } + return true; + } + + + private Resource assignNodeLocalContainers(Resource clusterResource, + ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node, + FiCaSchedulerApp application, Priority priority, + RMContainer reservedContainer, boolean needToUnreserve) { if (canAssign(application, priority, node, NodeType.NODE_LOCAL, reservedContainer)) { - return assignContainer(clusterResource, node, application, priority, - nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer); + return assignContainer(clusterResource, node, application, priority, + nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer, + needToUnreserve); } return Resources.none(); @@ -1199,11 +1348,12 @@ private Resource assignRackLocalContainers( Resource clusterResource, ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, - RMContainer reservedContainer) { + RMContainer reservedContainer, boolean needToUnreserve) { if (canAssign(application, priority, node, NodeType.RACK_LOCAL, reservedContainer)) { - return assignContainer(clusterResource, node, application, priority, - rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer); + return assignContainer(clusterResource, node, application, priority, + rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer, + needToUnreserve); } return Resources.none(); @@ -1212,11 +1362,12 @@ private Resource assignOffSwitchContainers( Resource clusterResource, ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, - RMContainer reservedContainer) { + RMContainer reservedContainer, boolean needToUnreserve) { if (canAssign(application, priority, node, NodeType.OFF_SWITCH, reservedContainer)) { - return assignContainer(clusterResource, node, application, priority, - offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer); + return assignContainer(clusterResource, node, application, priority, + offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer, + needToUnreserve); } return Resources.none(); @@ -1296,14 +1447,17 @@ return container; } + private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, - ResourceRequest request, NodeType type, RMContainer rmContainer) { + ResourceRequest request, NodeType type, RMContainer rmContainer, + boolean needToUnreserve) { if (LOG.isDebugEnabled()) { LOG.debug("assignContainers: node=" + node.getNodeName() + " application=" + application.getApplicationId() + " priority=" + priority.getPriority() - + " request=" + request + " type=" + type); + + " request=" + request + " type=" + type + + " needToUnreserve= " + needToUnreserve); } Resource capability = request.getCapability(); Resource available = node.getAvailableResource(); @@ -1328,6 +1482,18 @@ return Resources.none(); } + // default to true since if reservation continue look feature isn't on + // needContainers is checked earlier and we wouldn't have gotten this far + boolean canAllocContainer = true; + if (this.reservationsContinueLooking) { + // based on reservations can we allocate/reserve more or do we need + // to unreserve one first + canAllocContainer = needContainers(application, priority, capability); + if (LOG.isDebugEnabled()) { + LOG.debug("can alloc container is: " + canAllocContainer); + } + } + // Can we allocate a container on this node? int availableContainers = resourceCalculator.computeAvailableContainers(available, capability); @@ -1335,8 +1501,28 @@ // Allocate... // Did we previously reserve containers at this 'priority'? - if (rmContainer != null){ + if (rmContainer != null) { unreserve(application, priority, node, rmContainer); + } else if (this.reservationsContinueLooking + && (!canAllocContainer || needToUnreserve)) { + // need to unreserve some other container first + boolean res = findNodeToUnreserve(clusterResource, node, application, + priority, capability); + if (!res) { + return Resources.none(); + } + } else { + // we got here by possibly ignoring queue capacity limits. If the + // parameter needToUnreserve is true it means we ignored one of those + // limits in the chance we could unreserve. If we are here we aren't + // trying to unreserve so we can't allocate anymore due to that parent + // limit. + if (needToUnreserve) { + if (LOG.isDebugEnabled()) { + LOG.debug("we needed to unreserve to be able to allocate, skipping"); + } + return Resources.none(); + } } // Inform the application @@ -1359,17 +1545,38 @@ return container.getResource(); } else { - // Reserve by 'charging' in advance... - reserve(application, priority, node, rmContainer, container); + // if we are allowed to allocate but this node doesn't have space, reserve it or + // if this was an already a reserved container, reserve it again + if ((canAllocContainer) || (rmContainer != null)) { - LOG.info("Reserved container " + - " application attempt=" + application.getApplicationAttemptId() + - " resource=" + request.getCapability() + - " queue=" + this.toString() + - " node=" + node + - " clusterResource=" + clusterResource); + if (reservationsContinueLooking) { + // we got here by possibly ignoring parent queue capacity limits. If + // the parameter needToUnreserve is true it means we ignored one of + // those limits in the chance we could unreserve. If we are here + // we aren't trying to unreserve so we can't allocate + // anymore due to that parent limit + boolean res = checkLimitsToReserve(clusterResource, application, capability, + needToUnreserve); + if (!res) { + return Resources.none(); + } + } - return request.getCapability(); + // Reserve by 'charging' in advance... + reserve(application, priority, node, rmContainer, container); + + LOG.info("Reserved container " + + " application=" + application.getApplicationId() + + " resource=" + request.getCapability() + + " queue=" + this.toString() + + " usedCapacity=" + getUsedCapacity() + + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + + " used=" + usedResources + + " cluster=" + clusterResource); + + return request.getCapability(); + } + return Resources.none(); } } @@ -1395,8 +1602,8 @@ node.unreserveResource(application); // Update reserved metrics - getMetrics().unreserveResource( - application.getUser(), rmContainer.getContainer().getResource()); + getMetrics().unreserveResource(application.getUser(), + rmContainer.getContainer().getResource()); return true; } return false; @@ -1405,7 +1612,8 @@ @Override public void completedContainer(Resource clusterResource, FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer, - ContainerStatus containerStatus, RMContainerEventType event, CSQueue childQueue) { + ContainerStatus containerStatus, RMContainerEventType event, CSQueue childQueue, + boolean sortQueues) { if (application != null) { boolean removed = false; @@ -1442,7 +1650,7 @@ if (removed) { // Inform the parent queue _outside_ of the leaf-queue lock getParent().completedContainer(clusterResource, application, node, - rmContainer, null, event, this); + rmContainer, null, event, this, sortQueues); } } } @@ -1459,6 +1667,8 @@ String userName = application.getUser(); User user = getUser(userName); user.assignContainer(resource); + // Note this is a bit unconventional since it gets the object and modifies it here + // rather then using set routine Resources.subtractFrom(application.getHeadroom(), resource); // headroom metrics.setAvailableResourcesToUser(userName, application.getHeadroom()); @@ -1578,7 +1788,7 @@ public synchronized void releaseContainer(Resource resource) { Resources.subtractFrom(consumed, resource); - } + } } @Override Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (revision 1610498) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (working copy) @@ -100,6 +100,8 @@ RecordFactoryProvider.getRecordFactory(null); private final ResourceCalculator resourceCalculator; + + private boolean reservationsContinueLooking; public ParentQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) { @@ -146,7 +148,8 @@ setupQueueConfigs(cs.getClusterResource(), capacity, absoluteCapacity, - maximumCapacity, absoluteMaxCapacity, state, acls); + maximumCapacity, absoluteMaxCapacity, state, acls, + cs.getConfiguration().getReservationContinueLook()); this.queueComparator = cs.getQueueComparator(); this.childQueues = new TreeSet(queueComparator); @@ -160,7 +163,8 @@ Resource clusterResource, float capacity, float absoluteCapacity, float maximumCapacity, float absoluteMaxCapacity, - QueueState state, Map acls + QueueState state, Map acls, + boolean continueLooking ) { // Sanity check CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity); @@ -180,6 +184,8 @@ this.queueInfo.setMaximumCapacity(this.maximumCapacity); this.queueInfo.setQueueState(this.state); + this.reservationsContinueLooking = continueLooking; + StringBuilder aclsString = new StringBuilder(); for (Map.Entry e : acls.entrySet()) { aclsString.append(e.getKey() + ":" + e.getValue().getAclString()); @@ -195,7 +201,8 @@ ", maxCapacity=" + maximumCapacity + ", asboluteMaxCapacity=" + absoluteMaxCapacity + ", state=" + state + - ", acls=" + aclsString); + ", acls=" + aclsString + + ", reservationsContinueLooking=" + reservationsContinueLooking); } private static float PRECISION = 0.0005f; // 0.05% precision @@ -383,7 +390,8 @@ newlyParsedParentQueue.maximumCapacity, newlyParsedParentQueue.absoluteMaxCapacity, newlyParsedParentQueue.state, - newlyParsedParentQueue.acls); + newlyParsedParentQueue.acls, + newlyParsedParentQueue.reservationsContinueLooking); // Re-configure existing child queues and add new ones // The CS has already checked to ensure all existing child queues are present! @@ -551,7 +559,7 @@ @Override public synchronized CSAssignment assignContainers( - Resource clusterResource, FiCaSchedulerNode node) { + Resource clusterResource, FiCaSchedulerNode node, boolean needToUnreserve) { CSAssignment assignment = new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); @@ -561,14 +569,19 @@ + getQueueName()); } + boolean localNeedToUnreserve = false; // Are we over maximum-capacity for this queue? if (!assignToQueue(clusterResource)) { - break; + // check to see if we could if we unreserve first + localNeedToUnreserve = assignToQueueIfUnreserve(clusterResource); + if (!localNeedToUnreserve) { + break; + } } // Schedule CSAssignment assignedToChild = - assignContainersToChildQueues(clusterResource, node); + assignContainersToChildQueues(clusterResource, node, localNeedToUnreserve | needToUnreserve); assignment.setType(assignedToChild.getType()); // Done if no child-queue assigned anything @@ -632,7 +645,38 @@ return true; } + + private synchronized boolean assignToQueueIfUnreserve(Resource clusterResource) { + if (this.reservationsContinueLooking) { + // check to see if we could potentially use this node instead of a reserved + // node + + Resource reservedResources = Resources.createResource(getMetrics() + .getReservedMB(), getMetrics().getReservedVirtualCores()); + float capacityWithoutReservedCapacity = Resources.divide( + resourceCalculator, clusterResource, + Resources.subtract(usedResources, reservedResources), + clusterResource); + + if (capacityWithoutReservedCapacity <= absoluteMaxCapacity) { + LOG.info("parent: try to use reserved: " + getQueueName() + + " usedResources: " + usedResources.getMemory() + + " clusterResources: " + clusterResource.getMemory() + + " reservedResources: " + reservedResources.getMemory() + + " currentCapacity " + ((float) usedResources.getMemory()) + / clusterResource.getMemory() + + " potentialNewWithoutReservedCapacity: " + + capacityWithoutReservedCapacity + " ( " + " max-capacity: " + + absoluteMaxCapacity + ")"); + // we could potentially use this node instead of reserved node + return true; + } + } + return false; + } + + private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) { return (node.getReservedContainer() == null) && Resources.greaterThanOrEqual(resourceCalculator, clusterResource, @@ -640,7 +684,7 @@ } synchronized CSAssignment assignContainersToChildQueues(Resource cluster, - FiCaSchedulerNode node) { + FiCaSchedulerNode node, boolean needToUnreserve) { CSAssignment assignment = new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); @@ -653,7 +697,7 @@ LOG.debug("Trying to assign to queue: " + childQueue.getQueuePath() + " stats: " + childQueue); } - assignment = childQueue.assignContainers(cluster, node); + assignment = childQueue.assignContainers(cluster, node, needToUnreserve); if(LOG.isDebugEnabled()) { LOG.debug("Assigned to queue: " + childQueue.getQueuePath() + " stats: " + childQueue + " --> " + @@ -697,7 +741,8 @@ public void completedContainer(Resource clusterResource, FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer, ContainerStatus containerStatus, - RMContainerEventType event, CSQueue completedChildQueue) { + RMContainerEventType event, CSQueue completedChildQueue, + boolean sortQueues) { if (application != null) { // Careful! Locking order is important! // Book keeping @@ -713,16 +758,21 @@ " cluster=" + clusterResource); } - // reinsert the updated queue - for (Iterator iter=childQueues.iterator(); iter.hasNext();) { - CSQueue csqueue = iter.next(); - if(csqueue.equals(completedChildQueue)) - { - iter.remove(); - LOG.info("Re-sorting completed queue: " + csqueue.getQueuePath() + - " stats: " + csqueue); - childQueues.add(csqueue); - break; + // Note that this is using an iterator on the childQueues so this can't be + // called if already within an iterator for the childQueues. Like + // from assignContainersToChildQueues. + if (sortQueues) { + // reinsert the updated queue + for (Iterator iter=childQueues.iterator(); iter.hasNext();) { + CSQueue csqueue = iter.next(); + if(csqueue.equals(completedChildQueue)) + { + iter.remove(); + LOG.info("Re-sorting completed queue: " + csqueue.getQueuePath() + + " stats: " + csqueue); + childQueues.add(csqueue); + break; + } } } @@ -730,10 +780,15 @@ if (parent != null) { // complete my parent parent.completedContainer(clusterResource, application, - node, rmContainer, null, event, this); + node, rmContainer, null, event, this, sortQueues); } } } + + @Private + boolean getReservationContinueLooking() { + return reservationsContinueLooking; + } synchronized void allocateResource(Resource clusterResource, Resource resource) { Index: hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml =================================================================== --- hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml (revision 1610498) +++ hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml (working copy) @@ -331,4 +331,16 @@ + + + + + + + + + + + +