diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml index 2bed464..30f4eb9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml @@ -108,4 +108,27 @@ + + yarn.scheduler.capacity.queue-mappings + + + A list of mappings that will be used to assign jobs to queues + The syntax for this list is [u|g]:[name]:[queue_name][,next mapping]* + Typically this list will be used to map users to queues, + for example, u:%user:%user maps all users to queues with the same name + as the user. + + + + + yarn.scheduler.capacity.queue-mappings-override.enable + false + + If a queue mapping is present, will it override the value specified + by the user? This can be used by administrators to place jobs in queues + that are different than the one specified by the user. + The default is false. + + + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 65ff81c..bfa8915 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -18,8 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; -import com.google.common.base.Preconditions; - import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; @@ -41,6 +39,7 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.Groups; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -76,6 +75,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping.MappingType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -93,6 +94,7 @@ import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; @LimitedPrivate("yarn") @Evolving @@ -198,6 +200,16 @@ public Configuration getConf() { + ".scheduling-interval-ms"; private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5; + private boolean overrideWithQueueMappings = false; + private List mappings = new ArrayList(); + private Groups groups; + + @VisibleForTesting + public synchronized String getMappedQueueForTest(String user) + throws IOException { + return getMappedQueue(user); + } + public CapacityScheduler() { super(CapacityScheduler.class.getName()); } @@ -262,7 +274,6 @@ private synchronized void initScheduler(Configuration configuration) throws this.applications = new ConcurrentHashMap>(); - initializeQueues(this.conf); scheduleAsynchronously = this.conf.getScheduleAynschronously(); @@ -401,7 +412,19 @@ public CSQueue hook(CSQueue queue) { } } private static final QueueHook noop = new QueueHook(); - + + private void initializeQueueMappings() { + overrideWithQueueMappings = conf.getOverrideWithQueueMappings(); + LOG.info("Initialized queue mappings, override: " + + overrideWithQueueMappings); + // Get new user/group mappings + mappings = conf.getQueueMappings(); + // initialize groups if mappings are present + if (mappings.size() > 0) { + groups = new Groups(conf); + } + } + @Lock(CapacityScheduler.class) private void initializeQueues(CapacitySchedulerConfiguration conf) throws IOException { @@ -409,7 +432,9 @@ private void initializeQueues(CapacitySchedulerConfiguration conf) root = parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, noop); + LOG.info("Initialized root queue " + root); + initializeQueueMappings(); } @Lock(CapacityScheduler.class) @@ -429,6 +454,7 @@ private void reinitializeQueues(CapacitySchedulerConfiguration conf) // Re-configure queues root.reinitialize(newRoot, clusterResource); + initializeQueueMappings(); } /** @@ -516,12 +542,72 @@ static CSQueue parseQueue( } synchronized CSQueue getQueue(String queueName) { + if (queueName == null) { + return null; + } return queues.get(queueName); } + private static final String CURRENT_USER_MAPPING = "%user"; + + private static final String PRIMARY_GROUP_MAPPING = "%primary_group"; + + private String getMappedQueue(String user) throws IOException { + for (QueueMapping mapping : mappings) { + if (mapping.type == MappingType.USER) { + if (mapping.source.equals(CURRENT_USER_MAPPING)) { + if (mapping.queue.equals(CURRENT_USER_MAPPING)) { + return user; + } + else if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) { + return groups.getGroups(user).get(0); + } + else { + return mapping.queue; + } + } + if (user.equals(mapping.source)) { + return mapping.queue; + } + } + if (mapping.type == MappingType.GROUP) { + for (String userGroups : groups.getGroups(user)) { + if (userGroups.equals(mapping.source)) { + return mapping.queue; + } + } + } + } + return null; + } + + private synchronized String getMappedQueueIfRequired( + ApplicationId applicationId, String user, String queueName) { + if (mappings != null && mappings.size() > 0) { + try { + String mappedQueue = getMappedQueue(user); + if (mappedQueue != null) { + // We have a mapping, should we use it? + if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME) + || overrideWithQueueMappings) { + LOG.info( + "Application " + applicationId + + " user " + user + " mapping [" + queueName + "] to [" + + mappedQueue + + "] override " + overrideWithQueueMappings); + queueName = mappedQueue; + } + } + } catch (IOException ioex) { + LOG.warn("map queue failed ", ioex); + } + } + return queueName; + } + private synchronized void addApplication(ApplicationId applicationId, - String queueName, String user, boolean isAppRecovering) { - // santiy checks. + String queueName, String user, boolean isAppRecovering) { + // sanity checks. CSQueue queue = getQueue(queueName); if (queue == null) { String message = "Application " + applicationId + @@ -899,8 +985,9 @@ public void handle(SchedulerEvent event) { { AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; addApplication(appAddedEvent.getApplicationId(), - appAddedEvent.getQueue(), appAddedEvent.getUser(), - appAddedEvent.getIsAppRecovering()); + getMappedQueueIfRequired(appAddedEvent.getApplicationId(), + appAddedEvent.getUser(), appAddedEvent.getQueue()), + appAddedEvent.getUser(), appAddedEvent.getIsAppRecovering()); } break; case APP_REMOVED: diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 6fe695e..878f582 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -18,8 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; -import java.util.HashMap; -import java.util.Map; +import java.util.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -146,6 +145,44 @@ @Private public static final boolean DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE = false; + @Private + public static final String QUEUE_MAPPING = PREFIX + "queue-mappings"; + + @Private + public static final String ENABLE_QUEUE_MAPPING_OVERRIDE = QUEUE_MAPPING + "-override.enable"; + + @Private + public static final boolean DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE = false; + + @Private + public static class QueueMapping { + + public enum MappingType { + + USER("u"), + GROUP("g"); + private final String type; + private MappingType(String type) { + this.type = type; + } + + public String toString() { + return type; + } + + }; + + MappingType type; + String source; + String queue; + + public QueueMapping(MappingType type, String source, String queue) { + this.type = type; + this.source = source; + this.queue = queue; + } + } + public CapacitySchedulerConfiguration() { this(new Configuration()); } @@ -378,4 +415,82 @@ public void setScheduleAynschronously(boolean async) { setBoolean(SCHEDULE_ASYNCHRONOUSLY_ENABLE, async); } + public boolean getOverrideWithQueueMappings() { + return getBoolean(ENABLE_QUEUE_MAPPING_OVERRIDE, + DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE); + } + + /** + * Returns a collection of strings, trimming leading and trailing whitespeace + * on each value + * + * @param str + * String to parse + * @param delim + * delimiter to separate the values + * @return Collection of parsed elements. + */ + private static Collection getTrimmedStringCollection(String str, + String delim) { + List values = new ArrayList(); + if (str == null) + return values; + StringTokenizer tokenizer = new StringTokenizer(str, delim); + while (tokenizer.hasMoreTokens()) { + String next = tokenizer.nextToken(); + if (next == null || next.trim().isEmpty()) { + continue; + } + values.add(next.trim()); + } + return values; + } + + /** + * Get user/group mappings to queues. + * + * @return user/groups mappings or null on illegal configs + */ + public List getQueueMappings() { + List mappings = + new ArrayList(); + Collection mappingsString = + getTrimmedStringCollection(QUEUE_MAPPING); + for (String mappingValue : mappingsString) { + String[] mapping = + getTrimmedStringCollection(mappingValue, ":") + .toArray(new String[] {}); + if (mapping.length != 3 || mapping[1].length() == 0 + || mapping[2].length() == 0) { + throw new IllegalArgumentException( + "Illegal queue mapping " + mappingValue); + } + + QueueMapping m; + try { + QueueMapping.MappingType mappingType; + if (mapping[0].equals("u")) { + mappingType = QueueMapping.MappingType.USER; + } else if (mapping[0].equals("g")) { + mappingType = QueueMapping.MappingType.GROUP; + } else { + throw new IllegalArgumentException( + "unknown mapping prefix " + mapping[0]); + } + m = new QueueMapping( + mappingType, + mapping[1], + mapping[2]); + } catch (Throwable t) { + throw new IllegalArgumentException( + "Illegal queue mapping " + mappingValue); + } + + if (m != null) { + mappings.add(m); + } + } + + return mappings; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java new file mode 100644 index 0000000..e225a4c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java @@ -0,0 +1,259 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.*; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.mockito.Mockito; + +import org.junit.Assert; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.when; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.security.GroupMappingServiceProvider; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SimpleGroupsMapping; +import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.util.HashMap; + +public class TestQueueMappings { + + private static final Log LOG = LogFactory.getLog(TestQueueMappings.class); + + private static final String Q1 = "q1"; + private static final String Q2 = "q2"; + + private final static String Q1_PATH = + CapacitySchedulerConfiguration.ROOT + "." + Q1; + private final static String Q2_PATH = + CapacitySchedulerConfiguration.ROOT + "." + Q2; + + private MockRM resourceManager; + + @After + public void tearDown() throws Exception { + if (resourceManager != null) { + LOG.info("Stopping the resource manager"); + resourceManager.stop(); + } + } + + private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) { + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { Q1, Q2 }); + + conf.setCapacity(Q1_PATH, 10); + conf.setCapacity(Q2_PATH, 90); + + LOG.info("Setup top-level queues q1 and q2"); + } + + @Test + public void testQueueMapping() throws Exception { + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + setupQueueConfiguration(csConf); + YarnConfiguration conf = new YarnConfiguration(csConf); + CapacityScheduler cs = new CapacityScheduler(); + + RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null, + null, new RMContainerTokenSecretManager(conf), + new NMTokenSecretManagerInRM(conf), + new ClientToAMTokenSecretManagerInRM(), null); + cs.setConf(conf); + cs.setRMContext(rmContext); + cs.init(conf); + cs.start(); + + conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING, + SimpleGroupsMapping.class, GroupMappingServiceProvider.class); + conf.set(CapacitySchedulerConfiguration.ENABLE_QUEUE_MAPPING_OVERRIDE, + "true"); + + // configuration parsing tests - negative test cases + checkInvalidQMapping(conf, cs, "x:a:b", "invalid specifier"); + checkInvalidQMapping(conf, cs, "u:a", "no queue specified"); + checkInvalidQMapping(conf, cs, "g:a", "no queue specified"); + checkInvalidQMapping(conf, cs, "u:a:b,g:a", + "multiple mappings with invalid mapping"); + checkInvalidQMapping(conf, cs, "u:a:b,g:a:d:e", "too many path segments"); + checkInvalidQMapping(conf, cs, "u::", "empty source and queue"); + checkInvalidQMapping(conf, cs, "u:", "missing source missing queue"); + checkInvalidQMapping(conf, cs, "u:a:", "empty source missing q"); + + // simple base case for mapping user to queue + conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:a:b"); + cs.reinitialize(conf, null); + checkQMapping("a", "b", cs); + + // group mapping test + conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "g:agroup:q1"); + cs.reinitialize(conf, null); + checkQMapping("a", "q1", cs); + + // %user tests + conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:%user:q2"); + cs.reinitialize(conf, null); + checkQMapping("a", "q2", cs); + + conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:%user:%user"); + cs.reinitialize(conf, null); + checkQMapping("a", "a", cs); + + // %primary_group tests + conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, + "u:%user:%primary_group"); + cs.reinitialize(conf, null); + checkQMapping("a", "agroup", cs); + + // non-primary group mapping + conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "g:asubgroup1:q3"); + cs.reinitialize(conf, null); + checkQMapping("a", "q3", cs); + + // space trimming + conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, " u : a : b"); + cs.reinitialize(conf, null); + checkQMapping("a", "b", cs); + + csConf = new CapacitySchedulerConfiguration(); + setupQueueConfiguration(csConf); + conf = new YarnConfiguration(csConf); + + resourceManager = new MockRM(csConf); + resourceManager.start(); + + conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING, + SimpleGroupsMapping.class, GroupMappingServiceProvider.class); + conf.set(CapacitySchedulerConfiguration.ENABLE_QUEUE_MAPPING_OVERRIDE, + "true"); + conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:user:" + Q1); + resourceManager.getResourceScheduler().reinitialize(conf, null); + + // ensure that if the user specifies a Q that is still overriden + checkAppQueue(resourceManager, "user", Q2, Q1); + + // toggle admin override and retry + conf.setBoolean( + CapacitySchedulerConfiguration.ENABLE_QUEUE_MAPPING_OVERRIDE, + false); + conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:user:" + Q1); + setupQueueConfiguration(csConf); + resourceManager.getResourceScheduler().reinitialize(conf, null); + + checkAppQueue(resourceManager, "user", Q2, Q2); + + // ensure that if a user does not specify a Q, the user mapping is used + checkAppQueue(resourceManager, "user", null, Q1); + + conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "g:usergroup:" + Q2); + setupQueueConfiguration(csConf); + resourceManager.getResourceScheduler().reinitialize(conf, null); + + // ensure that if a user does not specify a Q, the group mapping is used + checkAppQueue(resourceManager, "user", null, Q2); + + // if the mapping specifies a queue that does not exist, the job is rejected + conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, + "u:user:non_existent_queue"); + setupQueueConfiguration(csConf); + resourceManager.getResourceScheduler().reinitialize(conf, null); + checkAppQueue(resourceManager, "user", null, ""); + + resourceManager.stop(); + } + + private void checkAppQueue(MockRM resourceManager, String user, + String submissionQueue, String expected) + throws Exception { + RMApp app = resourceManager.submitApp(200, "name", user, + new HashMap(), false, submissionQueue, -1, + null, "MAPREDUCE", false); + RMAppState expectedState = expected.isEmpty() ? RMAppState.FAILED + : RMAppState.ACCEPTED; + resourceManager.waitForState(app.getApplicationId(), expectedState); + // get scheduler app + CapacityScheduler cs = (CapacityScheduler) + resourceManager.getResourceScheduler(); + SchedulerApplication schedulerApp = + cs.getSchedulerApplications().get(app.getApplicationId()); + String queue = ""; + if (schedulerApp != null) { + queue = schedulerApp.getQueue().getQueueName(); + } + Assert.assertTrue("expected " + expected + " actual " + queue, + expected.equals(queue)); + } + + private void checkInvalidQMapping(YarnConfiguration conf, + CapacityScheduler cs, + String mapping, String reason) + throws IOException { + boolean fail = false; + try { + conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, mapping); + cs.reinitialize(conf, null); + } catch (IOException ex) { + fail = true; + } + Assert.assertTrue("invalid mapping did not throw exception for " + reason, + fail); + } + + private void checkQMapping(String user, String expected, CapacityScheduler cs) + throws IOException { + String actual = cs.getMappedQueueForTest(user); + Assert.assertTrue("expected " + expected + " actual " + actual, + expected.equals(actual)); + } +}