diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java
index 6836758..1f363f5 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java
@@ -35,6 +35,7 @@
public class ComputeFairShares {
private static final int COMPUTE_FAIR_SHARES_ITERATIONS = 25;
+ private static final double RESOURCE_USAGE_EPSILON = 0.0001;
/**
* Compute fair share of the given schedulables.Fair share is an allocation of
@@ -139,16 +140,48 @@ private static void computeSharesInternal(
getResourceValue(totalResources, type));
double rMax = 1.0;
- while (resourceUsedWithWeightToResourceRatio(rMax, schedulables, type)
- < totalResource) {
+ boolean firstTime = true;
+ // calculate the start value based on minShare to make sure
+ // currentRU will change for every iteration.
+ for (Schedulable sched : schedulables) {
+ double weight = sched.getWeights().getWeight(type);
+ if (weight > 0) {
+ int minShare = getResourceValue(sched.getMinShare(), type);
+ if (weight >= minShare) {
+ rMax = 1.0;
+ break;
+ } else {
+ double t = (double)minShare / weight;
+ // get the minimum ratio.
+ if (firstTime) {
+ rMax = t;
+ firstTime = false;
+ } else {
+ rMax = Math.min(rMax, t);
+ }
+ }
+ }
+ }
+
+ double previousRU = 0.0;
+ while (true) {
+ // Continue as long as we are making progress towards finding a ratio
+ // that would lead to exhausting all available resources
+ double currentRU =
+ resourceUsedWithWeightToResourceRatio(rMax, schedulables, type);
+ if ((currentRU - previousRU < RESOURCE_USAGE_EPSILON) ||
+ (currentRU >= totalResource)) {
+ break;
+ }
rMax *= 2.0;
+ previousRU = currentRU;
}
// Perform the binary search for up to COMPUTE_FAIR_SHARES_ITERATIONS steps
double left = 0;
double right = rMax;
for (int i = 0; i < COMPUTE_FAIR_SHARES_ITERATIONS; i++) {
double mid = (left + right) / 2.0;
- int plannedResourceUsed = resourceUsedWithWeightToResourceRatio(
+ int plannedResourceUsed = (int)resourceUsedWithWeightToResourceRatio(
mid, schedulables, type);
if (plannedResourceUsed == totalResource) {
right = mid;
@@ -162,11 +195,11 @@ private static void computeSharesInternal(
// Set the fair shares based on the value of R we've converged to
for (Schedulable sched : schedulables) {
if (isSteadyShare) {
- setResourceValue(computeShare(sched, right, type),
+ setResourceValue((int)computeShare(sched, right, type),
((FSQueue) sched).getSteadyFairShare(), type);
} else {
- setResourceValue(
- computeShare(sched, right, type), sched.getFairShare(), type);
+ setResourceValue((int)computeShare(sched, right, type),
+ sched.getFairShare(), type);
}
}
}
@@ -175,12 +208,11 @@ private static void computeSharesInternal(
* Compute the resources that would be used given a weight-to-resource ratio
* w2rRatio, for use in the computeFairShares algorithm as described in #
*/
- private static int resourceUsedWithWeightToResourceRatio(double w2rRatio,
+ private static double resourceUsedWithWeightToResourceRatio(double w2rRatio,
Collection extends Schedulable> schedulables, ResourceType type) {
- int resourcesTaken = 0;
+ double resourcesTaken = 0.0;
for (Schedulable sched : schedulables) {
- int share = computeShare(sched, w2rRatio, type);
- resourcesTaken += share;
+ resourcesTaken += computeShare(sched, w2rRatio, type);
}
return resourcesTaken;
}
@@ -189,12 +221,12 @@ private static int resourceUsedWithWeightToResourceRatio(double w2rRatio,
* Compute the resources assigned to a Schedulable given a particular
* weight-to-resource ratio w2rRatio.
*/
- private static int computeShare(Schedulable sched, double w2rRatio,
+ private static double computeShare(Schedulable sched, double w2rRatio,
ResourceType type) {
double share = sched.getWeights().getWeight(type) * w2rRatio;
share = Math.max(share, getResourceValue(sched.getMinShare(), type));
share = Math.min(share, getResourceValue(sched.getMaxShare(), type));
- return (int) share;
+ return share;
}
private static int getResourceValue(Resource resource, ResourceType type) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index 05b1925..1a3da55 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -301,13 +301,156 @@ public void testSimpleFairShareCalculation() throws IOException {
// Divided three ways - between the two queues and the default queue
for (FSLeafQueue p : queues) {
- assertEquals(3414, p.getFairShare().getMemory());
- assertEquals(3414, p.getMetrics().getFairShareMB());
- assertEquals(3414, p.getSteadyFairShare().getMemory());
- assertEquals(3414, p.getMetrics().getSteadyFairShareMB());
+ assertEquals(3413, p.getFairShare().getMemory());
+ assertEquals(3413, p.getMetrics().getFairShareMB());
+ assertEquals(3413, p.getSteadyFairShare().getMemory());
+ assertEquals(3413, p.getMetrics().getSteadyFairShareMB());
}
}
-
+
+ @Test
+ public void testFairShareWithZeroWeight() throws IOException {
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+ // set queueA and queueB weight zero.
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("");
+ out.println("");
+ out.println("");
+ out.println("0.0");
+ out.println("");
+ out.println("");
+ out.println("0.0");
+ out.println("");
+ out.println("");
+ out.close();
+
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ // Add one big node (only care about aggregate capacity)
+ RMNode node1 =
+ MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
+ "127.0.0.1");
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ scheduler.handle(nodeEvent1);
+
+ // Queue A wants 2 * 1024.
+ createSchedulingRequest(2 * 1024, "queueA", "user1");
+ // Queue B wants 6 * 1024
+ createSchedulingRequest(6 * 1024, "queueB", "user1");
+
+ scheduler.update();
+
+ FSLeafQueue queue = scheduler.getQueueManager().getLeafQueue(
+ "queueA", false);
+ // queueA's weight is 0.0, so its fair share should be 0.
+ assertEquals(0, queue.getFairShare().getMemory());
+ // queueB's weight is 0.0, so its fair share should be 0.
+ queue = scheduler.getQueueManager().getLeafQueue(
+ "queueB", false);
+ assertEquals(0, queue.getFairShare().getMemory());
+ }
+
+ @Test
+ public void testFairShareWithZeroWeightNoneZeroMinRes() throws IOException {
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+ // set queueA and queueB weight zero.
+ // set queueA and queueB minResources 1.
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("");
+ out.println("");
+ out.println("");
+ out.println("1 mb 1 vcores");
+ out.println("0.0");
+ out.println("");
+ out.println("");
+ out.println("1 mb 1 vcores");
+ out.println("0.0");
+ out.println("");
+ out.println("");
+ out.close();
+
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ // Add one big node (only care about aggregate capacity)
+ RMNode node1 =
+ MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
+ "127.0.0.1");
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ scheduler.handle(nodeEvent1);
+
+ // Queue A wants 2 * 1024.
+ createSchedulingRequest(2 * 1024, "queueA", "user1");
+ // Queue B wants 6 * 1024
+ createSchedulingRequest(6 * 1024, "queueB", "user1");
+
+ scheduler.update();
+
+ FSLeafQueue queue = scheduler.getQueueManager().getLeafQueue(
+ "queueA", false);
+ // queueA's weight is 0.0 and minResources is 1,
+ // so its fair share should be 1 (minShare).
+ assertEquals(1, queue.getFairShare().getMemory());
+ // queueB's weight is 0.0 and minResources is 1,
+ // so its fair share should be 1 (minShare).
+ queue = scheduler.getQueueManager().getLeafQueue(
+ "queueB", false);
+ assertEquals(1, queue.getFairShare().getMemory());
+ }
+
+ @Test
+ public void testFairShareWithNoneZeroWeightNoneZeroMinRes()
+ throws IOException {
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+ // set queueA and queueB weight 0.5.
+ // set queueA and queueB minResources 1024.
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("");
+ out.println("");
+ out.println("");
+ out.println("1024 mb 1 vcores");
+ out.println("0.5");
+ out.println("");
+ out.println("");
+ out.println("1024 mb 1 vcores");
+ out.println("0.5");
+ out.println("");
+ out.println("");
+ out.close();
+
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ // Add one big node (only care about aggregate capacity)
+ RMNode node1 =
+ MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
+ "127.0.0.1");
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ scheduler.handle(nodeEvent1);
+
+ // Queue A wants 4 * 1024.
+ createSchedulingRequest(4 * 1024, "queueA", "user1");
+ // Queue B wants 4 * 1024
+ createSchedulingRequest(4 * 1024, "queueB", "user1");
+
+ scheduler.update();
+
+ FSLeafQueue queue = scheduler.getQueueManager().getLeafQueue(
+ "queueA", false);
+ // queueA's weight is 0.5 and minResources is 1024,
+ // so its fair share should be 4096.
+ assertEquals(4096, queue.getFairShare().getMemory());
+ // queueB's weight is 0.5 and minResources is 1024,
+ // so its fair share should be 4096.
+ queue = scheduler.getQueueManager().getLeafQueue(
+ "queueB", false);
+ assertEquals(4096, queue.getFairShare().getMemory());
+ }
+
@Test
public void testSimpleHierarchicalFairShareCalculation() throws IOException {
scheduler.init(conf);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerFairShare.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerFairShare.java
index ab8fcbc..f4ae335 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerFairShare.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerFairShare.java
@@ -319,6 +319,12 @@ public void testFairShareWithDRFMultipleActiveQueuesUnderDifferentParent()
Collection leafQueues = scheduler.getQueueManager()
.getLeafQueues();
+ assertEquals(
+ 1,
+ scheduler.getQueueManager()
+ .getParentQueue("root.parentB", false).getSteadyFairShare()
+ .getVirtualCores());
+
for (FSLeafQueue leaf : leafQueues) {
if (leaf.getName().startsWith("root.parentA")) {
assertEquals(0.2,
@@ -329,7 +335,7 @@ public void testFairShareWithDRFMultipleActiveQueuesUnderDifferentParent()
} else if (leaf.getName().startsWith("root.parentB")) {
assertEquals(0.05,
(double) leaf.getSteadyFairShare().getMemory() / nodeMem, 0.001);
- assertEquals(0.1,
+ assertEquals(0.0,
(double) leaf.getSteadyFairShare().getVirtualCores() / nodeVCores,
0.001);
}