diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 2e8a7c14ebe..fba4510235f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -36,6 +36,8 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateUtils; import org.apache.commons.lang3.time.FastDateFormat; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -449,7 +451,16 @@ public boolean updateResourceRequests( writeLock.lock(); try { if (!isStopped) { - return appSchedulingInfo.updateResourceRequests(requests, false); + boolean updated = + appSchedulingInfo.updateResourceRequests(requests, false); + if (updated) { + requests.stream() + .filter(r -> ResourceRequest.ANY.equals(r.getResourceName()) + && r.getNumContainers() == 0) + .forEach(r -> killExcessReserveContainers( + SchedulerRequestKey.create(r))); + } + return updated; } return false; } finally { @@ -466,7 +477,15 @@ public boolean updateSchedulingRequests( writeLock.lock(); try { if (!isStopped) { - return appSchedulingInfo.updateSchedulingRequests(requests, false); + boolean updated = + appSchedulingInfo.updateSchedulingRequests(requests, false); + if (updated) { + requests.stream() + .filter(r -> r.getResourceSizing().getNumAllocations() == 0) + .forEach(r -> killExcessReserveContainers( + SchedulerRequestKey.create(r))); + } + return updated; } return false; } finally { @@ -1468,4 +1487,18 @@ public String getDiagnosticMessage() { public Map getApplicationSchedulingEnvs() { return this.applicationSchedulingEnvs; } + + public void killExcessReserveContainers(SchedulerRequestKey schedulerKey) { + Map excessReservedContainers = + reservedContainers.get(schedulerKey); + if (excessReservedContainers != null) { + LOG.info("Killing excess reserved containers {} with scheduler key {} " + + "which has no pending resource.", excessReservedContainers, + schedulerKey); + excessReservedContainers.values().stream().forEach( + c -> rmContext.getDispatcher().getEventHandler().handle( + new ContainerPreemptEvent(getApplicationAttemptId(), c, + SchedulerEventType.KILL_RESERVED_CONTAINER))); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 71aa865ccfc..662a7cf7e8b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -561,9 +561,9 @@ public boolean apply(Resource cluster, ResourceCommitRequest updateResourceRequestFunc = + (am, numContainers) -> { + try { + am.allocate("*", 7 * GB, numContainers, new ArrayList<>()); + } catch (Exception e1) { + e1.printStackTrace(); + } + return null; + }; + testExcessReservationWillBeUnreserved(updateResourceRequestFunc); + //TODO enable test case for scheduling request after supporting multi-nodes +// BiFunction updateSchedulingRequestFunc = +// (am, numContainers) -> { +// try { +// SchedulingRequest schedulingRequest = +// SchedulingRequest.newBuilder().resourceSizing( +// ResourceSizing.newInstance(numContainers, +// Resource.newInstance(7 * GB, 1))).build(); +// am.addSchedulingRequest(ImmutableList.of(schedulingRequest)); +// am.schedule(); +// } catch (Exception e) { +// e.printStackTrace(); +// } +// return null; +// }; +// testExcessReservationWillBeUnreserved(updateSchedulingRequestFunc); + } + + public void testExcessReservationWillBeUnreserved( + BiFunction updateRequestFunc) throws Exception { + CapacitySchedulerConfiguration newConf = + new CapacitySchedulerConfiguration(conf); + newConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER); + newConf.setInt(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME + + ".resource-based.sorting-interval.ms", 0); + newConf.setMaximumApplicationMasterResourcePerQueuePercent("root.default", + 1.0f); + MockRM rm1 = new MockRM(newConf); + + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); + MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(2 * GB, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // launch another app to queue, AM container should be launched in nm2 + RMApp app2 = rm1.submitApp(2 * GB, "app", "user", null, "default"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2); + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + LeafQueue leafQueue = (LeafQueue) cs.getQueue("default"); + FiCaSchedulerApp schedulerApp1 = + cs.getApplicationAttempt(am1.getApplicationAttemptId()); + FiCaSchedulerApp schedulerApp2 = + cs.getApplicationAttempt(am2.getApplicationAttemptId()); + + /* + * Verify that reserved container will be unreserved + * after its ask has been cancelled. + */ + // Request a container with 7GB memory size for app1, + // nm1 will reserve a container for app1 + updateRequestFunc.apply(am1, 1); + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + + // Check if a container allocated and a container reserved for app1, + // and a container allocated for app2 + Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); + Assert.assertEquals(1, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(1, schedulerApp1.getReservedContainers().size()); + Assert.assertEquals(1, schedulerApp2.getLiveContainers().size()); + + // Cancel asks of app1 + // am1.allocate("*", 7 * GB, 0, new ArrayList<>()); + updateRequestFunc.apply(am1, 0); + rm1.drainEvents(); + + // App1's reservation will be cancelled + Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); + Assert.assertEquals(0, schedulerApp1.getReservedContainers().size()); + Assert.assertEquals(2 * GB, + cs.getNode(nm1.getNodeId()).getAllocatedResource().getMemorySize()); + Assert.assertEquals(4 * GB, + cs.getRootQueue().getQueueResourceUsage().getUsed().getMemorySize()); + Assert.assertEquals(0, + cs.getRootQueue().getQueueResourceUsage().getReserved() + .getMemorySize()); + Assert.assertEquals(0, + leafQueue.getQueueResourceUsage().getReserved().getMemorySize()); + + /* + * Verify that reserved container will be unreserved + * after its ask has been satisfied. + */ + // Request a container with 7GB memory size for app1, + // nm1 will reserve container for app1 + // am1.allocate("*", 7 * GB, 1, new ArrayList<>()); + updateRequestFunc.apply(am1, 1); + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + + // Check if a container allocated and a container reserved for app1, + // and a container allocated for app2 + Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); + Assert.assertEquals(1, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(1, schedulerApp1.getReservedContainers().size()); + Assert.assertEquals(1, schedulerApp2.getLiveContainers().size()); + + // Kill app2 to make room for app1 + rm1.killApp(am2.getApplicationAttemptId().getApplicationId()); + + // Do node heartbeats once, ask of app1 will be satisfied on nm2 + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + rm1.drainEvents(); + + // App1's reservation will be cancelled + Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); + Assert.assertEquals(2, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(0, schedulerApp1.getReservedContainers().size()); + Assert.assertEquals(2 * GB, + cs.getNode(nm1.getNodeId()).getAllocatedResource().getMemorySize()); + Assert.assertEquals(7 * GB, + cs.getNode(nm2.getNodeId()).getAllocatedResource().getMemorySize()); + Assert.assertEquals(9 * GB, + cs.getRootQueue().getQueueResourceUsage().getUsed().getMemorySize()); + Assert.assertEquals(0, + cs.getRootQueue().getQueueResourceUsage().getReserved() + .getMemorySize()); + Assert.assertEquals(0, + leafQueue.getQueueResourceUsage().getReserved().getMemorySize()); + + rm1.close(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java index 22b311db4dd..f9459980029 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java @@ -416,7 +416,7 @@ public void testExcessReservationWillBeUnreserved() throws Exception { // Cancel asks of app2 and re-kick RM am2.allocate("*", 4 * GB, 0, new ArrayList()); - cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + rm1.drainEvents(); // App2's reservation will be cancelled Assert.assertTrue(schedulerApp2.getReservedContainers().size() == 0); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java index 9cfddd66f66..d6c632e5d10 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java @@ -536,7 +536,7 @@ public RMNodeLabelsManager createNodeLabelManager() { // Cancel asks of app2 and re-kick RM am1.allocate("*", 4 * GB, 0, new ArrayList()); - cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + rm1.drainEvents(); Assert.assertEquals(5 * GB, cs.getRootQueue().getQueueResourceUsage() .getUsed("x").getMemorySize());