diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 68c7acf..f07becd 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -380,6 +380,11 @@ public static boolean isAclEnabled(Configuration conf) {
/** ACL used in case none is found. Allows nothing. */
public static final String DEFAULT_YARN_APP_ACL = " ";
+ /** The global max overallocation per node in terms of their capacity */
+ public static final String PER_NODE_MAX_OVERALLOCATION_RATIO =
+ RM_PREFIX + "overallocation.per-node-max-ratio";
+ public static final float DEFAULT_PER_NODE_MAX_OVERALLOCATION_RATIO = 4.0f;
+
/** Setting that controls whether opportunistic container allocation
* is enabled or not. */
@Unstable
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 95a8949..def64d4 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -3097,6 +3097,16 @@
+ The maximum amount of resources, specified as a ratio to node capacity,
+ that can be allocated to opportunistic containers on any given node in
+ the cluster.
+
+ yarn.resourcemanager.overallocation.per-node-max-ratio
+ 4.0
+
+
+
+
Frequency for computing least loaded NMs.
yarn.resourcemanager.nm-container-queuing.sorting-nodes-interval-ms
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 33079f7..d727131 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -116,6 +116,7 @@
new ClusterNodeTracker<>();
protected Resource minimumAllocation;
+ protected float maxOverAllocationRatioPerNode;
protected volatile RMContext rmContext;
@@ -195,6 +196,9 @@ public void serviceInit(Configuration conf) throws Exception {
nodeTracker.setConfiguredMaxAllocationWaitTime(
configuredMaximumAllocationWaitTime);
maxClusterLevelAppPriority = getMaxPriorityFromConf(conf);
+ maxOverAllocationRatioPerNode = conf.getFloat(
+ YarnConfiguration.PER_NODE_MAX_OVERALLOCATION_RATIO,
+ YarnConfiguration.DEFAULT_PER_NODE_MAX_OVERALLOCATION_RATIO);
createReleaseCache();
autoUpdateContainers =
conf.getBoolean(YarnConfiguration.RM_AUTO_UPDATE_CONTAINERS,
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
index e942981..b10cac5 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
@@ -90,10 +90,14 @@
protected Resource resourceAllocatedPendingLaunch =
Resource.newInstance(0, 0);
+ // The max amount of resources that can be allocated to opportunistic
+ // containers on the node, specified as a ratio to its capacity
+ private final float maxOverAllocationRatio;
+
private volatile Set labels = null;
public SchedulerNode(RMNode node, boolean usePortForNodeName,
- Set labels) {
+ Set labels, float maxOverAllocationRatio) {
this.rmNode = node;
this.unallocatedResource = Resources.clone(node.getTotalCapability());
this.capacity = Resources.clone(node.getTotalCapability());
@@ -103,10 +107,24 @@ public SchedulerNode(RMNode node, boolean usePortForNodeName,
nodeName = rmNode.getHostName();
}
this.labels = ImmutableSet.copyOf(labels);
+ this.maxOverAllocationRatio = maxOverAllocationRatio;
+ }
+
+ public SchedulerNode(RMNode node, boolean usePortForNodeName,
+ Set labels) {
+ this(node, usePortForNodeName, labels,
+ YarnConfiguration.DEFAULT_PER_NODE_MAX_OVERALLOCATION_RATIO);
+ }
+
+ public SchedulerNode(RMNode node, boolean usePortForNodeName,
+ float maxOverAllocationRatio) {
+ this(node, usePortForNodeName, CommonNodeLabelsManager.EMPTY_STRING_SET,
+ maxOverAllocationRatio);
}
public SchedulerNode(RMNode node, boolean usePortForNodeName) {
- this(node, usePortForNodeName, CommonNodeLabelsManager.EMPTY_STRING_SET);
+ this(node, usePortForNodeName, CommonNodeLabelsManager.EMPTY_STRING_SET,
+ YarnConfiguration.DEFAULT_PER_NODE_MAX_OVERALLOCATION_RATIO);
}
public RMNode getRMNode() {
@@ -604,9 +622,11 @@ public ResourceUtilization getNodeUtilization() {
/**
* Get the amount of resources that can be allocated to opportunistic
- * containers in the case of overallocation. It is calculated as
+ * containers in the case of overallocation, calculated as
* node capacity - (node utilization + resources of allocated-yet-not-started
- * containers).
+ * containers), subject to the maximum amount of resources that can be
+ * allocated to opportunistic containers on the node specified as a ratio to
+ * its capacity.
* @return the amount of resources that are available to be allocated to
* opportunistic containers
*/
@@ -639,11 +659,20 @@ public synchronized Resource allowedResourceForOverAllocation() {
Resource resourceAllowedForOpportunisticContainers =
Resources.createResource(allowedMemory, allowedCpu);
- // TODO cap the resources allocated to OPPORTUNISTIC containers on a node
- // in terms of its capacity. i.e. return min(max_ratio * capacity, allowed)
+ // cap the total amount of resources allocated to OPPORTUNISTIC containers
+ Resource maxOverallocation = getMaxOverallocationAllowed();
+ Resources.subtractFrom(maxOverallocation, allocatedResourceOpportunistic);
+ resourceAllowedForOpportunisticContainers = Resources.componentwiseMin(
+ maxOverallocation, resourceAllowedForOpportunisticContainers);
+
return resourceAllowedForOpportunisticContainers;
}
+ private Resource getMaxOverallocationAllowed() {
+ long maxMemory = (long) (capacity.getMemorySize() * maxOverAllocationRatio);
+ int maxVcore = (int) (capacity.getVirtualCores() * maxOverAllocationRatio);
+ return Resource.newInstance(maxMemory, maxVcore);
+ }
private static class ContainerInfo {
private final RMContainer container;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
index 95490f5..dec8790 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
@@ -66,10 +66,15 @@
// slated for preemption
private Resource totalResourcesPreempted = Resource.newInstance(0, 0);
+ @VisibleForTesting
public FSSchedulerNode(RMNode node, boolean usePortForNodeName) {
super(node, usePortForNodeName);
}
+ public FSSchedulerNode(RMNode node, boolean usePortForNodeName,
+ float maxOverallocationRatio) {
+ super(node, usePortForNodeName, maxOverallocationRatio);
+ }
/**
* Total amount of reserved resources including reservations and preempted
* containers.
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index c64bf53..d5c673b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -719,7 +719,7 @@ private void addNode(List containerReports,
try {
writeLock.lock();
FSSchedulerNode schedulerNode = new FSSchedulerNode(node,
- usePortForNodeName);
+ usePortForNodeName, maxOverAllocationRatioPerNode);
nodeTracker.addNode(schedulerNode);
triggerUpdate();
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index e70053c..19c2943 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -3058,6 +3058,113 @@ public void testAllocateOpportunisticContainersWithGuaranteedOnes()
}
}
+ /**
+ * Test that max overallocation per node is enforced by Fair Scheduler.
+ * @throws Exception
+ */
+ @Test
+ public void testMaxOverallocationPerNode() throws Exception {
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+ true);
+ // disable resource request normalization in fair scheduler
+ int memoryAllocationIncrement = conf.getInt(
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+ FairSchedulerConfiguration.
+ DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB);
+ conf.setInt(
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1);
+ int memoryAllocationMinimum = conf.getInt(
+ YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1);
+ float maxOverallocationRatio = conf.getFloat(
+ YarnConfiguration.PER_NODE_MAX_OVERALLOCATION_RATIO,
+ YarnConfiguration.DEFAULT_PER_NODE_MAX_OVERALLOCATION_RATIO);
+ conf.setFloat(YarnConfiguration.PER_NODE_MAX_OVERALLOCATION_RATIO, 1.5f);
+
+ try {
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ // Add a node with 1G of memory and 1 vcores and an overallocation threshold
+ // of 1.0f and 1.0f for memory and cpu respectively
+ OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance(
+ ResourceThresholds.newInstance(1f, 1f));
+ MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1,
+ Resources.createResource(1024, 1), overAllocationInfo);
+ scheduler.handle(new NodeAddedSchedulerEvent(node));
+
+ // create a scheduling request that takes up the whole node
+ ApplicationAttemptId appAttempt1 =
+ createSchedulingRequest(1024, "queue1", "user1", 1);
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+ assertEquals(1024, scheduler.getQueueManager().getQueue("queue1").
+ getGuaranteedResourceUsage().getMemorySize());
+ List allocatedContainers1 =
+ scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers();
+ assertTrue(allocatedContainers1.size() == 1);
+ assertEquals("unexpected container execution type",
+ ExecutionType.GUARANTEED,
+ allocatedContainers1.get(0).getExecutionType());
+
+ // node utilization is zero after the container runs
+ ContainerStatus containerStatus1 = ContainerStatus.newInstance(
+ allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "",
+ ContainerExitStatus.SUCCESS);
+ node.updateContainersAndNodeUtilization(
+ new UpdatedContainerInfo(Collections.singletonList(containerStatus1),
+ Collections.emptyList()),
+ ResourceUtilization.newInstance(0, 0, 0.0f));
+
+ // create a scheduling request that should get allocated an OPPORTUNISTIC
+ // container because the node utilization is zero
+ ApplicationAttemptId appAttempt2 =
+ createSchedulingRequest(1024, "queue2", "user1", 1);
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+ List allocatedContainers2 =
+ scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers();
+ assertTrue(allocatedContainers2.size() == 1);
+ assertEquals("unexpected container execution type",
+ ExecutionType.OPPORTUNISTIC,
+ allocatedContainers2.get(0).getExecutionType());
+ assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
+ getOpportunisticResourceUsage().getMemorySize());
+
+ // node utilization is still zero after the container runs
+ ContainerStatus containerStatus2 = ContainerStatus.newInstance(
+ allocatedContainers2.get(0).getId(), ContainerState.RUNNING, "",
+ ContainerExitStatus.SUCCESS);
+ node.updateContainersAndNodeUtilization(
+ new UpdatedContainerInfo(Collections.singletonList(containerStatus2),
+ Collections.emptyList()),
+ ResourceUtilization.newInstance(0, 0, 0.0f));
+
+ // create another scheduling request that should not get any allocation
+ // because of the max overallocation on the node will be exceeded.
+ ApplicationAttemptId appAttempt3 =
+ createSchedulingRequest(1024, "queue3", "user1", 1);
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+ assertEquals(0, scheduler.getQueueManager().getQueue("queue3").
+ getOpportunisticResourceUsage().getMemorySize());
+ List allocatedContainers3 =
+ scheduler.getSchedulerApp(appAttempt3).pullNewlyAllocatedContainers();
+ assertTrue(allocatedContainers3.size() == 0);
+ assertEquals(0, scheduler.getQueueManager().getQueue("queue3").
+ getOpportunisticResourceUsage().getMemorySize());
+ } finally {
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+ false);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ memoryAllocationMinimum);
+ conf.setInt(
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+ memoryAllocationIncrement);
+ conf.setFloat(YarnConfiguration.PER_NODE_MAX_OVERALLOCATION_RATIO,
+ maxOverallocationRatio);
+ }
+ }
+
@Test
public void testAclSubmitApplication() throws Exception {
// Set acl's