diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 9832729..ba4e5e3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -180,6 +180,13 @@ private static void addDeprecatedKeys() { YARN_PREFIX + "scheduler.maximum-allocation-vcores"; public static final int DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES = 4; + /** Threshold for container size for making a container reservation as a ratio + * to the maximum request. */ + public static final String RM_SCHEDULER_RESERVATION_THRESHOLD_MAX_RATIO = + YARN_PREFIX + "scheduler.reservation-threshold.maxmimum-ratio"; + public static final float DEFAULT_RM_SCHEDULER_RESERVATION_THRESHOLD_MAX_RATIO + = 0.5f; + /** Number of threads to handle scheduler interface.*/ public static final String RM_SCHEDULER_CLIENT_THREAD_COUNT = RM_PREFIX + "scheduler.client.thread-count"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index cfec915..7af1891 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -543,10 +543,23 @@ private Resource assignContainer( return container.getResource(); } - // The desired container won't fit here, so reserve - reserve(request.getPriority(), node, container, reserved); + if (isReservable(container)) { + // The desired container won't fit here, so reserve + reserve(request.getPriority(), node, container, reserved); - return FairScheduler.CONTAINER_RESERVED; + return FairScheduler.CONTAINER_RESERVED; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Not creating reservation as container " + container.getId() + + " is not reservable"); + } + return Resources.none(); + } + } + + private boolean isReservable(Container container) { + return scheduler.isAtLeastReservationThreshold( + getQueue().getPolicy().getResourceCalculator(), container.getResource()); } private boolean hasNodeOrRackLocalRequests(Priority priority) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 3eefb8f..6a67f7e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -194,7 +195,11 @@ private AllocationFileLoaderService allocsLoader; @VisibleForTesting AllocationConfiguration allocConf; - + + // Container size threshold for making a reservation. + @VisibleForTesting + Resource reservationThreshold; + public FairScheduler() { super(FairScheduler.class.getName()); clock = new SystemClock(); @@ -203,6 +208,12 @@ public FairScheduler() { maxRunningEnforcer = new MaxRunningAppsEnforcer(this); } + public boolean isAtLeastReservationThreshold( + ResourceCalculator resourceCalculator, Resource resource) { + return Resources.greaterThanOrEqual( + resourceCalculator, clusterResource, resource, reservationThreshold); + } + private void validateConf(Configuration conf) { // validate scheduler memory allocation setting int minMem = conf.getInt( @@ -1310,6 +1321,7 @@ private void initScheduler(Configuration conf) throws IOException { validateConf(this.conf); minimumAllocation = this.conf.getMinimumAllocation(); initMaximumResourceCapability(this.conf.getMaximumAllocation()); + updateReservationThreshold(); incrAllocation = this.conf.getIncrementAllocation(); continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled(); continuousSchedulingSleepMs = @@ -1377,6 +1389,26 @@ private void initScheduler(Configuration conf) throws IOException { } } + private void updateReservationThreshold() { + Resource newThreshold = Resources.multiply( + getMaximumResourceCapability(), + this.conf.getReservationThresholdToMaxRatio()); + + reservationThreshold = newThreshold; + } + + @Override + protected void updateMaximumAllocation(SchedulerNode node, boolean add) { + super.updateMaximumAllocation(node, add); + updateReservationThreshold(); + } + + @Override + protected void refreshMaximumAllocation(Resource newMaxAlloc) { + super.refreshMaximumAllocation(newMaxAlloc); + updateReservationThreshold(); + } + private synchronized void startSchedulerThreads() { Preconditions.checkNotNull(updateThread, "updateThread is null"); Preconditions.checkNotNull(allocsLoader, "allocsLoader is null"); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java index e477e6e..c2a53e9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java @@ -166,7 +166,13 @@ public Resource getIncrementAllocation() { DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES); return Resources.createResource(incrementMemory, incrementCores); } - + + public float getReservationThresholdToMaxRatio() { + return getFloat( + YarnConfiguration.RM_SCHEDULER_RESERVATION_THRESHOLD_MAX_RATIO, + YarnConfiguration.DEFAULT_RM_SCHEDULER_RESERVATION_THRESHOLD_MAX_RATIO); + } + public float getLocalityThresholdNode() { return getFloat(LOCALITY_THRESHOLD_NODE, DEFAULT_LOCALITY_THRESHOLD_NODE); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index 403c8ea..3de13db 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -64,6 +64,7 @@ protected Configuration conf; protected FairScheduler scheduler; protected ResourceManager resourceManager; + public static final float TEST_RESERVATION_THRESHOLD = 0.09f; // Helper methods public Configuration createConfiguration() { @@ -76,6 +77,10 @@ public Configuration createConfiguration() { conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 10240); conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, false); conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f); + + conf.setFloat( + YarnConfiguration.RM_SCHEDULER_RESERVATION_THRESHOLD_MAX_RATIO, + TEST_RESERVATION_THRESHOLD); return conf; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index c352cc9..78bed3b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -707,9 +707,10 @@ public void testSimpleContainerAllocation() throws IOException { scheduler.handle(updateEvent); // Asked for less than increment allocation. - assertEquals(FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + assertEquals( + FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemory()); + getResourceUsage().getMemory()); NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2); scheduler.handle(updateEvent2); @@ -761,7 +762,7 @@ public void testSimpleContainerReservation() throws Exception { // Make sure queue 2 is waiting with a reservation assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). - getResourceUsage().getMemory()); + getResourceUsage().getMemory()); assertEquals(1024, scheduler.getSchedulerApp(attId).getCurrentReservation().getMemory()); // Now another node checks in with capacity @@ -936,8 +937,113 @@ public void testContainerReservationNotExceedingQueueMax() throws Exception { getResourceUsage().getMemory()); } - + @Test + public void testReservationThresholdGatesReservations() throws Exception { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println("drf" + + ""); + out.println(""); + out.close(); + + // Set threshold to 0.2 * 10240 ==> 2048 MB + conf.setFloat( + YarnConfiguration.RM_SCHEDULER_RESERVATION_THRESHOLD_MAX_RATIO, 0.2f); + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node + RMNode node1 = + MockNodes + .newNodeInfo(1, Resources.createResource(2048, 2), 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + // Queue 1 requests full capacity of node + createSchedulingRequest(2048, 2, "queue1", "user1", 1, 1); + scheduler.update(); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); + + scheduler.handle(updateEvent); + + // Make sure queue 1 is allocated app capacity + assertEquals(2048, scheduler.getQueueManager().getQueue("queue1"). + getResourceUsage().getMemory()); + + // Now queue 2 requests below threshold + ApplicationAttemptId attId = createSchedulingRequest(1024, "queue2", "user1", 1); + scheduler.update(); + scheduler.handle(updateEvent); + + // Make sure queue 2 has no reservation + assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). + getResourceUsage().getMemory()); + assertEquals(0, + scheduler.getSchedulerApp(attId).getReservedContainers().size()); + + // Now queue requests CPU above threshold + createSchedulingRequestExistingApplication(1024, 2, 1, attId); + scheduler.update(); + scheduler.handle(updateEvent); + // Make sure queue 2 is waiting with a reservation + assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). + getResourceUsage().getMemory()); + assertEquals(2, scheduler.getSchedulerApp(attId).getCurrentReservation() + .getVirtualCores()); + + // Now another node checks in with capacity + RMNode node2 = + MockNodes + .newNodeInfo(1, Resources.createResource(1024, 2), 2, "127.0.0.2"); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2); + scheduler.handle(nodeEvent2); + scheduler.handle(updateEvent2); + + // Make sure this goes to queue 2 + assertEquals(2, scheduler.getQueueManager().getQueue("queue2"). + getResourceUsage().getVirtualCores()); + + // The old reservation should still be there... + assertEquals(2, scheduler.getSchedulerApp(attId).getCurrentReservation() + .getVirtualCores()); + // ... but it should disappear when we update the first node. + scheduler.handle(updateEvent); + assertEquals(0, scheduler.getSchedulerApp(attId).getCurrentReservation() + .getVirtualCores()); + } + + @Test + public void testReservationThresholdUpdatesWithMaxResource() + throws IOException { + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + Assert.assertEquals((int)(TEST_RESERVATION_THRESHOLD * 10240), + scheduler.reservationThreshold.getMemory()); + Assert.assertEquals((int)(TEST_RESERVATION_THRESHOLD * 4), + scheduler.reservationThreshold.getVirtualCores()); + + final int nodeMemory = 1024; + final int nodeCpu = 5; + Resource fullResource = Resource.newInstance(nodeMemory, nodeCpu); + SchedulerNode mockNode1 = mock(SchedulerNode.class); + when(mockNode1.getNodeID()).thenReturn(NodeId.newInstance("foo", 8080)); + when(mockNode1.getAvailableResource()).thenReturn(fullResource); + when(mockNode1.getTotalResource()).thenReturn(fullResource); + scheduler.updateMaximumAllocation(mockNode1, true); + + Assert.assertEquals((int)(TEST_RESERVATION_THRESHOLD * nodeMemory), + scheduler.reservationThreshold.getMemory()); + Assert.assertEquals((int)(TEST_RESERVATION_THRESHOLD * nodeCpu), + scheduler.reservationThreshold.getVirtualCores()); + } @Test public void testEmptyQueueName() throws Exception {