diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index 9301bba..68438f0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -48,6 +48,9 @@ private final BlockingQueue eventQueue; private volatile boolean stopped = false; + private volatile boolean drained = true; + private volatile boolean drainingStopNeeded = false; + private boolean printLog = true; private Thread eventHandlingThread; protected final Map, EventHandler> eventDispatchers; @@ -68,6 +71,7 @@ Runnable createThread() { @Override public void run() { while (!stopped && !Thread.currentThread().isInterrupted()) { + drained = eventQueue.isEmpty(); Event event; try { event = eventQueue.take(); @@ -102,8 +106,17 @@ protected void serviceStart() throws Exception { eventHandlingThread.start(); } + public void setDraningStop() { + drainingStopNeeded = true; + } + @Override protected void serviceStop() throws Exception { + if (drainingStopNeeded) { + while(!drained) { + Thread.yield(); + } + } stopped = true; if (eventHandlingThread != null) { eventHandlingThread.interrupt(); @@ -178,6 +191,15 @@ public EventHandler getEventHandler() { class GenericEventHandler implements EventHandler { public void handle(Event event) { + if (drainingStopNeeded) { + if (printLog) { + LOG.info("Ignoring events as AsyncDispatcher is draning to stop."); + printLog = false; + } + return; + } + drained = false; + /* all this method does is enqueue all the events onto the queue */ int qSize = eventQueue.size(); if (qSize !=0 && qSize %1000 == 0) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 5a7c7dc..5a7b20a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -261,8 +261,9 @@ public void setRMDispatcher(Dispatcher dispatcher) { } AsyncDispatcher dispatcher; - - public synchronized void serviceInit(Configuration conf) throws Exception{ + + @Override + protected void serviceInit(Configuration conf) throws Exception{ // create async handler dispatcher = new AsyncDispatcher(); dispatcher.init(conf); @@ -270,8 +271,9 @@ public synchronized void serviceInit(Configuration conf) throws Exception{ new ForwardingEventHandler()); initInternal(conf); } - - protected synchronized void serviceStart() throws Exception { + + @Override + protected void serviceStart() throws Exception { dispatcher.start(); startInternal(); } @@ -288,11 +290,13 @@ protected synchronized void serviceStart() throws Exception { */ protected abstract void startInternal() throws Exception; - public synchronized void serviceStop() throws Exception { + @Override + protected void serviceStop() throws Exception { + dispatcher.setDraningStop(); closeInternal(); dispatcher.stop(); } - + /** * Derived classes close themselves using this method. * The base class will be closed and the event dispatcher will be shutdown @@ -509,8 +513,7 @@ public Credentials getCredentialsFromAppAttempt(RMAppAttempt appAttempt) { } // Dispatcher related code - - private synchronized void handleStoreEvent(RMStateStoreEvent event) { + protected void handleStoreEvent(RMStateStoreEvent event) { if (event.getType().equals(RMStateStoreEventType.STORE_APP) || event.getType().equals(RMStateStoreEventType.UPDATE_APP)) { ApplicationState appState = null; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index aba334a..cb41ca4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -163,6 +163,14 @@ public RMApp submitApp(int masterMemory, String name, String user, public RMApp submitApp(int masterMemory, String name, String user, Map acls, boolean unmanaged, String queue, int maxAppAttempts, Credentials ts, String appType) throws Exception { + return submitApp(masterMemory, name, user, acls, unmanaged, queue, + maxAppAttempts, ts, appType, true); + } + + public RMApp submitApp(int masterMemory, String name, String user, + Map acls, boolean unmanaged, String queue, + int maxAppAttempts, Credentials ts, String appType, + boolean waitForAccepted) throws Exception { ApplicationClientProtocol client = getClientRMService(); GetNewApplicationResponse resp = client.getNewApplication(Records .newRecord(GetNewApplicationRequest.class)); @@ -222,7 +230,9 @@ public SubmitApplicationResponse run() { }.setClientReq(client, req); fakeUser.doAs(action); // make sure app is immediately available after submit - waitForState(appId, RMAppState.ACCEPTED); + if (waitForAccepted) { + waitForState(appId, RMAppState.ACCEPTED); + } return getRMContext().getRMApps().get(appId); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 97f51a2..f87f689 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -70,6 +70,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -1062,6 +1063,65 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception { rm2.stop(); } + @Test + public void testRMStateStoreDispatcherDrainedOnRMStop() throws Exception { + MemoryRMStateStore memStore = new MemoryRMStateStore() { + volatile boolean wait = true; + @Override + public void serviceStop() throws Exception { + // Unblock app saving request. + wait = false; + super.serviceStop(); + } + + @Override + protected void handleStoreEvent(RMStateStoreEvent event) { + // Block app saving request. + while (wait); + super.handleStoreEvent(event); + } + }; + memStore.init(conf); + + // start RM + final MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + + // create apps. + final ArrayList appList = new ArrayList(); + final int NUM_APPS = 5; + + for (int i = 0; i < NUM_APPS; i++) { + RMApp app = rm1.submitApp(200, "name", "user", + new HashMap(), false, + "default", -1, null, "MAPREDUCE", false); + appList.add(app); + rm1.waitForState(app.getApplicationId(), RMAppState.NEW_SAVING); + } + // all apps's saving request are now enqueued to RMStateStore's dispatcher + // queue, and will be processed once rm.stop() is called. + + // Nothing exist in state store before stop is called. + Map rmAppState = + memStore.getState().getApplicationState(); + Assert.assertTrue(rmAppState.size() == 0); + + // stop rm + rm1.stop(); + + // Assert app info is still saved even if stop is called with pending saving + // request on dispatcher. + for (RMApp app : appList) { + ApplicationState appState = rmAppState.get(app.getApplicationId()); + Assert.assertNotNull(appState); + Assert.assertEquals(0, appState.getAttemptCount()); + Assert.assertEquals(appState.getApplicationSubmissionContext() + .getApplicationId(), app.getApplicationSubmissionContext() + .getApplicationId()); + } + Assert.assertTrue(rmAppState.size() == NUM_APPS); + } + public static class TestSecurityMockRM extends MockRM { public TestSecurityMockRM(Configuration conf, RMStateStore store) {