diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 3ae22ad0ebd..ddd70889105 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -129,6 +129,9 @@ protected ActivitiesManager activitiesManager; + private String appOrderingPolicyType; + private Map appOrderingPolicyConfs; + protected ReentrantReadWriteLock.ReadLock readLock; protected ReentrantReadWriteLock.WriteLock writeLock; @@ -425,6 +428,27 @@ protected void setupQueueConfigs(Resource clusterResource, configuration.getMultiNodesSortingAlgorithmPolicy(getQueuePath())); this.userWeights = getUserWeightsFromHierarchy(configuration); + + // Inherit app ordering policy and its configs from parent + this.appOrderingPolicyType = + configuration.getConfiguredAppOrderingPolicy(getQueuePath()); + if (this.appOrderingPolicyType == null) { + if (getParent() == null) { + this.appOrderingPolicyType + = CapacitySchedulerConfiguration.DEFAULT_APP_ORDERING_POLICY; + } else { + this.appOrderingPolicyType = getParent().getAppOrderingPolicyType(); + } + } + Map appOrderingPolicyConfs = getParent() == null + ? new HashMap<>() + : getParent().getAppOrderingPolicyConfs(); + for (Map.Entry entry + : configuration.getConfiguredAppOrderingPolicyConfs( + getQueuePath()).entrySet()) { + appOrderingPolicyConfs.put(entry.getKey(), entry.getValue()); + } + this.appOrderingPolicyConfs = appOrderingPolicyConfs; } finally { writeLock.unlock(); } @@ -1317,4 +1341,14 @@ public String getMultiNodeSortingPolicyName() { public void setMultiNodeSortingPolicyName(String policyName) { this.multiNodeSortingPolicyName = policyName; } + + @Override + public String getAppOrderingPolicyType() { + return this.appOrderingPolicyType; + } + + @Override + public Map getAppOrderingPolicyConfs() { + return this.appOrderingPolicyConfs; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index d507e53543c..17017d5babb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -449,4 +449,16 @@ public void validateSubmitApplication(ApplicationId applicationId, * @return policy name */ String getMultiNodeSortingPolicyName(); + + /** + * Gets this queue's app ordering policy. + * @return Queue's app ordering policy + */ + String getAppOrderingPolicyType(); + + /** + * Gets this queue's app ordering policy configurations. + * @return App ordering policy configurations + */ + Map getAppOrderingPolicyConfs(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 2343872d95b..9e2e5599616 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -549,41 +549,19 @@ public int getUserLimit(String queue) { return userLimit; } - @SuppressWarnings("unchecked") - public OrderingPolicy getAppOrderingPolicy( - String queue) { - - String policyType = get(getQueuePrefix(queue) + APP_ORDERING_POLICY, - DEFAULT_APP_ORDERING_POLICY); - - OrderingPolicy orderingPolicy; - - if (policyType.trim().equals(FIFO_APP_ORDERING_POLICY)) { - policyType = FifoOrderingPolicy.class.getName(); - } - if (policyType.trim().equals(FAIR_APP_ORDERING_POLICY)) { - policyType = FairOrderingPolicy.class.getName(); - } - if (policyType.trim().equals(FIFO_WITH_PARTITIONS_APP_ORDERING_POLICY)) { - policyType = FifoOrderingPolicyWithExclusivePartitions.class.getName(); - } - try { - orderingPolicy = (OrderingPolicy) - Class.forName(policyType).newInstance(); - } catch (Exception e) { - String message = "Unable to construct ordering policy for: " + policyType + ", " + e.getMessage(); - throw new RuntimeException(message, e); - } + public String getConfiguredAppOrderingPolicy(String queuePath) { + return get(getQueuePrefix(queuePath) + APP_ORDERING_POLICY); + } - Map config = new HashMap(); - String confPrefix = getQueuePrefix(queue) + APP_ORDERING_POLICY + "."; + public Map getConfiguredAppOrderingPolicyConfs(String queuePath) { + Map config = new HashMap<>(); + String confPrefix = getQueuePrefix(queuePath) + APP_ORDERING_POLICY + "."; for (Map.Entry kv : this) { if (kv.getKey().startsWith(confPrefix)) { - config.put(kv.getKey().substring(confPrefix.length()), kv.getValue()); + config.put(kv.getKey().substring(confPrefix.length()), kv.getValue()); } } - orderingPolicy.configure(config); - return orderingPolicy; + return config; } public void setUserLimit(String queue, int userLimit) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index cc63b2682a3..2dbeb660814 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -25,6 +25,10 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyWithExclusivePartitions; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -184,8 +188,7 @@ protected void setupQueueConfigs(Resource clusterResource, // absoluteMaxAvailCapacity during headroom/userlimit/allocation events) setQueueResourceLimitsInfo(clusterResource); - setAppOrderingPolicy( - conf.getAppOrderingPolicy(getQueuePath())); + setAppOrderingPolicy(createAppOrderingPolicy()); usersManager.setUserLimit(conf.getUserLimit(getQueuePath())); usersManager.setUserLimitFactor(conf.getUserLimitFactor(getQueuePath())); @@ -2099,10 +2102,41 @@ public void setMaxAMResourcePerQueuePercent( this.maxAMResourcePerQueuePercent = maxAMResourcePerQueuePercent; } + @VisibleForTesting + @SuppressWarnings("unchecked") + protected OrderingPolicy + createAppOrderingPolicy() { + String policyType = getAppOrderingPolicyType(); + OrderingPolicy orderingPolicy; + + if (policyType.trim().equals( + CapacitySchedulerConfiguration.FIFO_APP_ORDERING_POLICY)) { + policyType = FifoOrderingPolicy.class.getName(); + } + if (policyType.trim().equals( + CapacitySchedulerConfiguration.FAIR_APP_ORDERING_POLICY)) { + policyType = FairOrderingPolicy.class.getName(); + } + if (policyType.trim().equals( + CapacitySchedulerConfiguration.FIFO_WITH_PARTITIONS_APP_ORDERING_POLICY)) { + policyType = FifoOrderingPolicyWithExclusivePartitions.class.getName(); + } + try { + orderingPolicy = (OrderingPolicy) + Class.forName(policyType).newInstance(); + } catch (Exception e) { + String message = "Unable to construct ordering policy for: " + policyType + ", " + e.getMessage(); + throw new RuntimeException(message, e); + } + + orderingPolicy.configure(getAppOrderingPolicyConfs()); + return orderingPolicy; + } + public OrderingPolicy getAppOrderingPolicy() { return appOrderingPolicy; } - + void setAppOrderingPolicy( OrderingPolicy appOrderingPolicy) { writeLock.lock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 14468bf6c7c..ba7d7aa78c4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -441,17 +441,50 @@ public void testUserQueueAcl() throws Exception { @Test public void testPolicyConfiguration() throws Exception { - - CapacitySchedulerConfiguration testConf = - new CapacitySchedulerConfiguration(); - - String tproot = ROOT + "." + - "testPolicyRoot" + System.currentTimeMillis(); + Map queues = new HashMap<>(); + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + csConf.set(CapacitySchedulerConfiguration.PREFIX + ROOT + + "." + CapacitySchedulerConfiguration.APP_ORDERING_POLICY, + CapacitySchedulerConfiguration.FAIR_APP_ORDERING_POLICY); + csConf.set(CapacitySchedulerConfiguration.PREFIX + ROOT + + "." + CapacitySchedulerConfiguration.APP_ORDERING_POLICY + ".key1", + "parentVal1"); + final String newRootName = "root" + System.currentTimeMillis(); + setupQueueConfiguration(csConf, newRootName, false); - OrderingPolicy comPol = - testConf.getAppOrderingPolicy(tproot); - - + Resource clusterResource = Resources.createResource(100 * 16 * GB, + 100 * 32); + CapacitySchedulerContext csContext = mockCSContext(csConf, clusterResource); + when(csContext.getRMContext()).thenReturn(rmContext); + + CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null, + CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); + // Leaf queue should inherit policy and confs from parent if unset + LeafQueue leafQueue = stubLeafQueue((LeafQueue) queues.get(B)); + assertEquals(FairOrderingPolicy.class.getCanonicalName(), + leafQueue.getAppOrderingPolicy().getClass().getCanonicalName()); + assertEquals("parentVal1", + leafQueue.getAppOrderingPolicyConfs().get("key1")); + + // Leaf queue policy should override parent's policy and/or confs if set + csConf.set(CapacitySchedulerConfiguration.PREFIX + leafQueue.getQueuePath() + + "." + CapacitySchedulerConfiguration.APP_ORDERING_POLICY, + CapacitySchedulerConfiguration.FIFO_APP_ORDERING_POLICY); + csConf.set(CapacitySchedulerConfiguration.PREFIX + leafQueue.getQueuePath() + + "." + CapacitySchedulerConfiguration.APP_ORDERING_POLICY + ".key1", + "leafVal1"); + setupQueueConfiguration(csConf, newRootName, false); + csContext = mockCSContext(csConf, clusterResource); + when(csContext.getRMContext()).thenReturn(rmContext); + + queues.clear(); + CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null, + CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); + leafQueue = stubLeafQueue((LeafQueue) queues.get(B)); + assertEquals(FifoOrderingPolicy.class.getCanonicalName(), + leafQueue.getAppOrderingPolicy().getClass().getCanonicalName()); + assertEquals("leafVal1", + leafQueue.getAppOrderingPolicyConfs().get("key1")); } @Test @@ -519,12 +552,8 @@ public void testFairConfiguration() throws Exception { CapacitySchedulerConfiguration testConf = new CapacitySchedulerConfiguration(); - - String tproot = ROOT + "." + - "testPolicyRoot" + System.currentTimeMillis(); - - OrderingPolicy schedOrder = - testConf.getAppOrderingPolicy(tproot); + String leafQueueName = "testPolicyRoot" + System.currentTimeMillis(); + String tproot = ROOT + "." + leafQueueName; //override default to fair String policyType = CapacitySchedulerConfiguration.PREFIX + tproot + @@ -532,8 +561,10 @@ public void testFairConfiguration() throws Exception { testConf.set(policyType, CapacitySchedulerConfiguration.FAIR_APP_ORDERING_POLICY); - schedOrder = - testConf.getAppOrderingPolicy(tproot); + LeafQueue leafQueue = new LeafQueue(csContext, testConf, + leafQueueName, cs.getRootQueue(), null); + OrderingPolicy schedOrder = + leafQueue.createAppOrderingPolicy(); FairOrderingPolicy fop = (FairOrderingPolicy) schedOrder; assertFalse(fop.getSizeBasedWeight()); @@ -542,8 +573,9 @@ public void testFairConfiguration() throws Exception { "." + CapacitySchedulerConfiguration.APP_ORDERING_POLICY + "." + FairOrderingPolicy.ENABLE_SIZE_BASED_WEIGHT; testConf.set(sbwConfig, "true"); - schedOrder = - testConf.getAppOrderingPolicy(tproot); + leafQueue = new LeafQueue(csContext, testConf, + leafQueueName, cs.getRootQueue(), null); + schedOrder = leafQueue.createAppOrderingPolicy(); fop = (FairOrderingPolicy) schedOrder; assertTrue(fop.getSizeBasedWeight());