diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 91728a3f5b..1a7c9f9086 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -608,6 +608,9 @@ public static boolean isAclEnabled(Configuration conf) { public static final String DEFAULT_CONFIGURATION_STORE = MEMORY_CONFIGURATION_STORE; + public static final String RM_SCHEDULER_MUTATION_POLICY_CLASS = + YARN_PREFIX + "scheduler.configuration.mutation.policy.class"; + public static final String YARN_AUTHORIZATION_PROVIDER = YARN_PREFIX + "authorization-provider"; private static final List RM_SERVICES_ADDRESS_CONF_KEYS_HTTP = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ConfigurationMutationPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ConfigurationMutationPolicy.java new file mode 100644 index 0000000000..ef31cb2c09 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ConfigurationMutationPolicy.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; + +import java.util.Map; + +/** + * Interface for determining whether configuration mutations are allowed. + */ +public interface ConfigurationMutationPolicy { + + /** + * Initialize policy with configuration and RMContext. + * @param conf Configuration to initialize with. + * @param rmContext rmContext + */ + void init(Configuration conf, RMContext rmContext); + + /** + * Check if mutation is allowed. + * @param user User issuing the request + * @param confUpdate Key-value pairs for configurations to be updated. + */ + boolean isMutationAllowed(UserGroupInformation user, Map + confUpdate); + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ConfigurationMutationPolicyFactory.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ConfigurationMutationPolicyFactory.java new file mode 100644 index 0000000000..5e43d6ec55 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ConfigurationMutationPolicyFactory.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +/** + * Factory class for creating instances of {@link ConfigurationMutationPolicy}. + */ +public final class ConfigurationMutationPolicyFactory { + + private static final Log LOG = LogFactory.getLog( + ConfigurationMutationPolicyFactory.class); + + private ConfigurationMutationPolicyFactory() { + // Unused. + } + + public static ConfigurationMutationPolicy getPolicy(Configuration conf) { + Class policyClass = + conf.getClass(YarnConfiguration.RM_SCHEDULER_MUTATION_POLICY_CLASS, + DefaultConfigurationMutationPolicy.class, + ConfigurationMutationPolicy.class); + LOG.info("Using ConfigurationMutationPolicy implementation - " + + policyClass); + return ReflectionUtils.newInstance(policyClass, conf); + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/DefaultConfigurationMutationPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/DefaultConfigurationMutationPolicy.java new file mode 100644 index 0000000000..0893c5a2ee --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/DefaultConfigurationMutationPolicy.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ConfigurationMutationPolicy; + +import java.util.Map; + +/** + * Default configuration mutation policy. Checks if user is YARN admin. + */ +public class DefaultConfigurationMutationPolicy implements + ConfigurationMutationPolicy { + + private YarnAuthorizationProvider authorizer; + + @Override + public void init(Configuration conf, RMContext rmContext) { + authorizer = YarnAuthorizationProvider.getInstance(conf); + } + + @Override + public boolean isMutationAllowed(UserGroupInformation user, + Map confUpdate) { + return authorizer.isAdmin(user); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfScheduler.java index 35e36e1c23..373a812bdf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfScheduler.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import java.io.IOException; @@ -37,4 +38,7 @@ void updateConfiguration(UserGroupInformation user, Map confUpdate) throws IOException; + Configuration getConfiguration(); + + Queue getQueue(String queueName); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java index 889c3bc1f0..29541fc0f4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import org.apache.hadoop.security.UserGroupInformation; + import java.io.IOException; import java.util.Map; @@ -32,7 +34,7 @@ * @param confUpdate Key-value pairs for configurations to be updated. * @throws IOException if scheduler could not be reinitialized */ - void mutateConfiguration(String user, Map confUpdate) - throws IOException; + void mutateConfiguration(UserGroupInformation user, Map + confUpdate) throws IOException; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index e027f5e79d..0942f04593 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -619,6 +619,7 @@ private void reinitializeQueues(CapacitySchedulerConfiguration newConf) preemptionManager.refreshQueues(null, this.getRootQueue()); } + @Override public CSQueue getQueue(String queueName) { if (queueName == null) { return null; @@ -2494,7 +2495,7 @@ public void updateConfiguration(UserGroupInformation user, Map confUpdate) throws IOException { if (csConfProvider instanceof MutableConfigurationProvider) { ((MutableConfigurationProvider) csConfProvider).mutateConfiguration( - user.getShortUserName(), confUpdate); + user, confUpdate); } else { throw new UnsupportedOperationException("Configured CS configuration " + "provider does not support updating configuration."); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java index ea1b3c070f..19e90eee67 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java @@ -19,8 +19,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ConfigurationMutationPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ConfigurationMutationPolicyFactory; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation; @@ -38,6 +42,7 @@ private Configuration schedConf; private YarnConfigurationStore confStore; + private ConfigurationMutationPolicy mutationPolicy; private RMContext rmContext; private Configuration conf; @@ -68,6 +73,8 @@ public void init(Configuration config) throws IOException { schedConf.set(kv.getKey(), kv.getValue()); } confStore.initialize(config, schedConf); + this.mutationPolicy = ConfigurationMutationPolicyFactory.getPolicy(config); + mutationPolicy.init(config, rmContext); this.conf = config; } @@ -80,10 +87,14 @@ public CapacitySchedulerConfiguration loadConfiguration(Configuration } @Override - public void mutateConfiguration(String user, + public void mutateConfiguration(UserGroupInformation user, Map confUpdate) throws IOException { + if (!mutationPolicy.isMutationAllowed(user, confUpdate)) { + throw new AccessControlException("User is not admin of all modified" + + " queues."); + } Configuration oldConf = new Configuration(schedConf); - LogMutation log = new LogMutation(confUpdate, user); + LogMutation log = new LogMutation(confUpdate, user.getShortUserName()); long id = confStore.logMutation(log); for (Map.Entry kv : confUpdate.entrySet()) { if (kv.getValue() == null) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/QueueAdminConfigurationMutationPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/QueueAdminConfigurationMutationPolicy.java new file mode 100644 index 0000000000..0b3cabd853 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/QueueAdminConfigurationMutationPolicy.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.QueueACL; +import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ConfigurationMutationPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * A configuration mutation policy which checks that user has admin privileges + * on all queues they are changing. + */ +public class QueueAdminConfigurationMutationPolicy implements + ConfigurationMutationPolicy { + + private RMContext rmContext; + private static final List multipleSuffixConfigs; + + static { + multipleSuffixConfigs = Collections.unmodifiableList(Arrays.asList( + CapacitySchedulerConfiguration.ACCESSIBLE_NODE_LABELS, + CapacitySchedulerConfiguration.ORDERING_POLICY + )); + } + + @Override + public void init(Configuration conf, RMContext rmContext) { + this.rmContext = rmContext; + } + + @Override + public boolean isMutationAllowed(UserGroupInformation user, + Map confUpdate) { + Set queues = new HashSet<>(); + for (Map.Entry kv : confUpdate.entrySet()) { + String key = kv.getKey(); + int startIdx = key.indexOf("root"); + int endIdx = -1; + for (String suffix : multipleSuffixConfigs) { + if (key.contains(suffix)) { + endIdx = key.indexOf(suffix) - 1; + break; + } + } + if (endIdx == -1) { + endIdx = key.lastIndexOf('.'); + } + queues.add(key.substring(startIdx, endIdx)); + } + for (String queuePath : queues) { + String queueName = queuePath.lastIndexOf('.') != -1 ? + queuePath.substring(queuePath.lastIndexOf('.') + 1) : queuePath; + QueueInfo queueInfo = null; + try { + queueInfo = rmContext.getScheduler() + .getQueueInfo(queueName, false, false); + } catch (IOException e) { + // Queue is not found, do nothing. + } + String parentPath = queuePath; + // TODO: handle global config change. + while (queueInfo == null) { + // We are adding a queue (whose parent we are possibly also adding). + // Check ACL of lowest parent queue which already exists. + parentPath = parentPath.substring(0, parentPath.lastIndexOf('.')); + String parentName = parentPath.lastIndexOf('.') != -1 ? + parentPath.substring(parentPath.lastIndexOf('.') + 1) : parentPath; + try { + queueInfo = rmContext.getScheduler() + .getQueueInfo(parentName, false, false); + } catch (IOException e) { + // Queue is not found, do nothing. + } + } + Queue queue = ((MutableConfScheduler) rmContext.getScheduler()) + .getQueue(queueName); + if (!queue.hasAccess(QueueACL.ADMINISTER_QUEUE, user)) { + return false; + } + } + return true; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java index 78eb4deec7..a89dff1bcc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java @@ -32,6 +32,7 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -114,6 +115,7 @@ import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; +import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationRequest; @@ -196,6 +198,7 @@ import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo; import org.apache.hadoop.yarn.util.AdHocLogDumper; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Times; import org.apache.hadoop.yarn.webapp.BadRequestException; import org.apache.hadoop.yarn.webapp.ForbiddenException; @@ -2645,7 +2648,7 @@ public Response updateSchedulerConfiguration(QueueConfigsUpdateInfo public Void run() throws IOException, YarnException { Map confUpdate = constructKeyValueConfUpdate(mutationInfo); - ((CapacityScheduler) scheduler).updateConfiguration(callerUGI, + ((MutableConfScheduler) scheduler).updateConfiguration(callerUGI, confUpdate); return null; } @@ -2658,95 +2661,124 @@ public Void run() throws IOException, YarnException { "successfully applied.").build(); } else { return Response.status(Status.BAD_REQUEST) - .entity("Configuration change only supported by CapacityScheduler.") + .entity("Configuration change only supported by MutableConfScheduler.") .build(); } } private Map constructKeyValueConfUpdate( QueueConfigsUpdateInfo mutationInfo) throws IOException { - CapacitySchedulerConfiguration currentConf = - ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration(); - CapacitySchedulerConfiguration proposedConf = - new CapacitySchedulerConfiguration(currentConf, false); + // Stores the queue infos for modified queues as the modifications are + // applied, in case the modifications depend on each other. + Map queueInfos = new HashMap<>(); Map confUpdate = new HashMap<>(); for (String queueToRemove : mutationInfo.getRemoveQueueInfo()) { - removeQueue(queueToRemove, proposedConf, confUpdate); + removeQueue(queueToRemove, queueInfos, confUpdate); } for (QueueConfigInfo addQueueInfo : mutationInfo.getAddQueueInfo()) { - addQueue(addQueueInfo, proposedConf, confUpdate); + addQueue(addQueueInfo, queueInfos, confUpdate); } for (QueueConfigInfo updateQueueInfo : mutationInfo.getUpdateQueueInfo()) { - updateQueue(updateQueueInfo, proposedConf, confUpdate); + updateQueue(updateQueueInfo, confUpdate); } return confUpdate; } private void removeQueue( - String queueToRemove, CapacitySchedulerConfiguration proposedConf, + String queueToRemove, Map queueInfos, Map confUpdate) throws IOException { if (queueToRemove == null) { return; } else { - CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); - String queueName = queueToRemove.substring( - queueToRemove.lastIndexOf('.') + 1); - CSQueue queue = cs.getQueue(queueName); - if (queue == null || - !queue.getQueuePath().equals(queueToRemove)) { - throw new IOException("Queue " + queueToRemove + " not found"); - } else if (queueToRemove.lastIndexOf('.') == -1) { + MutableConfScheduler mcs = (MutableConfScheduler) + rm.getResourceScheduler(); + // Can't remove root queue. + if (queueToRemove.lastIndexOf('.') == -1) { throw new IOException("Can't remove queue " + queueToRemove); } + String queueName = queueToRemove.substring( + queueToRemove.lastIndexOf('.') + 1); + // TODO: specifying correct queue name but wrong queue path will succeed. String parentQueuePath = queueToRemove.substring(0, queueToRemove .lastIndexOf('.')); - String[] siblingQueues = proposedConf.getQueues(parentQueuePath); - List newSiblingQueues = new ArrayList<>(); - for (String siblingQueue : siblingQueues) { - if (!siblingQueue.equals(queueName)) { - newSiblingQueues.add(siblingQueue); + String parentQueueName = parentQueuePath.lastIndexOf('.') != -1 ? + parentQueuePath.substring(parentQueuePath.lastIndexOf('.') + 1) : + parentQueuePath; + List siblingQueues = new ArrayList<>(); + if (!queueInfos.containsKey(parentQueuePath)) { + queueInfos.put(parentQueuePath, mcs.getQueueInfo(parentQueueName, true, + false)); + } + for (Iterator itr = queueInfos.get(parentQueuePath) + .getChildQueues().iterator(); itr.hasNext();) { + QueueInfo queueInfo = itr.next(); + if (queueInfo.getQueueName().equals(queueName)) { + itr.remove(); + } else { + siblingQueues.add(queueInfo.getQueueName()); } } - proposedConf.setQueues(parentQueuePath, newSiblingQueues - .toArray(new String[0])); String queuesConfig = CapacitySchedulerConfiguration.PREFIX + parentQueuePath + CapacitySchedulerConfiguration.DOT + CapacitySchedulerConfiguration.QUEUES; - if (newSiblingQueues.size() == 0) { + if (queueInfos.get(parentQueuePath).getChildQueues().size() == 0) { confUpdate.put(queuesConfig, null); } else { - confUpdate.put(queuesConfig, Joiner.on(',').join(newSiblingQueues)); + confUpdate.put(queuesConfig, Joiner.on(',').join(siblingQueues)); } - for (Map.Entry confRemove : proposedConf.getValByRegex( + for (Map.Entry confRemove : mcs.getConfiguration().getValByRegex( ".*" + queueToRemove.replaceAll("\\.", "\\.") + "\\..*") .entrySet()) { - proposedConf.unset(confRemove.getKey()); confUpdate.put(confRemove.getKey(), null); } } } private void addQueue( - QueueConfigInfo addInfo, CapacitySchedulerConfiguration proposedConf, + QueueConfigInfo addInfo, Map queueInfos, Map confUpdate) throws IOException { if (addInfo == null) { return; } else { - CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + MutableConfScheduler mcs = (MutableConfScheduler) + rm.getResourceScheduler(); String queuePath = addInfo.getQueue(); + if (queuePath.lastIndexOf('.') == -1) { + throw new IOException("Can't add invalid queue " + queuePath); + } String queueName = queuePath.substring(queuePath.lastIndexOf('.') + 1); - if (cs.getQueue(queueName) != null) { + QueueInfo queueInfo = null; + try { + queueInfo = mcs.getQueueInfo(queueName, false, false); throw new IOException("Can't add existing queue " + queuePath); - } else if (queuePath.lastIndexOf('.') == -1) { - throw new IOException("Can't add invalid queue " + queuePath); + } catch (IOException e) { + // Do nothing. Exception is expected. } String parentQueue = queuePath.substring(0, queuePath.lastIndexOf('.')); - String[] siblings = proposedConf.getQueues(parentQueue); - List siblingQueues = siblings == null ? new ArrayList<>() : - new ArrayList<>(Arrays.asList(siblings)); - siblingQueues.add(queuePath.substring(queuePath.lastIndexOf('.') + 1)); - proposedConf.setQueues(parentQueue, - siblingQueues.toArray(new String[0])); + String parentQueueName = parentQueue.lastIndexOf('.') != -1 ? + parentQueue.substring(parentQueue.lastIndexOf('.') + 1) : + parentQueue; + List siblingQueues = new ArrayList<>(); + if (!queueInfos.containsKey(parentQueue)) { + try { + queueInfos.put(parentQueue, mcs.getQueueInfo(parentQueueName, true, + false)); + } catch (IOException e) { + // Parent might be added later in the same mutation request. + QueueInfo newQueueInfo = Records.newRecord(QueueInfo.class); + newQueueInfo.setQueueName(parentQueueName); + QueueInfo newChildQueueInfo = Records.newRecord(QueueInfo.class); + newChildQueueInfo.setQueueName(queueName); + List childQueues = new ArrayList<>(); + childQueues.add(newChildQueueInfo); + newQueueInfo.setChildQueues(childQueues); + queueInfos.put(parentQueue, newQueueInfo); + } + } + for (QueueInfo siblingQueueInfo : queueInfos.get(parentQueue).getChildQueues()) { + siblingQueues.add(siblingQueueInfo.getQueueName()); + } + siblingQueues.add(queueName); confUpdate.put(CapacitySchedulerConfiguration.PREFIX + parentQueue + CapacitySchedulerConfiguration.DOT + CapacitySchedulerConfiguration.QUEUES, @@ -2754,18 +2786,12 @@ private void addQueue( String keyPrefix = CapacitySchedulerConfiguration.PREFIX + queuePath + CapacitySchedulerConfiguration.DOT; for (Map.Entry kv : addInfo.getParams().entrySet()) { - if (kv.getValue() == null) { - proposedConf.unset(keyPrefix + kv.getKey()); - } else { - proposedConf.set(keyPrefix + kv.getKey(), kv.getValue()); - } confUpdate.put(keyPrefix + kv.getKey(), kv.getValue()); } } } private void updateQueue(QueueConfigInfo updateInfo, - CapacitySchedulerConfiguration proposedConf, Map confUpdate) { if (updateInfo == null) { return; @@ -2774,11 +2800,6 @@ private void updateQueue(QueueConfigInfo updateInfo, String keyPrefix = CapacitySchedulerConfiguration.PREFIX + queuePath + CapacitySchedulerConfiguration.DOT; for (Map.Entry kv : updateInfo.getParams().entrySet()) { - if (kv.getValue() == null) { - proposedConf.unset(keyPrefix + kv.getKey()); - } else { - proposedConf.set(keyPrefix + kv.getKey(), kv.getValue()); - } confUpdate.put(keyPrefix + kv.getKey(), kv.getValue()); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java index 254da31893..02559b6a7c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.junit.Before; @@ -47,7 +48,8 @@ private Map badUpdate; private CapacityScheduler cs; - private static final String TEST_USER = "testUser"; + private static final UserGroupInformation TEST_USER = UserGroupInformation + .createUserForTesting("testUser", new String[] {}); @Before public void setUp() {