diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index b9966e7..b23ec3e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -172,7 +172,7 @@ private synchronized void unreserveInternal( } @Override - public synchronized Resource getHeadroom() { + public Resource getHeadroom() { final FSQueue queue = (FSQueue) this.queue; SchedulingPolicy policy = queue.getPolicy(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 345ea8b..a919e74 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -23,6 +23,9 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; @@ -50,6 +53,10 @@ new ArrayList(); private final List nonRunnableApps = new ArrayList(); + // get a lock with fair distribution for app list updates + private final ReadWriteLock rwl = new ReentrantReadWriteLock(true); + private final Lock readLock = rwl.readLock(); + private final Lock writeLock = rwl.writeLock(); private Resource demand = Resources.createResource(0); @@ -72,16 +79,20 @@ public FSLeafQueue(String name, FairScheduler scheduler, } public void addApp(FSAppAttempt app, boolean runnable) { + writeLock.lock(); if (runnable) { runnableApps.add(app); } else { nonRunnableApps.add(app); } + writeLock.unlock(); } // for testing void addAppSchedulable(FSAppAttempt appSched) { + writeLock.lock(); runnableApps.add(appSched); + writeLock.unlock(); } /** @@ -89,15 +100,19 @@ void addAppSchedulable(FSAppAttempt appSched) { * @return whether or not the app was runnable */ public boolean removeApp(FSAppAttempt app) { + writeLock.lock(); if (runnableApps.remove(app)) { + writeLock.unlock(); // Update AM resource usage if (app.isAmRunning() && app.getAMResource() != null) { Resources.subtractFrom(amResourceUsage, app.getAMResource()); } return true; } else if (nonRunnableApps.remove(app)) { + writeLock.unlock(); return false; } else { + writeLock.unlock(); throw new IllegalStateException("Given app to remove " + app + " does not exist in queue " + this); } @@ -114,12 +129,14 @@ public boolean removeApp(FSAppAttempt app) { @Override public void collectSchedulerApplications( Collection apps) { + readLock.lock(); for (FSAppAttempt appSched : runnableApps) { apps.add(appSched.getApplicationAttemptId()); } for (FSAppAttempt appSched : nonRunnableApps) { apps.add(appSched.getApplicationAttemptId()); } + readLock.unlock(); } @Override @@ -144,12 +161,14 @@ public Resource getDemand() { @Override public Resource getResourceUsage() { Resource usage = Resources.createResource(0); + readLock.lock(); for (FSAppAttempt app : runnableApps) { Resources.addTo(usage, app.getResourceUsage()); } for (FSAppAttempt app : nonRunnableApps) { Resources.addTo(usage, app.getResourceUsage()); } + readLock.unlock(); return usage; } @@ -164,6 +183,7 @@ public void updateDemand() { Resource maxRes = scheduler.getAllocationConfiguration() .getMaxResources(getName()); demand = Resources.createResource(0); + readLock.lock(); for (FSAppAttempt sched : runnableApps) { if (Resources.equals(demand, maxRes)) { break; @@ -176,6 +196,7 @@ public void updateDemand() { } updateDemandForApp(sched, maxRes); } + readLock.unlock(); if (LOG.isDebugEnabled()) { LOG.debug("The updated demand for " + getName() + " is " + demand + "; the max is " + maxRes); @@ -198,7 +219,8 @@ private void updateDemandForApp(FSAppAttempt sched, Resource maxRes) { public Resource assignContainer(FSSchedulerNode node) { Resource assigned = Resources.none(); if (LOG.isDebugEnabled()) { - LOG.debug("Node " + node.getNodeName() + " offered to queue: " + getName()); + LOG.debug("Node " + node.getNodeName() + " offered to queue: " + + getName()); } if (!assignContainerPreCheck(node)) { @@ -206,7 +228,10 @@ public Resource assignContainer(FSSchedulerNode node) { } Comparator comparator = policy.getComparator(); + writeLock.lock(); Collections.sort(runnableApps, comparator); + writeLock.unlock(); + readLock.lock(); for (FSAppAttempt sched : runnableApps) { if (SchedulerAppUtils.isBlacklisted(sched, node, LOG)) { continue; @@ -217,6 +242,7 @@ public Resource assignContainer(FSSchedulerNode node) { break; } } + readLock.unlock(); return assigned; } @@ -237,12 +263,14 @@ public RMContainer preemptContainer() { // Choose the app that is most over fair share Comparator comparator = policy.getComparator(); FSAppAttempt candidateSched = null; + readLock.lock(); for (FSAppAttempt sched : runnableApps) { if (candidateSched == null || comparator.compare(sched, candidateSched) > 0) { candidateSched = sched; } } + readLock.unlock(); // Preempt from the selected app if (candidateSched != null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java index 97736be..385ea0b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java @@ -28,12 +28,22 @@ import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; +import java.util.ArrayList; import java.util.Collection; - +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; @@ -222,4 +232,85 @@ public void testIsStarvedForFairShare() throws Exception { assertFalse(queueB1.isStarvedForFairShare()); assertFalse(queueB2.isStarvedForFairShare()); } + + @Test + public void testConcurrentAccess() { + conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false"); + resourceManager = new MockRM(conf); + resourceManager.start(); + scheduler = (FairScheduler) resourceManager.getResourceScheduler(); + + String queueName = "root.queue1"; + final FSLeafQueue schedulable = scheduler.getQueueManager(). + getLeafQueue(queueName, true); + ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1); + RMContext rmContext = resourceManager.getRMContext(); + final FSAppAttempt app = + new FSAppAttempt(scheduler, applicationAttemptId, "user1", + schedulable, null, rmContext); + + // this needs to be in sync with the number of runnables declared below + int testThreads = 2; + List runnables = new ArrayList(); + + // add applications to modify the list + runnables.add(new Runnable() { + @Override + public void run() { + for (int i=0; i < 500; i++) { + schedulable.addAppSchedulable(app); + } + } + }); + + // iterate over the list a couple of times in a different thread + runnables.add(new Runnable() { + @Override + public void run() { + for (int i=0; i < 500; i++) { + schedulable.getResourceUsage(); + } + } + }); + + final List exceptions = Collections.synchronizedList( + new ArrayList()); + final ExecutorService threadPool = Executors.newFixedThreadPool( + testThreads); + + try { + final CountDownLatch allExecutorThreadsReady = + new CountDownLatch(testThreads); + final CountDownLatch startBlocker = new CountDownLatch(1); + final CountDownLatch allDone = new CountDownLatch(testThreads); + for (final Runnable submittedTestRunnable : runnables) { + threadPool.submit(new Runnable() { + public void run() { + allExecutorThreadsReady.countDown(); + try { + startBlocker.await(); + submittedTestRunnable.run(); + } catch (final Throwable e) { + exceptions.add(e); + } finally { + allDone.countDown(); + } + } + }); + } + // wait until all threads are ready + allExecutorThreadsReady.await(); + // start all test runners + startBlocker.countDown(); + int testTimeout = 2; + assertTrue("Timeout waiting for more than " + testTimeout + " seconds", + allDone.await(testTimeout, TimeUnit.SECONDS)); + } catch (InterruptedException ie) { + exceptions.add(ie); + } finally { + threadPool.shutdownNow(); + } + assertTrue("Test failed with exception(s)" + exceptions, + exceptions.isEmpty()); + } }