diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 3286982..68083ab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -607,6 +607,7 @@ private void updatePlacementRules() throws IOException { private void initializeQueues(CapacitySchedulerConfiguration conf) throws IOException { + conf.refreshQueueConfigsCache(); this.queueManager.initializeQueues(conf); updatePlacementRules(); @@ -618,6 +619,7 @@ private void initializeQueues(CapacitySchedulerConfiguration conf) @Lock(CapacityScheduler.class) private void reinitializeQueues(CapacitySchedulerConfiguration newConf) throws IOException { + conf.refreshQueueConfigsCache(); this.queueManager.reinitializeQueues(newConf); updatePlacementRules(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 13b9ff6..4ff474d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -62,6 +62,8 @@ import java.util.Map.Entry; import java.util.Set; import java.util.StringTokenizer; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; public class CapacitySchedulerConfiguration extends ReservationSchedulerConfiguration { @@ -318,6 +320,12 @@ AppPriorityACLConfigurationParser priorityACLConfig = new AppPriorityACLConfigurationParser(); + private AtomicBoolean queueConfigsCacheLoaded = new AtomicBoolean(false); + + private Map> queueOrderingPolicyParams = null; + + private Map> queueConfiguredNodeLabels = null; + public CapacitySchedulerConfiguration() { this(new Configuration()); } @@ -334,6 +342,84 @@ public CapacitySchedulerConfiguration(Configuration configuration, } } + public void refreshQueueConfigsCache() { + synchronized (queueConfigsCacheLoaded) { + queueConfigsCacheLoaded.set(false); + } + loadQueueConfigsCache(); + } + + private void loadQueueConfigsCache() { + synchronized (queueConfigsCacheLoaded){ + if (queueConfigsCacheLoaded.get()){ + return; + } + queueOrderingPolicyParams = new ConcurrentHashMap<>(); + queueConfiguredNodeLabels = new ConcurrentHashMap<>(); + Iterator> iter = iterator(); + Entry e; + while (iter.hasNext()) { + e = iter.next(); + updateQueueConfiguredNodeLables(e.getKey()); + updateQueueOrderingPolicyConfigs(e.getKey(), e.getValue()); + } + queueConfigsCacheLoaded.set(true); + LOG.info("Loaded queue properties cache: configurableNodeLabels(" + + queueConfiguredNodeLabels + "), orderingPolicyParams(" + + queueOrderingPolicyParams + ")"); + } + } + + private void updateQueueOrderingPolicyConfigs(String key, String value){ + if (key.contains(ORDERING_POLICY + DOT)) { + // Find and in + // yarn.scheduler.capacity..ordering-policy. + // + int queuePathStartIdx = PREFIX.length(); + int queuePathEndIdx = key.indexOf(ORDERING_POLICY) - 1; + if(queuePathEndIdx <= queuePathStartIdx){ + return; + } + String queuePath = key.substring(queuePathStartIdx, queuePathEndIdx); + int orderingPolocyKeyStartIdx = + key.indexOf(ORDERING_POLICY) + ORDERING_POLICY.length() + 1; + String orderingPolocyKey = key.substring(orderingPolocyKeyStartIdx); + Map orderingPolocyConfigs = + queueOrderingPolicyParams.get(queuePath); + if (orderingPolocyConfigs == null) { + orderingPolocyConfigs = new HashMap<>(); + queueOrderingPolicyParams.put(queuePath, orderingPolocyConfigs); + } + orderingPolocyConfigs.put(orderingPolocyKey, value); + } + } + + private void updateQueueConfiguredNodeLables(String key){ + if (key.contains(ACCESSIBLE_NODE_LABELS + DOT)) { + // Find and in + // yarn.scheduler.capacity..accessible-node-labels. + // .property + int queuePathStartIdx = PREFIX.length(); + int queuePathEndIdx = key.indexOf(ACCESSIBLE_NODE_LABELS) - 1; + if(queuePathEndIdx <= queuePathStartIdx){ + return; + } + int labelStartIdx = + key.indexOf(ACCESSIBLE_NODE_LABELS) + ACCESSIBLE_NODE_LABELS + .length() + 1; + int labelEndIndx = key.indexOf('.', labelStartIdx); + String queuePath = key.substring(queuePathStartIdx, queuePathEndIdx); + String labelName = key.substring(labelStartIdx, labelEndIndx); + Set labels = queueConfiguredNodeLabels.get(queuePath); + if (labels == null) { + labels = new HashSet<>(); + queueConfiguredNodeLabels.put(queuePath, labels); + } + labels.add(labelName); + } + } + + static String getQueuePrefix(String queue) { String queueName = PREFIX + queue + DOT; return queueName; @@ -472,15 +558,26 @@ public int getUserLimit(String queue) { throw new RuntimeException(message, e); } + Map config = getOrderingPolicyConfigs(queue); + orderingPolicy.configure(config); + return orderingPolicy; + } + + public Map getOrderingPolicyConfigs(String queue){ Map config = new HashMap(); - String confPrefix = getQueuePrefix(queue) + ORDERING_POLICY + "."; - for (Map.Entry kv : this) { - if (kv.getKey().startsWith(confPrefix)) { - config.put(kv.getKey().substring(confPrefix.length()), kv.getValue()); + if (queueConfigsCacheLoaded.get()) { + if (queueOrderingPolicyParams.containsKey(queue)) { + config = queueOrderingPolicyParams.get(queue); + } + } else { + String confPrefix = getQueuePrefix(queue) + ORDERING_POLICY + "."; + for (Map.Entry kv : this) { + if (kv.getKey().startsWith(confPrefix)) { + config.put(kv.getKey().substring(confPrefix.length()), kv.getValue()); + } } } - orderingPolicy.configure(config); - return orderingPolicy; + return config; } public void setUserLimit(String queue, int userLimit) { @@ -526,6 +623,11 @@ public void setAccessibleNodeLabels(String queue, Set labels) { } String str = StringUtils.join(",", labels); set(getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS, str); + if (queueConfigsCacheLoaded.get()) { + synchronized (queueConfigsCacheLoaded) { + queueConfiguredNodeLabels.put(queue, labels); + } + } } public Set getAccessibleNodeLabels(String queue) { @@ -1120,23 +1222,29 @@ public boolean getPreemptionDisabled(String queue, boolean defaultVal) { */ public Set getConfiguredNodeLabels(String queuePath) { Set configuredNodeLabels = new HashSet(); - Entry e = null; - - Iterator> iter = iterator(); - while (iter.hasNext()) { - e = iter.next(); - String key = e.getKey(); - - if (key.startsWith(getQueuePrefix(queuePath) + ACCESSIBLE_NODE_LABELS - + DOT)) { - // Find in - // .accessible-node-labels..property - int labelStartIdx = - key.indexOf(ACCESSIBLE_NODE_LABELS) - + ACCESSIBLE_NODE_LABELS.length() + 1; - int labelEndIndx = key.indexOf('.', labelStartIdx); - String labelName = key.substring(labelStartIdx, labelEndIndx); - configuredNodeLabels.add(labelName); + if (queueConfigsCacheLoaded.get()) { + if (queueConfiguredNodeLabels.containsKey(queuePath)) { + configuredNodeLabels = queueConfiguredNodeLabels.get(queuePath); + } + } else { + Entry e = null; + + Iterator> iter = iterator(); + while (iter.hasNext()) { + e = iter.next(); + String key = e.getKey(); + + if (key.startsWith(getQueuePrefix(queuePath) + ACCESSIBLE_NODE_LABELS + + DOT)) { + // Find in + // .accessible-node-labels..property + int labelStartIdx = + key.indexOf(ACCESSIBLE_NODE_LABELS) + + ACCESSIBLE_NODE_LABELS.length() + 1; + int labelEndIndx = key.indexOf('.', labelStartIdx); + String labelName = key.substring(labelStartIdx, labelEndIndx); + configuredNodeLabels.add(labelName); + } } } @@ -1163,6 +1271,17 @@ public void setOrderingPolicyParameter(String queue, String parameterKey, String parameterValue) { set(getQueuePrefix(queue) + ORDERING_POLICY + "." + parameterKey, parameterValue); + if (queueConfigsCacheLoaded.get()) { + synchronized (queueConfigsCacheLoaded){ + Map orderingPolocyConfigs = + queueOrderingPolicyParams.get(queue); + if (orderingPolocyConfigs == null) { + orderingPolocyConfigs = new HashMap<>(); + queueOrderingPolicyParams.put(queue, orderingPolocyConfigs); + } + orderingPolocyConfigs.put(parameterKey, parameterValue); + } + } } public boolean getLazyPreemptionEnabled() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 64e0df4..78f4abb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -35,6 +35,7 @@ import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.PriorityQueue; @@ -4808,4 +4809,99 @@ private void waitforNMRegistered(ResourceScheduler scheduler, int nodecount, } } } + + @Test + public void testRefreshQueueConfigsCache() throws IOException { + Configuration conf = new YarnConfiguration(); + conf.set(CapacitySchedulerConfiguration.PREFIX + "queue1" + + CapacitySchedulerConfiguration.DOT + + CapacitySchedulerConfiguration.ACCESSIBLE_NODE_LABELS + + CapacitySchedulerConfiguration.DOT + "label1" + + CapacitySchedulerConfiguration.DOT + + CapacitySchedulerConfiguration.CAPACITY, "0.2"); + conf.set(CapacitySchedulerConfiguration.PREFIX + "queue1" + + CapacitySchedulerConfiguration.DOT + + CapacitySchedulerConfiguration.ACCESSIBLE_NODE_LABELS + + CapacitySchedulerConfiguration.DOT + "label2" + + CapacitySchedulerConfiguration.DOT + + CapacitySchedulerConfiguration.CAPACITY, "0.5"); + conf.set(CapacitySchedulerConfiguration.PREFIX + "queue2" + + CapacitySchedulerConfiguration.DOT + + CapacitySchedulerConfiguration.ACCESSIBLE_NODE_LABELS + + CapacitySchedulerConfiguration.DOT + "label1" + + CapacitySchedulerConfiguration.DOT + + CapacitySchedulerConfiguration.CAPACITY, "0.5"); + conf.set(CapacitySchedulerConfiguration.PREFIX + "queue2" + + CapacitySchedulerConfiguration.DOT + + CapacitySchedulerConfiguration.ACCESSIBLE_NODE_LABELS + + CapacitySchedulerConfiguration.DOT + "label2" + + CapacitySchedulerConfiguration.DOT + + CapacitySchedulerConfiguration.CAPACITY, "0.2"); + CapacityScheduler cs = new CapacityScheduler(); + RMContextImpl rmContext = + new RMContextImpl(null, null, null, null, null, null, + new RMContainerTokenSecretManager(conf), + new NMTokenSecretManagerInRM(conf), + new ClientToAMTokenSecretManagerInRM(), null); + cs.setConf(new YarnConfiguration()); + cs.setRMContext(resourceManager.getRMContext()); + cs.init(conf); + cs.start(); + cs.reinitialize(conf, rmContext); + Set queue1Labels = + cs.getConfiguration().getConfiguredNodeLabels("queue1"); + Set queue2Labels = + cs.getConfiguration().getConfiguredNodeLabels("queue2"); + Set queue3Labels = + cs.getConfiguration().getConfiguredNodeLabels("queue3"); + assertTrue(queue1Labels.contains("")); + assertTrue(queue1Labels.contains("label1")); + assertTrue(queue1Labels.contains("label2")); + assertEquals(queue1Labels.size(), 3); + assertTrue(queue2Labels.contains("")); + assertTrue(queue2Labels.contains("label1")); + assertTrue(queue2Labels.contains("label2")); + assertEquals(queue2Labels.size(), 3); + assertTrue(queue3Labels.contains("")); + assertEquals(queue3Labels.size(), 1); + conf.set(CapacitySchedulerConfiguration.PREFIX + "queue1" + + CapacitySchedulerConfiguration.DOT + + CapacitySchedulerConfiguration.ORDERING_POLICY + + CapacitySchedulerConfiguration.DOT + "key1", "value1"); + conf.set(CapacitySchedulerConfiguration.PREFIX + "queue1" + + CapacitySchedulerConfiguration.DOT + + CapacitySchedulerConfiguration.ORDERING_POLICY + + CapacitySchedulerConfiguration.DOT + "key2", "value2"); + conf.set(CapacitySchedulerConfiguration.PREFIX + "queue2" + + CapacitySchedulerConfiguration.DOT + + CapacitySchedulerConfiguration.ORDERING_POLICY + + CapacitySchedulerConfiguration.DOT + "key1", "value1"); + cs.reinitialize(conf, rmContext); + Map queue1OrderingPolicyConfig = + cs.getConfiguration().getOrderingPolicyConfigs("queue1"); + Map queue2OrderingPolicyConfig = + cs.getConfiguration().getOrderingPolicyConfigs("queue2"); + Map queue3OrderingPolicyConfig = + cs.getConfiguration().getOrderingPolicyConfigs("queue3"); + assertEquals(queue1OrderingPolicyConfig.get("key1"), "value1"); + assertEquals(queue1OrderingPolicyConfig.get("key2"), "value2"); + assertEquals(queue1OrderingPolicyConfig.size(), 2); + assertEquals(queue2OrderingPolicyConfig.get("key1"), "value1"); + assertEquals(queue2OrderingPolicyConfig.size(), 1); + assertEquals(queue3OrderingPolicyConfig.size(), 0); + + // Test hot update for cache item + Set labels = new HashSet<>(); + labels.add("label3"); + cs.getConfiguration().setAccessibleNodeLabels("queue3", labels); + queue3Labels = cs.getConfiguration().getConfiguredNodeLabels("queue3"); + assertTrue(queue3Labels.contains("")); + assertTrue(queue3Labels.contains("label3")); + assertEquals(queue3Labels.size(), 2); + cs.getConfiguration().setOrderingPolicyParameter("queue3", "key3", "value3"); + queue3OrderingPolicyConfig = + cs.getConfiguration().getOrderingPolicyConfigs("queue3"); + assertEquals(queue3OrderingPolicyConfig.size(), 1); + assertEquals(queue3OrderingPolicyConfig.get("key3"), "value3"); + } }