diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 23bf381..4fa0b52 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -180,6 +180,19 @@ public static final boolean DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE = false; @Private + public static final String ASSIGN_MULTIPLE = PREFIX + "assignmultiple"; + + @Private + public static final boolean DEFAULT_ASSIGN_MULTIPLE = true; + + /** Maximum number of containers to assign on each check-in. */ + @Private + public static final String MAX_ASSIGN = PREFIX + "max.assign"; + + @Private + public static final int DEFAULT_MAX_ASSIGN = -1; + + @Private public static class QueueMapping { public enum MappingType { @@ -635,6 +648,13 @@ public boolean getOverrideWithQueueMappings() { DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE); } + public boolean getAssignMultiple() { + return getBoolean(ASSIGN_MULTIPLE, DEFAULT_ASSIGN_MULTIPLE); + } + + public int getMaxAssignContainers() { + return getInt(MAX_ASSIGN, DEFAULT_MAX_ASSIGN); + } /** * Returns a collection of strings, trimming leading and trailing whitespeace * on each value diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 6ffaf4c..aa4f401 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -73,6 +73,8 @@ private final boolean rootQueue; final Comparator queueComparator; volatile int numApplications; + boolean assignMultiple; + int maxAllocationPerNode; private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -117,7 +119,8 @@ public ParentQueue(CapacitySchedulerContext cs, cs.getConfiguration().getReservationContinueLook()); this.childQueues = new TreeSet(queueComparator); - + this.assignMultiple = cs.getConfiguration().getAssignMultiple(); + this.maxAllocationPerNode = cs.getConfiguration().getMaxAssignContainers(); LOG.info("Initialized parent-queue " + queueName + " name=" + queueName + ", fullname=" + getQueuePath()); @@ -416,26 +419,30 @@ private synchronized void removeApplication(ApplicationId applicationId, } @Override - public synchronized CSAssignment assignContainers( - Resource clusterResource, FiCaSchedulerNode node, boolean needToUnreserve) { - CSAssignment assignment = + public synchronized CSAssignment assignContainers(Resource clusterResource, + FiCaSchedulerNode node, boolean needToUnreserve) { + if (LOG.isDebugEnabled()) { + LOG.info("START Assigning for NODE " + node.getNodeName()); + } + + CSAssignment assignment = new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); - + // if our queue cannot access this node, just return - if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels, - labelManager.getLabelsOnNode(node.getNodeID()))) { + if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels, labelManager + .getLabelsOnNode(node.getNodeID()))) { return assignment; } - + while (canAssign(clusterResource, node)) { if (LOG.isDebugEnabled()) { LOG.debug("Trying to assign containers to child-queue of " - + getQueueName()); + + getQueueName()); } - + boolean localNeedToUnreserve = false; - Set nodeLabels = labelManager.getLabelsOnNode(node.getNodeID()); - + Set nodeLabels = labelManager.getLabelsOnNode(node.getNodeID()); + // Are we over maximum-capacity for this queue? if (!canAssignToThisQueue(clusterResource, nodeLabels)) { // check to see if we could if we unreserve first @@ -444,39 +451,55 @@ public synchronized CSAssignment assignContainers( break; } } - + + int assignedContainer = 0; + // CapacitySchedulerConfiguration.ASSIGN_MULTIPLE // Schedule - CSAssignment assignedToChild = - assignContainersToChildQueues(clusterResource, node, localNeedToUnreserve | needToUnreserve); + CSAssignment assignedToChild = + assignContainersToChildQueues(clusterResource, node, + localNeedToUnreserve | needToUnreserve); assignment.setType(assignedToChild.getType()); - + // Done if no child-queue assigned anything - if (Resources.greaterThan( - resourceCalculator, clusterResource, - assignedToChild.getResource(), Resources.none())) { + if (Resources.greaterThan(resourceCalculator, clusterResource, + assignedToChild.getResource(), Resources.none())) { // Track resource utilization for the parent-queue super.allocateResource(clusterResource, assignedToChild.getResource(), - nodeLabels); - - // Track resource utilization in this pass of the scheduler - Resources.addTo(assignment.getResource(), assignedToChild.getResource()); - - LOG.info("assignedContainer" + - " queue=" + getQueueName() + - " usedCapacity=" + getUsedCapacity() + - " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + - " used=" + usedResources + - " cluster=" + clusterResource); + nodeLabels); + // Track resource utilization in this pass of the scheduler + Resources + .addTo(assignment.getResource(), assignedToChild.getResource()); + ++assignedContainer; + LOG.info("assignedContainer" + " queue=" + getQueueName() + + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + + getAbsoluteUsedCapacity() + " used=" + usedResources + + " cluster=" + clusterResource); + + // Check if we can assign multiple containers per node per heartbeat + + if (!assignMultiple) { + LOG + .info("assignMultiple is false, can't assign more then one container"); + break; + } + // Check if we not assign containers more then max containers + // per node per heart beat + if (maxAllocationPerNode > 0 + && maxAllocationPerNode <= assignedContainer) { + LOG + .info("Max Allocation per node reached for the node in this heartbeat"); + break; + } } else { break; } if (LOG.isDebugEnabled()) { LOG.debug("ParentQ=" + getQueueName() - + " assignedSoFarInThisIteration=" + assignment.getResource() - + " usedCapacity=" + getUsedCapacity() - + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity()); + + " assignedSoFarInThisIteration=" + assignment.getResource() + + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + + getAbsoluteUsedCapacity()); } // Do not assign more than one container if this isn't the root queue @@ -484,14 +507,14 @@ public synchronized CSAssignment assignContainers( if (!rootQueue || assignment.getType() == NodeType.OFF_SWITCH) { if (LOG.isDebugEnabled()) { if (rootQueue && assignment.getType() == NodeType.OFF_SWITCH) { - LOG.debug("Not assigning more than one off-switch container," + - " assignments so far: " + assignment); + LOG.debug("Not assigning more than one off-switch container," + + " assignments so far: " + assignment); } } break; } - } - + } + return assignment; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 2aa57a0..2436430 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -148,6 +148,7 @@ private ResourceManager resourceManager = null; private RMContext mockContext; + private YarnConfiguration conf; @Before public void setUp() throws Exception { @@ -166,12 +167,13 @@ protected RMNodeLabelsManager createNodeLabelManager() { conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class); resourceManager.init(conf); + this.conf = conf; resourceManager.getRMContext().getContainerTokenSecretManager().rollMasterKey(); resourceManager.getRMContext().getNMTokenSecretManager().rollMasterKey(); ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start(); mockContext = mock(RMContext.class); when(mockContext.getConfigurationProvider()).thenReturn( - new LocalConfigurationProvider()); + new LocalConfigurationProvider()); } @After @@ -229,6 +231,20 @@ public void testConfValidation() throws Exception { resourceManager.getResourceScheduler().handle(nodeAddEvent1); return nm; } + + private org.apache.hadoop.yarn.server.resourcemanager.NodeManager registerNode( + ResourceManager rm, String hostName, int containerManagerPort, + int httpPort, String rackName, Resource capability) throws IOException, + YarnException { + org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm = + new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(hostName, + containerManagerPort, httpPort, rackName, capability, rm); + NodeAddedSchedulerEvent nodeAddEvent1 = + new NodeAddedSchedulerEvent(rm.getRMContext().getRMNodes().get( + nm.getNodeId())); + rm.getResourceScheduler().handle(nodeAddEvent1); + return nm; + } @Test public void testCapacityScheduler() throws Exception { @@ -350,6 +366,113 @@ public void testCapacityScheduler() throws Exception { LOG.info("--- END: testCapacityScheduler ---"); } + @Test + public void testNotAssignMultiple() throws Exception { + LOG.info("--- START: testNotAssignMultiple ---"); + ResourceManager rm = new ResourceManager() { + @Override + protected RMNodeLabelsManager createNodeLabelManager() { + RMNodeLabelsManager mgr = new MemoryRMNodeLabelsManager(); + mgr.init(getConfig()); + return mgr; + } + }; + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + csConf.setBoolean("yarn.scheduler.capacity.assignmultiple", false); + setupQueueConfiguration(csConf); + YarnConfiguration conf = new YarnConfiguration(csConf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + rm.init(conf); + this.conf = conf; + rm.getRMContext().getContainerTokenSecretManager().rollMasterKey(); + rm.getRMContext().getNMTokenSecretManager().rollMasterKey(); + ((AsyncDispatcher) rm.getRMContext().getDispatcher()).start(); + RMContext mC = mock(RMContext.class); + when(mC.getConfigurationProvider()).thenReturn( + new LocalConfigurationProvider()); + + // Register node1 + String host_0 = "host_0"; + org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = + registerNode(rm, host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(4 * GB, 1)); + + // Register node2 + String host_1 = "host_1"; + org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_1 = + registerNode(rm, host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(4 * GB, 1)); + + // ResourceRequest priorities + Priority priority_0 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(0); + Priority priority_1 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(1); + + // Submit an application + Application application_0 = new Application("user_0", "a1", rm); + application_0.submit(); + + application_0.addNodeManager(host_0, 1234, nm_0); + application_0.addNodeManager(host_1, 1234, nm_1); + + Resource capability_0_0 = Resources.createResource(1 * GB, 1); + application_0.addResourceRequestSpec(priority_1, capability_0_0); + + Resource capability_0_1 = Resources.createResource(2 * GB, 1); + application_0.addResourceRequestSpec(priority_0, capability_0_1); + + Task task_0_0 = + new Task(application_0, priority_1, new String[] { host_0, host_1 }); + application_0.addTask(task_0_0); + + // Submit another application + Application application_1 = new Application("user_1", "b2", rm); + application_1.submit(); + + application_1.addNodeManager(host_0, 1234, nm_0); + application_1.addNodeManager(host_1, 1234, nm_1); + + Resource capability_1_0 = Resources.createResource(1 * GB, 1); + application_1.addResourceRequestSpec(priority_1, capability_1_0); + + Resource capability_1_1 = Resources.createResource(2 * GB, 1); + application_1.addResourceRequestSpec(priority_0, capability_1_1); + + Task task_1_0 = + new Task(application_1, priority_1, new String[] { host_0, host_1 }); + application_1.addTask(task_1_0); + + // Send resource requests to the scheduler + application_0.schedule(); + application_1.schedule(); + + // Send a heartbeat to kick the tires on the Scheduler + LOG.info("Kick!"); + + // task_0_0 and task_1_0 allocated, used=4G + nodeUpdate(rm, nm_0); + + // nothing allocated + nodeUpdate(rm, nm_1); + + // Get allocations from the scheduler + application_0.schedule(); // task_0_0 + // 1 Task per heart beat should be scheduled + checkApplicationResourceUsage(1 * GB, application_0); + // 1 Task per heart beat should be scheduled + application_1.schedule(); // task_1_0 + checkApplicationResourceUsage(1 * GB, application_1); + + checkNodeResourceUsage(1 * GB, nm_0); // task_0_0 (1G) and task_1_0 (3G) + checkNodeResourceUsage(1 * GB, nm_1); // no tasks, 2G available + LOG.info("--- START: testNotAssignMultiple ---"); + } + private void nodeUpdate( org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm) { RMNode node = resourceManager.getRMContext().getRMNodes().get(nm.getNodeId()); @@ -357,7 +480,15 @@ private void nodeUpdate( NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node); resourceManager.getResourceScheduler().handle(nodeUpdate); } - + + private void nodeUpdate(ResourceManager rm, + org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm) { + RMNode node = rm.getRMContext().getRMNodes().get(nm.getNodeId()); + // Send a heartbeat to kick the tires on the Scheduler + NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node); + rm.getResourceScheduler().handle(nodeUpdate); + } + private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) { // Define top-level queues