diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceOption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceOption.java
index e9de05227ec..add44a39103 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceOption.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceOption.java
@@ -55,12 +55,16 @@ public static ResourceOption newInstance(Resource resource,
* Get timeout for tolerant of resource over-commitment
* Note: negative value means no timeout so that allocated containers will
* keep running until the end even under resource over-commitment cases.
- * @return overCommitTimeout of the ResourceOption
+ * @return overCommitTimeout of the ResourceOption in milliseconds
*/
@Private
@Evolving
public abstract int getOverCommitTimeout();
-
+
+ /**
+ * Set the over commit timeout.
+ * @param overCommitTimeout Timeout in ms. Negative means no timeout.
+ */
@Private
@Evolving
protected abstract void setOverCommitTimeout(int overCommitTimeout);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index a798b97af5f..a2f69549f6c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -104,6 +104,8 @@
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.SettableFuture;
@@ -830,6 +832,7 @@ public void updateNodeResource(RMNode nm,
writeLock.lock();
SchedulerNode node = getSchedulerNode(nm.getNodeID());
Resource newResource = resourceOption.getResource();
+ final int timeout = resourceOption.getOverCommitTimeout();
Resource oldResource = node.getTotalResource();
if (!oldResource.equals(newResource)) {
// Notify NodeLabelsManager about this change
@@ -838,7 +841,7 @@ public void updateNodeResource(RMNode nm,
// Log resource change
LOG.info("Update resource on node: " + node.getNodeName() + " from: "
- + oldResource + ", to: " + newResource);
+ + oldResource + ", to: " + newResource + " in " + timeout + " ms");
nodeTracker.removeNode(nm.getNodeID());
@@ -846,6 +849,27 @@ public void updateNodeResource(RMNode nm,
node.updateTotalResource(newResource);
nodeTracker.addNode((N) node);
+
+ // TODO this should be done properly
+ long remaining = node.getUnallocatedResource().getMemorySize();
+ if (remaining < 0) {
+ new Thread(() -> {
+ // Wait for the over commit timeout
+ try {
+ Thread.sleep(timeout);
+ } catch (Exception e) {
+ LOG.error("Cannot wait");
+ }
+ List containers = node.getRunningContainersWithAMsAtTheEnd();
+ long curRemaining = node.getUnallocatedResource().getMemorySize();
+ while (curRemaining < 0) {
+ RMContainer container = containers.remove(0);
+ curRemaining += container.getAllocatedResource().getMemorySize();
+ LOG.info("Killing a container to free up resources: " + curRemaining);
+ killContainer(container);
+ }
+ }).start();
+ }
} else{
// Log resource change
LOG.warn("Update resource on node: " + node.getNodeName()
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index aac7f15a5a5..11b67122dfe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -68,6 +68,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -178,9 +179,12 @@
import com.google.common.collect.ImmutableSet;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TestCapacityScheduler extends CapacitySchedulerTestBase {
- private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class);
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestCapacityScheduler.class);
private final static ContainerUpdates NULL_UPDATE_REQUESTS =
new ContainerUpdates();
private ResourceManager resourceManager = null;
@@ -1309,27 +1313,29 @@ public void testAllocateReorder() throws Exception {
@Test
public void testResourceOverCommit() throws Exception {
- int waitCount;
Configuration conf = new Configuration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
+ @SuppressWarnings("resource")
MockRM rm = new MockRM(conf);
rm.start();
+ ResourceScheduler scheduler = rm.getResourceScheduler();
+ AdminService admin = rm.getAdminService();
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * GB);
+ NodeId nmId1 = nm1.getNodeId();
RMApp app1 = rm.submitApp(2048);
// kick the scheduling, 2 GB given to AM1, remaining 2GB on nm1
nm1.nodeHeartbeat(true);
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
am1.registerAppAttempt();
- SchedulerNodeReport report_nm1 = rm.getResourceScheduler().getNodeReport(
- nm1.getNodeId());
+ SchedulerNodeReport report_nm1 = scheduler.getNodeReport(nmId1);
// check node report, 2 GB used and 2 GB available
- Assert.assertEquals(2 * GB, report_nm1.getUsedResource().getMemorySize());
- Assert.assertEquals(2 * GB, report_nm1.getAvailableResource().getMemorySize());
+ assertEquals(2 * GB, report_nm1.getUsedResource().getMemorySize());
+ assertEquals(2 * GB, report_nm1.getAvailableResource().getMemorySize());
- // add request for containers
+ // add request for 1 container of 2 GB
am1.addRequests(new String[] { "127.0.0.1", "127.0.0.2" }, 2 * GB, 1, 1);
AllocateResponse alloc1Response = am1.schedule(); // send the request
@@ -1342,77 +1348,123 @@ public void testResourceOverCommit() throws Exception {
}
List allocated1 = alloc1Response.getAllocatedContainers();
- Assert.assertEquals(1, allocated1.size());
- Assert.assertEquals(2 * GB, allocated1.get(0).getResource().getMemorySize());
- Assert.assertEquals(nm1.getNodeId(), allocated1.get(0).getNodeId());
+ assertEquals(1, allocated1.size());
+ Container c1 = allocated1.get(0);
+ assertEquals(2 * GB, c1.getResource().getMemorySize());
+ assertEquals(nmId1, c1.getNodeId());
- report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
+ report_nm1 = scheduler.getNodeReport(nmId1);
// check node report, 4 GB used and 0 GB available
- Assert.assertEquals(0, report_nm1.getAvailableResource().getMemorySize());
- Assert.assertEquals(4 * GB, report_nm1.getUsedResource().getMemorySize());
+ assertEquals(0, report_nm1.getAvailableResource().getMemorySize());
+ assertEquals(4 * GB, report_nm1.getUsedResource().getMemorySize());
- // check container is assigned with 2 GB.
- Container c1 = allocated1.get(0);
- Assert.assertEquals(2 * GB, c1.getResource().getMemorySize());
// update node resource to 2 GB, so resource is over-consumed.
- Map nodeResourceMap =
- new HashMap();
- nodeResourceMap.put(nm1.getNodeId(),
- ResourceOption.newInstance(Resource.newInstance(2 * GB, 1), -1));
- UpdateNodeResourceRequest request =
- UpdateNodeResourceRequest.newInstance(nodeResourceMap);
- AdminService as = ((MockRM)rm).getAdminService();
- as.updateNodeResource(request);
-
- waitCount = 0;
- while (waitCount++ != 20) {
- report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
- if (report_nm1.getAvailableResource().getMemorySize() != 0) {
- break;
- }
- LOG.info("Waiting for RMNodeResourceUpdateEvent to be handled... Tried "
- + waitCount + " times already..");
- Thread.sleep(1000);
- }
+ admin.updateNodeResource(UpdateNodeResourceRequest.newInstance(
+ Collections.singletonMap(nmId1, ResourceOption.newInstance(
+ Resource.newInstance(2 * GB, 1), -1))));
+
+ LOG.info("Waiting for RMNodeResourceUpdateEvent to be handled...");
+ GenericTestUtils.waitFor(() -> {
+ SchedulerNodeReport nmReport = scheduler.getNodeReport(nmId1);
+ return nmReport.getAvailableResource().getMemorySize() != 0;
+ }, 100, 10 * 1000);
+
// Now, the used resource is still 4 GB, and available resource is minus value.
- report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
- Assert.assertEquals(4 * GB, report_nm1.getUsedResource().getMemorySize());
- Assert.assertEquals(-2 * GB, report_nm1.getAvailableResource().getMemorySize());
+ report_nm1 = scheduler.getNodeReport(nmId1);
+ assertEquals(4 * GB, report_nm1.getUsedResource().getMemorySize());
+ assertEquals(-2 * GB, report_nm1.getAvailableResource().getMemorySize());
// Check container can complete successfully in case of resource over-commitment.
ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
c1.getId(), ContainerState.COMPLETE, "", 0, c1.getResource());
nm1.containerStatus(containerStatus);
- waitCount = 0;
- while (attempt1.getJustFinishedContainers().size() < 1
- && waitCount++ != 20) {
- LOG.info("Waiting for containers to be finished for app 1... Tried "
- + waitCount + " times already..");
- Thread.sleep(100);
- }
- Assert.assertEquals(1, attempt1.getJustFinishedContainers().size());
- Assert.assertEquals(1, am1.schedule().getCompletedContainersStatuses().size());
- report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
- Assert.assertEquals(2 * GB, report_nm1.getUsedResource().getMemorySize());
+
+ LOG.info("Waiting for containers to be finished for app 1...");
+ GenericTestUtils.waitFor(
+ () -> attempt1.getJustFinishedContainers().size() == 1, 100, 2000);
+ assertEquals(1, attempt1.getJustFinishedContainers().size());
+ assertEquals(1, am1.schedule().getCompletedContainersStatuses().size());
+ report_nm1 = scheduler.getNodeReport(nmId1);
+ assertEquals(2 * GB, report_nm1.getUsedResource().getMemorySize());
// As container return 2 GB back, the available resource becomes 0 again.
- Assert.assertEquals(0 * GB, report_nm1.getAvailableResource().getMemorySize());
+ assertEquals(0 * GB, report_nm1.getAvailableResource().getMemorySize());
// Verify no NPE is trigger in schedule after resource is updated.
am1.addRequests(new String[] { "127.0.0.1", "127.0.0.2" }, 3 * GB, 1, 1);
- alloc1Response = am1.schedule();
- Assert.assertEquals("Shouldn't have enough resource to allocate containers",
- 0, alloc1Response.getAllocatedContainers().size());
- int times = 0;
+ AllocateResponse allocResponse2 = am1.schedule();
+ assertTrue("Shouldn't have enough resource to allocate containers",
+ allocResponse2.getAllocatedContainers().isEmpty());
// try 10 times as scheduling is async process.
- while (alloc1Response.getAllocatedContainers().size() < 1
- && times++ < 10) {
+ for (int i = 0; i < 10; i++) {
LOG.info("Waiting for containers to be allocated for app 1... Tried "
- + times + " times already..");
+ + "{} times already..", i);
+ Thread.sleep(100);
+ allocResponse2 = am1.schedule();
+ assertTrue("Shouldn't have enough resource to allocate containers",
+ allocResponse2.getAllocatedContainers().isEmpty());
+ }
+
+
+ // Increase the resources again to 5 GB to schedule the 3GB container
+ admin.updateNodeResource(UpdateNodeResourceRequest.newInstance(
+ Collections.singletonMap(nmId1, ResourceOption.newInstance(
+ Resource.newInstance(5 * GB, 1), -1))));
+ GenericTestUtils.waitFor(() -> {
+ SchedulerNodeReport nmReport = scheduler.getNodeReport(nmId1);
+ return nmReport.getAvailableResource().getMemorySize() == 3 * GB;
+ }, 100, 5 * 1000);
+ report_nm1 = scheduler.getNodeReport(nmId1);
+ assertEquals(2 * GB, report_nm1.getUsedResource().getMemorySize());
+ assertEquals(3 * GB, report_nm1.getAvailableResource().getMemorySize());
+
+ nm1.nodeHeartbeat(true);
+ while (allocResponse2.getAllocatedContainers().isEmpty()) {
+ LOG.info("Waiting for containers to be created for app 1...");
Thread.sleep(100);
+ allocResponse2 = am1.schedule();
}
- Assert.assertEquals("Shouldn't have enough resource to allocate containers",
- 0, alloc1Response.getAllocatedContainers().size());
+ assertEquals(1, allocResponse2.getAllocatedContainers().size());
+ Container c2 = allocResponse2.getAllocatedContainers().get(0);
+ assertEquals(3 * GB, c2.getResource().getMemorySize());
+ assertEquals(nmId1, c2.getNodeId());
+
+ report_nm1 = scheduler.getNodeReport(nmId1);
+ assertEquals(5 * GB, report_nm1.getUsedResource().getMemorySize());
+ assertEquals(0 * GB, report_nm1.getAvailableResource().getMemorySize());
+
+
+ // Reduce the resources and wait for the container to be preempted
+ admin.updateNodeResource(UpdateNodeResourceRequest.newInstance(
+ Collections.singletonMap(nmId1, ResourceOption.newInstance(
+ Resource.newInstance(3 * GB, 1), 2000))));
+ // The container should be running for a couple seconds until timing out
+ GenericTestUtils.waitFor(() -> {
+ SchedulerNodeReport nmReport = scheduler.getNodeReport(nmId1);
+ return nmReport.getAvailableResource().getMemorySize() < 0;
+ }, 100, 5 * 1000);
+ report_nm1 = scheduler.getNodeReport(nmId1);
+ assertEquals(5 * GB, report_nm1.getUsedResource().getMemorySize());
+ assertEquals(-2 * GB, report_nm1.getAvailableResource().getMemorySize());
+
+ // Wait until the scheduler preempts the containers for some resources
+ GenericTestUtils.waitFor(() -> {
+ SchedulerNodeReport nmReport = scheduler.getNodeReport(nmId1);
+ return nmReport.getAvailableResource().getMemorySize() > 0;
+ }, 100, 5 * 1000);
+ report_nm1 = scheduler.getNodeReport(nmId1);
+ assertEquals(2 * GB, report_nm1.getUsedResource().getMemorySize());
+ assertEquals(1 * GB, report_nm1.getAvailableResource().getMemorySize());
+ AllocateResponse allocResponse3 = am1.schedule();
+ List completedContainers =
+ allocResponse3.getCompletedContainersStatuses();
+ assertEquals(1, completedContainers.size());
+ ContainerStatus c2status = completedContainers.get(0);
+ assertEquals(c2.getId(), c2status.getContainerId());
+ assertEquals(ContainerState.COMPLETE, c2status.getState());
+ assertEquals(ContainerExitStatus.PREEMPTED, c2status.getExitStatus());
+ assertEquals("Container preempted by scheduler", c2status.getDiagnostics());
+
rm.stop();
}