diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java index 820d2fa58bc..3b1471c0d25 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java @@ -43,6 +43,8 @@ private boolean allowPreempt = false; + private boolean reReserveEnabled = true; + public ResourceLimits(Resource limit) { this(limit, Resources.none()); } @@ -105,4 +107,12 @@ public Resource getNetLimit() { } return limit; } + + public boolean isReReserveEnabled() { + return reReserveEnabled; + } + + public void setReReserveEnabled(boolean reReserveEnabled) { + this.reReserveEnabled = reReserveEnabled; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityDiagnosticConstant.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityDiagnosticConstant.java index d3d4d9b4d33..b282c4e43ab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityDiagnosticConstant.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityDiagnosticConstant.java @@ -77,4 +77,5 @@ public final static String NODE_CAN_NOT_FIND_CONTAINER_TO_BE_UNRESERVED_WHEN_NEEDED = "Node can't find a container to be unreserved when needed"; + public final static String NODE_HAS_BEEN_RESERVED = "Node has been reserved"; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 57ee69026f6..f275538f67c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -3181,4 +3181,9 @@ private LeafQueue autoCreateLeafQueue( public void resetSchedulerMetrics() { CapacitySchedulerMetrics.destroy(); } + + @Override + public boolean isMultiNodePlacementEnabled() { + return multiNodePlacementEnabled; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java index ae74989a726..09353ca5280 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java @@ -102,4 +102,5 @@ */ Clock getClock(); + boolean isMultiNodePlacementEnabled(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index ca61dc6bad9..84c0171aaee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -25,6 +25,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -1037,10 +1038,22 @@ private CSAssignment allocateFromReservedContainer(Resource clusterResource, if (null != application) { ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager, node, SystemClock.getInstance().getTime(), application); + if (scheduler.isMultiNodePlacementEnabled()) { + currentResourceLimits.setReReserveEnabled(false); + } CSAssignment assignment = application.assignContainers( - clusterResource, candidates, currentResourceLimits, - schedulingMode, reservedContainer); - return assignment; + clusterResource, new SimpleCandidateNodeSet(node), + currentResourceLimits, schedulingMode, reservedContainer); + // Allow assignment only for allocation, + // or reservation when multi-node placement disabled + if (assignment.getAssignmentInformation().getNumAllocations() > 0 + || assignment.getAssignmentInformation().getNumReservations() > 0 + || assignment.getExcessReservation() != null || ( + assignment.getContainersToKill() != null && !assignment + .getContainersToKill().isEmpty())) { + return assignment; + } + continue; } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index 1f9f6eb7ee7..68b601e60ed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -628,11 +628,13 @@ private ContainerAllocation assignContainer(Resource clusterResource, } } - ContainerAllocation result = new ContainerAllocation(null, - pendingAsk.getPerAllocationResource(), AllocationState.RESERVED); - result.containerNodeType = type; - result.setToKillContainers(null); - return result; + if (rmContainer == null || currentResoureLimits.isReReserveEnabled()) { + ContainerAllocation result = new ContainerAllocation(null, + pendingAsk.getPerAllocationResource(), AllocationState.RESERVED); + result.containerNodeType = type; + result.setToKillContainers(null); + return result; + } } // Skip the locality request ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( @@ -835,6 +837,16 @@ private ContainerAllocation allocate(Resource clusterResource, FiCaSchedulerNode node = iter.next(); if (reservedContainer == null) { + // Do not schedule if there are any reservations to fulfill on the node + if (node.getReservedContainer() != null) { + LOG.debug("Skipping scheduling on node {} since it has already been" + + " reserved by {}", node.getNodeID(), + node.getReservedContainer().getContainerId()); + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, schedulerKey, + ActivityDiagnosticConstant.NODE_HAS_BEEN_RESERVED); + continue; + } result = preCheckForNodeCandidateSet(clusterResource, node, schedulingMode, resourceLimits, schedulerKey); if (null != result) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java index 0e295765e84..25d5fa4c3c6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java @@ -302,4 +302,57 @@ public void testAllocateForReservedContainer() throws Exception { rm1.close(); } + + @Test(timeout=30000) + public void testSkipAllocationOnNodeReservedByAnotherApp() throws Exception { + CapacitySchedulerConfiguration newConf = + new CapacitySchedulerConfiguration(conf); + newConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER); + newConf.setInt(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME + + ".resource-based.sorting-interval.ms", 0); + newConf.setMaximumApplicationMasterResourcePerQueuePercent("root.default", + 1.0f); + MockRM rm1 = new MockRM(newConf); + + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); + MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(5 * GB, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // launch another app to queue, AM container should be launched in nm2 + RMApp app2 = rm1.submitApp(5 * GB, "app", "user", null, "default"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2); + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + FiCaSchedulerApp schedulerApp1 = + cs.getApplicationAttempt(am1.getApplicationAttemptId()); + FiCaSchedulerApp schedulerApp2 = + cs.getApplicationAttempt(am2.getApplicationAttemptId()); + + // Ask a container with 5GB memory size for app1, + // nm1 will reserve a container for app1 + am1.allocate("*", 4 * GB, 1, new ArrayList<>()); + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + + // Check containers of app1 and app2. + Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); + Assert.assertEquals(1, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(1, schedulerApp2.getLiveContainers().size()); + Assert.assertEquals(1, schedulerApp1.getReservedContainers().size()); + + // Kill app2 to release resource on nm2. + rm1.killApp(app2.getApplicationId()); + + // Trigger scheduling to allocate for the reserved container on nm2. + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); + Assert.assertEquals(2, schedulerApp1.getLiveContainers().size()); + + rm1.close(); + } }