diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueueContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueueContext.java new file mode 100644 index 0000000..483ea55 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueueContext.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import java.util.Map; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + * Context of the Queues in Scheduler. + */ +@Unstable +@Private +public interface SchedulerQueueContext { + + public T getRootQueue(); + + public Map getQueues(); + + public void removeQueue(String queueName); + + public void addQueue(String queueName, T queue); + + public T getQueue(String queueName); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 10df751..cf52924 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -67,7 +67,6 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; -import org.apache.hadoop.yarn.security.Permission; import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; @@ -150,9 +149,9 @@ PreemptableResourceScheduler, CapacitySchedulerContext, Configurable { private static final Log LOG = LogFactory.getLog(CapacityScheduler.class); - private YarnAuthorizationProvider authorizer; - private CSQueue root; + private CapacitySchedulerQueueContext queueContext; + // timeout to join when we stop this service protected final long THREAD_JOIN_TIMEOUT_MS = 1000; @@ -160,23 +159,6 @@ private volatile boolean isLazyPreemptionEnabled = false; - static final Comparator nonPartitionedQueueComparator = - new Comparator() { - @Override - public int compare(CSQueue q1, CSQueue q2) { - if (q1.getUsedCapacity() < q2.getUsedCapacity()) { - return -1; - } else if (q1.getUsedCapacity() > q2.getUsedCapacity()) { - return 1; - } - - return q1.getQueuePath().compareTo(q2.getQueuePath()); - } - }; - - static final PartitionedQueueComparator partitionedQueueComparator = - new PartitionedQueueComparator(); - @Override public void setConf(Configuration conf) { yarnConf = conf; @@ -228,8 +210,6 @@ public Configuration getConf() { private CapacitySchedulerConfiguration conf; private Configuration yarnConf; - private Map queues = new ConcurrentHashMap(); - private ResourceCalculator calculator; private boolean usePortForNodeName; @@ -254,11 +234,11 @@ public CapacityScheduler() { @Override public QueueMetrics getRootQueueMetrics() { - return root.getMetrics(); + return getRootQueue().getMetrics(); } public CSQueue getRootQueue() { - return root; + return queueContext.getRootQueue(); } @Override @@ -278,12 +258,12 @@ public ResourceCalculator getResourceCalculator() { @Override public Comparator getNonPartitionedQueueComparator() { - return nonPartitionedQueueComparator; + return CapacitySchedulerQueueContext.nonPartitionedQueueComparator; } @Override public PartitionedQueueComparator getPartitionedQueueComparator() { - return partitionedQueueComparator; + return CapacitySchedulerQueueContext.partitionedQueueComparator; } @Override @@ -313,7 +293,10 @@ private void initScheduler(Configuration configuration) throws this.usePortForNodeName = this.conf.getUsePortForNodeName(); this.applications = new ConcurrentHashMap<>(); this.labelManager = rmContext.getNodeLabelManager(); - authorizer = YarnAuthorizationProvider.getInstance(yarnConf); + this.queueContext = new CapacitySchedulerQueueContext(); + this.queueContext.setCapacitySchedulerContext(this); + this.queueContext.setYarnAuthorizationProvider( + YarnAuthorizationProvider.getInstance(yarnConf)); this.activitiesManager = new ActivitiesManager(rmContext); activitiesManager.init(conf); initializeQueues(this.conf); @@ -472,7 +455,6 @@ public CSQueue hook(CSQueue queue) { return queue; } } - private static final QueueHook noop = new QueueHook(); @VisibleForTesting public UserGroupMappingPlacementRule @@ -491,7 +473,7 @@ public CSQueue hook(CSQueue queue) { if (!mappingQueue.equals( UserGroupMappingPlacementRule.CURRENT_USER_MAPPING) && !mappingQueue .equals(UserGroupMappingPlacementRule.PRIMARY_GROUP_MAPPING)) { - CSQueue queue = queues.get(mappingQueue); + CSQueue queue = getQueue(mappingQueue); if (queue == null || !(queue instanceof LeafQueue)) { throw new IOException( "mapping contains invalid or non-leaf queue " + mappingQueue); @@ -529,64 +511,30 @@ private void updatePlacementRules() throws IOException { private void initializeQueues(CapacitySchedulerConfiguration conf) throws IOException { - root = - parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT, - queues, queues, noop); + this.queueContext.initializeQueues(conf); labelManager.reinitializeQueueLabels(getQueueToLabels()); - LOG.info("Initialized root queue " + root); updatePlacementRules(); - setQueueAcls(authorizer, queues); // Notify Preemption Manager - preemptionManager.refreshQueues(null, root); + preemptionManager.refreshQueues(null, this.getRootQueue()); } @Lock(CapacityScheduler.class) private void reinitializeQueues(CapacitySchedulerConfiguration newConf) throws IOException { // Parse new queues - Map newQueues = new HashMap(); - CSQueue newRoot = - parseQueue(this, newConf, null, CapacitySchedulerConfiguration.ROOT, - newQueues, queues, noop); - - // Ensure all existing queues are still present - validateExistingQueues(queues, newQueues); - - // Add new queues - addNewQueues(queues, newQueues); - - // Re-configure queues - root.reinitialize(newRoot, getClusterResource()); + this.queueContext.reinitializeQueues(newConf); updatePlacementRules(); - // Re-calculate headroom for active applications - Resource clusterResource = getClusterResource(); - root.updateClusterResource(clusterResource, new ResourceLimits( - clusterResource)); - labelManager.reinitializeQueueLabels(getQueueToLabels()); - setQueueAcls(authorizer, queues); // Notify Preemption Manager - preemptionManager.refreshQueues(null, root); - } - - @VisibleForTesting - public static void setQueueAcls(YarnAuthorizationProvider authorizer, - Map queues) throws IOException { - List permissions = new ArrayList<>(); - for (CSQueue queue : queues.values()) { - AbstractCSQueue csQueue = (AbstractCSQueue) queue; - permissions.add( - new Permission(csQueue.getPrivilegedEntity(), csQueue.getACLs())); - } - authorizer.setPermission(permissions, UserGroupInformation.getCurrentUser()); + preemptionManager.refreshQueues(null, this.getRootQueue()); } private Map> getQueueToLabels() { Map> queueToLabels = new HashMap>(); - for (CSQueue queue : queues.values()) { + for (CSQueue queue : this.queueContext.getQueues().values()) { queueToLabels.put(queue.getQueueName(), queue.getAccessibleNodeLabels()); } return queueToLabels; @@ -618,95 +566,11 @@ private void validateExistingQueues( } } - /** - * Add the new queues (only) to our list of queues... - * ... be careful, do not overwrite existing queues. - * @param queues - * @param newQueues - */ - @Lock(CapacityScheduler.class) - private void addNewQueues( - Map queues, Map newQueues) - { - for (Map.Entry e : newQueues.entrySet()) { - String queueName = e.getKey(); - CSQueue queue = e.getValue(); - if (!queues.containsKey(queueName)) { - queues.put(queueName, queue); - } - } - } - - @Lock(CapacityScheduler.class) - static CSQueue parseQueue( - CapacitySchedulerContext csContext, - CapacitySchedulerConfiguration conf, - CSQueue parent, String queueName, Map queues, - Map oldQueues, - QueueHook hook) throws IOException { - CSQueue queue; - String fullQueueName = - (parent == null) ? queueName - : (parent.getQueuePath() + "." + queueName); - String[] childQueueNames = - conf.getQueues(fullQueueName); - boolean isReservableQueue = conf.isReservable(fullQueueName); - if (childQueueNames == null || childQueueNames.length == 0) { - if (null == parent) { - throw new IllegalStateException( - "Queue configuration missing child queue names for " + queueName); - } - // Check if the queue will be dynamically managed by the Reservation - // system - if (isReservableQueue) { - queue = - new PlanQueue(csContext, queueName, parent, - oldQueues.get(queueName)); - } else { - queue = - new LeafQueue(csContext, queueName, parent, - oldQueues.get(queueName)); - - // Used only for unit tests - queue = hook.hook(queue); - } - } else { - if (isReservableQueue) { - throw new IllegalStateException( - "Only Leaf Queues can be reservable for " + queueName); - } - ParentQueue parentQueue = - new ParentQueue(csContext, queueName, parent, oldQueues.get(queueName)); - - // Used only for unit tests - queue = hook.hook(parentQueue); - - List childQueues = new ArrayList(); - for (String childQueueName : childQueueNames) { - CSQueue childQueue = - parseQueue(csContext, conf, queue, childQueueName, - queues, oldQueues, hook); - childQueues.add(childQueue); - } - parentQueue.setChildQueues(childQueues); - } - - if (queue instanceof LeafQueue && queues.containsKey(queueName) - && queues.get(queueName) instanceof LeafQueue) { - throw new IOException("Two leaf queues were named " + queueName - + ". Leaf queue names must be distinct"); - } - queues.put(queueName, queue); - - LOG.info("Initialized queue: " + queue); - return queue; - } - public CSQueue getQueue(String queueName) { if (queueName == null) { return null; } - return queues.get(queueName); + return this.queueContext.getQueue(queueName); } private void addApplicationOnRecovery( @@ -953,7 +817,7 @@ private void doneApplicationAttempt( // Inform the queue String queueName = attempt.getQueue().getQueueName(); - CSQueue queue = queues.get(queueName); + CSQueue queue = getQueue(queueName); if (!(queue instanceof LeafQueue)) { LOG.error( "Cannot finish application " + "from non-leaf queue: " + queueName); @@ -1079,7 +943,7 @@ public QueueInfo getQueueInfo(String queueName, boolean includeChildQueues, boolean recursive) throws IOException { CSQueue queue = null; - queue = this.queues.get(queueName); + queue = getQueue(queueName); if (queue == null) { throw new IOException("Unknown queue: " + queueName); } @@ -1097,7 +961,7 @@ public QueueInfo getQueueInfo(String queueName, return new ArrayList(); } - return root.getQueueUserAclInfo(user); + return getRootQueue().getQueueUserAclInfo(user); } private void nodeUpdate(RMNode nm) { @@ -1196,7 +1060,7 @@ private void updateNodeAndQueueResource(RMNode nm, writeLock.lock(); updateNodeResource(nm, resourceOption); Resource clusterResource = getClusterResource(); - root.updateClusterResource(clusterResource, + getRootQueue().updateClusterResource(clusterResource, new ResourceLimits(clusterResource)); } finally { writeLock.unlock(); @@ -1368,7 +1232,8 @@ public void allocateContainersToNode(FiCaSchedulerNode node) { + ", available: " + node.getUnallocatedResource()); } - assignment = root.assignContainers(getClusterResource(), node, + assignment = getRootQueue().assignContainers( + getClusterResource(), node, new ResourceLimits(labelManager .getResourceByLabel(node.getPartition(), getClusterResource())), @@ -1399,7 +1264,8 @@ public void allocateContainersToNode(FiCaSchedulerNode node) { } // Try to use NON_EXCLUSIVE - assignment = root.assignContainers(getClusterResource(), node, + assignment = getRootQueue().assignContainers( + getClusterResource(), node, // TODO, now we only consider limits for parent for non-labeled // resources, should consider labeled resources as well. new ResourceLimits(labelManager @@ -1581,7 +1447,7 @@ private void addNode(RMNode nodeManager) { } Resource clusterResource = getClusterResource(); - root.updateClusterResource(clusterResource, + getRootQueue().updateClusterResource(clusterResource, new ResourceLimits(clusterResource)); LOG.info( @@ -1630,7 +1496,7 @@ private void removeNode(RMNode nodeInfo) { nodeTracker.removeNode(nodeId); Resource clusterResource = getClusterResource(); - root.updateClusterResource(clusterResource, + getRootQueue().updateClusterResource(clusterResource, new ResourceLimits(clusterResource)); int numNodes = nodeTracker.nodeCount(); @@ -1866,7 +1732,7 @@ public boolean checkAccess(UserGroupInformation callerUGI, @Override public List getAppsInQueue(String queueName) { - CSQueue queue = queues.get(queueName); + CSQueue queue = getQueue(queueName); if (queue == null) { return null; } @@ -1970,7 +1836,7 @@ public void removeQueue(String queueName) } ((PlanQueue) disposableLeafQueue.getParent()).removeChildQueue(q); - this.queues.remove(queueName); + this.queueContext.removeQueue(queueName); LOG.info("Removal of ReservationQueue " + queueName + " has succeeded"); } finally { writeLock.unlock(); @@ -1999,7 +1865,7 @@ public void addQueue(Queue queue) PlanQueue parentPlan = (PlanQueue) newQueue.getParent(); String queuename = newQueue.getQueueName(); parentPlan.addChildQueue(newQueue); - this.queues.put(queuename, newQueue); + this.queueContext.addQueue(queuename, newQueue); LOG.info("Creation of ReservationQueue " + newQueue + " succeeded"); } finally { writeLock.unlock(); @@ -2186,7 +2052,7 @@ private String handleMoveToPlanQueue(String targetQueueName) { @Override public Set getPlanQueues() { Set ret = new HashSet(); - for (Map.Entry l : queues.entrySet()) { + for (Map.Entry l : queueContext.getQueues().entrySet()) { if (l.getValue() instanceof PlanQueue) { ret.add(l.getKey()); } @@ -2309,6 +2175,6 @@ public PreemptionManager getPreemptionManager() { @Override public ResourceUsage getClusterResourceUsage() { - return root.getQueueResourceUsage(); + return getRootQueue().getQueueResourceUsage(); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueContext.java new file mode 100644 index 0000000..2deb764 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueContext.java @@ -0,0 +1,260 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.security.Permission; +import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueueContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler.QueueHook; + +/** + * Context of the Queues in Capacity Scheduler. + */ +@Unstable +@Private +public class CapacitySchedulerQueueContext implements + SchedulerQueueContext { + + private static final Log LOG = LogFactory.getLog( + CapacitySchedulerQueueContext.class); + + static final Comparator nonPartitionedQueueComparator = + new Comparator() { + @Override + public int compare(CSQueue q1, CSQueue q2) { + if (q1.getUsedCapacity() < q2.getUsedCapacity()) { + return -1; + } else if (q1.getUsedCapacity() > q2.getUsedCapacity()) { + return 1; + } + + return q1.getQueuePath().compareTo(q2.getQueuePath()); + } + }; + + static final PartitionedQueueComparator partitionedQueueComparator = + new PartitionedQueueComparator(); + + private static final QueueHook noop = new QueueHook(); + + private CapacitySchedulerContext csContext; + private YarnAuthorizationProvider authorizer; + + private Map queues = new ConcurrentHashMap(); + + private CSQueue root; + + public CapacitySchedulerQueueContext() {} + + @Override + public CSQueue getRootQueue() { + return this.root; + } + + @Override + public Map getQueues() { + return queues; + } + + @Override + public void removeQueue(String queueName) { + this.queues.remove(queueName); + } + + @Override + public void addQueue(String queueName, CSQueue queue) { + this.queues.put(queueName, queue); + } + + @Override + public CSQueue getQueue(String queueName) { + return queues.get(queueName); + } + + public void setCapacitySchedulerContext(CapacitySchedulerContext csContext) { + this.csContext = csContext; + } + + public void setYarnAuthorizationProvider(YarnAuthorizationProvider authorizer) { + this.authorizer = authorizer; + } + + public void initializeQueues(CapacitySchedulerConfiguration conf) + throws IOException { + root = parseQueue(this.csContext, conf, null, + CapacitySchedulerConfiguration.ROOT, queues, queues, noop); + setQueueAcls(authorizer, queues); + LOG.info("Initialized root queue " + root); + } + + public void reinitializeQueues(CapacitySchedulerConfiguration newConf) + throws IOException { + // Parse new queues + Map newQueues = new HashMap(); + CSQueue newRoot = parseQueue(this.csContext, newConf, null, + CapacitySchedulerConfiguration.ROOT, newQueues, queues, noop); + + // Ensure all existing queues are still present + validateExistingQueues(queues, newQueues); + + // Add new queues + addNewQueues(queues, newQueues); + + // Re-configure queues + root.reinitialize(newRoot, this.csContext.getClusterResource()); + + setQueueAcls(authorizer, queues); + + // Re-calculate headroom for active applications + Resource clusterResource = this.csContext.getClusterResource(); + root.updateClusterResource(clusterResource, new ResourceLimits( + clusterResource)); + } + + static CSQueue parseQueue( + CapacitySchedulerContext csContext, + CapacitySchedulerConfiguration conf, + CSQueue parent, String queueName, Map queues, + Map oldQueues, + QueueHook hook) throws IOException { + CSQueue queue; + String fullQueueName = + (parent == null) ? queueName + : (parent.getQueuePath() + "." + queueName); + String[] childQueueNames = + conf.getQueues(fullQueueName); + boolean isReservableQueue = conf.isReservable(fullQueueName); + if (childQueueNames == null || childQueueNames.length == 0) { + if (null == parent) { + throw new IllegalStateException( + "Queue configuration missing child queue names for " + queueName); + } + // Check if the queue will be dynamically managed by the Reservation + // system + if (isReservableQueue) { + queue = + new PlanQueue(csContext, queueName, parent, + oldQueues.get(queueName)); + } else { + queue = + new LeafQueue(csContext, queueName, parent, + oldQueues.get(queueName)); + + // Used only for unit tests + queue = hook.hook(queue); + } + } else { + if (isReservableQueue) { + throw new IllegalStateException( + "Only Leaf Queues can be reservable for " + queueName); + } + ParentQueue parentQueue = + new ParentQueue(csContext, queueName, parent, oldQueues.get(queueName)); + + // Used only for unit tests + queue = hook.hook(parentQueue); + + List childQueues = new ArrayList(); + for (String childQueueName : childQueueNames) { + CSQueue childQueue = + parseQueue(csContext, conf, queue, childQueueName, + queues, oldQueues, hook); + childQueues.add(childQueue); + } + parentQueue.setChildQueues(childQueues); + } + + if (queue instanceof LeafQueue && queues.containsKey(queueName) + && queues.get(queueName) instanceof LeafQueue) { + throw new IOException("Two leaf queues were named " + queueName + + ". Leaf queue names must be distinct"); + } + queues.put(queueName, queue); + LOG.info("Initialized queue: " + queue); + return queue; + } + + /** + * Ensure all existing queues are present. Queues cannot be deleted + * @param queues existing queues + * @param newQueues new queues + */ + private void validateExistingQueues( + Map queues, Map newQueues) + throws IOException { + // check that all static queues are included in the newQueues list + for (Map.Entry e : queues.entrySet()) { + if (!(e.getValue() instanceof ReservationQueue)) { + String queueName = e.getKey(); + CSQueue oldQueue = e.getValue(); + CSQueue newQueue = newQueues.get(queueName); + if (null == newQueue) { + throw new IOException(queueName + " cannot be found during refresh!"); + } else if (!oldQueue.getQueuePath().equals(newQueue.getQueuePath())) { + throw new IOException(queueName + " is moved from:" + + oldQueue.getQueuePath() + " to:" + newQueue.getQueuePath() + + " after refresh, which is not allowed."); + } + } + } + } + + /** + * Add the new queues (only) to our list of queues... + * ... be careful, do not overwrite existing queues. + * @param queues + * @param newQueues + */ + private void addNewQueues( + Map queues, Map newQueues) { + for (Map.Entry e : newQueues.entrySet()) { + String queueName = e.getKey(); + CSQueue queue = e.getValue(); + if (!queues.containsKey(queueName)) { + queues.put(queueName, queue); + } + } + } + + @VisibleForTesting + public static void setQueueAcls(YarnAuthorizationProvider authorizer, + Map queues) throws IOException { + List permissions = new ArrayList<>(); + for (CSQueue queue : queues.values()) { + AbstractCSQueue csQueue = (AbstractCSQueue) queue; + permissions.add( + new Permission(csQueue.getPrivilegedEntity(), csQueue.getACLs())); + } + authorizer.setPermission(permissions, UserGroupInformation.getCurrentUser()); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index 175f5bb..7886eba 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -98,7 +98,8 @@ public void setUp() throws IOException { when(csContext.getClusterResource()). thenReturn(Resources.createResource(10 * 16 * GB, 10 * 32)); when(csContext.getNonPartitionedQueueComparator()). - thenReturn(CapacityScheduler.nonPartitionedQueueComparator); + thenReturn( + CapacitySchedulerQueueContext.nonPartitionedQueueComparator); when(csContext.getResourceCalculator()). thenReturn(resourceCalculator); when(csContext.getRMContext()).thenReturn(rmContext); @@ -111,7 +112,7 @@ public void setUp() throws IOException { Map queues = new HashMap(); CSQueue root = - CapacityScheduler.parseQueue(csContext, csConf, null, "root", + CapacitySchedulerQueueContext.parseQueue(csContext, csConf, null, "root", queues, queues, TestUtils.spyHook); @@ -263,7 +264,8 @@ public void testLimitsComputation() throws Exception { when(csContext.getMaximumResourceCapability()). thenReturn(Resources.createResource(16*GB, 16)); when(csContext.getNonPartitionedQueueComparator()). - thenReturn(CapacityScheduler.nonPartitionedQueueComparator); + thenReturn( + CapacitySchedulerQueueContext.nonPartitionedQueueComparator); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); when(csContext.getRMContext()).thenReturn(rmContext); when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager()); @@ -275,7 +277,7 @@ public void testLimitsComputation() throws Exception { Map queues = new HashMap(); CSQueue root = - CapacityScheduler.parseQueue(csContext, csConf, null, "root", + CapacitySchedulerQueueContext.parseQueue(csContext, csConf, null, "root", queues, queues, TestUtils.spyHook); LeafQueue queue = (LeafQueue)queues.get(A); @@ -347,7 +349,7 @@ public void testLimitsComputation() throws Exception { // Re-create queues to get new configs. queues = new HashMap(); root = - CapacityScheduler.parseQueue(csContext, csConf, null, "root", + CapacitySchedulerQueueContext.parseQueue(csContext, csConf, null, "root", queues, queues, TestUtils.spyHook); clusterResource = Resources.createResource(100 * 16 * GB); @@ -371,7 +373,7 @@ public void testLimitsComputation() throws Exception { // Re-create queues to get new configs. queues = new HashMap(); root = - CapacityScheduler.parseQueue(csContext, csConf, null, "root", + CapacitySchedulerQueueContext.parseQueue(csContext, csConf, null, "root", queues, queues, TestUtils.spyHook); queue = (LeafQueue)queues.get(A); @@ -572,7 +574,8 @@ public void testHeadroom() throws Exception { when(csContext.getMaximumResourceCapability()). thenReturn(Resources.createResource(16*GB)); when(csContext.getNonPartitionedQueueComparator()). - thenReturn(CapacityScheduler.nonPartitionedQueueComparator); + thenReturn( + CapacitySchedulerQueueContext.nonPartitionedQueueComparator); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); when(csContext.getRMContext()).thenReturn(rmContext); @@ -581,8 +584,8 @@ public void testHeadroom() throws Exception { when(csContext.getClusterResource()).thenReturn(clusterResource); Map queues = new HashMap(); - CSQueue rootQueue = CapacityScheduler.parseQueue(csContext, csConf, null, - "root", queues, queues, TestUtils.spyHook); + CSQueue rootQueue = CapacitySchedulerQueueContext.parseQueue(csContext, + csConf, null, "root", queues, queues, TestUtils.spyHook); ResourceUsage queueCapacities = rootQueue.getQueueResourceUsage(); when(csContext.getClusterResourceUsage()) diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java index d335552..e33667a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java @@ -595,7 +595,8 @@ public void testHeadroom() throws Exception { when(csContext.getMaximumResourceCapability()) .thenReturn(Resources.createResource(16 * GB)); when(csContext.getNonPartitionedQueueComparator()) - .thenReturn(CapacityScheduler.nonPartitionedQueueComparator); + .thenReturn( + CapacitySchedulerQueueContext.nonPartitionedQueueComparator); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); RMContext rmContext = TestUtils.getMockRMContext(); RMContext spyRMContext = spy(rmContext); @@ -614,8 +615,8 @@ public void testHeadroom() throws Exception { when(csContext.getClusterResource()).thenReturn(clusterResource); Map queues = new HashMap(); - CSQueue rootQueue = CapacityScheduler.parseQueue(csContext, csConf, null, - "root", queues, queues, TestUtils.spyHook); + CSQueue rootQueue = CapacitySchedulerQueueContext.parseQueue( + csContext, csConf, null, "root", queues, queues, TestUtils.spyHook); ResourceUsage queueResUsage = rootQueue.getQueueResourceUsage(); when(csContext.getClusterResourceUsage()) diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java index 7f4fc2c..0c06c83 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java @@ -96,7 +96,7 @@ public void setUp() throws Exception { when(csContext.getClusterResource()). thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32)); when(csContext.getNonPartitionedQueueComparator()). - thenReturn(CapacityScheduler.nonPartitionedQueueComparator); + thenReturn(CapacitySchedulerQueueContext.nonPartitionedQueueComparator); when(csContext.getResourceCalculator()). thenReturn(resourceComparator); when(csContext.getRMContext()).thenReturn(rmContext); @@ -219,9 +219,9 @@ public void testSortedQueues() throws Exception { setupSortedQueues(csConf); Map queues = new HashMap(); CSQueue root = - CapacityScheduler.parseQueue(csContext, csConf, null, - CapacitySchedulerConfiguration.ROOT, queues, queues, - TestUtils.spyHook); + CapacitySchedulerQueueContext.parseQueue(csContext, csConf, null, + CapacitySchedulerConfiguration.ROOT, queues, queues, + TestUtils.spyHook); // Setup some nodes final int memoryPerNode = 10; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 25029f2..3909192 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -172,7 +172,8 @@ private void setUpInternal(ResourceCalculator rC) throws Exception { when(csContext.getClusterResource()). thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32)); when(csContext.getNonPartitionedQueueComparator()). - thenReturn(CapacityScheduler.nonPartitionedQueueComparator); + thenReturn( + CapacitySchedulerQueueContext.nonPartitionedQueueComparator); when(csContext.getResourceCalculator()). thenReturn(resourceCalculator); when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager()); @@ -185,9 +186,9 @@ private void setUpInternal(ResourceCalculator rC) throws Exception { containerTokenSecretManager); root = - CapacityScheduler.parseQueue(csContext, csConf, null, - CapacitySchedulerConfiguration.ROOT, - queues, queues, + CapacitySchedulerQueueContext.parseQueue(csContext, csConf, null, + CapacitySchedulerConfiguration.ROOT, + queues, queues, TestUtils.spyHook); ResourceUsage queueResUsage = root.getQueueResourceUsage(); @@ -2176,7 +2177,7 @@ public void testActivateApplicationAfterQueueRefresh() throws Exception { .DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT * 2); Map newQueues = new HashMap(); CSQueue newRoot = - CapacityScheduler.parseQueue(csContext, csConf, null, + CapacitySchedulerQueueContext.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, newQueues, queues, TestUtils.spyHook); @@ -2201,7 +2202,7 @@ public void testNodeLocalityAfterQueueRefresh() throws Exception { .NODE_LOCALITY_DELAY, 60); Map newQueues = new HashMap(); CSQueue newRoot = - CapacityScheduler.parseQueue(csContext, csConf, null, + CapacitySchedulerQueueContext.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, newQueues, queues, TestUtils.spyHook); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java index 890e998..cf1ad12 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java @@ -92,7 +92,7 @@ public void setUp() throws Exception { when(csContext.getClusterResource()). thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32)); when(csContext.getNonPartitionedQueueComparator()). - thenReturn(CapacityScheduler.nonPartitionedQueueComparator); + thenReturn(CapacitySchedulerQueueContext.nonPartitionedQueueComparator); when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager()); when(csContext.getResourceCalculator()). thenReturn(resourceComparator); @@ -205,7 +205,7 @@ public void testSingleLevelQueues() throws Exception { Map queues = new HashMap(); CSQueue root = - CapacityScheduler.parseQueue(csContext, csConf, null, + CapacitySchedulerQueueContext.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); @@ -309,7 +309,7 @@ public void testSingleLevelQueuesPrecision() throws Exception { Map queues = new HashMap(); boolean exceptionOccured = false; try { - CapacityScheduler.parseQueue(csContext, csConf, null, + CapacitySchedulerQueueContext.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); } catch (IllegalArgumentException ie) { @@ -323,7 +323,7 @@ public void testSingleLevelQueuesPrecision() throws Exception { exceptionOccured = false; queues.clear(); try { - CapacityScheduler.parseQueue(csContext, csConf, null, + CapacitySchedulerQueueContext.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); } catch (IllegalArgumentException ie) { @@ -337,7 +337,7 @@ public void testSingleLevelQueuesPrecision() throws Exception { exceptionOccured = false; queues.clear(); try { - CapacityScheduler.parseQueue(csContext, csConf, null, + CapacitySchedulerQueueContext.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); } catch (IllegalArgumentException ie) { @@ -430,7 +430,7 @@ public void testMultiLevelQueues() throws Exception { Map queues = new HashMap(); CSQueue root = - CapacityScheduler.parseQueue(csContext, csConf, null, + CapacitySchedulerQueueContext.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); @@ -561,7 +561,7 @@ public void testQueueCapacitySettingChildZero() throws Exception { csConf.setCapacity(Q_B + "." + B3, 0); Map queues = new HashMap(); - CapacityScheduler.parseQueue(csContext, csConf, null, + CapacitySchedulerQueueContext.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); } @@ -578,7 +578,7 @@ public void testQueueCapacitySettingParentZero() throws Exception { csConf.setCapacity(Q_A, 60); Map queues = new HashMap(); - CapacityScheduler.parseQueue(csContext, csConf, null, + CapacitySchedulerQueueContext.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); } @@ -600,7 +600,7 @@ public void testQueueCapacityZero() throws Exception { Map queues = new HashMap(); try { - CapacityScheduler.parseQueue(csContext, csConf, null, + CapacitySchedulerQueueContext.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); } catch (IllegalArgumentException e) { @@ -616,7 +616,7 @@ public void testOffSwitchScheduling() throws Exception { Map queues = new HashMap(); CSQueue root = - CapacityScheduler.parseQueue(csContext, csConf, null, + CapacitySchedulerQueueContext.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); @@ -689,7 +689,7 @@ public void testOffSwitchSchedulingMultiLevelQueues() throws Exception { //B3 Map queues = new HashMap(); CSQueue root = - CapacityScheduler.parseQueue(csContext, csConf, null, + CapacitySchedulerQueueContext.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); @@ -783,12 +783,12 @@ public void testQueueAcl() throws Exception { Map queues = new HashMap(); CSQueue root = - CapacityScheduler.parseQueue(csContext, csConf, null, + CapacitySchedulerQueueContext.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); YarnAuthorizationProvider authorizer = YarnAuthorizationProvider.getInstance(conf); - CapacityScheduler.setQueueAcls(authorizer, queues); + CapacitySchedulerQueueContext.setQueueAcls(authorizer, queues); UserGroupInformation user = UserGroupInformation.getCurrentUser(); // Setup queue configs diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java index 5e2007c..3508296 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java @@ -130,7 +130,7 @@ private void setup(CapacitySchedulerConfiguration csConf, when(csContext.getClusterResource()).thenReturn( Resources.createResource(100 * 16 * GB, 100 * 12)); when(csContext.getNonPartitionedQueueComparator()).thenReturn( - CapacityScheduler.nonPartitionedQueueComparator); + CapacitySchedulerQueueContext.nonPartitionedQueueComparator); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager()); when(csContext.getRMContext()).thenReturn(rmContext); @@ -140,8 +140,9 @@ private void setup(CapacitySchedulerConfiguration csConf, when(csContext.getContainerTokenSecretManager()).thenReturn( containerTokenSecretManager); - root = CapacityScheduler.parseQueue(csContext, csConf, null, - CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); + root = CapacitySchedulerQueueContext.parseQueue(csContext, csConf, null, + CapacitySchedulerConfiguration.ROOT, queues, queues, + TestUtils.spyHook); ResourceUsage queueResUsage = root.getQueueResourceUsage(); when(csContext.getClusterResourceUsage()) @@ -1087,8 +1088,8 @@ public void refreshQueuesTurnOffReservationsContLook(LeafQueue a, csConf.setBoolean( CapacitySchedulerConfiguration.RESERVE_CONT_LOOK_ALL_NODES, false); Map newQueues = new HashMap(); - CSQueue newRoot = CapacityScheduler.parseQueue(csContext, csConf, null, - CapacitySchedulerConfiguration.ROOT, newQueues, queues, + CSQueue newRoot = CapacitySchedulerQueueContext.parseQueue(csContext, + csConf, null, CapacitySchedulerConfiguration.ROOT, newQueues, queues, TestUtils.spyHook); queues = newQueues; root.reinitialize(newRoot, cs.getClusterResource());