diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java index e664725af31..787d64bbb46 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java @@ -32,6 +32,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.locks.Lock; @@ -141,7 +142,29 @@ private PreemptableContainers identifyContainersToPreemptForOneContainer( PreemptableContainers bestContainers = null; int maxAMContainers = Integer.MAX_VALUE; - for (FSSchedulerNode node : potentialNodes) { + int preemptionBatchSize = scheduler.getConf().getPreemptionBatchSize(); + int size = potentialNodes.size(); + int start= 0; + int end = 0; + + /* + * It's too expensive to loop all nodes, such as when the request resource name is ANY. + * So we provide option to process only a batch size of nodes at a time. + * For example, if we have a batch size of 100 and 350 nodes in cluster. The loop maybe + * one of: 0-100, 100-200, 200-300, 300-350(0)-50. + */ + if (preemptionBatchSize >= 0) { + // find a start point somewhere in the list if it is long + if (size > preemptionBatchSize) { + Random rand = new Random(); + start = rand.nextInt(Math.max(size / preemptionBatchSize + 1, 1)) + * preemptionBatchSize; + } + end = preemptionBatchSize > size ? size : (start + preemptionBatchSize) % size; + } + + do { + FSSchedulerNode node = potentialNodes.get(start); PreemptableContainers preemptableContainers = identifyContainersToPreemptOnNode( rr.getCapability(), node, maxAMContainers); @@ -155,7 +178,13 @@ private PreemptableContainers identifyContainersToPreemptForOneContainer( break; } } - } + start++; + // flip at the end of the list + if (start >= size) { + start = 0; + } + } while (start != end); + return bestContainers; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java index e6b1de4e7f0..0535c751b95 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java @@ -161,6 +161,11 @@ CONF_PREFIX + "preemption.cluster-utilization-threshold"; protected static final float DEFAULT_PREEMPTION_THRESHOLD = 0.8f; + /** Max trial nodes num to identify containers for one starved container. */ + protected static final String PREEMPTION_BATCHSIZE = + CONF_PREFIX + "preemption.batchSize"; + protected static final int DEFAULT_PREEMPTION_BATCHSIZE = -1; + protected static final String WAIT_TIME_BEFORE_KILL = CONF_PREFIX + "waitTimeBeforeKill"; protected static final int DEFAULT_WAIT_TIME_BEFORE_KILL = 15000; @@ -376,6 +381,10 @@ public float getPreemptionUtilizationThreshold() { return getFloat(PREEMPTION_THRESHOLD, DEFAULT_PREEMPTION_THRESHOLD); } + public int getPreemptionBatchSize() { + return getInt(PREEMPTION_BATCHSIZE, DEFAULT_PREEMPTION_BATCHSIZE); + } + public boolean getAssignMultiple() { return getBoolean(ASSIGN_MULTIPLE, DEFAULT_ASSIGN_MULTIPLE); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/FairScheduler.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/FairScheduler.md index 5f9e7796636..57e43042ab2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/FairScheduler.md +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/FairScheduler.md @@ -66,6 +66,7 @@ Customizing the Fair Scheduler typically involves altering two files. First, sch | `yarn.scheduler.fair.user-as-default-queue` | Whether to use the username associated with the allocation as the default queue name, in the event that a queue name is not specified. If this is set to "false" or unset, all jobs have a shared default queue, named "default". Defaults to true. If a queue placement policy is given in the allocations file, this property is ignored. | | `yarn.scheduler.fair.preemption` | Whether to use preemption. Defaults to false. | | `yarn.scheduler.fair.preemption.cluster-utilization-threshold` | The utilization threshold after which preemption kicks in. The utilization is computed as the maximum ratio of usage to capacity among all resources. Defaults to 0.8f. | +| `yarn.scheduler.fair.preemption.batchSize` | The max trial nodes num to identify containers for one starved container. This can improve preemption performance in a large cluster by using just a partial list of nodes to check for preemptable containers. Defaults to -1. | | `yarn.scheduler.fair.sizebasedweight` | Whether to assign shares to individual apps based on their size, rather than providing an equal share to all apps regardless of size. When set to true, apps are weighted by the natural logarithm of one plus the app's total requested memory, divided by the natural logarithm of 2. Defaults to false. | | `yarn.scheduler.fair.assignmultiple` | Whether to allow multiple container assignments in one heartbeat. Defaults to false. | | `yarn.scheduler.fair.dynamic.max.assign` | If assignmultiple is true, whether to dynamically determine the amount of resources that can be assigned in one heartbeat. When turned on, about half of the un-allocated resources on the node are allocated to containers in a single heartbeat. Defaults to true. |