diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 8d22a36d99d..e275787bb26 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -25,10 +25,12 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.Collection; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -202,6 +204,50 @@ protected void setupConfigurableCapacities( parent == null ? null : parent.getQueueCapacities()); } + protected float getWeightValue(CSQueue q, CapacitySchedulerConfiguration csConf) { + // todo throw exception if parse error + // fill default value 1 + return Float.parseFloat(csConf.get(csConf.getQueuePrefix(q.getQueuePath()) + + csConf.CAPACITY).replace("w:","")); + } + + protected void updateWeightToCapacities(CapacitySchedulerConfiguration csConf, + Collection childQueues) { + Set configuredNodelabels = + csConf.getConfiguredNodeLabels(queuePath); + // todo throw exception if mixed with capacity + for (String label : configuredNodelabels) { + if (hasChildQueues()) { + if (label.equals(CommonNodeLabelsManager.NO_LABEL)) { + float weightTotal = 0f; + for (CSQueue q : childQueues) { + weightTotal += getWeightValue(q, csConf); + } + for (CSQueue q : childQueues) { + float capacity = getWeightValue(q, csConf) / weightTotal; + csConf.setFloat(csConf.getQueuePrefix( + q.getQueuePath()) + csConf.CAPACITY, 100 * capacity); + q.getQueueCapacities(). + setCapacity(capacity); + } + } else { + float weightTotal = 0f; + for (CSQueue q : childQueues) { + weightTotal += getWeightValue(q, csConf); + } + for (CSQueue q : childQueues) { + float capacity = getWeightValue(q, csConf) / weightTotal; + csConf.setFloat(csConf.getNodeLabelPrefix(q.getQueuePath(), label) + + csConf.CAPACITY, + capacity); + q.getQueueCapacities(). + setCapacity(label, 100 * capacity); + } + } + } + } + } + @Override public String getQueuePath() { return queuePath; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index d0ee25df300..5e7351e5785 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -354,6 +354,13 @@ @Private public static final boolean DEFAULT_LAZY_PREEMPTION_ENABLED = false; + @Private + public static final String WEIGHT_MODE_ENABLED = + PREFIX + "weight-mode-enabled"; + + @Private + public static final boolean DEFAULT_WEIGHT_MODE_ENABLED = false; + @Private public static final String ASSIGN_MULTIPLE_ENABLED = PREFIX + "per-node-heartbeat.multiple-assignments-enabled"; @@ -445,7 +452,7 @@ static String getUserPrefix(String user) { return PREFIX + "user." + user + DOT; } - private String getNodeLabelPrefix(String queue, String label) { + public String getNodeLabelPrefix(String queue, String label) { if (label.equals(CommonNodeLabelsManager.NO_LABEL)) { return getQueuePrefix(queue); } @@ -504,9 +511,12 @@ public float getNonLabeledQueueCapacity(String queue) { return queue.equals("root") ? 100.0f : 0f; } + // Return 0 if in weight mode, + // weight mode resource will be parsed and updated. + // todo need more invalid check for weight mode. float capacity = queue.equals("root") ? 100.0f - : (configuredCapacity == null) + : (configuredCapacity == null || configuredCapacity.trim().startsWith("w:")) ? 0f : Float.parseFloat(configuredCapacity); if (capacity < MINIMUM_CAPACITY_VALUE @@ -531,6 +541,16 @@ public void setCapacity(String queue, float capacity) { } + public void setWeightModeCapacity(String queue, float weight) { + if (queue.equals("root")) { + throw new IllegalArgumentException( + "Cannot set capacity, root queue has a fixed capacity of 100.0f"); + } + set(getQueuePrefix(queue) + CAPACITY, "w:" + weight); + LOG.debug("CSConf - setCapacity: queuePrefix={}, weight={}", + getQueuePrefix(queue), weight); + } + @VisibleForTesting public void setCapacity(String queue, String absoluteResourceCapacity) { if (queue.equals("root")) { @@ -584,6 +604,12 @@ public void setCapacityByLabel(String queue, String label, set(getNodeLabelPrefix(queue, label) + CAPACITY, absoluteResourceCapacity); } + public void setWeightModeCapacityByLabel(String queue, String label, + float weight) { + set(getNodeLabelPrefix(queue, label) + CAPACITY, + "w:" + weight ); + } + public void setMaximumCapacityByLabel(String queue, String label, float capacity) { setFloat(getNodeLabelPrefix(queue, label) + MAXIMUM_CAPACITY, capacity); @@ -1521,6 +1547,14 @@ public void setOrderingPolicyParameter(String queue, parameterValue); } + public boolean getWeightModeEnabled() { + return getBoolean(WEIGHT_MODE_ENABLED, DEFAULT_WEIGHT_MODE_ENABLED); + } + + public void setWeightModeEnabled(boolean enabled) { + setBoolean(WEIGHT_MODE_ENABLED, enabled); + } + public boolean getLazyPreemptionEnabled() { return getBoolean(LAZY_PREEMPTION_ENABLED, DEFAULT_LAZY_PREEMPTION_ENABLED); } @@ -2195,6 +2229,16 @@ public void setAutoCreatedLeafQueueTemplateMaxCapacity(String queuePath, setMaximumCapacityByLabel(leafQueueConfPrefix, label, resourceString.toString()); } + @Private + @VisibleForTesting + // todo support weight mode capacity + public void setAutoCreatedLeafQueueTemplateWeightModeCapacity(String queuePath, + String label, float weight) { + String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix( + queuePath); + setWeightModeCapacityByLabel(leafQueueConfPrefix, label, weight); + } + @VisibleForTesting @Private public void setAutoCreatedLeafQueueConfigUserLimit(String queuePath, @@ -2304,6 +2348,31 @@ private void updateMinMaxResourceToConf(String label, String queue, set(prefix, resourceString.toString()); } + @VisibleForTesting + public void setWeightModeResourceRequirement(String label, String queue, + float weight) { + updateWeightModeResourceResourceToConf(label, queue, weight, CAPACITY); + } + + private void updateWeightModeResourceResourceToConf(String label, String queue, + float weight, String type) { + if (queue.equals("root")) { + throw new IllegalArgumentException( + "Cannot set resource, root queue will take 100% of cluster capacity"); + } + + StringBuilder resourceString = new StringBuilder(); + resourceString + .append("w:").append(weight); + + String prefix = getQueuePrefix(queue) + type; + if (!label.isEmpty()) { + prefix = getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS + DOT + label + + DOT + type; + } + set(prefix, resourceString.toString()); + } + public boolean checkConfigTypeIsAbsoluteResource(String label, String queue, Set resourceTypes) { String propertyName = getNodeLabelPrefix(queue, label) + CAPACITY; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 7d82faeeef4..bfa9087ef40 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -175,6 +175,10 @@ protected void setupQueueConfigs(Resource clusterResource) void setChildQueues(Collection childQueues) { writeLock.lock(); try { + if (csContext.getConfiguration().getWeightModeEnabled()) { + updateWeightToCapacities(csContext.getConfiguration(), + childQueues); + } // Validate float childCapacities = 0; Resource minResDefaultLabel = Resources.createResource(0, 0); @@ -198,7 +202,9 @@ void setChildQueues(Collection childQueues) { float delta = Math.abs(1.0f - childCapacities); // crude way to check - if (allowZeroCapacitySum) { + // In weight mode the capacity now + // queueCapacities.getCapacity() == 0 only need check childCapacities + if (allowZeroCapacitySum || csContext.getConfiguration().getWeightModeEnabled()) { // If we allow zero capacity for children, only fail if: // Σ(childCapacities) != 1.0f OR Σ(childCapacities) != 0.0f // @@ -252,7 +258,9 @@ void setChildQueues(Collection childQueues) { float labelDelta = Math.abs(1.0f - sum); - if (allowZeroCapacitySum) { + // In weight mode the capacity now + // queueCapacities.getCapacity() == 0 only need check childCapacities + if (allowZeroCapacitySum || csContext.getConfiguration().getWeightModeEnabled()) { // Similar to above, we only throw exception if // Σ(childCapacities) != 1.0f OR Σ(childCapacities) != 0.0f if (minResDefaultLabel.equals(Resources.none()) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java index 229bb0f0a74..1f03cdc65ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java @@ -38,6 +38,8 @@ protected static final String B3 = B + ".b3"; protected static float A_CAPACITY = 10.5f; protected static float B_CAPACITY = 89.5f; + protected static float A_CAPACITY_WEIGHT = 1.05f; + protected static float B_CAPACITY_WEIGHT = 8.95f; protected static final String P1 = CapacitySchedulerConfiguration.ROOT + ".p1"; protected static final String P2 = CapacitySchedulerConfiguration.ROOT + ".p2"; protected static final String X1 = P1 + ".x1"; @@ -46,9 +48,14 @@ protected static final String Y2 = P2 + ".y2"; protected static float A1_CAPACITY = 30; protected static float A2_CAPACITY = 70; + protected static float A1_CAPACITY_WEIGHT = 3; + protected static float A2_CAPACITY_WEIGHT = 7; protected static float B1_CAPACITY = 79.2f; protected static float B2_CAPACITY = 0.8f; protected static float B3_CAPACITY = 20; + protected static float B1_CAPACITY_WEIGHT = 7.92f; + protected static float B2_CAPACITY_WEIGHT = 0.08f; + protected static float B3_CAPACITY_WEIGHT = 2; @SuppressWarnings("unchecked") diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index d5d89cae866..7341dc3e1f9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -681,6 +681,36 @@ private CapacitySchedulerConfiguration setupQueueConfiguration( return conf; } + private CapacitySchedulerConfiguration setupQueueConfigurationWithWeightMode( + CapacitySchedulerConfiguration conf) { + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"}); + + conf.setWeightModeCapacity(A, A_CAPACITY_WEIGHT); + conf.setWeightModeCapacity(B, B_CAPACITY_WEIGHT); + + // Define 2nd-level queues + conf.setQueues(A, new String[] {"a1", "a2"}); + conf.setWeightModeCapacity(A1, A1_CAPACITY_WEIGHT); + conf.setUserLimitFactor(A1, 100.0f); + conf.setWeightModeCapacity(A2, A2_CAPACITY_WEIGHT); + conf.setUserLimitFactor(A2, 100.0f); + + conf.setQueues(B, new String[] {"b1", "b2", "b3"}); + conf.setWeightModeCapacity(B1, B1_CAPACITY_WEIGHT); + conf.setUserLimitFactor(B1, 100.0f); + conf.setWeightModeCapacity(B2, B2_CAPACITY_WEIGHT); + conf.setUserLimitFactor(B2, 100.0f); + conf.setWeightModeCapacity(B3, B3_CAPACITY_WEIGHT); + conf.setUserLimitFactor(B3, 100.0f); + + conf.setWeightModeEnabled(true); + + LOG.info("Setup top-level queues a and b"); + return conf; + } + /** * @param conf, to be modified * @return, CS configuration which has deleted all childred of queue(b) @@ -933,6 +963,31 @@ public void testRefreshQueues() throws Exception { cs.stop(); } + @Test + public void testRefreshQueuesWithWeightMode() throws Exception { + CapacityScheduler cs = new CapacityScheduler(); + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null, + null, new RMContainerTokenSecretManager(conf), + new NMTokenSecretManagerInRM(conf), + new ClientToAMTokenSecretManagerInRM(), null); + setupQueueConfigurationWithWeightMode(conf); + cs.setConf(new YarnConfiguration()); + cs.setRMContext(resourceManager.getRMContext()); + cs.init(conf); + cs.start(); + cs.reinitialize(conf, rmContext); + checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY); + + conf.setWeightModeCapacity(A, 8f); + conf.setWeightModeCapacity(B, 2f); + cs.reinitialize(conf, mockContext); + checkQueueCapacities(cs, 80f, 20f); + cs.stop(); + } + + + void checkQueueCapacities(CapacityScheduler cs, float capacityA, float capacityB) { CSQueue rootQueue = cs.getRootQueue(); @@ -1028,6 +1083,30 @@ public void testParseQueue() throws IOException { new ClientToAMTokenSecretManagerInRM(), null)); } + /** Test that parseQueue throws an exception when two leaf queues have the + * same name + * @throws IOException + */ + @Test(expected=IOException.class) + public void testParseQueueWithWeightMode() throws IOException { + CapacityScheduler cs = new CapacityScheduler(); + cs.setConf(new YarnConfiguration()); + cs.setRMContext(resourceManager.getRMContext()); + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + setupQueueConfigurationWithWeightMode(conf); + cs.init(conf); + cs.start(); + + conf.setQueues(CapacitySchedulerConfiguration.ROOT + ".a.a1", new String[] {"b1"} ); + conf.setWeightModeCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a1.b1", 100.0f); + conf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + ".a.a1.b1", 100.0f); + + cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, + null, new RMContainerTokenSecretManager(conf), + new NMTokenSecretManagerInRM(conf), + new ClientToAMTokenSecretManagerInRM(), null)); + } + @Test public void testParseQueueWithAbsoluteResource() { String childQueue = "testQueue";