diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
index 661caa7..60f2e98 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
@@ -492,6 +492,10 @@ private void loadQueue(String parentName, Element element,
} else if ("weight".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
double val = Double.parseDouble(text);
+ if (val < 0) {
+ throw new AllocationConfigurationException("Weights " +
+ "cannot be negative");
+ }
queueWeights.put(queueName, new ResourceWeights((float)val));
} else if ("minSharePreemptionTimeout".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java
index f4fad32..b83a769 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java
@@ -119,6 +119,9 @@ private static void computeSharesInternal(
if (schedulables.isEmpty()) {
return;
}
+
+ boolean allWeightsZero = checkIfAllWeightsZero(schedulables, type);
+
// Find an upper bound on R that we can use in our binary search. We start
// at R = 1 and double it until we have either used all the resources or we
// have met all Schedulables' max shares.
@@ -137,8 +140,8 @@ private static void computeSharesInternal(
totalResource = Math.min(totalMaxShare, totalResource);
double rMax = 1.0;
- while (resourceUsedWithWeightToResourceRatio(rMax, schedulables, type)
- < totalResource) {
+ while (totalResource > resourceUsedWithWeightToResourceRatio(
+ rMax, schedulables, type, allWeightsZero)) {
rMax *= 2.0;
}
// Perform the binary search for up to COMPUTE_FAIR_SHARES_ITERATIONS steps
@@ -147,7 +150,7 @@ private static void computeSharesInternal(
for (int i = 0; i < COMPUTE_FAIR_SHARES_ITERATIONS; i++) {
double mid = (left + right) / 2.0;
int plannedResourceUsed = resourceUsedWithWeightToResourceRatio(
- mid, schedulables, type);
+ mid, schedulables, type, allWeightsZero);
if (plannedResourceUsed == totalResource) {
right = mid;
break;
@@ -160,13 +163,23 @@ private static void computeSharesInternal(
// Set the fair shares based on the value of R we've converged to
for (Schedulable sched : schedulables) {
if (isSteadyShare) {
- setResourceValue(computeShare(sched, right, type),
+ setResourceValue(computeShare(sched, right, type, false),
((FSQueue) sched).getSteadyFairShare(), type);
} else {
- setResourceValue(
- computeShare(sched, right, type), sched.getFairShare(), type);
+ setResourceValue(computeShare(sched, right, type, allWeightsZero),
+ sched.getFairShare(), type);
+ }
+ }
+ }
+
+ private static boolean checkIfAllWeightsZero(
+ Collection extends Schedulable> schedulables, ResourceType type) {
+ for (Schedulable sched : schedulables) {
+ if (sched.getWeights().getWeight(type) > 0) {
+ return false;
}
}
+ return true;
}
/**
@@ -174,10 +187,11 @@ private static void computeSharesInternal(
* w2rRatio, for use in the computeFairShares algorithm as described in #
*/
private static int resourceUsedWithWeightToResourceRatio(double w2rRatio,
- Collection extends Schedulable> schedulables, ResourceType type) {
+ Collection extends Schedulable> schedulables, ResourceType type,
+ boolean allWeightsZero) {
int resourcesTaken = 0;
for (Schedulable sched : schedulables) {
- int share = computeShare(sched, w2rRatio, type);
+ int share = computeShare(sched, w2rRatio, type, allWeightsZero);
resourcesTaken += share;
}
return resourcesTaken;
@@ -188,8 +202,17 @@ private static int resourceUsedWithWeightToResourceRatio(double w2rRatio,
* weight-to-resource ratio w2rRatio.
*/
private static int computeShare(Schedulable sched, double w2rRatio,
- ResourceType type) {
- double share = sched.getWeights().getWeight(type) * w2rRatio;
+ ResourceType type, boolean forceWeightOne) {
+ // If all active Schedulable weights are 0.0,
+ // use 1 as the weight to avoid zero share.
+ float weight;
+ if (forceWeightOne && sched.getWeights().getWeight(type) == 0.0) {
+ weight = 1;
+ } else {
+ weight = sched.getWeights().getWeight(type);
+ }
+
+ double share = weight * w2rRatio;
share = Math.max(share, getResourceValue(sched.getMinShare(), type));
share = Math.min(share, getResourceValue(sched.getMaxShare(), type));
return (int) share;
@@ -226,8 +249,9 @@ private static int handleFixedFairShares(
/**
* Get the fairshare for the {@link Schedulable} if it is fixed, -1 otherwise.
*
- * The fairshare is fixed if either the maxShare is 0, weight is 0,
- * or the Schedulable is not active for instantaneous fairshare.
+ * The fairshare is fixed if either the maxShare is 0, weight is 0 while
+ * calculating steady share, or the Schedulable is not active for
+ * instantaneous fairshare.
*/
private static int getFairShareIfFixed(Schedulable sched,
boolean isSteadyShare, ResourceType type) {
@@ -243,8 +267,8 @@ private static int getFairShareIfFixed(Schedulable sched,
return 0;
}
- // Check if weight is 0
- if (sched.getWeights().getWeight(type) <= 0) {
+ // Check if weight is 0 while calculating steady share
+ if (isSteadyShare && sched.getWeights().getWeight(type) == 0) {
int minShare = getResourceValue(sched.getMinShare(), type);
return (minShare <= 0) ? 0 : minShare;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java
index cc91ef9..42845d7 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java
@@ -681,6 +681,31 @@ public void testReservableCannotBeCombinedWithDynamicUserQueue()
allocLoader.reloadAllocations();
}
+ /**
+ * Verify a queue can't have a negative weight.
+ */
+ @Test (expected = AllocationConfigurationException.class)
+ public void testNegativeWeightQueue()
+ throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("");
+ out.println("");
+ out.println("");
+ out.println("-1");
+ out.println("");
+ out.println("");
+ out.close();
+
+ AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
+ allocLoader.init(conf);
+ ReloadListener confHolder = new ReloadListener();
+ allocLoader.setReloadListener(confHolder);
+ allocLoader.reloadAllocations();
+ }
+
private class ReloadListener implements AllocationFileLoaderService.Listener {
public AllocationConfiguration allocConf;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index 5690fa4..ea1fae3 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -406,14 +406,14 @@ public void testFairShareWithZeroWeight() throws IOException {
scheduler.update();
+ // all queue weight is 0.0 and no active Non-Zero weight queue,
+ // so its fair share should be 4096.
FSLeafQueue queue = scheduler.getQueueManager().getLeafQueue(
"queueA", false);
- // queueA's weight is 0.0, so its fair share should be 0.
- assertEquals(0, queue.getFairShare().getMemory());
- // queueB's weight is 0.0, so its fair share should be 0.
+ assertEquals(4096, queue.getFairShare().getMemory());
queue = scheduler.getQueueManager().getLeafQueue(
"queueB", false);
- assertEquals(0, queue.getFairShare().getMemory());
+ assertEquals(4096, queue.getFairShare().getMemory());
}
@Test
@@ -425,13 +425,16 @@ public void testFairShareWithZeroWeightNoneZeroMinRes() throws IOException {
out.println("");
out.println("");
out.println("");
- out.println("1 mb 1 vcores");
+ out.println("5120 mb 1 vcores");
out.println("0.0");
out.println("");
out.println("");
- out.println("1 mb 1 vcores");
+ out.println("5120 mb 1 vcores");
out.println("0.0");
out.println("");
+ out.println("");
+ out.println("1.0");
+ out.println("");
out.println("");
out.close();
@@ -455,14 +458,14 @@ public void testFairShareWithZeroWeightNoneZeroMinRes() throws IOException {
FSLeafQueue queue = scheduler.getQueueManager().getLeafQueue(
"queueA", false);
- // queueA's weight is 0.0 and minResources is 1,
- // so its fair share should be 1 (minShare).
- assertEquals(1, queue.getFairShare().getMemory());
- // queueB's weight is 0.0 and minResources is 1,
- // so its fair share should be 1 (minShare).
+ // queueA's weight is 0.0 and minResources is 5120,
+ // so its fair share should be 5120 (minShare).
+ assertEquals(5 * 1024, queue.getFairShare().getMemory());
+ // queueB's weight is 0.0 and minResources is 5120,
+ // so its fair share should be 5120 (minShare).
queue = scheduler.getQueueManager().getLeafQueue(
"queueB", false);
- assertEquals(1, queue.getFairShare().getMemory());
+ assertEquals(5 * 1024, queue.getFairShare().getMemory());
}
@Test
@@ -3378,6 +3381,53 @@ public void testQueueMaxAMShareDefault() throws Exception {
}
/**
+ * The test verifies that if there is no non-zero weight active queue,
+ * zero weight queues can get resources.
+ */
+ @Test
+ public void testRequestAMResourceInZeroWeightQueue() throws Exception {
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("");
+ out.println("");
+ out.println("");
+ out.println("0.0");
+ out.println("");
+ out.println("");
+ out.println("2.0");
+ out.println("");
+ out.println("fair" +
+ "");
+ out.println("");
+ out.close();
+
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ RMNode node =
+ MockNodes.newNodeInfo(1, Resources.createResource(8192, 20),
+ 0, "127.0.0.1");
+ NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
+ NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
+ scheduler.handle(nodeEvent);
+ scheduler.update();
+
+ // Create a Managed AM
+ Resource amResource = Resource.newInstance(1024, 1);
+ int amPriority = RMAppAttemptImpl.AM_CONTAINER_PRIORITY.getPriority();
+ ApplicationAttemptId attId1 = createAppAttemptId(1, 1);
+ createApplicationWithAMResource(attId1, "root.queue1", "user1", amResource);
+ createSchedulingRequestExistingApplication(1024, 1, amPriority, attId1);
+ FSAppAttempt app1 = scheduler.getSchedulerApp(attId1);
+ scheduler.update();
+ scheduler.handle(updateEvent);
+ assertEquals("Application 1 should be running",
+ 1, app1.getLiveContainers().size());
+ }
+
+ /**
* The test verifies container gets reserved when not over maxAMShare,
* reserved container gets unreserved when over maxAMShare,
* container doesn't get reserved when over maxAMShare,