diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceOption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceOption.java
index e9de05227ec..add44a39103 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceOption.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceOption.java
@@ -55,12 +55,16 @@ public static ResourceOption newInstance(Resource resource,
* Get timeout for tolerant of resource over-commitment
* Note: negative value means no timeout so that allocated containers will
* keep running until the end even under resource over-commitment cases.
- * @return overCommitTimeout of the ResourceOption
+ * @return overCommitTimeout of the ResourceOption in milliseconds
*/
@Private
@Evolving
public abstract int getOverCommitTimeout();
-
+
+ /**
+ * Set the over commit timeout.
+ * @param overCommitTimeout Timeout in ms. Negative means no timeout.
+ */
@Private
@Evolving
protected abstract void setOverCommitTimeout(int overCommitTimeout);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index c50950bbd9a..e82cd3cff15 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -677,6 +677,11 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
if (capability != null) {
nodeHeartBeatResponse.setResource(capability);
}
+ // Check if we got an event (AdminService) that updated the resources
+ if (rmNode.isUpdatedCapability()) {
+ nodeHeartBeatResponse.setResource(rmNode.getTotalCapability());
+ rmNode.resetUpdatedCapability();
+ }
// 7. Send Container Queuing Limits back to the Node. This will be used by
// the node to truncate the number of Containers queued for execution.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
index c77d29c89ae..d3b515e8241 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
@@ -104,6 +104,17 @@
*/
public Resource getTotalCapability();
+ /**
+ * If the total available resources has been updated.
+ * @return If the capability has been updated.
+ */
+ boolean isUpdatedCapability();
+
+ /**
+ * Mark that the updated event has been processed.
+ */
+ void resetUpdatedCapability();
+
/**
* the aggregated resource utilization of the containers.
* @return the aggregated resource utilization of the containers.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index d33ee44de4d..3b4ccc60524 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -125,6 +125,7 @@
/* Snapshot of total resources before receiving decommissioning command */
private volatile Resource originalTotalCapability;
private volatile Resource totalCapability;
+ private volatile boolean updatedCapability = false;
private final Node node;
private String healthReport;
@@ -455,6 +456,16 @@ public Resource getTotalCapability() {
return this.totalCapability;
}
+ @Override
+ public boolean isUpdatedCapability() {
+ return this.updatedCapability;
+ }
+
+ @Override
+ public void resetUpdatedCapability() {
+ this.updatedCapability = false;
+ }
+
@Override
public String getRackName() {
return node.getNetworkLocation();
@@ -813,11 +824,12 @@ private static void handleRunningAppOnNode(RMNodeImpl rmNode,
.handle(new RMAppRunningOnNodeEvent(appId, nodeId));
}
- private static void updateNodeResourceFromEvent(RMNodeImpl rmNode,
- RMNodeResourceUpdateEvent event){
- ResourceOption resourceOption = event.getResourceOption();
- // Set resource on RMNode
- rmNode.totalCapability = resourceOption.getResource();
+ private static void updateNodeResourceFromEvent(RMNodeImpl rmNode,
+ RMNodeResourceUpdateEvent event){
+ ResourceOption resourceOption = event.getResourceOption();
+ // Set resource on RMNode
+ rmNode.totalCapability = resourceOption.getResource();
+ rmNode.updatedCapability = true;
}
private static NodeHealthStatus updateRMNodeFromStatusEvents(
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index a798b97af5f..55df40e879e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -92,16 +92,16 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
-
-
-
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -118,6 +118,8 @@
private static final Log LOG = LogFactory.getLog(AbstractYarnScheduler.class);
+ private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0);
+
protected final ClusterNodeTracker nodeTracker =
new ClusterNodeTracker<>();
@@ -830,6 +832,7 @@ public void updateNodeResource(RMNode nm,
writeLock.lock();
SchedulerNode node = getSchedulerNode(nm.getNodeID());
Resource newResource = resourceOption.getResource();
+ final int timeout = resourceOption.getOverCommitTimeout();
Resource oldResource = node.getTotalResource();
if (!oldResource.equals(newResource)) {
// Notify NodeLabelsManager about this change
@@ -838,14 +841,17 @@ public void updateNodeResource(RMNode nm,
// Log resource change
LOG.info("Update resource on node: " + node.getNodeName() + " from: "
- + oldResource + ", to: " + newResource);
+ + oldResource + ", to: " + newResource + " in " + timeout + " ms");
nodeTracker.removeNode(nm.getNodeID());
// update resource to node
node.updateTotalResource(newResource);
+ node.setOvercommitTimeOut(timeout);
+ markContainersIfOvercommitted(node);
nodeTracker.addNode((N) node);
+
} else{
// Log resource change
LOG.warn("Update resource on node: " + node.getNodeName()
@@ -1188,6 +1194,10 @@ protected void nodeUpdate(RMNode nm) {
updateNodeResourceUtilization(nm, schedulerNode);
}
+ if (schedulerNode != null) {
+ markContainersIfOvercommitted(schedulerNode);
+ }
+
// Now node data structures are up-to-date and ready for scheduling.
if(LOG.isDebugEnabled()) {
LOG.debug(
@@ -1197,6 +1207,52 @@ protected void nodeUpdate(RMNode nm) {
}
}
+ /**
+ * Check if the node is over committed and needs to remove containers. It
+ * allows marking them for PREEMPT (notify the AM) or KILL.
+ * @param schedulerNode The node to check whether is over committed.
+ * @param kill If the container should be killed or just notify the AM.
+ */
+ private void markContainersIfOvercommitted(SchedulerNode schedulerNode) {
+
+ // If there is no time out, we don't mark anything
+ if (!schedulerNode.isOvercommitTimeOutSet()) {
+ return;
+ }
+
+ // Check if the node is over committed (negative resources)
+ ResourceCalculator rc = getResourceCalculator();
+ Resource unallocated = Resource.newInstance(
+ schedulerNode.getUnallocatedResource());
+ if (Resources.fitsIn(rc, ZERO_RESOURCE, unallocated)) {
+ return;
+ }
+
+ // Only send KILL requests if we passed the time out
+ SchedulerEventType eventType =
+ SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION;
+ if (schedulerNode.isOvercommitTimedOut()) {
+ eventType = SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE;
+ }
+
+ LOG.debug(schedulerNode.getNodeID() + " is over committed (" +
+ schedulerNode.getUnallocatedResource() + "), free up resources");
+ for (RMContainer container : schedulerNode.getContainersToKill()) {
+ LOG.info("Send " + eventType + " to " + container.getContainerId() +
+ " to free up " + container.getAllocatedResource());
+ ApplicationAttemptId appId = container.getApplicationAttemptId();
+ ContainerPreemptEvent event =
+ new ContainerPreemptEvent(appId, container, eventType);
+ this.rmContext.getDispatcher().getEventHandler().handle(event);
+ Resources.addTo(unallocated, container.getAllocatedResource());
+
+ if (Resources.fitsIn(rc, ZERO_RESOURCE, unallocated)) {
+ LOG.debug("Enough free resources " + unallocated);
+ break;
+ }
+ }
+ }
+
@Override
public Resource getNormalizedResource(Resource requestedResource,
Resource maxResourceCapability) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
index b35aeba83b3..ba5b99e64dd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.LinkedList;
import java.util.HashMap;
import java.util.List;
@@ -26,6 +28,8 @@
import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.commons.lang3.builder.CompareToBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -60,6 +64,8 @@
private static final Log LOG = LogFactory.getLog(SchedulerNode.class);
+ private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0);
+
private Resource unallocatedResource = Resource.newInstance(0, 0);
private Resource allocatedResource = Resource.newInstance(0, 0);
private Resource totalResource;
@@ -69,6 +75,8 @@
ResourceUtilization.newInstance(0, 0, 0f);
private volatile ResourceUtilization nodeUtilization =
ResourceUtilization.newInstance(0, 0, 0f);
+ /** Time stamp for over committed resources to time out. */
+ private long overcommitTimeout = -1;
/* set of containers that are allocated containers */
private final Map launchedContainers =
@@ -118,6 +126,38 @@ public synchronized void updateTotalResource(Resource resource){
this.allocatedResource);
}
+ /**
+ * Set the timeout for the node to stop over committing the resources. After
+ * this time the scheduler will start killing containers until the resources
+ * are not over committed anymore. This may reset a previous timeout.
+ * @param timeOut Time out in milliseconds.
+ */
+ public synchronized void setOvercommitTimeOut(long timeOut) {
+ if (timeOut >= 0) {
+ if (this.overcommitTimeout != -1) {
+ LOG.debug("The over commit timeout for " + getNodeID() +
+ " was already set to " + this.overcommitTimeout);
+ }
+ this.overcommitTimeout = Time.now() + timeOut;
+ }
+ }
+
+ /**
+ * Check if the time out has passed.
+ * @return If the node is over committed.
+ */
+ public synchronized boolean isOvercommitTimedOut() {
+ return this.overcommitTimeout >= 0 && Time.now() >= this.overcommitTimeout;
+ }
+
+ /**
+ * Check if the node has a time out for over commit resources.
+ * @return If the node has a time out for over commit resources.
+ */
+ public synchronized boolean isOvercommitTimeOutSet() {
+ return this.overcommitTimeout >= 0;
+ }
+
/**
* Get the ID of the node which contains both its hostname and port.
* @return The ID of the node.
@@ -371,6 +411,36 @@ public int getNumContainers() {
return result;
}
+ /**
+ * Get the containers running on the node ordered by which to kill first. It
+ * tries to kill AMs last, then GUARANTEED containers, and it kills
+ * OPPORTUNISTIC first. If the same time, it uses the creation time.
+ * @return A copy of the running containers ordered by which to kill first.
+ */
+ public List getContainersToKill() {
+ List result = getLaunchedContainers();
+ Collections.sort(result, (c1, c2) -> {
+ return new CompareToBuilder()
+ .append(c1.isAMContainer(), c2.isAMContainer())
+ .append(c2.getExecutionType(), c1.getExecutionType()) // reversed
+ .append(c2.getCreationTime(), c1.getCreationTime()) // reversed
+ .toComparison();
+ });
+ return result;
+ }
+
+ /**
+ * Get the containers launched in the node.
+ * @return List of containers launched.
+ */
+ protected synchronized List getLaunchedContainers() {
+ List result = new ArrayList<>();
+ for (ContainerInfo info : launchedContainers.values()) {
+ result.add(info.container);
+ }
+ return result;
+ }
+
/**
* Get the container for the specified container ID.
* @param containerId The container ID
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
index c0af0413a0f..3b72ca1c0e2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
@@ -190,6 +190,15 @@ public Resource getTotalCapability() {
return this.perNode;
}
+ @Override
+ public boolean isUpdatedCapability() {
+ return false;
+ }
+
+ @Override
+ public void resetUpdatedCapability() {
+ }
+
@Override
public String getRackName() {
return this.rackName;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
index ba409b1386b..0aa2e9eba86 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
@@ -18,12 +18,14 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -35,6 +37,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.*;
@@ -1018,4 +1021,95 @@ public void testContainerRecoveredByNode() throws Exception {
System.out.println("Stopping testContainerRecoveredByNode");
}
}
+
+ /**
+ * Test the order we get the containers to kill. It should respect the order
+ * described in {@link SchedulerNode#getContainersToKill()}.
+ */
+ @Test
+ public void testGetRunningContainersToKill() {
+ final SchedulerNode node = new MockSchedulerNode();
+ assertEquals(Collections.emptyList(), node.getContainersToKill());
+
+ // AM
+ RMContainer am0 = newMockRMContainer(
+ true, ExecutionType.GUARANTEED, "AM0");
+ node.allocateContainer(am0);
+ assertEquals(Arrays.asList(am0), node.getContainersToKill());
+
+ // OPPORTUNISTIC, AM0
+ RMContainer opp0 = newMockRMContainer(
+ false, ExecutionType.OPPORTUNISTIC, "OPPORTUNISTIC0");
+ node.allocateContainer(opp0);
+ assertEquals(Arrays.asList(opp0, am0), node.getContainersToKill());
+
+ // OPPORTUNISTIC, GUARANTEED, AM0
+ RMContainer regular0 = newMockRMContainer(
+ false, ExecutionType.GUARANTEED, "GUARANTEED0");
+ node.allocateContainer(regular0);
+ assertEquals(Arrays.asList(opp0, regular0, am0),
+ node.getContainersToKill());
+
+ // OPPORTUNISTIC1, OPPORTUNISTIC0, GUARANTEED0, AM0
+ RMContainer opp1 = newMockRMContainer(
+ false, ExecutionType.OPPORTUNISTIC, "OPPORTUNISTIC1");
+ node.allocateContainer(opp1);
+ assertEquals(Arrays.asList(opp1, opp0, regular0, am0),
+ node.getContainersToKill());
+
+ // OPPORTUNISTIC1, OPPORTUNISTIC0, GUARANTEED0, AM1, AM0
+ RMContainer am1 = newMockRMContainer(
+ true, ExecutionType.GUARANTEED, "AM1");
+ node.allocateContainer(am1);
+ assertEquals(Arrays.asList(opp1, opp0, regular0, am1, am0),
+ node.getContainersToKill());
+
+ // OPPORTUNISTIC1, OPPORTUNISTIC0, GUARANTEED1, GUARANTEED0, AM1, AM0
+ RMContainer regular1 = newMockRMContainer(
+ false, ExecutionType.GUARANTEED, "GUARANTEED0");
+ node.allocateContainer(regular1);
+ assertEquals(Arrays.asList(opp1, opp0, regular1, regular0, am1, am0),
+ node.getContainersToKill());
+ }
+
+ private static RMContainer newMockRMContainer(boolean isAMContainer,
+ ExecutionType executionType, String name) {
+ RMContainer container = mock(RMContainer.class);
+ when(container.isAMContainer()).thenReturn(isAMContainer);
+ when(container.getExecutionType()).thenReturn(executionType);
+ when(container.getCreationTime()).thenReturn(Time.now());
+ when(container.toString()).thenReturn(name);
+ return container;
+ }
+
+ /**
+ * SchedulerNode mock to test launching containers.
+ */
+ class MockSchedulerNode extends SchedulerNode {
+ final List containers = new ArrayList<>();
+
+ public MockSchedulerNode() {
+ super(MockNodes.newNodeInfo(
+ 0, Resource.newInstance(1, 1), 1, "127.0.0.1"), false);
+ }
+
+ @Override
+ protected List getLaunchedContainers() {
+ return containers;
+ }
+
+ @Override
+ public void allocateContainer(RMContainer rmContainer) {
+ containers.add(rmContainer);
+ // Shuffle for testing
+ Collections.shuffle(containers);
+ }
+
+ @Override
+ public void reserveResource(SchedulerApplicationAttempt attempt,
+ SchedulerRequestKey schedulerKey, RMContainer container) {}
+
+ @Override
+ public void unreserveResource(SchedulerApplicationAttempt attempt) {}
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java
index 60e25ed83ac..92d866ed722 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java
@@ -27,7 +27,7 @@
import java.util.Set;
public class CapacitySchedulerTestBase {
- protected final int GB = 1024;
+ protected final static int GB = 1024;
protected static final String A = CapacitySchedulerConfiguration.ROOT + ".a";
protected static final String B = CapacitySchedulerConfiguration.ROOT + ".b";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index aac7f15a5a5..8e53d6dd37a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -21,6 +21,7 @@
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_VCORES;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -41,12 +42,12 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeoutException;
import com.google.common.collect.Sets;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.net.NetworkTopology;
@@ -57,6 +58,7 @@
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@@ -68,6 +70,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -76,6 +79,8 @@
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.PreemptionContainer;
+import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
@@ -178,9 +183,12 @@
import com.google.common.collect.ImmutableSet;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TestCapacityScheduler extends CapacitySchedulerTestBase {
- private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class);
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestCapacityScheduler.class);
private final static ContainerUpdates NULL_UPDATE_REQUESTS =
new ContainerUpdates();
private ResourceManager resourceManager = null;
@@ -1309,110 +1317,155 @@ public void testAllocateReorder() throws Exception {
@Test
public void testResourceOverCommit() throws Exception {
- int waitCount;
Configuration conf = new Configuration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
MockRM rm = new MockRM(conf);
rm.start();
+ ResourceScheduler scheduler = rm.getResourceScheduler();
- MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * GB);
- RMApp app1 = rm.submitApp(2048);
- // kick the scheduling, 2 GB given to AM1, remaining 2GB on nm1
- nm1.nodeHeartbeat(true);
- RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
- MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
- am1.registerAppAttempt();
- SchedulerNodeReport report_nm1 = rm.getResourceScheduler().getNodeReport(
- nm1.getNodeId());
- // check node report, 2 GB used and 2 GB available
- Assert.assertEquals(2 * GB, report_nm1.getUsedResource().getMemorySize());
- Assert.assertEquals(2 * GB, report_nm1.getAvailableResource().getMemorySize());
+ MockNM nm = rm.registerNode("127.0.0.1:1234", 4 * GB);
+ NodeId nmId = nm.getNodeId();
+ RMApp app = rm.submitApp(2048);
+ // kick the scheduling, 2 GB given to AM1, remaining 2GB on nm
+ nm.nodeHeartbeat(true);
+ RMAppAttempt attempt1 = app.getCurrentAppAttempt();
+ MockAM am = rm.sendAMLaunched(attempt1.getAppAttemptId());
+ am.registerAppAttempt();
+ assertMemory(scheduler, nmId, 2 * GB, 2 * GB);
- // add request for containers
- am1.addRequests(new String[] { "127.0.0.1", "127.0.0.2" }, 2 * GB, 1, 1);
- AllocateResponse alloc1Response = am1.schedule(); // send the request
+ // add request for 1 container of 2 GB
+ am.addRequests(new String[] {"127.0.0.1", "127.0.0.2"}, 2 * GB, 1, 1);
+ AllocateResponse alloc1Response = am.schedule(); // send the request
// kick the scheduler, 2 GB given to AM1, resource remaining 0
- nm1.nodeHeartbeat(true);
- while (alloc1Response.getAllocatedContainers().size() < 1) {
+ nm.nodeHeartbeat(true);
+ while (alloc1Response.getAllocatedContainers().isEmpty()) {
LOG.info("Waiting for containers to be created for app 1...");
Thread.sleep(100);
- alloc1Response = am1.schedule();
+ alloc1Response = am.schedule();
}
List allocated1 = alloc1Response.getAllocatedContainers();
- Assert.assertEquals(1, allocated1.size());
- Assert.assertEquals(2 * GB, allocated1.get(0).getResource().getMemorySize());
- Assert.assertEquals(nm1.getNodeId(), allocated1.get(0).getNodeId());
-
- report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
- // check node report, 4 GB used and 0 GB available
- Assert.assertEquals(0, report_nm1.getAvailableResource().getMemorySize());
- Assert.assertEquals(4 * GB, report_nm1.getUsedResource().getMemorySize());
-
- // check container is assigned with 2 GB.
+ assertEquals(1, allocated1.size());
Container c1 = allocated1.get(0);
- Assert.assertEquals(2 * GB, c1.getResource().getMemorySize());
-
- // update node resource to 2 GB, so resource is over-consumed.
- Map nodeResourceMap =
- new HashMap();
- nodeResourceMap.put(nm1.getNodeId(),
- ResourceOption.newInstance(Resource.newInstance(2 * GB, 1), -1));
- UpdateNodeResourceRequest request =
- UpdateNodeResourceRequest.newInstance(nodeResourceMap);
- AdminService as = ((MockRM)rm).getAdminService();
- as.updateNodeResource(request);
-
- waitCount = 0;
- while (waitCount++ != 20) {
- report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
- if (report_nm1.getAvailableResource().getMemorySize() != 0) {
- break;
- }
- LOG.info("Waiting for RMNodeResourceUpdateEvent to be handled... Tried "
- + waitCount + " times already..");
- Thread.sleep(1000);
- }
- // Now, the used resource is still 4 GB, and available resource is minus value.
- report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
- Assert.assertEquals(4 * GB, report_nm1.getUsedResource().getMemorySize());
- Assert.assertEquals(-2 * GB, report_nm1.getAvailableResource().getMemorySize());
+ assertEquals(2 * GB, c1.getResource().getMemorySize());
+ assertEquals(nmId, c1.getNodeId());
- // Check container can complete successfully in case of resource over-commitment.
+ // check node report, 4 GB used and 0 GB available
+ assertMemory(scheduler, nmId, 4 * GB, 0);
+ nm.nodeHeartbeat(true);
+ assertEquals(4 * GB, nm.getCapability().getMemorySize());
+
+ // update node resource to 2 GB, so resource is over-consumed
+ updateNodeResource(rm, nmId, 2 * GB, 2, -1);
+ // the used resource should still 4 GB and negative available resource
+ waitMemory(scheduler, nmId, 4 * GB, -2 * GB, 200, 5 * 1000);
+ // check that we did not get a preemption requests
+ assertNull(am.schedule().getPreemptionMessage());
+
+ // check that the NM got the updated resources
+ nm.nodeHeartbeat(true);
+ assertEquals(2 * GB, nm.getCapability().getMemorySize());
+
+ // check container can complete successfully with resource over-commitment
ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
c1.getId(), ContainerState.COMPLETE, "", 0, c1.getResource());
- nm1.containerStatus(containerStatus);
- waitCount = 0;
- while (attempt1.getJustFinishedContainers().size() < 1
- && waitCount++ != 20) {
- LOG.info("Waiting for containers to be finished for app 1... Tried "
- + waitCount + " times already..");
+ nm.containerStatus(containerStatus);
+
+ LOG.info("Waiting for containers to be finished for app 1...");
+ GenericTestUtils.waitFor(
+ () -> attempt1.getJustFinishedContainers().size() == 1, 100, 2000);
+ assertEquals(1, am.schedule().getCompletedContainersStatuses().size());
+ assertMemory(scheduler, nmId, 2 * GB, 0);
+
+ // verify no NPE is trigger in schedule after resource is updated
+ am.addRequests(new String[] {"127.0.0.1", "127.0.0.2"}, 3 * GB, 1, 1);
+ AllocateResponse allocResponse2 = am.schedule();
+ assertTrue("Shouldn't have enough resource to allocate containers",
+ allocResponse2.getAllocatedContainers().isEmpty());
+ // try 10 times as scheduling is an async process
+ for (int i = 0; i < 10; i++) {
Thread.sleep(100);
+ allocResponse2 = am.schedule();
+ assertTrue("Shouldn't have enough resource to allocate containers",
+ allocResponse2.getAllocatedContainers().isEmpty());
}
- Assert.assertEquals(1, attempt1.getJustFinishedContainers().size());
- Assert.assertEquals(1, am1.schedule().getCompletedContainersStatuses().size());
- report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
- Assert.assertEquals(2 * GB, report_nm1.getUsedResource().getMemorySize());
- // As container return 2 GB back, the available resource becomes 0 again.
- Assert.assertEquals(0 * GB, report_nm1.getAvailableResource().getMemorySize());
-
- // Verify no NPE is trigger in schedule after resource is updated.
- am1.addRequests(new String[] { "127.0.0.1", "127.0.0.2" }, 3 * GB, 1, 1);
- alloc1Response = am1.schedule();
- Assert.assertEquals("Shouldn't have enough resource to allocate containers",
- 0, alloc1Response.getAllocatedContainers().size());
- int times = 0;
- // try 10 times as scheduling is async process.
- while (alloc1Response.getAllocatedContainers().size() < 1
- && times++ < 10) {
- LOG.info("Waiting for containers to be allocated for app 1... Tried "
- + times + " times already..");
+
+ // increase the resources again to 5 GB to schedule the 3GB container
+ updateNodeResource(rm, nmId, 5 * GB, 2, -1);
+ waitMemory(scheduler, nmId, 2 * GB, 3 * GB, 100, 5 * 1000);
+
+ // kick the scheduling and check it took effect
+ nm.nodeHeartbeat(true);
+ while (allocResponse2.getAllocatedContainers().isEmpty()) {
+ LOG.info("Waiting for containers to be created for app 1...");
Thread.sleep(100);
+ allocResponse2 = am.schedule();
}
- Assert.assertEquals("Shouldn't have enough resource to allocate containers",
- 0, alloc1Response.getAllocatedContainers().size());
+ assertEquals(1, allocResponse2.getAllocatedContainers().size());
+ Container c2 = allocResponse2.getAllocatedContainers().get(0);
+ assertEquals(3 * GB, c2.getResource().getMemorySize());
+ assertEquals(nmId, c2.getNodeId());
+ assertMemory(scheduler, nmId, 5 * GB, 0);
+
+ // reduce the resources and trigger a preempt request to the AM for c2
+ updateNodeResource(rm, nmId, 3 * GB, 2, 2 * 1000);
+ waitMemory(scheduler, nmId, 5 * GB, -2 * GB, 200, 5 * 1000);
+
+ PreemptionMessage preemptMsg = am.schedule().getPreemptionMessage();
+ assertNotNull(preemptMsg);
+ Set preemptContainers =
+ preemptMsg.getContract().getContainers();
+ assertEquals(1, preemptContainers.size());
+ PreemptionContainer preemptContainer = preemptContainers.iterator().next();
+ assertEquals(c2.getId(), preemptContainer.getId());
+
+ // increasing the resources again, should stop killing the containers
+ updateNodeResource(rm, nmId, 5 * GB, 2, -1);
+ waitMemory(scheduler, nmId, 5 * GB, 0, 200, 5 * 1000);
+ Thread.sleep(3 * 1000);
+ assertMemory(scheduler, nmId, 5 * GB, 0);
+
+ // reduce the resources again to trigger a preempt request to the AM for c2
+ long t0 = Time.now();
+ updateNodeResource(rm, nmId, 3 * GB, 2, 2 * 1000);
+ waitMemory(scheduler, nmId, 5 * GB, -2 * GB, 200, 5 * 1000);
+
+ preemptMsg = am.schedule().getPreemptionMessage();
+ assertNotNull(preemptMsg);
+ preemptContainers = preemptMsg.getContract().getContainers();
+ assertEquals(1, preemptContainers.size());
+ preemptContainer = preemptContainers.iterator().next();
+ assertEquals(c2.getId(), preemptContainer.getId());
+
+ // wait until the scheduler kills the container
+ GenericTestUtils.waitFor(() -> {
+ try {
+ nm.nodeHeartbeat(true); // trigger preemption in the NM
+ } catch (Exception e) {
+ LOG.error("Cannot heartbeat", e);
+ }
+ SchedulerNodeReport report = scheduler.getNodeReport(nmId);
+ return report.getAvailableResource().getMemorySize() > 0;
+ }, 200, 5 * 1000);
+ assertMemory(scheduler, nmId, 2 * GB, 1 * GB);
+
+ List completedContainers =
+ am.schedule().getCompletedContainersStatuses();
+ assertEquals(1, completedContainers.size());
+ ContainerStatus c2status = completedContainers.get(0);
+ assertEquals(c2.getId(), c2status.getContainerId());
+ assertEquals(ContainerState.COMPLETE, c2status.getState());
+ assertEquals(ContainerExitStatus.PREEMPTED, c2status.getExitStatus());
+ assertEquals("Container preempted by scheduler", c2status.getDiagnostics());
+
+ long timeToKill = Time.now() - t0;
+ assertTrue("Took too short to kill: " + timeToKill + "ms",
+ timeToKill > 2000);
+ assertTrue("Took too long to kill: " + timeToKill + "ms",
+ timeToKill < 2500);
+
rm.stop();
}