diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java index fce220b..4056b02 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java @@ -843,6 +843,11 @@ public void setRMContext(RMContext rmContext) { } @Override + public RMContext getRMContext() { + return rmContext; + } + + @Override public void reinitialize(Configuration conf, RMContext rmContext) throws IOException { scheduler.reinitialize(conf, rmContext); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index abd72bf..e38cc0a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -637,7 +637,7 @@ public synchronized void moveAllApps(String sourceQueue, String destQueue) } @Override - public synchronized void killAllAppsInQueue(String queueName) + public void killAllAppsInQueue(String queueName) throws YarnException { // check if queue is a valid List apps = getAppsInQueue(queueName); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index 0aff669..2de5caf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; @@ -279,6 +280,12 @@ void setEntitlement(String queue, QueueEntitlement entitlement) throws YarnException; /** + * + * @return RMContext + */ + public RMContext getRMContext(); + + /** * Gets the list of names for queues managed by the Reservation System * @return the list of queues which support reservations */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index b40ac27..6bff09d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -89,7 +89,8 @@ public AbstractCSQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { - this.labelManager = cs.getRMContext().getNodeLabelManager(); + //this.labelManager = cs.getRMContext().getNodeLabelManager(); + this.labelManager = cs.getNodeLabelManager(); this.parent = parent; this.queueName = queueName; this.resourceCalculator = cs.getResourceCalculator(); @@ -120,12 +121,12 @@ protected void setupConfigurableCapacities() { } @Override - public synchronized float getCapacity() { + public float getCapacity() { return queueCapacities.getCapacity(); } @Override - public synchronized float getAbsoluteCapacity() { + public float getAbsoluteCapacity() { return queueCapacities.getAbsoluteCapacity(); } @@ -135,7 +136,7 @@ public float getAbsoluteMaximumCapacity() { } @Override - public synchronized float getAbsoluteUsedCapacity() { + public float getAbsoluteUsedCapacity() { return queueCapacities.getAbsoluteUsedCapacity(); } @@ -606,7 +607,6 @@ public boolean accessibleToPartition(String nodePartition) { @Override public Priority getDefaultApplicationPriority() { - // TODO add dummy implementation - return null; + return Priority.newInstance(0); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 782ed03..9f91d6e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -35,7 +35,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; - +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -148,6 +148,9 @@ new Comparator() { @Override public int compare(CSQueue q1, CSQueue q2) { + if (q1.equals(q2)) { + return 0; + } if (q1.getUsedCapacity() < q2.getUsedCapacity()) { return -1; } else if (q1.getUsedCapacity() > q2.getUsedCapacity()) { @@ -212,7 +215,7 @@ public Configuration getConf() { private CapacitySchedulerConfiguration conf; private Configuration yarnConf; - private Map queues = new ConcurrentHashMap(); + private ConcurrentHashMap queues = new ConcurrentHashMap(); private AtomicInteger numNodeManagers = new AtomicInteger(0); @@ -223,8 +226,8 @@ public Configuration getConf() { private AsyncScheduleThread asyncSchedulerThread; private RMNodeLabelsManager labelManager; private SchedulerHealth schedulerHealth = new SchedulerHealth(); - long lastNodeUpdateTime; - + AtomicLong lastNodeUpdateTime = new AtomicLong(0); + private Priority maxClusterLevelAppPriority; /** * EXPERT */ @@ -288,6 +291,13 @@ public synchronized void setRMContext(RMContext rmContext) { this.rmContext = rmContext; } + @Override + public RMNodeLabelsManager getNodeLabelManager() { + // this is not under lock, but it is only invoked strictly + // after initScheduler (only method setting labelManager) + return this.labelManager; + } + private synchronized void initScheduler(Configuration configuration) throws IOException { this.conf = loadCapacitySchedulerConfiguration(configuration); @@ -1059,7 +1069,7 @@ private synchronized void nodeUpdate(RMNode nm) { } } - schedulerHealth.updateSchedulerReleaseDetails(lastNodeUpdateTime, + schedulerHealth.updateSchedulerReleaseDetails(lastNodeUpdateTime.get(), releaseResources); schedulerHealth.updateSchedulerReleaseCounts(releasedContainers); @@ -1169,7 +1179,7 @@ private synchronized void allocateContainersToNode(FiCaSchedulerNode node) { return; } // reset allocation and reservation stats before we start doing any work - updateSchedulerHealth(lastNodeUpdateTime, node, + updateSchedulerHealth(lastNodeUpdateTime.get(), node, new CSAssignment(Resources.none(), NodeType.NODE_LOCAL)); CSAssignment assignment; @@ -1207,7 +1217,7 @@ private synchronized void allocateContainersToNode(FiCaSchedulerNode node) { tmp.getAssignmentInformation().addAllocationDetails( reservedContainer.getContainerId(), queue.getQueuePath()); tmp.getAssignmentInformation().incrAllocations(); - updateSchedulerHealth(lastNodeUpdateTime, node, tmp); + updateSchedulerHealth(lastNodeUpdateTime.get(), node, tmp); schedulerHealth.updateSchedulerFulfilledReservationCounts(1); } } @@ -1231,7 +1241,7 @@ private synchronized void allocateContainersToNode(FiCaSchedulerNode node) { SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); if (Resources.greaterThan(calculator, clusterResource, assignment.getResource(), Resources.none())) { - updateSchedulerHealth(lastNodeUpdateTime, node, assignment); + updateSchedulerHealth(lastNodeUpdateTime.get(), node, assignment); return; } @@ -1262,7 +1272,7 @@ private synchronized void allocateContainersToNode(FiCaSchedulerNode node) { new ResourceLimits(labelManager.getResourceByLabel( RMNodeLabelsManager.NO_LABEL, clusterResource)), SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY); - updateSchedulerHealth(lastNodeUpdateTime, node, assignment); + updateSchedulerHealth(lastNodeUpdateTime.get(), node, assignment); } } else { LOG.info("Skipping scheduling since node " @@ -1519,7 +1529,7 @@ protected synchronized void completedContainer(RMContainer rmContainer, container.getId(), queue.getQueuePath()); schedulerHealth.updateSchedulerPreemptionCounts(1); } else { - schedulerHealth.updateRelease(lastNodeUpdateTime, container.getNodeId(), + schedulerHealth.updateRelease(lastNodeUpdateTime.get(), container.getNodeId(), container.getId(), queue.getQueuePath()); } } @@ -1719,7 +1729,7 @@ private synchronized String resolveReservationQueueName(String queueName, } @Override - public synchronized void removeQueue(String queueName) + public void removeQueue(String queueName) throws SchedulerDynamicEditException { LOG.info("Removing queue: " + queueName); CSQueue q = this.getQueue(queueName); @@ -1742,7 +1752,7 @@ public synchronized void removeQueue(String queueName) } @Override - public synchronized void addQueue(Queue queue) + public void addQueue(Queue queue) throws SchedulerDynamicEditException { if (!(queue instanceof ReservationQueue)) { @@ -1761,9 +1771,13 @@ public synchronized void addQueue(Queue queue) PlanQueue parentPlan = (PlanQueue) newQueue.getParent(); String queuename = newQueue.getQueueName(); - parentPlan.addChildQueue(newQueue); - this.queues.put(queuename, newQueue); - LOG.info("Creation of ReservationQueue " + newQueue + " succeeded"); + boolean added = parentPlan.addChildQueue(newQueue); + if (added) { + this.queues.putIfAbsent(queuename, newQueue); + LOG.info("Creation of ReservationQueue " + newQueue + " succeeded"); + } else { + LOG.warn("Trying to add an existing queue"); + } } @Override @@ -1910,8 +1924,9 @@ public SchedulerHealth getSchedulerHealth() { return this.schedulerHealth; } - private synchronized void setLastNodeUpdateTime(long time) { - this.lastNodeUpdateTime = time; + private void setLastNodeUpdateTime(long time) { + // lastNodeUpdateTime is an AtomicLong + this.lastNodeUpdateTime.set(time); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java index 2a0dd0d..d4b7b16 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java @@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -61,4 +62,6 @@ PartitionedQueueComparator getPartitionedQueueComparator(); FiCaSchedulerNode getNode(NodeId nodeId); + + RMNodeLabelsManager getNodeLabelManager(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index e5ac072..1fff259 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; @@ -63,6 +64,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.server.utils.Lock.NoLock; @@ -785,28 +788,28 @@ private synchronized FiCaSchedulerApp getApplication( return applicationAttemptMap.get(applicationAttemptId); } + @SuppressWarnings("unchecked") private void handleExcessReservedContainer(Resource clusterResource, CSAssignment assignment, FiCaSchedulerNode node, FiCaSchedulerApp app) { if (assignment.getExcessReservation() != null) { RMContainer excessReservedContainer = assignment.getExcessReservation(); - if (excessReservedContainer.hasIncreaseReservation()) { - unreserveIncreasedContainer(clusterResource, - app, node, excessReservedContainer); + unreserveIncreasedContainer(clusterResource, app, node, + excessReservedContainer); } else { - completedContainer(clusterResource, assignment.getApplication(), - scheduler.getNode(excessReservedContainer.getAllocatedNode()), - excessReservedContainer, - SchedulerUtils.createAbnormalContainerStatus( - excessReservedContainer.getContainerId(), - SchedulerUtils.UNRESERVED_CONTAINER), - RMContainerEventType.RELEASED, null, false); + // completed container is supposed to do locking hand-over-hand + // on the parent, but we are holding LeafQueue locks here, so + // we can't invoke it directly! using events instead + scheduler.getRMContext().getDispatcher().getEventHandler() + .handle(new ContainerPreemptEvent( + assignment.getApplication().getApplicationAttemptId(), + excessReservedContainer, SchedulerEventType.DROP_RESERVATION)); } assignment.setExcessReservation(null); } } - + @Override public synchronized CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, ResourceLimits currentResourceLimits, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index badab72..3d5656d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.ConcurrentSkipListSet; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -68,7 +69,7 @@ private static final Log LOG = LogFactory.getLog(ParentQueue.class); - protected final Set childQueues; + protected final ConcurrentSkipListSet childQueues; private final boolean rootQueue; final Comparator nonPartitionedQueueComparator; final PartitionedQueueComparator partitionQueueComparator; @@ -97,7 +98,7 @@ public ParentQueue(CapacitySchedulerContext cs, ". Must be " + CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE); } - this.childQueues = new TreeSet(nonPartitionedQueueComparator); + this.childQueues = new ConcurrentSkipListSet(nonPartitionedQueueComparator); setupQueueConfigs(cs.getClusterResource()); @@ -178,7 +179,7 @@ public String getQueuePath() { } @Override - public synchronized QueueInfo getQueueInfo( + public QueueInfo getQueueInfo( boolean includeChildQueues, boolean recursive) { QueueInfo queueInfo = getQueueInfo(); @@ -722,7 +723,7 @@ public synchronized void updateClusterResource(Resource clusterResource, } @Override - public synchronized List getChildQueues() { + public List getChildQueues() { return new ArrayList(childQueues); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java index 7b53ad5..6fa5a06 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java @@ -112,7 +112,7 @@ public synchronized void reinitialize(CSQueue newlyParsedQueue, showReservationsAsQueues = newlyParsedParentQueue.showReservationsAsQueues; } - synchronized void addChildQueue(CSQueue newQueue) + boolean addChildQueue(CSQueue newQueue) throws SchedulerDynamicEditException { if (newQueue.getCapacity() > 0) { throw new SchedulerDynamicEditException("Queue " + newQueue @@ -123,24 +123,26 @@ synchronized void addChildQueue(CSQueue newQueue) LOG.debug("updateChildQueues (action: add queue): " + added + " " + getChildQueuesToPrint()); } + return added; } - synchronized void removeChildQueue(CSQueue remQueue) + void removeChildQueue(CSQueue remQueue) throws SchedulerDynamicEditException { if (remQueue.getCapacity() > 0) { throw new SchedulerDynamicEditException("Queue " + remQueue + " being removed has non zero capacity."); } - Iterator qiter = childQueues.iterator(); - while (qiter.hasNext()) { - CSQueue cs = qiter.next(); - if (cs.equals(remQueue)) { - qiter.remove(); - if (LOG.isDebugEnabled()) { - LOG.debug("Removed child queue: {}", cs.getQueueName()); - } - } - } + childQueues.remove(remQueue); +// Iterator qiter = childQueues.iterator(); +// while (qiter.hasNext()) { +// CSQueue cs = qiter.next(); +// if (cs.equals(remQueue)) { +// qiter.remove(); +// if (LOG.isDebugEnabled()) { +// LOG.debug("Removed child queue: {}", cs.getQueueName()); +// } +// } +// } } protected synchronized float sumOfChildCapacities() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java index 976cf8c..be05eb5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java @@ -108,4 +108,22 @@ protected void setupConfigurableCapacities() { CSQueueUtils.updateAndCheckCapacitiesByLabel(getQueuePath(), queueCapacities, parent == null ? null : parent.getQueueCapacities()); } + + @Override + public float getAbsoluteCapacity() { + // ATTENTION: this is NOT synchronized. This is ok as queueCapacities does + // its own locks, and + // we only modify absCap from the same thread we read from + return queueCapacities.getAbsoluteCapacity(); + } + + @Override + public boolean equals(Object other) { + if (other instanceof ReservationQueue) { + if (this.queueName.equals(((ReservationQueue) other).queueName)) { + return true; + } + } + return false; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 04977a6..65a1fe9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -1384,6 +1384,11 @@ public synchronized void setRMContext(RMContext rmContext) { this.rmContext = rmContext; } + @Override + public RMContext getRMContext() { + return rmContext; + } + private void initScheduler(Configuration conf) throws IOException { synchronized (this) { this.conf = new FairSchedulerConfiguration(conf); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index cfae3a2..dec5a87 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -304,6 +304,11 @@ public synchronized void setRMContext(RMContext rmContext) { } @Override + public RMContext getRMContext() { + return rmContext; + } + + @Override public synchronized void reinitialize(Configuration conf, RMContext rmContext) throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index 9f4b9f5..cc813c8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -99,6 +99,8 @@ public void setUp() throws IOException { when(csContext.getResourceCalculator()). thenReturn(resourceCalculator); when(csContext.getRMContext()).thenReturn(rmContext); + when(csContext.getNodeLabelManager()) + .thenReturn(rmContext.getNodeLabelManager()); RMContainerTokenSecretManager containerTokenSecretManager = new RMContainerTokenSecretManager(conf); @@ -262,6 +264,8 @@ public void testLimitsComputation() throws Exception { thenReturn(CapacityScheduler.nonPartitionedQueueComparator); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); when(csContext.getRMContext()).thenReturn(rmContext); + when(csContext.getNodeLabelManager()) + .thenReturn(rmContext.getNodeLabelManager()); // Say cluster has 100 nodes of 16G each Resource clusterResource = @@ -567,6 +571,8 @@ public void testHeadroom() throws Exception { thenReturn(CapacityScheduler.nonPartitionedQueueComparator); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); when(csContext.getRMContext()).thenReturn(rmContext); + when(csContext.getNodeLabelManager()) + .thenReturn(rmContext.getNodeLabelManager()); // Say cluster has 100 nodes of 16G each Resource clusterResource = Resources.createResource(100 * 16 * GB); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java index 88c7c13..c2dae58 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java @@ -99,6 +99,8 @@ public void setUp() throws Exception { when(csContext.getResourceCalculator()). thenReturn(resourceComparator); when(csContext.getRMContext()).thenReturn(rmContext); + when(csContext.getNodeLabelManager()) + .thenReturn(rmContext.getNodeLabelManager()); } private FiCaSchedulerApp getMockApplication(int appId, String user) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 479e25a..66bf46c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; @@ -115,7 +116,8 @@ public void setUp() throws Exception { CapacityScheduler spyCs = new CapacityScheduler(); cs = spy(spyCs); - rmContext = TestUtils.getMockRMContext(); + DrainDispatcher disp = new DrainDispatcher(); + rmContext = TestUtils.getMockRMContext(disp); spyRMContext = spy(rmContext); ConcurrentMap spyApps = @@ -156,6 +158,8 @@ public void setUp() throws Exception { containerTokenSecretManager.rollMasterKey(); when(csContext.getContainerTokenSecretManager()).thenReturn( containerTokenSecretManager); + when(csContext.getNodeLabelManager()) + .thenReturn(rmContext.getNodeLabelManager()); root = CapacityScheduler.parseQueue(csContext, csConf, null, @@ -2316,6 +2320,8 @@ public void testMaxAMResourcePerQueuePercentAfterQueueRefresh() .createResource(100 * 16 * GB, 100 * 32); CapacitySchedulerContext csContext = mockCSContext(csConf, clusterResource); when(csContext.getRMContext()).thenReturn(rmContext); + when(csContext.getNodeLabelManager()) + .thenReturn(rmContext.getNodeLabelManager()); csConf.setFloat(CapacitySchedulerConfiguration. MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 0.1f); ParentQueue root = new ParentQueue(csContext, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java index 4a815f5..7439c07 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java @@ -95,6 +95,8 @@ public void setUp() throws Exception { when(csContext.getResourceCalculator()). thenReturn(resourceComparator); when(csContext.getRMContext()).thenReturn(rmContext); + when(csContext.getNodeLabelManager()) + .thenReturn(rmContext.getNodeLabelManager()); } private static final String A = "a"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java index e23e93c..bd097de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java @@ -64,6 +64,8 @@ public void setup() throws IOException { RMContext mockRMContext = TestUtils.getMockRMContext(); when(csContext.getRMContext()).thenReturn(mockRMContext); + when(csContext.getNodeLabelManager()) + .thenReturn(mockRMContext.getNodeLabelManager()); // create a queue PlanQueue pq = new PlanQueue(csContext, "root", null, null); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java index 884de2a..e069a2d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java @@ -124,6 +124,8 @@ private void setup(CapacitySchedulerConfiguration csConf) throws Exception { containerTokenSecretManager.rollMasterKey(); when(csContext.getContainerTokenSecretManager()).thenReturn( containerTokenSecretManager); + when(csContext.getNodeLabelManager()) + .thenReturn(rmContext.getNodeLabelManager()); root = CapacityScheduler.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index 489ef77..b439eab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -67,12 +67,17 @@ public class TestUtils { private static final Log LOG = LogFactory.getLog(TestUtils.class); + public static RMContext getMockRMContext() { + return getMockRMContext(null); + } + /** * Get a mock {@link RMContext} for use in test cases. * @return a mock {@link RMContext} for use in test cases */ @SuppressWarnings({ "rawtypes", "unchecked" }) - public static RMContext getMockRMContext() { + public static RMContext getMockRMContext(Dispatcher dispatcher) { + if (dispatcher == null) { // Null dispatcher Dispatcher nullDispatcher = new Dispatcher() { private final EventHandler handler = @@ -90,15 +95,17 @@ public EventHandler getEventHandler() { return handler; } }; + dispatcher = nullDispatcher; + } // No op ContainerAllocationExpirer cae = - new ContainerAllocationExpirer(nullDispatcher); + new ContainerAllocationExpirer(dispatcher); Configuration conf = new Configuration(); RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); RMContextImpl rmContext = - new RMContextImpl(nullDispatcher, cae, null, null, null, + new RMContextImpl(dispatcher, cae, null, null, null, new AMRMTokenSecretManager(conf, null), new RMContainerTokenSecretManager(conf), new NMTokenSecretManagerInRM(conf),