diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 5dfef73..2d62be5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -1174,11 +1174,18 @@ boolean isStarved() { VisitedResourceRequestTracker visitedRRs = new VisitedResourceRequestTracker(scheduler.getNodeTracker()); + // Try to return at least the configured minimum number of RRs. This + // won't happen if the application has fewer RRs than this number that + // fit in the app's starvation. + int minRRsToReturn = scheduler.minResourceRequestsForPreemption; + // Start with current starvation and track the pending amount - Resource pending = getStarvation(); + Resource starvation = getStarvation(); + Resource pending = Resources.clone(starvation); + for (ResourceRequest rr : appSchedulingInfo.getAllResourceRequests()) { - if (Resources.isNone(pending)) { - // Found enough RRs to match the starvation + if (Resources.isNone(pending) && ret.size() >= minRRsToReturn) { + // Found enough RRs to match the starvation and returning at least the configured minimum break; } @@ -1188,12 +1195,13 @@ boolean isStarved() { } // A RR can have multiple containers of a capability. We need to - // compute the number of containers that fit in "pending". + // compute the number of containers that fit in "starvation". int numContainersThatFit = (int) Math.floor( Resources.ratio(scheduler.getResourceCalculator(), - pending, rr.getCapability())); + starvation, rr.getCapability())); + if (numContainersThatFit == 0) { - // This RR's capability is too large to fit in pending + // This RR's capability is too large to fit in starvation continue; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index db02bab..32d992a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -189,6 +189,8 @@ private FairSchedulerEventLog eventLog; // Machine-readable event log protected boolean assignMultiple; // Allocate multiple containers per // heartbeat + protected int minResourceRequestsForPreemption; + @VisibleForTesting boolean maxAssignDynamic; protected int maxAssign; // Max containers to assign per heartbeat @@ -1290,6 +1292,7 @@ private void initScheduler(Configuration conf) throws IOException { sizeBasedWeight = this.conf.getSizeBasedWeight(); usePortForNodeName = this.conf.getUsePortForNodeName(); reservableNodesRatio = this.conf.getReservableNodes(); + minResourceRequestsForPreemption = this.conf.getMinResourceRequestsForPreemption(); updateInterval = this.conf.getUpdateInterval(); if (updateInterval < 0) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java index 8e8e37b..e6ae9b0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java @@ -162,6 +162,11 @@ CONF_PREFIX + "reservable-nodes"; public static final float RESERVABLE_NODES_DEFAULT = 0.05f; + /** Minimum number of ResourceRequest instances that will be considered for preemption. */ + public static final String MIN_RESOURCE_REQUESTS_FOR_PREEMPTION = + CONF_PREFIX + "preemption-min-resource-requests"; + public static final int MIN_RESOURCE_REQUESTS_FOR_PREEMPTION_DEFAULT = 10; + public FairSchedulerConfiguration() { super(); } @@ -281,6 +286,10 @@ public float getReservableNodes() { return getFloat(RESERVABLE_NODES, RESERVABLE_NODES_DEFAULT); } + public int getMinResourceRequestsForPreemption() { + return getInt(MIN_RESOURCE_REQUESTS_FOR_PREEMPTION, MIN_RESOURCE_REQUESTS_FOR_PREEMPTION_DEFAULT); + } + /** * Parses a resource config value of a form like "1024", "1024 mb", * or "1024 mb, 3 vcores". If no units are given, megabytes are assumed. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index af4e1dd..efa8999 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -30,11 +30,10 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; -import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; @@ -42,8 +41,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics; - - +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -51,7 +49,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.Resources; - import org.junit.Assert; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -95,6 +92,7 @@ public Configuration createConfiguration() { conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, false); conf.setLong(FairSchedulerConfiguration.UPDATE_INTERVAL_MS, 10); conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f); + conf.setInt(FairSchedulerConfiguration.MIN_RESOURCE_REQUESTS_FOR_PREEMPTION, 3); conf.setFloat( FairSchedulerConfiguration diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java index 46187d9..0c3d551 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java @@ -21,32 +21,31 @@ import java.util.ArrayList; import java.util.List; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; -import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; - +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity - .TestUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; - +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Before; @@ -341,6 +340,36 @@ public void testHeadroomWithBlackListedNodes() { assertEquals(clusterResource, spyApp.getHeadroom()); } + @Test + public void testReturnMinResourceRequestsForPreemption() { + FSLeafQueue queue = scheduler.getQueueManager().getLeafQueue("queue", true); + RMContext rmContext = resourceManager.getRMContext(); + ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1); + ActiveUsersManager usersManager = Mockito.mock(ActiveUsersManager.class); + Mockito.doNothing().when(usersManager).activateApplication(Mockito.anyString(), Mockito.anyObject()); + FSAppAttempt schedulerApp = + new FSAppAttempt(scheduler, applicationAttemptId, "user1", queue, usersManager, rmContext); + + schedulerApp.setMinshareStarvation(Resource.newInstance(100, 10)); + assertTrue(schedulerApp.isStarved()); + assertEquals(schedulerApp.getStarvation(), Resource.newInstance(100, 10)); + + // Create three RRs, each of which is enough to satisfy the app's starvation. + List allRrs = new ArrayList<>(); + allRrs.add(ResourceRequest.newInstance( + Priority.newInstance(1), "*", Resource.newInstance(10, 1), 10)); + allRrs.add(ResourceRequest.newInstance( + Priority.newInstance(2), "*", Resource.newInstance(10, 1), 10)); + allRrs.add(ResourceRequest.newInstance( + Priority.newInstance(3), "*", Resource.newInstance(10, 1), 10)); + schedulerApp.updateResourceRequests(allRrs); + schedulerApp.updateDemand(); + + // Both should be equal to 3. + assertEquals(scheduler.minResourceRequestsForPreemption, + schedulerApp.getStarvedResourceRequests().size()); + } + private static long min(long value1, long value2, long value3) { return Math.min(Math.min(value1, value2), value3); }