diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 345ea8b..f4eee29 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -47,21 +47,21 @@ FSLeafQueue.class.getName()); private final List runnableApps = // apps that are runnable - new ArrayList(); + Collections.synchronizedList(new ArrayList()); private final List nonRunnableApps = - new ArrayList(); - + Collections.synchronizedList(new ArrayList()); + private Resource demand = Resources.createResource(0); - + // Variables used for preemption private long lastTimeAtMinShare; private long lastTimeAtFairShareThreshold; - + // Track the AM resource usage for this queue private Resource amResourceUsage; private final ActiveUsersManager activeUsersManager; - + public FSLeafQueue(String name, FairScheduler scheduler, FSParentQueue parent) { super(name, scheduler, parent); @@ -70,7 +70,7 @@ public FSLeafQueue(String name, FairScheduler scheduler, activeUsersManager = new ActiveUsersManager(getMetrics()); amResourceUsage = Resource.newInstance(0, 0); } - + public void addApp(FSAppAttempt app, boolean runnable) { if (runnable) { runnableApps.add(app); @@ -78,12 +78,12 @@ public void addApp(FSAppAttempt app, boolean runnable) { nonRunnableApps.add(app); } } - + // for testing void addAppSchedulable(FSAppAttempt appSched) { runnableApps.add(appSched); } - + /** * Removes the given app from this queue. * @return whether or not the app was runnable @@ -102,23 +102,27 @@ public boolean removeApp(FSAppAttempt app) { " does not exist in queue " + this); } } - + public Collection getRunnableAppSchedulables() { return runnableApps; } - + public List getNonRunnableAppSchedulables() { return nonRunnableApps; } - + @Override public void collectSchedulerApplications( Collection apps) { - for (FSAppAttempt appSched : runnableApps) { - apps.add(appSched.getApplicationAttemptId()); + synchronized (runnableApps) { + for (FSAppAttempt appSched : runnableApps) { + apps.add(appSched.getApplicationAttemptId()); + } } - for (FSAppAttempt appSched : nonRunnableApps) { - apps.add(appSched.getApplicationAttemptId()); + synchronized (nonRunnableApps) { + for (FSAppAttempt appSched : nonRunnableApps) { + apps.add(appSched.getApplicationAttemptId()); + } } } @@ -130,7 +134,7 @@ public void setPolicy(SchedulingPolicy policy) } super.policy = policy; } - + @Override public void recomputeShares() { policy.computeShares(getRunnableAppSchedulables(), getFairShare()); @@ -144,11 +148,15 @@ public Resource getDemand() { @Override public Resource getResourceUsage() { Resource usage = Resources.createResource(0); - for (FSAppAttempt app : runnableApps) { - Resources.addTo(usage, app.getResourceUsage()); + synchronized (runnableApps) { + for (FSAppAttempt app : runnableApps) { + Resources.addTo(usage, app.getResourceUsage()); + } } - for (FSAppAttempt app : nonRunnableApps) { - Resources.addTo(usage, app.getResourceUsage()); + synchronized (nonRunnableApps) { + for (FSAppAttempt app : nonRunnableApps) { + Resources.addTo(usage, app.getResourceUsage()); + } } return usage; } @@ -164,24 +172,28 @@ public void updateDemand() { Resource maxRes = scheduler.getAllocationConfiguration() .getMaxResources(getName()); demand = Resources.createResource(0); - for (FSAppAttempt sched : runnableApps) { - if (Resources.equals(demand, maxRes)) { - break; + synchronized (runnableApps) { + for (FSAppAttempt sched : runnableApps) { + if (Resources.equals(demand, maxRes)) { + break; + } + updateDemandForApp(sched, maxRes); } - updateDemandForApp(sched, maxRes); } - for (FSAppAttempt sched : nonRunnableApps) { - if (Resources.equals(demand, maxRes)) { - break; + synchronized (nonRunnableApps) { + for (FSAppAttempt sched : nonRunnableApps) { + if (Resources.equals(demand, maxRes)) { + break; + } + updateDemandForApp(sched, maxRes); } - updateDemandForApp(sched, maxRes); } if (LOG.isDebugEnabled()) { LOG.debug("The updated demand for " + getName() + " is " + demand + "; the max is " + maxRes); } } - + private void updateDemandForApp(FSAppAttempt sched, Resource maxRes) { sched.updateDemand(); Resource toAdd = sched.getDemand(); @@ -206,15 +218,17 @@ public Resource assignContainer(FSSchedulerNode node) { } Comparator comparator = policy.getComparator(); - Collections.sort(runnableApps, comparator); - for (FSAppAttempt sched : runnableApps) { - if (SchedulerAppUtils.isBlacklisted(sched, node, LOG)) { - continue; - } - - assigned = sched.assignContainer(node); - if (!assigned.equals(Resources.none())) { - break; + synchronized (runnableApps) { + Collections.sort(runnableApps, comparator); + for (FSAppAttempt sched : runnableApps) { + if (SchedulerAppUtils.isBlacklisted(sched, node, LOG)) { + continue; + } + + assigned = sched.assignContainer(node); + if (!assigned.equals(Resources.none())) { + break; + } } } return assigned; @@ -237,10 +251,12 @@ public RMContainer preemptContainer() { // Choose the app that is most over fair share Comparator comparator = policy.getComparator(); FSAppAttempt candidateSched = null; - for (FSAppAttempt sched : runnableApps) { - if (candidateSched == null || - comparator.compare(sched, candidateSched) > 0) { - candidateSched = sched; + synchronized (runnableApps) { + for (FSAppAttempt sched : runnableApps) { + if (candidateSched == null || + comparator.compare(sched, candidateSched) > 0) { + candidateSched = sched; + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java index 97736be..2e434f8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java @@ -28,12 +28,22 @@ import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; +import java.util.ArrayList; import java.util.Collection; - +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; @@ -222,4 +232,85 @@ public void testIsStarvedForFairShare() throws Exception { assertFalse(queueB1.isStarvedForFairShare()); assertFalse(queueB2.isStarvedForFairShare()); } + + @Test + public void testConcurrentAccess() { + conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false"); + resourceManager = new MockRM(conf); + resourceManager.start(); + scheduler = (FairScheduler) resourceManager.getResourceScheduler(); + + String queueName = "root.queue1"; + final FSLeafQueue schedulable = scheduler.getQueueManager(). + getLeafQueue(queueName, true); + ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1); + RMContext rmContext = resourceManager.getRMContext(); + final FSAppAttempt app = + new FSAppAttempt(scheduler, applicationAttemptId, "user1", + schedulable, null, rmContext); + + // this needs to be in sync with the number of runnables declared below + int testThreads = 2; + + List runnables = new ArrayList(); + // add applications to modify the list + runnables.add(new Runnable() { + @Override + public void run() { + for (int i=0; i < 200; i++) { + schedulable.addAppSchedulable(app); + } + } + }); + + // iterate over the list a couple of times in a different thread + runnables.add(new Runnable() { + @Override + public void run() { + for (int i=0; i < 200; i++) { + schedulable.getResourceUsage(); + } + } + }); + + final List exceptions = Collections.synchronizedList( + new ArrayList()); + final ExecutorService threadPool = Executors.newFixedThreadPool( + testThreads); + + try { + final CountDownLatch allExecutorThreadsReady = + new CountDownLatch(testThreads); + final CountDownLatch startBlocker = new CountDownLatch(1); + final CountDownLatch allDone = new CountDownLatch(testThreads); + for (final Runnable submittedTestRunnable : runnables) { + threadPool.submit(new Runnable() { + public void run() { + allExecutorThreadsReady.countDown(); + try { + startBlocker.await(); + submittedTestRunnable.run(); + } catch (final Throwable e) { + exceptions.add(e); + } finally { + allDone.countDown(); + } + } + }); + } + // wait until all threads are ready + allExecutorThreadsReady.await(); + // start all test runners + startBlocker.countDown(); + int testTimeout = 2; + assertTrue("Timeout waiting for more than " + testTimeout + " seconds", + allDone.await(testTimeout, TimeUnit.SECONDS)); + } catch (InterruptedException ie) { + exceptions.add(ie); + } finally { + threadPool.shutdownNow(); + } + assertTrue("Test failed with exception(s)" + exceptions, + exceptions.isEmpty()); + } }