diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 345ea8b7c365ac55e29d192ca853662017f2c52f..5eb2d8eb11d9befdd2bd57cd4d9c29a270e666a9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -47,9 +47,9 @@ FSLeafQueue.class.getName()); private final List runnableApps = // apps that are runnable - new ArrayList(); + Collections.synchronizedList(new ArrayList()); private final List nonRunnableApps = - new ArrayList(); + Collections.synchronizedList(new ArrayList()); private Resource demand = Resources.createResource(0); @@ -114,11 +114,15 @@ public boolean removeApp(FSAppAttempt app) { @Override public void collectSchedulerApplications( Collection apps) { - for (FSAppAttempt appSched : runnableApps) { - apps.add(appSched.getApplicationAttemptId()); + synchronized (runnableApps) { + for (FSAppAttempt appSched : runnableApps) { + apps.add(appSched.getApplicationAttemptId()); + } } - for (FSAppAttempt appSched : nonRunnableApps) { - apps.add(appSched.getApplicationAttemptId()); + synchronized (nonRunnableApps) { + for (FSAppAttempt appSched : nonRunnableApps) { + apps.add(appSched.getApplicationAttemptId()); + } } } @@ -144,11 +148,15 @@ public Resource getDemand() { @Override public Resource getResourceUsage() { Resource usage = Resources.createResource(0); - for (FSAppAttempt app : runnableApps) { - Resources.addTo(usage, app.getResourceUsage()); + synchronized (runnableApps) { + for (FSAppAttempt app : runnableApps) { + Resources.addTo(usage, app.getResourceUsage()); + } } - for (FSAppAttempt app : nonRunnableApps) { - Resources.addTo(usage, app.getResourceUsage()); + synchronized (nonRunnableApps) { + for (FSAppAttempt app : nonRunnableApps) { + Resources.addTo(usage, app.getResourceUsage()); + } } return usage; } @@ -164,17 +172,21 @@ public void updateDemand() { Resource maxRes = scheduler.getAllocationConfiguration() .getMaxResources(getName()); demand = Resources.createResource(0); - for (FSAppAttempt sched : runnableApps) { - if (Resources.equals(demand, maxRes)) { - break; + synchronized (runnableApps) { + for (FSAppAttempt sched : runnableApps) { + if (Resources.equals(demand, maxRes)) { + break; + } + updateDemandForApp(sched, maxRes); } - updateDemandForApp(sched, maxRes); } - for (FSAppAttempt sched : nonRunnableApps) { - if (Resources.equals(demand, maxRes)) { - break; + synchronized (nonRunnableApps) { + for (FSAppAttempt sched : nonRunnableApps) { + if (Resources.equals(demand, maxRes)) { + break; + } + updateDemandForApp(sched, maxRes); } - updateDemandForApp(sched, maxRes); } if (LOG.isDebugEnabled()) { LOG.debug("The updated demand for " + getName() + " is " + demand @@ -206,15 +218,17 @@ public Resource assignContainer(FSSchedulerNode node) { } Comparator comparator = policy.getComparator(); - Collections.sort(runnableApps, comparator); - for (FSAppAttempt sched : runnableApps) { - if (SchedulerAppUtils.isBlacklisted(sched, node, LOG)) { - continue; - } - - assigned = sched.assignContainer(node); - if (!assigned.equals(Resources.none())) { - break; + synchronized (runnableApps) { + Collections.sort(runnableApps, comparator); + for (FSAppAttempt sched : runnableApps) { + if (SchedulerAppUtils.isBlacklisted(sched, node, LOG)) { + continue; + } + + assigned = sched.assignContainer(node); + if (!assigned.equals(Resources.none())) { + break; + } } } return assigned; @@ -237,10 +251,12 @@ public RMContainer preemptContainer() { // Choose the app that is most over fair share Comparator comparator = policy.getComparator(); FSAppAttempt candidateSched = null; - for (FSAppAttempt sched : runnableApps) { - if (candidateSched == null || - comparator.compare(sched, candidateSched) > 0) { - candidateSched = sched; + synchronized (runnableApps) { + for (FSAppAttempt sched : runnableApps) { + if (candidateSched == null || + comparator.compare(sched, candidateSched) > 0) { + candidateSched = sched; + } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java index 97736bedd04279105835d85be7ae5991acb655e6..3feb85027dfb625da17a08823e9d55ac00aef436 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java @@ -28,12 +28,21 @@ import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; +import java.util.ArrayList; import java.util.Collection; - +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; @@ -222,4 +231,86 @@ public void testIsStarvedForFairShare() throws Exception { assertFalse(queueB1.isStarvedForFairShare()); assertFalse(queueB2.isStarvedForFairShare()); } + + @Test + public void testConcurrentAccess() { + conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false"); + resourceManager = new MockRM(conf); + resourceManager.start(); + scheduler = (FairScheduler) resourceManager.getResourceScheduler(); + + String queueName = "root.queue1"; + final FSLeafQueue schedulable = scheduler.getQueueManager(). + getLeafQueue(queueName, true); + + // this needs to be in sync with the number of runnables declared below + int testThreads = 2; + + List runnables = new ArrayList(); + // add applications to modify the list + runnables.add(new Runnable() { + @Override + public void run() { + ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1); + RMContext rmContext = resourceManager.getRMContext(); + FSAppAttempt app = + new FSAppAttempt(scheduler, applicationAttemptId, "user1", + schedulable, null, rmContext); + for (int i=0; i < 10000; i++) { + schedulable.addAppSchedulable(app); + schedulable.removeApp(app); + } + } + }); + + // iterate over the list a couple of times in a different thread + runnables.add(new Runnable() { + @Override + public void run() { + for (int i=0; i < 200; i++) { + schedulable.getResourceUsage(); + } + } + }); + + final List exceptions = Collections.synchronizedList( + new ArrayList()); + final ExecutorService threadPool = Executors.newFixedThreadPool( + testThreads); + + try { + final CountDownLatch allExecutorThreadsReady = + new CountDownLatch(testThreads); + final CountDownLatch startBlocker = new CountDownLatch(1); + final CountDownLatch allDone = new CountDownLatch(testThreads); + for (final Runnable submittedTestRunnable : runnables) { + threadPool.submit(new Runnable() { + public void run() { + allExecutorThreadsReady.countDown(); + try { + startBlocker.await(); + submittedTestRunnable.run(); + } catch (final Throwable e) { + exceptions.add(e); + } finally { + allDone.countDown(); + } + } + }); + } + // wait until all threads are ready + allExecutorThreadsReady.await(); + // start all test runners + startBlocker.countDown(); + int testTimeout = 2; + assertTrue("Timeout waiting for more than " + testTimeout + " seconds", + allDone.await(testTimeout, TimeUnit.SECONDS)); + } catch (InterruptedException ie) { + exceptions.add(ie); + } finally { + threadPool.shutdownNow(); + } + assertTrue("Test failed with exception(s)" + exceptions, + exceptions.isEmpty()); + } }