diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 7c9b11e..18cbde7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -26,7 +26,6 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.TreeSet; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; @@ -38,7 +37,6 @@ import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; @@ -70,8 +68,7 @@ private Resource amResourceUsage; private final ActiveUsersManager activeUsersManager; - public static final List EMPTY_LIST = Collections.emptyList(); - + public FSLeafQueue(String name, FairScheduler scheduler, FSParentQueue parent) { super(name, scheduler, parent); @@ -126,9 +123,8 @@ public boolean removeApp(FSAppAttempt app) { writeLock.unlock(); } - // Update AM resource usage if needed. If isAMRunning is true, we're not - // running an unmanaged AM. - if (runnable && app.isAmRunning()) { + // Update AM resource usage if needed + if (runnable && app.isAmRunning() && app.getAMResource() != null) { Resources.subtractFrom(amResourceUsage, app.getAMResource()); } @@ -270,29 +266,22 @@ public void updateDemand() { readLock.lock(); try { for (FSAppAttempt sched : runnableApps) { - if (Resources.equals(demand, maxRes)) { - break; - } - updateDemandForApp(sched, maxRes); + updateDemandForApp(sched); } for (FSAppAttempt sched : nonRunnableApps) { - if (Resources.equals(demand, maxRes)) { - break; - } - updateDemandForApp(sched, maxRes); + updateDemandForApp(sched); } } finally { readLock.unlock(); } + demand = Resources.componentwiseMin(demand, maxRes); if (LOG.isDebugEnabled()) { LOG.debug("The updated demand for " + getName() + " is " + demand + "; the max is " + maxRes); - LOG.debug("The updated fairshare for " + getName() + " is " - + getFairShare()); } } - private void updateDemandForApp(FSAppAttempt sched, Resource maxRes) { + private void updateDemandForApp(FSAppAttempt sched) { sched.updateDemand(); Resource toAdd = sched.getDemand(); if (LOG.isDebugEnabled()) { @@ -301,7 +290,6 @@ private void updateDemandForApp(FSAppAttempt sched, Resource maxRes) { + demand); } demand = Resources.add(demand, toAdd); - demand = Resources.componentwiseMin(demand, maxRes); } @Override @@ -309,40 +297,35 @@ public Resource assignContainer(FSSchedulerNode node) { Resource assigned = Resources.none(); if (LOG.isDebugEnabled()) { LOG.debug("Node " + node.getNodeName() + " offered to queue: " + - getName() + " fairShare: " + getFairShare()); + getName()); } if (!assignContainerPreCheck(node)) { return assigned; } - // Apps that have resource demands. - TreeSet pendingForResourceApps = - new TreeSet(policy.getComparator()); + Comparator comparator = policy.getComparator(); + writeLock.lock(); + try { + Collections.sort(runnableApps, comparator); + } finally { + writeLock.unlock(); + } readLock.lock(); try { - for (FSAppAttempt app : runnableApps) { - Resource pending = app.getAppAttemptResourceUsage().getPending(); - if (!pending.equals(Resources.none())) { - pendingForResourceApps.add(app); + for (FSAppAttempt sched : runnableApps) { + if (SchedulerAppUtils.isBlacklisted(sched, node, LOG)) { + continue; + } + + assigned = sched.assignContainer(node); + if (!assigned.equals(Resources.none())) { + break; } } } finally { readLock.unlock(); } - for (FSAppAttempt sched : pendingForResourceApps) { - if (SchedulerAppUtils.isPlaceBlacklisted(sched, node, LOG)) { - continue; - } - assigned = sched.assignContainer(node); - if (!assigned.equals(Resources.none())) { - if (LOG.isDebugEnabled()) { - LOG.debug("Assigned container in queue:" + getName() + " " + - "container:" + assigned); - } - break; - } - } return assigned; } @@ -384,7 +367,7 @@ public RMContainer preemptContainer() { @Override public List getChildQueues() { - return EMPTY_LIST; + return new ArrayList(1); } @Override @@ -481,7 +464,8 @@ public ActiveUsersManager getActiveUsersManager() { /** * Check whether this queue can run this application master under the - * maxAMShare limit. + * maxAMShare limit + * * @param amResource * @return true if this queue can run */ @@ -491,25 +475,10 @@ public boolean canRunAppAM(Resource amResource) { if (Math.abs(maxAMShare - -1.0f) < 0.0001) { return true; } - - // If FairShare is zero, use min(maxShare, available resource) to compute - // maxAMResource - Resource maxResource = Resources.clone(getFairShare()); - if (maxResource.getMemorySize() == 0) { - maxResource.setMemorySize( - Math.min(scheduler.getRootQueueMetrics().getAvailableMB(), - getMaxShare().getMemorySize())); - } - - if (maxResource.getVirtualCores() == 0) { - maxResource.setVirtualCores(Math.min( - scheduler.getRootQueueMetrics().getAvailableVirtualCores(), - getMaxShare().getVirtualCores())); - } - - Resource maxAMResource = Resources.multiply(maxResource, maxAMShare); + Resource maxAMResource = Resources.multiply(getFairShare(), maxAMShare); Resource ifRunAMResource = Resources.add(amResourceUsage, amResource); - return Resources.fitsIn(ifRunAMResource, maxAMResource); + return !policy + .checkIfAMResourceUsageOverLimit(ifRunAMResource, maxAMResource); } public void addAMResourceUsage(Resource amResource) { @@ -538,15 +507,6 @@ public void updateStarvationStats() { } } - /** Allows setting weight for a dynamically created queue - * Currently only used for reservation based queues - * @param weight queue weight - */ - public void setWeights(float weight) { - scheduler.getAllocationConfiguration().setQueueWeight(getName(), - new ResourceWeights(weight)); - } - /** * Helper method to check if the queue should preempt containers * @@ -575,10 +535,9 @@ boolean isStarvedForFairShare() { } private boolean isStarved(Resource share) { - Resource desiredShare = Resources.min(policy.getResourceCalculator(), - scheduler.getClusterResource(), share, getDemand()); - Resource resourceUsage = getResourceUsage(); - return Resources.lessThan(policy.getResourceCalculator(), - scheduler.getClusterResource(), resourceUsage, desiredShare); + Resource desiredShare = Resources.min(FairScheduler.getResourceCalculator(), + scheduler.getClusterResource(), share, getDemand()); + return Resources.lessThan(FairScheduler.getResourceCalculator(), + scheduler.getClusterResource(), getResourceUsage(), desiredShare); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index 035c60c..3a32040 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -23,11 +23,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import com.google.common.collect.ImmutableList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -48,64 +44,36 @@ private static final Log LOG = LogFactory.getLog( FSParentQueue.class.getName()); - private final List childQueues = new ArrayList<>(); + private final List childQueues = + new ArrayList(); private Resource demand = Resources.createResource(0); private int runnableApps; - - private ReadWriteLock rwLock = new ReentrantReadWriteLock(); - private Lock readLock = rwLock.readLock(); - private Lock writeLock = rwLock.writeLock(); - + public FSParentQueue(String name, FairScheduler scheduler, FSParentQueue parent) { super(name, scheduler, parent); } public void addChildQueue(FSQueue child) { - writeLock.lock(); - try { - childQueues.add(child); - } finally { - writeLock.unlock(); - } - } - - public void removeChildQueue(FSQueue child) { - writeLock.lock(); - try { - childQueues.remove(child); - } finally { - writeLock.unlock(); - } + childQueues.add(child); } @Override public void recomputeShares() { - readLock.lock(); - try { - policy.computeShares(childQueues, getFairShare()); - for (FSQueue childQueue : childQueues) { - childQueue.getMetrics().setFairShare(childQueue.getFairShare()); - childQueue.recomputeShares(); - } - } finally { - readLock.unlock(); + policy.computeShares(childQueues, getFairShare()); + for (FSQueue childQueue : childQueues) { + childQueue.getMetrics().setFairShare(childQueue.getFairShare()); + childQueue.recomputeShares(); } } public void recomputeSteadyShares() { - readLock.lock(); - try { - policy.computeSteadyShares(childQueues, getSteadyFairShare()); - for (FSQueue childQueue : childQueues) { - childQueue.getMetrics() - .setSteadyFairShare(childQueue.getSteadyFairShare()); - if (childQueue instanceof FSParentQueue) { - ((FSParentQueue) childQueue).recomputeSteadyShares(); - } + policy.computeSteadyShares(childQueues, getSteadyFairShare()); + for (FSQueue childQueue : childQueues) { + childQueue.getMetrics().setSteadyFairShare(childQueue.getSteadyFairShare()); + if (childQueue instanceof FSParentQueue) { + ((FSParentQueue) childQueue).recomputeSteadyShares(); } - } finally { - readLock.unlock(); } } @@ -113,37 +81,21 @@ public void recomputeSteadyShares() { public void updatePreemptionVariables() { super.updatePreemptionVariables(); // For child queues - - readLock.lock(); - try { - for (FSQueue childQueue : childQueues) { - childQueue.updatePreemptionVariables(); - } - } finally { - readLock.unlock(); + for (FSQueue childQueue : childQueues) { + childQueue.updatePreemptionVariables(); } } @Override public Resource getDemand() { - readLock.lock(); - try { - return Resource.newInstance(demand.getMemorySize(), demand.getVirtualCores()); - } finally { - readLock.unlock(); - } + return demand; } @Override public Resource getResourceUsage() { Resource usage = Resources.createResource(0); - readLock.lock(); - try { - for (FSQueue child : childQueues) { - Resources.addTo(usage, child.getResourceUsage()); - } - } finally { - readLock.unlock(); + for (FSQueue child : childQueues) { + Resources.addTo(usage, child.getResourceUsage()); } return usage; } @@ -154,57 +106,51 @@ public void updateDemand() { // Limit demand to maxResources Resource maxRes = scheduler.getAllocationConfiguration() .getMaxResources(getName()); - writeLock.lock(); - try { - demand = Resources.createResource(0); - for (FSQueue childQueue : childQueues) { - childQueue.updateDemand(); - Resource toAdd = childQueue.getDemand(); - if (LOG.isDebugEnabled()) { - LOG.debug("Counting resource from " + childQueue.getName() + " " + - toAdd + "; Total resource consumption for " + getName() + - " now " + demand); - } - demand = Resources.add(demand, toAdd); - demand = Resources.componentwiseMin(demand, maxRes); - if (Resources.equals(demand, maxRes)) { - break; - } + demand = Resources.createResource(0); + for (FSQueue childQueue : childQueues) { + childQueue.updateDemand(); + Resource toAdd = childQueue.getDemand(); + if (LOG.isDebugEnabled()) { + LOG.debug("Counting resource from " + childQueue.getName() + " " + + toAdd + "; Total resource consumption for " + getName() + + " now " + demand); } - } finally { - writeLock.unlock(); + demand = Resources.add(demand, toAdd); } + demand = Resources.componentwiseMin(demand, maxRes); if (LOG.isDebugEnabled()) { LOG.debug("The updated demand for " + getName() + " is " + demand + "; the max is " + maxRes); } } - private QueueUserACLInfo getUserAclInfo(UserGroupInformation user) { - List operations = new ArrayList<>(); + private synchronized QueueUserACLInfo getUserAclInfo( + UserGroupInformation user) { + QueueUserACLInfo userAclInfo = + recordFactory.newRecordInstance(QueueUserACLInfo.class); + List operations = new ArrayList(); for (QueueACL operation : QueueACL.values()) { if (hasAccess(operation, user)) { operations.add(operation); } } - return QueueUserACLInfo.newInstance(getQueueName(), operations); + + userAclInfo.setQueueName(getQueueName()); + userAclInfo.setUserAcls(operations); + return userAclInfo; } @Override - public List getQueueUserAclInfo(UserGroupInformation user) { + public synchronized List getQueueUserAclInfo( + UserGroupInformation user) { List userAcls = new ArrayList(); // Add queue acls userAcls.add(getUserAclInfo(user)); // Add children queue acls - readLock.lock(); - try { - for (FSQueue child : childQueues) { - userAcls.addAll(child.getQueueUserAclInfo(user)); - } - } finally { - readLock.unlock(); + for (FSQueue child : childQueues) { + userAcls.addAll(child.getQueueUserAclInfo(user)); } return userAcls; @@ -219,32 +165,12 @@ public Resource assignContainer(FSSchedulerNode node) { return assigned; } - // Hold the write lock when sorting childQueues - writeLock.lock(); - try { - Collections.sort(childQueues, policy.getComparator()); - } finally { - writeLock.unlock(); - } - - /* - * We are releasing the lock between the sort and iteration of the - * "sorted" list. There could be changes to the list here: - * 1. Add a child queue to the end of the list, this doesn't affect - * container assignment. - * 2. Remove a child queue, this is probably good to take care of so we - * don't assign to a queue that is going to be removed shortly. - */ - readLock.lock(); - try { - for (FSQueue child : childQueues) { - assigned = child.assignContainer(node); - if (!Resources.equals(assigned, Resources.none())) { - break; - } + Collections.sort(childQueues, policy.getComparator()); + for (FSQueue child : childQueues) { + assigned = child.assignContainer(node); + if (!Resources.equals(assigned, Resources.none())) { + break; } - } finally { - readLock.unlock(); } return assigned; } @@ -256,23 +182,11 @@ public RMContainer preemptContainer() { // Find the childQueue which is most over fair share FSQueue candidateQueue = null; Comparator comparator = policy.getComparator(); - - readLock.lock(); - try { - for (FSQueue queue : childQueues) { - // Skip selection for non-preemptable queue - if (!queue.isPreemptable()) { - if (LOG.isDebugEnabled()) { - LOG.debug("skipping from queue=" + getName() - + " because it's a non-preemptable queue"); - } - } else if (candidateQueue == null || - comparator.compare(queue, candidateQueue) > 0) { - candidateQueue = queue; - } + for (FSQueue queue : childQueues) { + if (candidateQueue == null || + comparator.compare(queue, candidateQueue) > 0) { + candidateQueue = queue; } - } finally { - readLock.unlock(); } // Let the selected queue choose which of its container to preempt @@ -284,12 +198,7 @@ public RMContainer preemptContainer() { @Override public List getChildQueues() { - readLock.lock(); - try { - return ImmutableList.copyOf(childQueues); - } finally { - readLock.unlock(); - } + return childQueues; } @Override @@ -306,43 +215,23 @@ public void setPolicy(SchedulingPolicy policy) } public void incrementRunnableApps() { - writeLock.lock(); - try { - runnableApps++; - } finally { - writeLock.unlock(); - } + runnableApps++; } public void decrementRunnableApps() { - writeLock.lock(); - try { - runnableApps--; - } finally { - writeLock.unlock(); - } + runnableApps--; } @Override public int getNumRunnableApps() { - readLock.lock(); - try { - return runnableApps; - } finally { - readLock.unlock(); - } + return runnableApps; } @Override public void collectSchedulerApplications( Collection apps) { - readLock.lock(); - try { - for (FSQueue childQueue : childQueues) { - childQueue.collectSchedulerApplications(apps); - } - } finally { - readLock.unlock(); + for (FSQueue childQueue : childQueues) { + childQueue.collectSchedulerApplications(apps); } }