diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 09f6b6e..b347e7d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -300,7 +300,22 @@ //////////////////////////////// /** The class to use as the persistent store.*/ public static final String RM_STORE = RM_PREFIX + "store.class"; - + + /** + * How long to wait before deleting application state from RMStateStore after + * application is completed + */ + public static final String RM_STATE_STORE_APP_RETAIN_SECS = RM_PREFIX + + "state-store.application.retain-seconds"; + public static final long DEFAULT_RM_STATE_STORE_APP_RETAIN_SECS = 7200; + /** + * How long to wait between application state retention checks. + */ + public static final String RM_STATE_STORE_APP_RETAIN_CHECK_INTERVAL_SECS = + RM_PREFIX + "state-store.application.retain-check-interval-seconds"; + public static final long DEFAULT_RM_STATE_STORE_APP_RETAIN_CHECK_INTERVAL_SECS = + -1; + /** URI for FileSystemRMStateStore */ public static final String FS_RM_STATE_STORE_URI = RM_PREFIX + "fs.state-store.uri"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 86501ad..f42354f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -270,6 +270,21 @@ + How long to wait before deleting application state from + RMStateStore after application is completed + yarn.resourcemanager.state-store.application.retain-seconds + 7200 + + + + How long to wait between application state retention + checks. If set to 0 or a negative value then the value is computed as + one-tenth of the application retention time. + yarn.resourcemanager.state-store.application.retain-check-interval-seconds + -1 + + + The class to use as the persistent store. yarn.resourcemanager.store.class org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index 46a58fc..2a82993 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -63,7 +63,7 @@ public static final Log LOG = LogFactory.getLog(FileSystemRMStateStore.class); - private static final String ROOT_DIR_NAME = "FSRMStateRoot"; + protected static final String ROOT_DIR_NAME = "FSRMStateRoot"; protected FileSystem fs; @@ -321,7 +321,7 @@ public synchronized void updateApplicationAttemptStateInternal( } @Override - public synchronized void removeApplicationState(ApplicationState appState) + public synchronized void removeApplicationStateInternal(ApplicationState appState) throws Exception { String appId = appState.getAppId().toString(); Path nodeRemovePath = getAppDir(rmAppRoot, appId); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index 495c292..c7dc4e2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -167,8 +167,8 @@ public synchronized void updateApplicationAttemptStateInternal( } @Override - public synchronized void removeApplicationState(ApplicationState appState) - throws Exception { + public synchronized void removeApplicationStateInternal( + ApplicationState appState) throws Exception { ApplicationId appId = appState.getAppId(); ApplicationState removed = state.appState.remove(appId); if (removed == null) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java index c8ad1c4..5fed191 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java @@ -62,7 +62,7 @@ protected void storeApplicationAttemptStateInternal(String attemptId, } @Override - protected void removeApplicationState(ApplicationState appState) + protected void removeApplicationStateInternal(ApplicationState appState) throws Exception { // Do nothing } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 5a7c7dc..e2d4c43 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -36,13 +36,14 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.delegation.DelegationKey; -import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; @@ -51,9 +52,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRemovedEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNewSavedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppUpdateSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; @@ -68,7 +68,7 @@ * Real store implementations need to derive from it and implement blocking * store and load methods to actually store and load the state. */ -public abstract class RMStateStore extends AbstractService { +public abstract class RMStateStore extends CompositeService { // constants for RM App state and RMDTSecretManagerState. protected static final String RM_APP_ROOT = "RMAppRoot"; @@ -80,6 +80,10 @@ public static final Log LOG = LogFactory.getLog(RMStateStore.class); + private long retentionMillis; + private long checkIntevalMillis; + private volatile boolean stopped = false; + public RMStateStore() { super(RMStateStore.class.getName()); } @@ -259,21 +263,43 @@ public RMDTSecretManagerState getRMDTSecretManagerState() { public void setRMDispatcher(Dispatcher dispatcher) { this.rmDispatcher = dispatcher; } - - AsyncDispatcher dispatcher; - - public synchronized void serviceInit(Configuration conf) throws Exception{ - // create async handler - dispatcher = new AsyncDispatcher(); - dispatcher.init(conf); - dispatcher.register(RMStateStoreEventType.class, - new ForwardingEventHandler()); + + // Dispatcher for handling store_app events. + AsyncDispatcher storeAppDispatcher; + // Dispatcher for handing remove_app events. + AsyncDispatcher removeAppDispatcher; + + public synchronized void serviceInit(Configuration conf) throws Exception { + retentionMillis = + conf.getLong(YarnConfiguration.RM_STATE_STORE_APP_RETAIN_SECS, + YarnConfiguration.DEFAULT_RM_STATE_STORE_APP_RETAIN_SECS) * 1000; + checkIntevalMillis = + conf.getLong( + YarnConfiguration.RM_STATE_STORE_APP_RETAIN_CHECK_INTERVAL_SECS, + YarnConfiguration.DEFAULT_RM_STATE_STORE_APP_RETAIN_CHECK_INTERVAL_SECS) + * 1000; + if (checkIntevalMillis <= 0) { + // when unspecified compute check interval as 1/10th of retention + checkIntevalMillis = retentionMillis/ 10; + } + + storeAppDispatcher = new AsyncDispatcher(); + storeAppDispatcher.register(RMStateStoreEventType.class, + new StoreAppEventHandler()); + addIfService(storeAppDispatcher); + + removeAppDispatcher = new AsyncDispatcher(); + removeAppDispatcher.register(RMStateStoreEventType.class, + new RemoveAppEventHandler()); + addIfService(removeAppDispatcher); + initInternal(conf); + super.serviceInit(conf); } protected synchronized void serviceStart() throws Exception { - dispatcher.start(); startInternal(); + super.serviceStart(); } /** @@ -289,8 +315,9 @@ protected synchronized void serviceStart() throws Exception { protected abstract void startInternal() throws Exception; public synchronized void serviceStop() throws Exception { + stopped = true; closeInternal(); - dispatcher.stop(); + super.serviceStop(); } /** @@ -322,12 +349,12 @@ public synchronized void storeNewApplication(RMApp app) { ApplicationState appState = new ApplicationState(app.getSubmitTime(), app.getStartTime(), context, app.getUser()); - dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState)); + storeAppDispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState)); } @SuppressWarnings("unchecked") public synchronized void updateApplicationState(ApplicationState appState) { - dispatcher.getEventHandler().handle(new RMStateUpdateAppEvent(appState)); + storeAppDispatcher.getEventHandler().handle(new RMStateUpdateAppEvent(appState)); } /** @@ -356,14 +383,14 @@ public synchronized void storeNewApplicationAttempt(RMAppAttempt appAttempt) { appAttempt.getMasterContainer(), credentials, appAttempt.getStartTime()); - dispatcher.getEventHandler().handle( + storeAppDispatcher.getEventHandler().handle( new RMStateStoreAppAttemptEvent(attemptState)); } @SuppressWarnings("unchecked") public synchronized void updateApplicationAttemptState( ApplicationAttemptState attemptState) { - dispatcher.getEventHandler().handle( + storeAppDispatcher.getEventHandler().handle( new RMStateUpdateAppAttemptEvent(attemptState)); } @@ -453,6 +480,7 @@ protected abstract void removeRMDTMasterKeyState(DelegationKey delegationKey) * This does not block the dispatcher threads * There is no notification of completion for this operation. */ + @SuppressWarnings("unchecked") public synchronized void removeApplication(RMApp app) { ApplicationState appState = new ApplicationState( app.getSubmitTime(), app.getStartTime(), @@ -465,16 +493,9 @@ public synchronized void removeApplication(RMApp app) { appAttempt.getStartTime()); appState.attempts.put(attemptState.getAttemptId(), attemptState); } - - removeApplication(appState); - } - - @SuppressWarnings("unchecked") - /** - * Non-Blocking API - */ - public synchronized void removeApplication(ApplicationState appState) { - dispatcher.getEventHandler().handle(new RMStateStoreRemoveAppEvent(appState)); + + removeAppDispatcher.getEventHandler().handle( + new RMStateStoreRemoveAppEvent(appState)); } /** @@ -482,8 +503,8 @@ public synchronized void removeApplication(ApplicationState appState) { * Derived classes must implement this method to remove the state of an * application and its attempts */ - protected abstract void removeApplicationState(ApplicationState appState) - throws Exception; + protected abstract void removeApplicationStateInternal( + ApplicationState appState) throws Exception; // TODO: This should eventually become cluster-Id + "AM_RM_TOKEN_SERVICE". See // YARN-986 @@ -605,20 +626,6 @@ private synchronized void handleStoreEvent(RMStateStoreEvent event) { storedException); } } - } else if (event.getType().equals(RMStateStoreEventType.REMOVE_APP)) { - ApplicationState appState = - ((RMStateStoreRemoveAppEvent) event).getAppState(); - ApplicationId appId = appState.getAppId(); - Exception removedException = null; - LOG.info("Removing info for app: " + appId); - try { - removeApplicationState(appState); - } catch (Exception e) { - LOG.error("Error removing app: " + appId, e); - removedException = e; - } finally { - notifyDoneRemovingApplcation(appId, removedException); - } } else { LOG.error("Unknown RMStateStoreEvent type: " + event.getType()); } @@ -664,27 +671,53 @@ private void notifyDoneUpdatingApplicationAttempt(ApplicationAttemptId attemptId new RMAppAttemptUpdateSavedEvent(attemptId, updatedException)); } - @SuppressWarnings("unchecked") /** - * This is to notify RMApp that this application has been removed from - * RMStateStore + * EventHandler which forwards store_app events to the underlying state store + * implementation. */ - private void notifyDoneRemovingApplcation(ApplicationId appId, - Exception removedException) { - rmDispatcher.getEventHandler().handle( - new RMAppRemovedEvent(appId, removedException)); + private final class StoreAppEventHandler implements + EventHandler { + + @Override + public void handle(RMStateStoreEvent event) { + handleStoreEvent(event); + } } /** - * EventHandler implementation which forward events to the FSRMStateStore - * This hides the EventHandle methods of the store from its public interface + * EventHandler which forwards remove_app events to the underlying state store + * implementation. */ - private final class ForwardingEventHandler - implements EventHandler { - + private final class RemoveAppEventHandler implements + EventHandler { + @Override public void handle(RMStateStoreEvent event) { - handleStoreEvent(event); + assert event.getType().equals(RMStateStoreEventType.REMOVE_APP); + RMStateStoreRemoveAppEvent removeEvent = + (RMStateStoreRemoveAppEvent) event; + ApplicationId appId = removeEvent.getAppState().getAppId(); + + // Removal events must be coming in the order of increasing timestamp. + while (!stopped && !Thread.currentThread().isInterrupted()) { + if (System.currentTimeMillis() > removeEvent.getTimestamp() + + retentionMillis) { + try { + LOG.info("Removing info for application: " + appId); + removeApplicationStateInternal(removeEvent.getAppState()); + } catch (Exception e) { + LOG.error("Error removing application: " + appId, e); + } + break; + } + try { + Thread.sleep(checkIntevalMillis); + } catch (InterruptedException e) { + LOG.warn("Interrupted exception while waiting to remove application " + + appId, e); + return; + } + } } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEvent.java index 8e49a82..e45c772 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEvent.java @@ -24,4 +24,8 @@ public RMStateStoreEvent(RMStateStoreEventType type) { super(type); } + + public RMStateStoreEvent(RMStateStoreEventType type, long timeStamp) { + super(type, timeStamp); + } } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java index 402feb9..4e40d4d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java @@ -22,12 +22,12 @@ public class RMStateStoreRemoveAppEvent extends RMStateStoreEvent { ApplicationState appState; - + RMStateStoreRemoveAppEvent(ApplicationState appState) { - super(RMStateStoreEventType.REMOVE_APP); + super(RMStateStoreEventType.REMOVE_APP, System.currentTimeMillis()); this.appState = appState; } - + public ApplicationState getAppState() { return appState; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 628d260..6992981 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -65,7 +65,7 @@ public static final Log LOG = LogFactory.getLog(ZKRMStateStore.class); - private static final String ROOT_ZNODE_NAME = "ZKRMStateRoot"; + protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot"; private int numRetries; private String zkHostPort = null; @@ -342,7 +342,7 @@ public synchronized void updateApplicationAttemptStateInternal( } @Override - public synchronized void removeApplicationState(ApplicationState appState) + public synchronized void removeApplicationStateInternal(ApplicationState appState) throws Exception { String appId = appState.getAppId().toString(); String nodeRemovePath = getNodePath(rmAppRoot, appId); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index e3b083c..9112a58 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -944,6 +944,9 @@ public void transition(RMAppImpl app, RMAppEvent event) { app.handler.handle( new RMAppManagerEvent(app.applicationId, RMAppManagerEventType.APP_COMPLETED)); + + // app completed, add to the removal queue of RMStateStore. + app.rmContext.getStateStore().removeApplication(app); }; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 97f51a2..e5303b1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -395,6 +395,8 @@ public void testRMRestartAppRunningAMFailed() throws Exception { rm2.getRMContext().getRMApps().get(app0.getApplicationId()); Assert.assertEquals(RMAppAttemptState.FAILED, recoveredApp .getAppAttempts().get(am0.getApplicationAttemptId()).getAppAttemptState()); + rm1.stop(); + rm2.stop(); } @Test @@ -442,6 +444,8 @@ public void testRMRestartFailedApp() throws Exception { .contains("Failing the application.")); // failed diagnostics from attempt is lost because the diagnostics from // attempt is not yet available by the time app is saving the app state. + rm1.stop(); + rm2.stop(); } @Test @@ -488,6 +492,8 @@ public void testRMRestartKilledApp() throws Exception{ ApplicationReport appReport = verifyAppReportAfterRMRestart(app0, rm2); Assert.assertEquals(app0.getDiagnostics().toString(), appReport.getDiagnostics()); + rm1.stop(); + rm2.stop(); } @Test @@ -537,6 +543,9 @@ public void testRMRestartSucceededApp() throws Exception { Assert.assertEquals(FinalApplicationStatus.SUCCEEDED, appReport.getFinalApplicationStatus()); Assert.assertEquals("trackingUrl", appReport.getOriginalTrackingUrl()); + + rm1.stop(); + rm2.stop(); } @Test @@ -619,6 +628,9 @@ public void testRMRestartGetApplicationList() throws Exception { rm2.getClientRMService().getApplications(request2); List appList2 = response2.getApplicationList(); Assert.assertTrue(3 == appList2.size()); + + rm1.stop(); + rm2.stop(); } private MockAM launchAM(RMApp app, MockRM rm, MockNM nm) @@ -1062,6 +1074,41 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception { rm2.stop(); } + @Test + public void testStateStoreAppDelelayedRemoval() throws Exception { + // wait for 2 secs after application finishes, the app info will be deleted. + long retentionSecs = 2; + conf.setLong(YarnConfiguration.RM_STATE_STORE_APP_RETAIN_SECS, retentionSecs); + conf.setLong(YarnConfiguration.RM_STATE_STORE_APP_RETAIN_CHECK_INTERVAL_SECS, 0); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + RMState rmState = memStore.getState(); + + // start RM + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // create an app and finish the app. + RMApp app0 = rm1.submitApp(200); + MockAM am0 = launchAM(app0, rm1, nm1); + finishApplicationMaster(app0, rm1, nm1, am0); + + // After half of the retention period, app state still exists. + Thread.sleep(retentionSecs/2 * 1000); + Map rmAppState = + rmState.getApplicationState(); + Assert.assertNotNull(rmAppState.get(app0.getApplicationId())); + + // after app retention period, app is removed + Thread.sleep(retentionSecs * 1000); + Assert.assertNull(rmAppState.get(app0.getApplicationId())); + + rm1.stop(); + } + public static class TestSecurityMockRM extends MockRM { public TestSecurityMockRM(Configuration conf, RMStateStore store) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index 95c14bf..3e18e08 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -26,6 +26,8 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -104,8 +106,9 @@ public EventHandler getEventHandler() { } interface RMStateStoreHelper { - RMStateStore getRMStateStore() throws Exception; + RMStateStore getRMStateStore(Configuration conf) throws Exception; boolean isFinalStateValid() throws Exception; + boolean appExists(RMApp app) throws Exception; } void waitNotify(TestDispatcher dispatcher) { @@ -125,7 +128,7 @@ void waitNotify(TestDispatcher dispatcher) { dispatcher.notified = false; } - void storeApp(RMStateStore store, ApplicationId appId, long submitTime, + RMApp storeApp(RMStateStore store, ApplicationId appId, long submitTime, long startTime) throws Exception { ApplicationSubmissionContext context = new ApplicationSubmissionContextPBImpl(); @@ -138,6 +141,7 @@ void storeApp(RMStateStore store, ApplicationId appId, long submitTime, when(mockApp.getApplicationSubmissionContext()).thenReturn(context); when(mockApp.getUser()).thenReturn("test"); store.storeNewApplication(mockApp); + return mockApp; } ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId, @@ -165,7 +169,7 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) long submitTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis() + 1234; Configuration conf = new YarnConfiguration(); - RMStateStore store = stateStoreHelper.getRMStateStore(); + RMStateStore store = stateStoreHelper.getRMStateStore(conf); TestDispatcher dispatcher = new TestDispatcher(); store.setRMDispatcher(dispatcher); @@ -233,7 +237,7 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) store.close(); // load state - store = stateStoreHelper.getRMStateStore(); + store = stateStoreHelper.getRMStateStore(conf); store.setRMDispatcher(dispatcher); RMState state = store.loadState(); Map rmAppState = @@ -300,7 +304,7 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) store.close(); // check updated application state. - store = stateStoreHelper.getRMStateStore(); + store = stateStoreHelper.getRMStateStore(conf); store.setRMDispatcher(dispatcher); RMState newRMState = store.loadState(); Map newRMAppState = @@ -339,7 +343,8 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) public void testRMDTSecretManagerStateStore( RMStateStoreHelper stateStoreHelper) throws Exception { - RMStateStore store = stateStoreHelper.getRMStateStore(); + YarnConfiguration conf = new YarnConfiguration(); + RMStateStore store = stateStoreHelper.getRMStateStore(conf); TestDispatcher dispatcher = new TestDispatcher(); store.setRMDispatcher(dispatcher); @@ -367,6 +372,7 @@ public void testRMDTSecretManagerStateStore( Assert.assertEquals(keySet, secretManagerState.getMasterKeyState()); Assert.assertEquals(sequenceNumber, secretManagerState.getDTSequenceNumber()); + store.close(); } private Token generateAMRMToken( @@ -379,4 +385,37 @@ public void testRMDTSecretManagerStateStore( appToken.setService(new Text("appToken service")); return appToken; } + + public void testAppDeletion(RMStateStoreHelper stateStoreHelper) + throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setLong(YarnConfiguration.RM_STATE_STORE_APP_RETAIN_SECS, 0); + conf.setLong( + YarnConfiguration.RM_STATE_STORE_APP_RETAIN_CHECK_INTERVAL_SECS, 0); + RMStateStore store = stateStoreHelper.getRMStateStore(conf); + store.setRMDispatcher(new TestDispatcher()); + + // create and store apps + ArrayList appList = new ArrayList(); + int NUM_APPS = 5; + for (int i = 0; i < NUM_APPS; i++) { + ApplicationId appId = + ConverterUtils.toApplicationId("application_1383183338324_000" + i); + RMApp app = storeApp(store, appId, 123456789, 987654321); + appList.add(app); + } + + Assert.assertEquals(NUM_APPS, appList.size()); + for (RMApp app : appList) { + // wait for app to be stored. + while (!stateStoreHelper.appExists(app)); + } + + for (RMApp app : appList) { + // remove the app + store.removeApplication(app); + // wait for app to be removed. + while (stateStoreHelper.appExists(app)); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java index a1a6eab..6e55aed 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java @@ -19,6 +19,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; import static org.junit.Assert.assertTrue; + +import java.io.IOException; + import junit.framework.Assert; import org.apache.commons.logging.Log; @@ -32,6 +35,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.Test; @@ -42,7 +46,7 @@ class TestFSRMStateStoreTester implements RMStateStoreHelper { Path workingDirPathURI; - FileSystemRMStateStore store; + TestFileSystemRMStore store; MiniDFSCluster cluster; class TestFileSystemRMStore extends FileSystemRMStateStore { @@ -54,6 +58,13 @@ start(); Assert.assertNotNull(fs); } + + public Path getAppDir(String appId) { + Path rootDir = new Path(workingDirPathURI, ROOT_DIR_NAME); + Path appRootDir = new Path(rootDir, RM_APP_ROOT); + Path appDir = new Path(appRootDir, appId); + return appDir; + } } public TestFSRMStateStoreTester(MiniDFSCluster cluster) throws Exception { @@ -67,8 +78,7 @@ public TestFSRMStateStoreTester(MiniDFSCluster cluster) throws Exception { } @Override - public RMStateStore getRMStateStore() throws Exception { - YarnConfiguration conf = new YarnConfiguration(); + public RMStateStore getRMStateStore(Configuration conf) throws Exception { conf.set(YarnConfiguration.FS_RM_STATE_STORE_URI, workingDirPathURI.toString()); this.store = new TestFileSystemRMStore(conf); @@ -81,9 +91,17 @@ public boolean isFinalStateValid() throws Exception { FileStatus[] files = fs.listStatus(workingDirPathURI); return files.length == 1; } + + @Override + public boolean appExists(RMApp app) throws IOException { + FileSystem fs = cluster.getFileSystem(); + Path nodePath = + store.getAppDir(app.getApplicationId().toString()); + return fs.exists(nodePath); + } } - @Test + @Test(timeout = 60000) public void testFSRMStateStore() throws Exception { HdfsConfiguration conf = new HdfsConfiguration(); MiniDFSCluster cluster = @@ -94,15 +112,12 @@ public void testFSRMStateStore() throws Exception { // It should discard the entry and remove it from file system. FSDataOutputStream fsOut = null; FileSystemRMStateStore fileSystemRMStateStore = - (FileSystemRMStateStore) fsTester.getRMStateStore(); + (FileSystemRMStateStore) fsTester.getRMStateStore(conf); String appAttemptIdStr3 = "appattempt_1352994193343_0001_000003"; ApplicationAttemptId attemptId3 = ConverterUtils.toApplicationAttemptId(appAttemptIdStr3); - Path rootDir = - new Path(fileSystemRMStateStore.fsWorkingPath, "FSRMStateRoot"); - Path appRootDir = new Path(rootDir, "RMAppRoot"); Path appDir = - new Path(appRootDir, attemptId3.getApplicationId().toString()); + fsTester.store.getAppDir(attemptId3.getApplicationId().toString()); Path tempAppAttemptFile = new Path(appDir, attemptId3.toString() + ".tmp"); fsOut = fileSystemRMStateStore.fs.create(tempAppAttemptFile, false); @@ -110,9 +125,10 @@ public void testFSRMStateStore() throws Exception { fsOut.close(); testRMAppStateStore(fsTester); - Assert.assertFalse(fileSystemRMStateStore.fsWorkingPath + Assert.assertFalse(fsTester.workingDirPathURI .getFileSystem(conf).exists(tempAppAttemptFile)); testRMDTSecretManagerStateStore(fsTester); + testAppDeletion(fsTester); } finally { cluster.shutdown(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java index a6929a8..35508cd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java @@ -27,7 +27,9 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; import org.junit.Test; public class TestZKRMStateStore extends RMStateStoreTestBase { @@ -37,7 +39,8 @@ class TestZKRMStateStoreTester implements RMStateStoreHelper { ZooKeeper client; - ZKRMStateStore store; + TestZKRMStateStoreInternal store; + String workingZnode; class TestZKRMStateStoreInternal extends ZKRMStateStore { @@ -52,11 +55,15 @@ public TestZKRMStateStoreInternal(Configuration conf, String workingZnode) public ZooKeeper getNewZooKeeper() throws IOException { return client; } + + public String getAppNode(String appId) { + return workingZnode + "/" + ROOT_ZNODE_NAME + "/" + RM_APP_ROOT + "/" + + appId; + } } - public RMStateStore getRMStateStore() throws Exception { - String workingZnode = "/Test"; - YarnConfiguration conf = new YarnConfiguration(); + public RMStateStore getRMStateStore(Configuration conf) throws Exception { + workingZnode = "/Test"; conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort); conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode); this.client = createClient(); @@ -69,12 +76,21 @@ public boolean isFinalStateValid() throws Exception { List nodes = client.getChildren(store.znodeWorkingPath, false); return nodes.size() == 1; } + + @Override + public boolean appExists(RMApp app) throws Exception { + Stat node = + client.exists(store.getAppNode(app.getApplicationId().toString()), + false); + return node !=null; + } } - @Test + @Test (timeout = 60000) public void testZKRMStateStoreRealZK() throws Exception { TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester(); testRMAppStateStore(zkTester); testRMDTSecretManagerStateStore(zkTester); + testAppDeletion(zkTester); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index b5f4992..8b4e54d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -310,6 +310,7 @@ private void assertKilled(RMApp application) { StringBuilder diag = application.getDiagnostics(); Assert.assertEquals("application diagnostics is not correct", "Application killed by user.", diag.toString()); + verify(store, times(1)).removeApplication(application); } private void assertAppAndAttemptKilled(RMApp application) @@ -333,6 +334,7 @@ private void assertFailed(RMApp application, String regex) { Assert.assertTrue("application diagnostics is not correct", diag.toString().matches(regex)); assertAppFinalStateSaved(application); + verify(store, times(1)).removeApplication(application); } private void sendAppUpdateSavedEvent(RMApp application) { @@ -451,6 +453,7 @@ protected RMApp testCreateAppFinished( assertFinalAppStatus(FinalApplicationStatus.FAILED, application); Assert.assertTrue("Finished app missing diagnostics", application.getDiagnostics().indexOf(diagnostics) != -1); + verify(store, times(1)).removeApplication(application); return application; }