diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index a18ef7c..77548da 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -180,6 +180,14 @@ private static void addDeprecatedKeys() {
YARN_PREFIX + "scheduler.maximum-allocation-vcores";
public static final int DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES = 4;
+ /** Threshold for container size for making a container reservation as a
+ * multiple of increment allocation */
+ public static final String
+ RM_SCHEDULER_RESERVATION_THRESHOLD_INCERMENT_MULTIPLE =
+ YARN_PREFIX + "scheduler.reservation-threshold.increment-multiple";
+ public static final float
+ DEFAULT_RM_SCHEDULER_RESERVATION_THRESHOLD_INCREMENT_MULTIPLE = 2f;
+
/** Number of threads to handle scheduler interface.*/
public static final String RM_SCHEDULER_CLIENT_THREAD_COUNT =
RM_PREFIX + "scheduler.client.thread-count";
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 62ba599..8fe5b97 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2249,5 +2249,14 @@
yarn.nodemanager.log-aggregation.policy.parameters
+
+
+
+ Defines a threshold above only above which are containers allowed to
+ reserve a node. This is defined as a multiple of increment allocation.
+
+ yarn.scheduler.reservation-threshold.increment-multiple
+ 2f
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index cfec915..7af1891 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -543,10 +543,23 @@ private Resource assignContainer(
return container.getResource();
}
- // The desired container won't fit here, so reserve
- reserve(request.getPriority(), node, container, reserved);
+ if (isReservable(container)) {
+ // The desired container won't fit here, so reserve
+ reserve(request.getPriority(), node, container, reserved);
- return FairScheduler.CONTAINER_RESERVED;
+ return FairScheduler.CONTAINER_RESERVED;
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Not creating reservation as container " + container.getId()
+ + " is not reservable");
+ }
+ return Resources.none();
+ }
+ }
+
+ private boolean isReservable(Container container) {
+ return scheduler.isAtLeastReservationThreshold(
+ getQueue().getPolicy().getResourceCalculator(), container.getResource());
}
private boolean hasNodeOrRackLocalRequests(Priority priority) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 3eefb8f..911b209 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -194,7 +194,11 @@
private AllocationFileLoaderService allocsLoader;
@VisibleForTesting
AllocationConfiguration allocConf;
-
+
+ // Container size threshold for making a reservation.
+ @VisibleForTesting
+ Resource reservationThreshold;
+
public FairScheduler() {
super(FairScheduler.class.getName());
clock = new SystemClock();
@@ -203,6 +207,12 @@ public FairScheduler() {
maxRunningEnforcer = new MaxRunningAppsEnforcer(this);
}
+ public boolean isAtLeastReservationThreshold(
+ ResourceCalculator resourceCalculator, Resource resource) {
+ return Resources.greaterThanOrEqual(
+ resourceCalculator, clusterResource, resource, reservationThreshold);
+ }
+
private void validateConf(Configuration conf) {
// validate scheduler memory allocation setting
int minMem = conf.getInt(
@@ -1311,6 +1321,7 @@ private void initScheduler(Configuration conf) throws IOException {
minimumAllocation = this.conf.getMinimumAllocation();
initMaximumResourceCapability(this.conf.getMaximumAllocation());
incrAllocation = this.conf.getIncrementAllocation();
+ updateReservationThreshold();
continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
continuousSchedulingSleepMs =
this.conf.getContinuousSchedulingSleepMs();
@@ -1377,6 +1388,14 @@ private void initScheduler(Configuration conf) throws IOException {
}
}
+ private void updateReservationThreshold() {
+ Resource newThreshold = Resources.multiply(
+ getIncrementResourceCapability(),
+ this.conf.getReservationThresholdIncrementMultiple());
+
+ reservationThreshold = newThreshold;
+ }
+
private synchronized void startSchedulerThreads() {
Preconditions.checkNotNull(updateThread, "updateThread is null");
Preconditions.checkNotNull(allocsLoader, "allocsLoader is null");
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
index e477e6e..c94618a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
@@ -18,8 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -166,7 +164,13 @@ public Resource getIncrementAllocation() {
DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES);
return Resources.createResource(incrementMemory, incrementCores);
}
-
+
+ public float getReservationThresholdIncrementMultiple() {
+ return getFloat(
+ YarnConfiguration.RM_SCHEDULER_RESERVATION_THRESHOLD_INCERMENT_MULTIPLE,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_RESERVATION_THRESHOLD_INCREMENT_MULTIPLE);
+ }
+
public float getLocalityThresholdNode() {
return getFloat(LOCALITY_THRESHOLD_NODE, DEFAULT_LOCALITY_THRESHOLD_NODE);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
index 403c8ea..210404d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
@@ -64,6 +64,7 @@
protected Configuration conf;
protected FairScheduler scheduler;
protected ResourceManager resourceManager;
+ public static final float TEST_RESERVATION_THRESHOLD = 0.09f;
// Helper methods
public Configuration createConfiguration() {
@@ -76,6 +77,10 @@ public Configuration createConfiguration() {
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 10240);
conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, false);
conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f);
+
+ conf.setFloat(
+ YarnConfiguration.RM_SCHEDULER_RESERVATION_THRESHOLD_INCERMENT_MULTIPLE,
+ TEST_RESERVATION_THRESHOLD);
return conf;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index c352cc9..cae30b8 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -707,9 +707,10 @@ public void testSimpleContainerAllocation() throws IOException {
scheduler.handle(updateEvent);
// Asked for less than increment allocation.
- assertEquals(FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+ assertEquals(
+ FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
scheduler.getQueueManager().getQueue("queue1").
- getResourceUsage().getMemory());
+ getResourceUsage().getMemory());
NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2);
scheduler.handle(updateEvent2);
@@ -761,7 +762,7 @@ public void testSimpleContainerReservation() throws Exception {
// Make sure queue 2 is waiting with a reservation
assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
- getResourceUsage().getMemory());
+ getResourceUsage().getMemory());
assertEquals(1024, scheduler.getSchedulerApp(attId).getCurrentReservation().getMemory());
// Now another node checks in with capacity
@@ -936,8 +937,88 @@ public void testContainerReservationNotExceedingQueueMax() throws Exception {
getResourceUsage().getMemory());
}
-
+ @Test
+ public void testReservationThresholdGatesReservations() throws Exception {
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("");
+ out.println("");
+ out.println("drf" +
+ "");
+ out.println("");
+ out.close();
+ // Set threshold to 2 * 1024 ==> 2048 MB & 2 * 1 ==> 2 vcores (test will
+ // use vcores)
+ conf.setFloat(
+ YarnConfiguration.RM_SCHEDULER_RESERVATION_THRESHOLD_INCERMENT_MULTIPLE,
+ 2f);
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ // Add a node
+ RMNode node1 =
+ MockNodes
+ .newNodeInfo(1, Resources.createResource(4096, 4), 1, "127.0.0.1");
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ scheduler.handle(nodeEvent1);
+
+ // Queue 1 requests full capacity of node
+ createSchedulingRequest(4096, 4, "queue1", "user1", 1, 1);
+ scheduler.update();
+ NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
+
+ scheduler.handle(updateEvent);
+
+ // Make sure queue 1 is allocated app capacity
+ assertEquals(4096, scheduler.getQueueManager().getQueue("queue1").
+ getResourceUsage().getMemory());
+
+ // Now queue 2 requests below threshold
+ ApplicationAttemptId attId = createSchedulingRequest(1024, "queue2", "user1", 1);
+ scheduler.update();
+ scheduler.handle(updateEvent);
+
+ // Make sure queue 2 has no reservation
+ assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
+ getResourceUsage().getMemory());
+ assertEquals(0,
+ scheduler.getSchedulerApp(attId).getReservedContainers().size());
+
+ // Now queue requests CPU above threshold
+ createSchedulingRequestExistingApplication(1024, 3, 1, attId);
+ scheduler.update();
+ scheduler.handle(updateEvent);
+
+ // Make sure queue 2 is waiting with a reservation
+ assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
+ getResourceUsage().getMemory());
+ assertEquals(3, scheduler.getSchedulerApp(attId).getCurrentReservation()
+ .getVirtualCores());
+
+ // Now another node checks in with capacity
+ RMNode node2 =
+ MockNodes
+ .newNodeInfo(1, Resources.createResource(1024, 4), 2, "127.0.0.2");
+ NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+ NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2);
+ scheduler.handle(nodeEvent2);
+ scheduler.handle(updateEvent2);
+
+ // Make sure this goes to queue 2
+ assertEquals(3, scheduler.getQueueManager().getQueue("queue2").
+ getResourceUsage().getVirtualCores());
+
+ // The old reservation should still be there...
+ assertEquals(3, scheduler.getSchedulerApp(attId).getCurrentReservation()
+ .getVirtualCores());
+ // ... but it should disappear when we update the first node.
+ scheduler.handle(updateEvent);
+ assertEquals(0, scheduler.getSchedulerApp(attId).getCurrentReservation()
+ .getVirtualCores());
+ }
@Test
public void testEmptyQueueName() throws Exception {