diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceOption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceOption.java index e9de05227ec..add44a39103 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceOption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceOption.java @@ -55,12 +55,16 @@ public static ResourceOption newInstance(Resource resource, * Get timeout for tolerant of resource over-commitment * Note: negative value means no timeout so that allocated containers will * keep running until the end even under resource over-commitment cases. - * @return overCommitTimeout of the ResourceOption + * @return overCommitTimeout of the ResourceOption in milliseconds */ @Private @Evolving public abstract int getOverCommitTimeout(); - + + /** + * Set the over commit timeout. + * @param overCommitTimeout Timeout in ms. Negative means no timeout. + */ @Private @Evolving protected abstract void setOverCommitTimeout(int overCommitTimeout); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index a798b97af5f..a2f69549f6c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -104,6 +104,8 @@ import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.SettableFuture; @@ -830,6 +832,7 @@ public void updateNodeResource(RMNode nm, writeLock.lock(); SchedulerNode node = getSchedulerNode(nm.getNodeID()); Resource newResource = resourceOption.getResource(); + final int timeout = resourceOption.getOverCommitTimeout(); Resource oldResource = node.getTotalResource(); if (!oldResource.equals(newResource)) { // Notify NodeLabelsManager about this change @@ -838,7 +841,7 @@ public void updateNodeResource(RMNode nm, // Log resource change LOG.info("Update resource on node: " + node.getNodeName() + " from: " - + oldResource + ", to: " + newResource); + + oldResource + ", to: " + newResource + " in " + timeout + " ms"); nodeTracker.removeNode(nm.getNodeID()); @@ -846,6 +849,27 @@ public void updateNodeResource(RMNode nm, node.updateTotalResource(newResource); nodeTracker.addNode((N) node); + + // TODO this should be done properly + long remaining = node.getUnallocatedResource().getMemorySize(); + if (remaining < 0) { + new Thread(() -> { + // Wait for the over commit timeout + try { + Thread.sleep(timeout); + } catch (Exception e) { + LOG.error("Cannot wait"); + } + List containers = node.getRunningContainersWithAMsAtTheEnd(); + long curRemaining = node.getUnallocatedResource().getMemorySize(); + while (curRemaining < 0) { + RMContainer container = containers.remove(0); + curRemaining += container.getAllocatedResource().getMemorySize(); + LOG.info("Killing a container to free up resources: " + curRemaining); + killContainer(container); + } + }).start(); + } } else{ // Log resource change LOG.warn("Update resource on node: " + node.getNodeName() diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index aac7f15a5a5..11b67122dfe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -178,9 +179,12 @@ import com.google.common.collect.ImmutableSet; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TestCapacityScheduler extends CapacitySchedulerTestBase { - private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class); + private static final Logger LOG = + LoggerFactory.getLogger(TestCapacityScheduler.class); private final static ContainerUpdates NULL_UPDATE_REQUESTS = new ContainerUpdates(); private ResourceManager resourceManager = null; @@ -1309,27 +1313,29 @@ public void testAllocateReorder() throws Exception { @Test public void testResourceOverCommit() throws Exception { - int waitCount; Configuration conf = new Configuration(); conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class); + @SuppressWarnings("resource") MockRM rm = new MockRM(conf); rm.start(); + ResourceScheduler scheduler = rm.getResourceScheduler(); + AdminService admin = rm.getAdminService(); MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * GB); + NodeId nmId1 = nm1.getNodeId(); RMApp app1 = rm.submitApp(2048); // kick the scheduling, 2 GB given to AM1, remaining 2GB on nm1 nm1.nodeHeartbeat(true); RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); am1.registerAppAttempt(); - SchedulerNodeReport report_nm1 = rm.getResourceScheduler().getNodeReport( - nm1.getNodeId()); + SchedulerNodeReport report_nm1 = scheduler.getNodeReport(nmId1); // check node report, 2 GB used and 2 GB available - Assert.assertEquals(2 * GB, report_nm1.getUsedResource().getMemorySize()); - Assert.assertEquals(2 * GB, report_nm1.getAvailableResource().getMemorySize()); + assertEquals(2 * GB, report_nm1.getUsedResource().getMemorySize()); + assertEquals(2 * GB, report_nm1.getAvailableResource().getMemorySize()); - // add request for containers + // add request for 1 container of 2 GB am1.addRequests(new String[] { "127.0.0.1", "127.0.0.2" }, 2 * GB, 1, 1); AllocateResponse alloc1Response = am1.schedule(); // send the request @@ -1342,77 +1348,123 @@ public void testResourceOverCommit() throws Exception { } List allocated1 = alloc1Response.getAllocatedContainers(); - Assert.assertEquals(1, allocated1.size()); - Assert.assertEquals(2 * GB, allocated1.get(0).getResource().getMemorySize()); - Assert.assertEquals(nm1.getNodeId(), allocated1.get(0).getNodeId()); + assertEquals(1, allocated1.size()); + Container c1 = allocated1.get(0); + assertEquals(2 * GB, c1.getResource().getMemorySize()); + assertEquals(nmId1, c1.getNodeId()); - report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); + report_nm1 = scheduler.getNodeReport(nmId1); // check node report, 4 GB used and 0 GB available - Assert.assertEquals(0, report_nm1.getAvailableResource().getMemorySize()); - Assert.assertEquals(4 * GB, report_nm1.getUsedResource().getMemorySize()); + assertEquals(0, report_nm1.getAvailableResource().getMemorySize()); + assertEquals(4 * GB, report_nm1.getUsedResource().getMemorySize()); - // check container is assigned with 2 GB. - Container c1 = allocated1.get(0); - Assert.assertEquals(2 * GB, c1.getResource().getMemorySize()); // update node resource to 2 GB, so resource is over-consumed. - Map nodeResourceMap = - new HashMap(); - nodeResourceMap.put(nm1.getNodeId(), - ResourceOption.newInstance(Resource.newInstance(2 * GB, 1), -1)); - UpdateNodeResourceRequest request = - UpdateNodeResourceRequest.newInstance(nodeResourceMap); - AdminService as = ((MockRM)rm).getAdminService(); - as.updateNodeResource(request); - - waitCount = 0; - while (waitCount++ != 20) { - report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); - if (report_nm1.getAvailableResource().getMemorySize() != 0) { - break; - } - LOG.info("Waiting for RMNodeResourceUpdateEvent to be handled... Tried " - + waitCount + " times already.."); - Thread.sleep(1000); - } + admin.updateNodeResource(UpdateNodeResourceRequest.newInstance( + Collections.singletonMap(nmId1, ResourceOption.newInstance( + Resource.newInstance(2 * GB, 1), -1)))); + + LOG.info("Waiting for RMNodeResourceUpdateEvent to be handled..."); + GenericTestUtils.waitFor(() -> { + SchedulerNodeReport nmReport = scheduler.getNodeReport(nmId1); + return nmReport.getAvailableResource().getMemorySize() != 0; + }, 100, 10 * 1000); + // Now, the used resource is still 4 GB, and available resource is minus value. - report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); - Assert.assertEquals(4 * GB, report_nm1.getUsedResource().getMemorySize()); - Assert.assertEquals(-2 * GB, report_nm1.getAvailableResource().getMemorySize()); + report_nm1 = scheduler.getNodeReport(nmId1); + assertEquals(4 * GB, report_nm1.getUsedResource().getMemorySize()); + assertEquals(-2 * GB, report_nm1.getAvailableResource().getMemorySize()); // Check container can complete successfully in case of resource over-commitment. ContainerStatus containerStatus = BuilderUtils.newContainerStatus( c1.getId(), ContainerState.COMPLETE, "", 0, c1.getResource()); nm1.containerStatus(containerStatus); - waitCount = 0; - while (attempt1.getJustFinishedContainers().size() < 1 - && waitCount++ != 20) { - LOG.info("Waiting for containers to be finished for app 1... Tried " - + waitCount + " times already.."); - Thread.sleep(100); - } - Assert.assertEquals(1, attempt1.getJustFinishedContainers().size()); - Assert.assertEquals(1, am1.schedule().getCompletedContainersStatuses().size()); - report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); - Assert.assertEquals(2 * GB, report_nm1.getUsedResource().getMemorySize()); + + LOG.info("Waiting for containers to be finished for app 1..."); + GenericTestUtils.waitFor( + () -> attempt1.getJustFinishedContainers().size() == 1, 100, 2000); + assertEquals(1, attempt1.getJustFinishedContainers().size()); + assertEquals(1, am1.schedule().getCompletedContainersStatuses().size()); + report_nm1 = scheduler.getNodeReport(nmId1); + assertEquals(2 * GB, report_nm1.getUsedResource().getMemorySize()); // As container return 2 GB back, the available resource becomes 0 again. - Assert.assertEquals(0 * GB, report_nm1.getAvailableResource().getMemorySize()); + assertEquals(0 * GB, report_nm1.getAvailableResource().getMemorySize()); // Verify no NPE is trigger in schedule after resource is updated. am1.addRequests(new String[] { "127.0.0.1", "127.0.0.2" }, 3 * GB, 1, 1); - alloc1Response = am1.schedule(); - Assert.assertEquals("Shouldn't have enough resource to allocate containers", - 0, alloc1Response.getAllocatedContainers().size()); - int times = 0; + AllocateResponse allocResponse2 = am1.schedule(); + assertTrue("Shouldn't have enough resource to allocate containers", + allocResponse2.getAllocatedContainers().isEmpty()); // try 10 times as scheduling is async process. - while (alloc1Response.getAllocatedContainers().size() < 1 - && times++ < 10) { + for (int i = 0; i < 10; i++) { LOG.info("Waiting for containers to be allocated for app 1... Tried " - + times + " times already.."); + + "{} times already..", i); + Thread.sleep(100); + allocResponse2 = am1.schedule(); + assertTrue("Shouldn't have enough resource to allocate containers", + allocResponse2.getAllocatedContainers().isEmpty()); + } + + + // Increase the resources again to 5 GB to schedule the 3GB container + admin.updateNodeResource(UpdateNodeResourceRequest.newInstance( + Collections.singletonMap(nmId1, ResourceOption.newInstance( + Resource.newInstance(5 * GB, 1), -1)))); + GenericTestUtils.waitFor(() -> { + SchedulerNodeReport nmReport = scheduler.getNodeReport(nmId1); + return nmReport.getAvailableResource().getMemorySize() == 3 * GB; + }, 100, 5 * 1000); + report_nm1 = scheduler.getNodeReport(nmId1); + assertEquals(2 * GB, report_nm1.getUsedResource().getMemorySize()); + assertEquals(3 * GB, report_nm1.getAvailableResource().getMemorySize()); + + nm1.nodeHeartbeat(true); + while (allocResponse2.getAllocatedContainers().isEmpty()) { + LOG.info("Waiting for containers to be created for app 1..."); Thread.sleep(100); + allocResponse2 = am1.schedule(); } - Assert.assertEquals("Shouldn't have enough resource to allocate containers", - 0, alloc1Response.getAllocatedContainers().size()); + assertEquals(1, allocResponse2.getAllocatedContainers().size()); + Container c2 = allocResponse2.getAllocatedContainers().get(0); + assertEquals(3 * GB, c2.getResource().getMemorySize()); + assertEquals(nmId1, c2.getNodeId()); + + report_nm1 = scheduler.getNodeReport(nmId1); + assertEquals(5 * GB, report_nm1.getUsedResource().getMemorySize()); + assertEquals(0 * GB, report_nm1.getAvailableResource().getMemorySize()); + + + // Reduce the resources and wait for the container to be preempted + admin.updateNodeResource(UpdateNodeResourceRequest.newInstance( + Collections.singletonMap(nmId1, ResourceOption.newInstance( + Resource.newInstance(3 * GB, 1), 2000)))); + // The container should be running for a couple seconds until timing out + GenericTestUtils.waitFor(() -> { + SchedulerNodeReport nmReport = scheduler.getNodeReport(nmId1); + return nmReport.getAvailableResource().getMemorySize() < 0; + }, 100, 5 * 1000); + report_nm1 = scheduler.getNodeReport(nmId1); + assertEquals(5 * GB, report_nm1.getUsedResource().getMemorySize()); + assertEquals(-2 * GB, report_nm1.getAvailableResource().getMemorySize()); + + // Wait until the scheduler preempts the containers for some resources + GenericTestUtils.waitFor(() -> { + SchedulerNodeReport nmReport = scheduler.getNodeReport(nmId1); + return nmReport.getAvailableResource().getMemorySize() > 0; + }, 100, 5 * 1000); + report_nm1 = scheduler.getNodeReport(nmId1); + assertEquals(2 * GB, report_nm1.getUsedResource().getMemorySize()); + assertEquals(1 * GB, report_nm1.getAvailableResource().getMemorySize()); + AllocateResponse allocResponse3 = am1.schedule(); + List completedContainers = + allocResponse3.getCompletedContainersStatuses(); + assertEquals(1, completedContainers.size()); + ContainerStatus c2status = completedContainers.get(0); + assertEquals(c2.getId(), c2status.getContainerId()); + assertEquals(ContainerState.COMPLETE, c2status.getState()); + assertEquals(ContainerExitStatus.PREEMPTED, c2status.getExitStatus()); + assertEquals("Container preempted by scheduler", c2status.getDiagnostics()); + rm.stop(); }