diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index 9301bba..4d229b1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -48,6 +48,7 @@ private final BlockingQueue eventQueue; private volatile boolean stopped = false; + private volatile boolean drained = false; private Thread eventHandlingThread; protected final Map, EventHandler> eventDispatchers; @@ -68,6 +69,7 @@ Runnable createThread() { @Override public void run() { while (!stopped && !Thread.currentThread().isInterrupted()) { + drained = eventQueue.isEmpty(); Event event; try { event = eventQueue.take(); @@ -102,6 +104,15 @@ protected void serviceStart() throws Exception { eventHandlingThread.start(); } + /** + * Busy loop waiting for all queued events to drain. + */ + public void drain() { + while (!drained) { + Thread.yield(); + } + } + @Override protected void serviceStop() throws Exception { stopped = true; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 2f4b896..cd41dfc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -195,8 +195,9 @@ public void setRMDispatcher(Dispatcher dispatcher) { } AsyncDispatcher dispatcher; - - public synchronized void serviceInit(Configuration conf) throws Exception{ + + @Override + protected void serviceInit(Configuration conf) throws Exception{ // create async handler dispatcher = new AsyncDispatcher(); dispatcher.init(conf); @@ -204,8 +205,9 @@ public synchronized void serviceInit(Configuration conf) throws Exception{ new ForwardingEventHandler()); initInternal(conf); } - - protected synchronized void serviceStart() throws Exception { + + @Override + protected void serviceStart() throws Exception { dispatcher.start(); startInternal(); } @@ -222,11 +224,15 @@ protected synchronized void serviceStart() throws Exception { */ protected abstract void startInternal() throws Exception; - public synchronized void serviceStop() throws Exception { + @Override + protected void serviceStop() throws Exception { + // ClientRMService, ApplicationMasterSeverice stop before RMStateStore, so + // no new events can be added when RMStateStore is stopping. + dispatcher.drain(); closeInternal(); dispatcher.stop(); } - + /** * Derived classes close themselves using this method. * The base class will be closed and the event dispatcher will be shutdown @@ -426,7 +432,7 @@ private Credentials getCredentialsFromAppAttempt(RMAppAttempt appAttempt) { // Dispatcher related code - private synchronized void handleStoreEvent(RMStateStoreEvent event) { + protected void handleStoreEvent(RMStateStoreEvent event) { switch(event.getType()) { case STORE_APP: { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index d9ff1b0..19aacab 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -163,6 +163,14 @@ public RMApp submitApp(int masterMemory, String name, String user, public RMApp submitApp(int masterMemory, String name, String user, Map acls, boolean unmanaged, String queue, int maxAppAttempts, Credentials ts, String appType) throws Exception { + return submitApp(masterMemory, name, user, acls, unmanaged, queue, + maxAppAttempts, ts, appType, true); + } + + public RMApp submitApp(int masterMemory, String name, String user, + Map acls, boolean unmanaged, String queue, + int maxAppAttempts, Credentials ts, String appType, + boolean waitForAccecpted) throws Exception { ApplicationClientProtocol client = getClientRMService(); GetNewApplicationResponse resp = client.getNewApplication(Records .newRecord(GetNewApplicationRequest.class)); @@ -222,7 +230,9 @@ public SubmitApplicationResponse run() { }.setClientReq(client, req); fakeUser.doAs(action); // make sure app is immediately available after submit - waitForState(appId, RMAppState.ACCEPTED); + if (waitForAccecpted) { + waitForState(appId, RMAppState.ACCEPTED); + } return getRMContext().getRMApps().get(appId); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 81f4bce..4df65e7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; @@ -61,6 +62,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -740,6 +742,75 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception { rm2.stop(); } + @Test + public void testRMStateStoreDispatcherDrainedOnRMStop() throws Exception { + MemoryRMStateStore memStore = new MemoryRMStateStore() { + AtomicBoolean wait = new AtomicBoolean(true); + + @Override + public void serviceStop() throws Exception { + // Unblock app saving request. + wait.set(false); + super.serviceStop(); + } + + @Override + protected void handleStoreEvent(RMStateStoreEvent event) { + // Block app saving request. + while (wait.get()); + super.handleStoreEvent(event); + } + }; + memStore.init(conf); + + // start RM + final MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + + // create apps. + final ArrayList appList = new ArrayList(); + final int NUM_APPS = 5; + Thread submitAppThread = new Thread() { + @Override + public void run() { + try { + for (int i = 0; i < NUM_APPS; i++) { + RMApp app = rm1.submitApp(200, "name", "user", + new HashMap(), false, + "default", -1, null, "MAPREDUCE", false); + appList.add(app); + rm1.waitForState(app.getApplicationId(), RMAppState.NEW_SAVING); + } + } catch (Exception e) { + } + } + }; + submitAppThread.start(); + submitAppThread.join(); + // all apps's saving request are now enqueued to RMStateStore's dispatcher + // queue, and will be processed once rm.stop() is called. + + // Nothing exist in state store before stop is called. + Map rmAppState = + memStore.getState().getApplicationState(); + Assert.assertTrue(rmAppState.size() == 0); + + // stop rm + rm1.stop(); + + // Assert app info is still saved even if stop is called with pending saving + // request on dispatcher. + for (RMApp app : appList) { + ApplicationState appState = rmAppState.get(app.getApplicationId()); + Assert.assertNotNull(appState); + Assert.assertEquals(0, appState.getAttemptCount()); + Assert.assertEquals(appState.getApplicationSubmissionContext() + .getApplicationId(), app.getApplicationSubmissionContext() + .getApplicationId()); + } + Assert.assertTrue(rmAppState.size() == NUM_APPS); + } + public static class TestSecurityMockRM extends MockRM { public TestSecurityMockRM(Configuration conf, RMStateStore store) {