diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 04dbd2f..af6d127 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -18,11 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; +import java.util.*; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -33,10 +29,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.QueueACL; -import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; -import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; @@ -50,26 +43,30 @@ private static final Log LOG = LogFactory.getLog( FSLeafQueue.class.getName()); - private final List runnableApps = // apps that are runnable + // apps that are runnable + private final List runnableApps = new ArrayList(); private final List nonRunnableApps = new ArrayList(); + // apps that have resource demands. + private final Set schedulableApps = + new TreeSet(policy.getComparator()); // get a lock with fair distribution for app list updates private final ReadWriteLock rwl = new ReentrantReadWriteLock(true); private final Lock readLock = rwl.readLock(); private final Lock writeLock = rwl.writeLock(); - + private Resource demand = Resources.createResource(0); - + // Variables used for preemption private long lastTimeAtMinShare; private long lastTimeAtFairShareThreshold; - + // Track the AM resource usage for this queue private Resource amResourceUsage; private final ActiveUsersManager activeUsersManager; - + public FSLeafQueue(String name, FairScheduler scheduler, FSParentQueue parent) { super(name, scheduler, parent); @@ -78,7 +75,7 @@ public FSLeafQueue(String name, FairScheduler scheduler, activeUsersManager = new ActiveUsersManager(getMetrics()); amResourceUsage = Resource.newInstance(0, 0); } - + public void addApp(FSAppAttempt app, boolean runnable) { writeLock.lock(); try { @@ -91,7 +88,7 @@ public void addApp(FSAppAttempt app, boolean runnable) { writeLock.unlock(); } } - + // for testing void addAppSchedulable(FSAppAttempt appSched) { writeLock.lock(); @@ -101,7 +98,7 @@ void addAppSchedulable(FSAppAttempt appSched) { writeLock.unlock(); } } - + /** * Removes the given app from this queue. * @return whether or not the app was runnable @@ -120,6 +117,8 @@ public boolean removeApp(FSAppAttempt app) { " does not exist in queue " + this); } } + // app may or may not in schedulableApps, try to remove it + schedulableApps.remove(app); } finally { writeLock.unlock(); } @@ -221,7 +220,7 @@ public void setPolicy(SchedulingPolicy policy) } super.policy = policy; } - + @Override public void recomputeShares() { readLock.lock(); @@ -265,7 +264,8 @@ public void updateDemand() { Resource maxRes = scheduler.getAllocationConfiguration() .getMaxResources(getName()); demand = Resources.createResource(0); - readLock.lock(); + // schedulableApps may change here so we need writeLock + writeLock.lock(); try { for (FSAppAttempt sched : runnableApps) { if (Resources.equals(demand, maxRes)) { @@ -280,7 +280,7 @@ public void updateDemand() { updateDemandForApp(sched, maxRes); } } finally { - readLock.unlock(); + writeLock.unlock(); } if (LOG.isDebugEnabled()) { LOG.debug("The updated demand for " + getName() + " is " + demand @@ -289,16 +289,21 @@ public void updateDemand() { + getFairShare()); } } - + private void updateDemandForApp(FSAppAttempt sched, Resource maxRes) { sched.updateDemand(); Resource toAdd = sched.getDemand(); + if (sched.getAppSchedulingInfo().getAllResourceRequests().size() != 0) { + schedulableApps.add(sched); + } else { + schedulableApps.remove(sched); + } if (LOG.isDebugEnabled()) { LOG.debug("Counting resource from " + sched.getName() + " " + toAdd + "; Total resource consumption for " + getName() + " now " + demand); } - demand = Resources.add(demand, toAdd); + demand = Resources.addTo(demand, toAdd); demand = Resources.componentwiseMin(demand, maxRes); } @@ -314,19 +319,9 @@ public Resource assignContainer(FSSchedulerNode node) { return assigned; } - Comparator comparator = policy.getComparator(); - writeLock.lock(); - try { - Collections.sort(runnableApps, comparator); - } finally { - writeLock.unlock(); - } - // Release write lock here for better performance and avoiding deadlocks. - // runnableApps can be in unsorted state because of this section, - // but we can accept it in practice since the probability is low. readLock.lock(); try { - for (FSAppAttempt sched : runnableApps) { + for (FSAppAttempt sched : schedulableApps) { if (SchedulerAppUtils.isBlacklisted(sched, node, LOG)) { continue; }