diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index ffeec63..4cd76ea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -1887,7 +1887,7 @@ public Resource getTotalResourcePending() { } @Override - public void collectSchedulerApplications( + public synchronized void collectSchedulerApplications( Collection apps) { for (FiCaSchedulerApp pendingApp : pendingApplications) { apps.add(pendingApp.getApplicationAttemptId()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 642363e..8fb2c9e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -37,11 +37,13 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.ConcurrentModificationException; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CyclicBarrier; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -61,6 +63,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; @@ -2353,6 +2356,88 @@ public void testAllocateContainerOnNodeWithoutOffSwitchSpecified() } } + @Test + public void testConcurrentAccess() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + MockRM rm = new MockRM(); + rm.init(conf); + rm.start(); + + final String queue = "default"; + final String user = "user"; + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + final LeafQueue defaultQueue = (LeafQueue) cs.getQueue(queue); + + final List listOfApps = + createListOfApps(10000, user, defaultQueue); + + final CyclicBarrier cb = new CyclicBarrier(2); + final List conException = + new ArrayList(); + + Thread submitAndRemove = new Thread(new Runnable() { + + @Override + public void run() { + + for (FiCaSchedulerApp fiCaSchedulerApp : listOfApps) { + defaultQueue.submitApplicationAttempt(fiCaSchedulerApp, user); + } + try { + cb.await(); + } catch (Exception e) { + // Ignore + } + for (FiCaSchedulerApp fiCaSchedulerApp : listOfApps) { + defaultQueue.finishApplicationAttempt(fiCaSchedulerApp, queue); + } + } + }, "SubmitAndRemoveApplicationAttempt Thread"); + + Thread getAppsInQueue = new Thread(new Runnable() { + List apps = new ArrayList(); + + @Override + public void run() { + try { + try { + cb.await(); + } catch (Exception e) { + // Ignore + } + defaultQueue.collectSchedulerApplications(apps); + } catch (ConcurrentModificationException e) { + conException.add(e); + } + } + + }, "GetAppsInQueue Thread"); + + submitAndRemove.start(); + getAppsInQueue.start(); + + submitAndRemove.join(); + getAppsInQueue.join(); + assertTrue("ConcurrentModificationException is thrown", + conException.isEmpty()); + rm.stop(); + + } + + private List createListOfApps(int noOfApps, String user, + LeafQueue defaultQueue) { + List appsLists = new ArrayList(); + for (int i = 0; i < noOfApps; i++) { + ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(i, 0); + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user, defaultQueue, + mock(ActiveUsersManager.class), spyRMContext); + appsLists.add(app_0); + } + return appsLists; + } + private CapacitySchedulerContext mockCSContext( CapacitySchedulerConfiguration csConf, Resource clusterResource) { CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class); -- 1.9.2.msysgit.0