diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index d3186da5b4..35085ade21 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -93,11 +93,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; @@ -162,6 +160,9 @@ private int offswitchPerHeartbeatLimit; + private boolean assignMultipleEnabled; + + private int maxAssignPerHeartbeat; @Override public void setConf(Configuration conf) { @@ -307,6 +308,9 @@ void initScheduler(Configuration configuration) throws asyncScheduleInterval = this.conf.getLong(ASYNC_SCHEDULER_INTERVAL, DEFAULT_ASYNC_SCHEDULER_INTERVAL); + this.assignMultipleEnabled = this.conf.getAssignMultipleEnabled(); + this.maxAssignPerHeartbeat = this.conf.getMaxAssignPerHeartbeat(); + // number of threads for async scheduling int maxAsyncSchedulingThreads = this.conf.getInt( CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD, @@ -1086,17 +1090,22 @@ private void updateSchedulerHealth(long now, NodeId nodeId, .getAssignmentInformation().getReserved()); } - private boolean canAllocateMore(CSAssignment assignment, int offswitchCount) { - if (null != assignment && Resources.greaterThan(getResourceCalculator(), - getClusterResource(), assignment.getResource(), Resources.none()) - && offswitchCount < offswitchPerHeartbeatLimit) { - // And it should not be a reserved container - if (assignment.getAssignmentInformation().getNumReservations() == 0) { - return true; - } + private boolean canAllocateMore(CSAssignment assignment, int offswitchCount, + int assignedContainers) { + // Current assignment shouldn't be empty + if (assignment == null + || Resources.equals(assignment.getResource(), Resources.none())) { + return false; + } + + // offswitch assignment should be under threshold + if (offswitchCount >= offswitchPerHeartbeatLimit) { + return false; } - return false; + // assignMultipleEnabled should be ON, and assignedContainers should be under threshold + return assignMultipleEnabled + && (maxAssignPerHeartbeat == -1 || assignedContainers < maxAssignPerHeartbeat); } /** @@ -1108,6 +1117,7 @@ private void allocateContainersToNode(NodeId nodeId, FiCaSchedulerNode node = getNode(nodeId); if (null != node) { int offswitchCount = 0; + int assignedContainers = 0; PlacementSet ps = new SimplePlacementSet<>(node); CSAssignment assignment = allocateContainersToNode(ps, withNodeHeartbeat); @@ -1118,7 +1128,12 @@ private void allocateContainersToNode(NodeId nodeId, offswitchCount++; } - while (canAllocateMore(assignment, offswitchCount)) { + if (Resources.greaterThan(calculator, getClusterResource(), + assignment.getResource(), Resources.none())) { + assignedContainers ++; + } + + while (canAllocateMore(assignment, offswitchCount, assignedContainers)) { // Try to see if it is possible to allocate multiple container for // the same node heartbeat assignment = allocateContainersToNode(ps, true); @@ -1127,6 +1142,12 @@ private void allocateContainersToNode(NodeId nodeId, && assignment.getType() == NodeType.OFF_SWITCH) { offswitchCount++; } + + if (null != assignment + && Resources.greaterThan(calculator, getClusterResource(), + assignment.getResource(), Resources.none())) { + assignedContainers ++; + } } if (offswitchCount >= offswitchPerHeartbeatLimit) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 90a7e65ddc..439e3efb3b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -301,6 +301,21 @@ @Private public static final boolean DEFAULT_LAZY_PREEMPTION_ENABLED = false; + @Private + public static final String ASSIGN_MULTIPLE_ENABLED = PREFIX + + "per-node-heartbeat.multiple-assignments-enabled"; + + @Private + public static final boolean DEFAULT_ASSIGN_MULTIPLE_ENABLED = true; + + /** Maximum number of containers to assign on each check-in. */ + @Private + public static final String MAX_ASSIGN_PER_HEARTBEAT = PREFIX + + "per-node-heartbeat.maximum-container-assignments"; + + @Private + public static final int DEFAULT_MAX_ASSIGN_PER_HEARTBEAT = -1; + AppPriorityACLConfigurationParser priorityACLConfig = new AppPriorityACLConfigurationParser(); public CapacitySchedulerConfiguration() { @@ -1446,4 +1461,12 @@ public void setPUOrderingPolicyUnderUtilizedPreemptionMoveReservation( } return userWeights; } + + public boolean getAssignMultipleEnabled() { + return getBoolean(ASSIGN_MULTIPLE_ENABLED, DEFAULT_ASSIGN_MULTIPLE_ENABLED); + } + + public int getMaxAssignPerHeartbeat() { + return getInt(MAX_ASSIGN_PER_HEARTBEAT, DEFAULT_MAX_ASSIGN_PER_HEARTBEAT); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 3c6e6dfb67..d81635e0b6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -90,7 +90,6 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.AdminService; import org.apache.hadoop.yarn.server.resourcemanager.Application; -import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; @@ -228,6 +227,19 @@ public void tearDown() throws Exception { } } + private org.apache.hadoop.yarn.server.resourcemanager.NodeManager + registerNode(ResourceManager rm, String hostName, int containerManagerPort, + int httpPort, String rackName, Resource capability) throws IOException, + YarnException { + org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm = + new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(hostName, + containerManagerPort, httpPort, rackName, capability, rm); + NodeAddedSchedulerEvent nodeAddEvent1 = + new NodeAddedSchedulerEvent(rm.getRMContext().getRMNodes() + .get(nm.getNodeId())); + rm.getResourceScheduler().handle(nodeAddEvent1); + return nm; + } @Test (timeout = 30000) public void testConfValidation() throws Exception { @@ -395,6 +407,218 @@ public void testCapacityScheduler() throws Exception { LOG.info("--- END: testCapacityScheduler ---"); } + @Test + public void testNotAssignMultiple() throws Exception { + LOG.info("--- START: testNotAssignMultiple ---"); + ResourceManager rm = new ResourceManager() { + @Override + protected RMNodeLabelsManager createNodeLabelManager() { + RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); + mgr.init(getConfig()); + return mgr; + } + }; + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + csConf.setBoolean(CapacitySchedulerConfiguration.ASSIGN_MULTIPLE_ENABLED, false); + setupQueueConfiguration(csConf); + YarnConfiguration conf = new YarnConfiguration(csConf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + rm.init(conf); + rm.getRMContext().getContainerTokenSecretManager().rollMasterKey(); + rm.getRMContext().getNMTokenSecretManager().rollMasterKey(); + ((AsyncDispatcher) rm.getRMContext().getDispatcher()).start(); + RMContext mC = mock(RMContext.class); + when(mC.getConfigurationProvider()).thenReturn( + new LocalConfigurationProvider()); + + // Register node1 + String host_0 = "host_0"; + org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = + registerNode(rm, host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(10 * GB, 10)); + + // ResourceRequest priorities + Priority priority_0 = Priority.newInstance(0); + Priority priority_1 = Priority.newInstance(1); + + // Submit an application + Application application_0 = new Application("user_0", "a1", rm); + application_0.submit(); + application_0.addNodeManager(host_0, 1234, nm_0); + + Resource capability_0_0 = Resources.createResource(1 * GB, 1); + application_0.addResourceRequestSpec(priority_0, capability_0_0); + + Resource capability_0_1 = Resources.createResource(2 * GB, 1); + application_0.addResourceRequestSpec(priority_1, capability_0_1); + + Task task_0_0 = + new Task(application_0, priority_0, new String[] { host_0 }); + Task task_0_1 = + new Task(application_0, priority_1, new String[] { host_0 }); + application_0.addTask(task_0_0); + application_0.addTask(task_0_1); + + // Submit another application + Application application_1 = new Application("user_1", "b2", rm); + application_1.submit(); + application_1.addNodeManager(host_0, 1234, nm_0); + + Resource capability_1_0 = Resources.createResource(3 * GB, 1); + application_1.addResourceRequestSpec(priority_0, capability_1_0); + + Resource capability_1_1 = Resources.createResource(4 * GB, 1); + application_1.addResourceRequestSpec(priority_1, capability_1_1); + + Task task_1_0 = + new Task(application_1, priority_0, new String[] { host_0 }); + Task task_1_1 = + new Task(application_1, priority_1, new String[] { host_0 }); + application_1.addTask(task_1_0); + application_1.addTask(task_1_1); + + // Send resource requests to the scheduler + application_0.schedule(); + + application_1.schedule(); + + // Send a heartbeat to kick the tires on the Scheduler + LOG.info("Kick!"); + + // task_0_0, used=1G + nodeUpdate(rm, nm_0); + + // Get allocations from the scheduler + application_0.schedule(); + application_1.schedule(); + // 1 Task per heart beat should be scheduled + checkNodeResourceUsage(3 * GB, nm_0); // task_0_0 (1G) + checkApplicationResourceUsage(0 * GB, application_0); + checkApplicationResourceUsage(3 * GB, application_1); + + // Another heartbeat + nodeUpdate(rm, nm_0); + application_0.schedule(); + checkApplicationResourceUsage(1 * GB, application_0); + application_1.schedule(); + checkApplicationResourceUsage(3 * GB, application_1); + checkNodeResourceUsage(4 * GB, nm_0); + LOG.info("--- START: testNotAssignMultiple ---"); + } + + @Test + public void testAssignMultiple() throws Exception { + LOG.info("--- START: testAssignMultiple ---"); + ResourceManager rm = new ResourceManager() { + @Override + protected RMNodeLabelsManager createNodeLabelManager() { + RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); + mgr.init(getConfig()); + return mgr; + } + }; + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + csConf.setBoolean(CapacitySchedulerConfiguration.ASSIGN_MULTIPLE_ENABLED, true); + // Each heartbeat will assign 2 containers at most + csConf.setInt(CapacitySchedulerConfiguration.MAX_ASSIGN_PER_HEARTBEAT, 2); + setupQueueConfiguration(csConf); + YarnConfiguration conf = new YarnConfiguration(csConf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + rm.init(conf); + rm.getRMContext().getContainerTokenSecretManager().rollMasterKey(); + rm.getRMContext().getNMTokenSecretManager().rollMasterKey(); + ((AsyncDispatcher) rm.getRMContext().getDispatcher()).start(); + RMContext mC = mock(RMContext.class); + when(mC.getConfigurationProvider()).thenReturn( + new LocalConfigurationProvider()); + + // Register node1 + String host_0 = "host_0"; + org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = + registerNode(rm, host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(10 * GB, 10)); + + // ResourceRequest priorities + Priority priority_0 = Priority.newInstance(0); + Priority priority_1 = Priority.newInstance(1); + + // Submit an application + Application application_0 = new Application("user_0", "a1", rm); + application_0.submit(); + application_0.addNodeManager(host_0, 1234, nm_0); + + Resource capability_0_0 = Resources.createResource(1 * GB, 1); + application_0.addResourceRequestSpec(priority_0, capability_0_0); + + Resource capability_0_1 = Resources.createResource(2 * GB, 1); + application_0.addResourceRequestSpec(priority_1, capability_0_1); + + Task task_0_0 = + new Task(application_0, priority_0, new String[] { host_0 }); + Task task_0_1 = + new Task(application_0, priority_1, new String[] { host_0 }); + application_0.addTask(task_0_0); + application_0.addTask(task_0_1); + + // Submit another application + Application application_1 = new Application("user_1", "b2", rm); + application_1.submit(); + application_1.addNodeManager(host_0, 1234, nm_0); + + Resource capability_1_0 = Resources.createResource(3 * GB, 1); + application_1.addResourceRequestSpec(priority_0, capability_1_0); + + Resource capability_1_1 = Resources.createResource(4 * GB, 1); + application_1.addResourceRequestSpec(priority_1, capability_1_1); + + Task task_1_0 = + new Task(application_1, priority_0, new String[] { host_0 }); + Task task_1_1 = + new Task(application_1, priority_1, new String[] { host_0 }); + application_1.addTask(task_1_0); + application_1.addTask(task_1_1); + + // Send resource requests to the scheduler + application_0.schedule(); + + application_1.schedule(); + + // Send a heartbeat to kick the tires on the Scheduler + LOG.info("Kick!"); + + // task_0_0, used=1G + nodeUpdate(rm, nm_0); + + // Get allocations from the scheduler + application_0.schedule(); + application_1.schedule(); + // 1 Task per heart beat should be scheduled + checkNodeResourceUsage(4 * GB, nm_0); // task_0_0 (1G) + checkApplicationResourceUsage(1 * GB, application_0); + checkApplicationResourceUsage(3 * GB, application_1); + + // Another heartbeat + nodeUpdate(rm, nm_0); + application_0.schedule(); + checkApplicationResourceUsage(3 * GB, application_0); + application_1.schedule(); + checkApplicationResourceUsage(7 * GB, application_1); + checkNodeResourceUsage(10 * GB, nm_0); + LOG.info("--- START: testAssignMultiple ---"); + } + + private void nodeUpdate(ResourceManager rm, + org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm) { + RMNode node = rm.getRMContext().getRMNodes().get(nm.getNodeId()); + // Send a heartbeat to kick the tires on the Scheduler + NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node); + rm.getResourceScheduler().handle(nodeUpdate); + } + private void nodeUpdate( org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm) { RMNode node = resourceManager.getRMContext().getRMNodes().get(nm.getNodeId());