diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 04dbd2f..abd0f33 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -50,26 +50,30 @@ private static final Log LOG = LogFactory.getLog( FSLeafQueue.class.getName()); - private final List runnableApps = // apps that are runnable + // apps that are runnable + private final List runnableApps = new ArrayList(); private final List nonRunnableApps = new ArrayList(); + // apps that have resource demands. + private final List pendingApps = + new ArrayList(); // get a lock with fair distribution for app list updates private final ReadWriteLock rwl = new ReentrantReadWriteLock(true); private final Lock readLock = rwl.readLock(); private final Lock writeLock = rwl.writeLock(); - + private Resource demand = Resources.createResource(0); - + // Variables used for preemption private long lastTimeAtMinShare; private long lastTimeAtFairShareThreshold; - + // Track the AM resource usage for this queue private Resource amResourceUsage; private final ActiveUsersManager activeUsersManager; - + public FSLeafQueue(String name, FairScheduler scheduler, FSParentQueue parent) { super(name, scheduler, parent); @@ -78,7 +82,7 @@ public FSLeafQueue(String name, FairScheduler scheduler, activeUsersManager = new ActiveUsersManager(getMetrics()); amResourceUsage = Resource.newInstance(0, 0); } - + public void addApp(FSAppAttempt app, boolean runnable) { writeLock.lock(); try { @@ -91,7 +95,7 @@ public void addApp(FSAppAttempt app, boolean runnable) { writeLock.unlock(); } } - + // for testing void addAppSchedulable(FSAppAttempt appSched) { writeLock.lock(); @@ -101,7 +105,7 @@ void addAppSchedulable(FSAppAttempt appSched) { writeLock.unlock(); } } - + /** * Removes the given app from this queue. * @return whether or not the app was runnable @@ -120,6 +124,8 @@ public boolean removeApp(FSAppAttempt app) { " does not exist in queue " + this); } } + // app may or may not in pendingApps, try to remove it + pendingApps.remove(app); } finally { writeLock.unlock(); } @@ -221,7 +227,7 @@ public void setPolicy(SchedulingPolicy policy) } super.policy = policy; } - + @Override public void recomputeShares() { readLock.lock(); @@ -265,7 +271,8 @@ public void updateDemand() { Resource maxRes = scheduler.getAllocationConfiguration() .getMaxResources(getName()); demand = Resources.createResource(0); - readLock.lock(); + // pendingApps may change here so we need writeLock + writeLock.lock(); try { for (FSAppAttempt sched : runnableApps) { if (Resources.equals(demand, maxRes)) { @@ -280,7 +287,7 @@ public void updateDemand() { updateDemandForApp(sched, maxRes); } } finally { - readLock.unlock(); + writeLock.unlock(); } if (LOG.isDebugEnabled()) { LOG.debug("The updated demand for " + getName() + " is " + demand @@ -289,16 +296,26 @@ public void updateDemand() { + getFairShare()); } } - + private void updateDemandForApp(FSAppAttempt sched, Resource maxRes) { sched.updateDemand(); Resource toAdd = sched.getDemand(); + if (Resources.greaterThan(scheduler.getResourceCalculator(), + scheduler.getClusterResource(), + sched.getAppAttemptResourceUsage().getPending(), + Resources.none())) { + if (!pendingApps.contains(sched)) { + pendingApps.add(sched); + } + } else { + pendingApps.remove(sched); + } if (LOG.isDebugEnabled()) { LOG.debug("Counting resource from " + sched.getName() + " " + toAdd + "; Total resource consumption for " + getName() + " now " + demand); } - demand = Resources.add(demand, toAdd); + demand = Resources.addTo(demand, toAdd); demand = Resources.componentwiseMin(demand, maxRes); } @@ -317,7 +334,7 @@ public Resource assignContainer(FSSchedulerNode node) { Comparator comparator = policy.getComparator(); writeLock.lock(); try { - Collections.sort(runnableApps, comparator); + Collections.sort(pendingApps, comparator); } finally { writeLock.unlock(); } @@ -326,7 +343,7 @@ public Resource assignContainer(FSSchedulerNode node) { // but we can accept it in practice since the probability is low. readLock.lock(); try { - for (FSAppAttempt sched : runnableApps) { + for (FSAppAttempt sched : pendingApps) { if (SchedulerAppUtils.isBlacklisted(sched, node, LOG)) { continue; }