diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 09f6b6e..b347e7d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -300,7 +300,22 @@
////////////////////////////////
/** The class to use as the persistent store.*/
public static final String RM_STORE = RM_PREFIX + "store.class";
-
+
+ /**
+ * How long to wait before deleting application state from RMStateStore after
+ * application is completed
+ */
+ public static final String RM_STATE_STORE_APP_RETAIN_SECS = RM_PREFIX
+ + "state-store.application.retain-seconds";
+ public static final long DEFAULT_RM_STATE_STORE_APP_RETAIN_SECS = 7200;
+ /**
+ * How long to wait between application state retention checks.
+ */
+ public static final String RM_STATE_STORE_APP_RETAIN_CHECK_INTERVAL_SECS =
+ RM_PREFIX + "state-store.application.retain-check-interval-seconds";
+ public static final long DEFAULT_RM_STATE_STORE_APP_RETAIN_CHECK_INTERVAL_SECS =
+ -1;
+
/** URI for FileSystemRMStateStore */
public static final String FS_RM_STATE_STORE_URI =
RM_PREFIX + "fs.state-store.uri";
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 86501ad..f42354f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -270,6 +270,21 @@
+ How long to wait before deleting application state from
+ RMStateStore after application is completed
+ yarn.resourcemanager.state-store.application.retain-seconds
+ 7200
+
+
+
+ How long to wait between application state retention
+ checks. If set to 0 or a negative value then the value is computed as
+ one-tenth of the application retention time.
+ yarn.resourcemanager.state-store.application.retain-check-interval-seconds
+ -1
+
+
+
The class to use as the persistent store.
yarn.resourcemanager.store.class
org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
index 46a58fc..2a82993 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
@@ -63,7 +63,7 @@
public static final Log LOG = LogFactory.getLog(FileSystemRMStateStore.class);
- private static final String ROOT_DIR_NAME = "FSRMStateRoot";
+ protected static final String ROOT_DIR_NAME = "FSRMStateRoot";
protected FileSystem fs;
@@ -321,7 +321,7 @@ public synchronized void updateApplicationAttemptStateInternal(
}
@Override
- public synchronized void removeApplicationState(ApplicationState appState)
+ public synchronized void removeApplicationStateInternal(ApplicationState appState)
throws Exception {
String appId = appState.getAppId().toString();
Path nodeRemovePath = getAppDir(rmAppRoot, appId);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
index 495c292..c7dc4e2 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
@@ -167,8 +167,8 @@ public synchronized void updateApplicationAttemptStateInternal(
}
@Override
- public synchronized void removeApplicationState(ApplicationState appState)
- throws Exception {
+ public synchronized void removeApplicationStateInternal(
+ ApplicationState appState) throws Exception {
ApplicationId appId = appState.getAppId();
ApplicationState removed = state.appState.remove(appId);
if (removed == null) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
index c8ad1c4..5fed191 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
@@ -62,7 +62,7 @@ protected void storeApplicationAttemptStateInternal(String attemptId,
}
@Override
- protected void removeApplicationState(ApplicationState appState)
+ protected void removeApplicationStateInternal(ApplicationState appState)
throws Exception {
// Do nothing
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
index 5a7c7dc..e2d4c43 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
@@ -36,13 +36,14 @@
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.DelegationKey;
-import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -51,9 +52,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRemovedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNewSavedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppUpdateSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -68,7 +68,7 @@
* Real store implementations need to derive from it and implement blocking
* store and load methods to actually store and load the state.
*/
-public abstract class RMStateStore extends AbstractService {
+public abstract class RMStateStore extends CompositeService {
// constants for RM App state and RMDTSecretManagerState.
protected static final String RM_APP_ROOT = "RMAppRoot";
@@ -80,6 +80,10 @@
public static final Log LOG = LogFactory.getLog(RMStateStore.class);
+ private long retentionMillis;
+ private long checkIntevalMillis;
+ private volatile boolean stopped = false;
+
public RMStateStore() {
super(RMStateStore.class.getName());
}
@@ -259,21 +263,43 @@ public RMDTSecretManagerState getRMDTSecretManagerState() {
public void setRMDispatcher(Dispatcher dispatcher) {
this.rmDispatcher = dispatcher;
}
-
- AsyncDispatcher dispatcher;
-
- public synchronized void serviceInit(Configuration conf) throws Exception{
- // create async handler
- dispatcher = new AsyncDispatcher();
- dispatcher.init(conf);
- dispatcher.register(RMStateStoreEventType.class,
- new ForwardingEventHandler());
+
+ // Dispatcher for handling store_app events.
+ AsyncDispatcher storeAppDispatcher;
+ // Dispatcher for handing remove_app events.
+ AsyncDispatcher removeAppDispatcher;
+
+ public synchronized void serviceInit(Configuration conf) throws Exception {
+ retentionMillis =
+ conf.getLong(YarnConfiguration.RM_STATE_STORE_APP_RETAIN_SECS,
+ YarnConfiguration.DEFAULT_RM_STATE_STORE_APP_RETAIN_SECS) * 1000;
+ checkIntevalMillis =
+ conf.getLong(
+ YarnConfiguration.RM_STATE_STORE_APP_RETAIN_CHECK_INTERVAL_SECS,
+ YarnConfiguration.DEFAULT_RM_STATE_STORE_APP_RETAIN_CHECK_INTERVAL_SECS)
+ * 1000;
+ if (checkIntevalMillis <= 0) {
+ // when unspecified compute check interval as 1/10th of retention
+ checkIntevalMillis = retentionMillis/ 10;
+ }
+
+ storeAppDispatcher = new AsyncDispatcher();
+ storeAppDispatcher.register(RMStateStoreEventType.class,
+ new StoreAppEventHandler());
+ addIfService(storeAppDispatcher);
+
+ removeAppDispatcher = new AsyncDispatcher();
+ removeAppDispatcher.register(RMStateStoreEventType.class,
+ new RemoveAppEventHandler());
+ addIfService(removeAppDispatcher);
+
initInternal(conf);
+ super.serviceInit(conf);
}
protected synchronized void serviceStart() throws Exception {
- dispatcher.start();
startInternal();
+ super.serviceStart();
}
/**
@@ -289,8 +315,9 @@ protected synchronized void serviceStart() throws Exception {
protected abstract void startInternal() throws Exception;
public synchronized void serviceStop() throws Exception {
+ stopped = true;
closeInternal();
- dispatcher.stop();
+ super.serviceStop();
}
/**
@@ -322,12 +349,12 @@ public synchronized void storeNewApplication(RMApp app) {
ApplicationState appState =
new ApplicationState(app.getSubmitTime(), app.getStartTime(), context,
app.getUser());
- dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
+ storeAppDispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
}
@SuppressWarnings("unchecked")
public synchronized void updateApplicationState(ApplicationState appState) {
- dispatcher.getEventHandler().handle(new RMStateUpdateAppEvent(appState));
+ storeAppDispatcher.getEventHandler().handle(new RMStateUpdateAppEvent(appState));
}
/**
@@ -356,14 +383,14 @@ public synchronized void storeNewApplicationAttempt(RMAppAttempt appAttempt) {
appAttempt.getMasterContainer(), credentials,
appAttempt.getStartTime());
- dispatcher.getEventHandler().handle(
+ storeAppDispatcher.getEventHandler().handle(
new RMStateStoreAppAttemptEvent(attemptState));
}
@SuppressWarnings("unchecked")
public synchronized void updateApplicationAttemptState(
ApplicationAttemptState attemptState) {
- dispatcher.getEventHandler().handle(
+ storeAppDispatcher.getEventHandler().handle(
new RMStateUpdateAppAttemptEvent(attemptState));
}
@@ -453,6 +480,7 @@ protected abstract void removeRMDTMasterKeyState(DelegationKey delegationKey)
* This does not block the dispatcher threads
* There is no notification of completion for this operation.
*/
+ @SuppressWarnings("unchecked")
public synchronized void removeApplication(RMApp app) {
ApplicationState appState = new ApplicationState(
app.getSubmitTime(), app.getStartTime(),
@@ -465,16 +493,9 @@ public synchronized void removeApplication(RMApp app) {
appAttempt.getStartTime());
appState.attempts.put(attemptState.getAttemptId(), attemptState);
}
-
- removeApplication(appState);
- }
-
- @SuppressWarnings("unchecked")
- /**
- * Non-Blocking API
- */
- public synchronized void removeApplication(ApplicationState appState) {
- dispatcher.getEventHandler().handle(new RMStateStoreRemoveAppEvent(appState));
+
+ removeAppDispatcher.getEventHandler().handle(
+ new RMStateStoreRemoveAppEvent(appState));
}
/**
@@ -482,8 +503,8 @@ public synchronized void removeApplication(ApplicationState appState) {
* Derived classes must implement this method to remove the state of an
* application and its attempts
*/
- protected abstract void removeApplicationState(ApplicationState appState)
- throws Exception;
+ protected abstract void removeApplicationStateInternal(
+ ApplicationState appState) throws Exception;
// TODO: This should eventually become cluster-Id + "AM_RM_TOKEN_SERVICE". See
// YARN-986
@@ -605,20 +626,6 @@ private synchronized void handleStoreEvent(RMStateStoreEvent event) {
storedException);
}
}
- } else if (event.getType().equals(RMStateStoreEventType.REMOVE_APP)) {
- ApplicationState appState =
- ((RMStateStoreRemoveAppEvent) event).getAppState();
- ApplicationId appId = appState.getAppId();
- Exception removedException = null;
- LOG.info("Removing info for app: " + appId);
- try {
- removeApplicationState(appState);
- } catch (Exception e) {
- LOG.error("Error removing app: " + appId, e);
- removedException = e;
- } finally {
- notifyDoneRemovingApplcation(appId, removedException);
- }
} else {
LOG.error("Unknown RMStateStoreEvent type: " + event.getType());
}
@@ -664,27 +671,53 @@ private void notifyDoneUpdatingApplicationAttempt(ApplicationAttemptId attemptId
new RMAppAttemptUpdateSavedEvent(attemptId, updatedException));
}
- @SuppressWarnings("unchecked")
/**
- * This is to notify RMApp that this application has been removed from
- * RMStateStore
+ * EventHandler which forwards store_app events to the underlying state store
+ * implementation.
*/
- private void notifyDoneRemovingApplcation(ApplicationId appId,
- Exception removedException) {
- rmDispatcher.getEventHandler().handle(
- new RMAppRemovedEvent(appId, removedException));
+ private final class StoreAppEventHandler implements
+ EventHandler {
+
+ @Override
+ public void handle(RMStateStoreEvent event) {
+ handleStoreEvent(event);
+ }
}
/**
- * EventHandler implementation which forward events to the FSRMStateStore
- * This hides the EventHandle methods of the store from its public interface
+ * EventHandler which forwards remove_app events to the underlying state store
+ * implementation.
*/
- private final class ForwardingEventHandler
- implements EventHandler {
-
+ private final class RemoveAppEventHandler implements
+ EventHandler {
+
@Override
public void handle(RMStateStoreEvent event) {
- handleStoreEvent(event);
+ assert event.getType().equals(RMStateStoreEventType.REMOVE_APP);
+ RMStateStoreRemoveAppEvent removeEvent =
+ (RMStateStoreRemoveAppEvent) event;
+ ApplicationId appId = removeEvent.getAppState().getAppId();
+
+ // Removal events must be coming in the order of increasing timestamp.
+ while (!stopped && !Thread.currentThread().isInterrupted()) {
+ if (System.currentTimeMillis() > removeEvent.getTimestamp()
+ + retentionMillis) {
+ try {
+ LOG.info("Removing info for application: " + appId);
+ removeApplicationStateInternal(removeEvent.getAppState());
+ } catch (Exception e) {
+ LOG.error("Error removing application: " + appId, e);
+ }
+ break;
+ }
+ try {
+ Thread.sleep(checkIntevalMillis);
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted exception while waiting to remove application "
+ + appId, e);
+ return;
+ }
+ }
}
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEvent.java
index 8e49a82..e45c772 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEvent.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEvent.java
@@ -24,4 +24,8 @@
public RMStateStoreEvent(RMStateStoreEventType type) {
super(type);
}
+
+ public RMStateStoreEvent(RMStateStoreEventType type, long timeStamp) {
+ super(type, timeStamp);
+ }
}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java
index 402feb9..4e40d4d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java
@@ -22,12 +22,12 @@
public class RMStateStoreRemoveAppEvent extends RMStateStoreEvent {
ApplicationState appState;
-
+
RMStateStoreRemoveAppEvent(ApplicationState appState) {
- super(RMStateStoreEventType.REMOVE_APP);
+ super(RMStateStoreEventType.REMOVE_APP, System.currentTimeMillis());
this.appState = appState;
}
-
+
public ApplicationState getAppState() {
return appState;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index 628d260..6992981 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -65,7 +65,7 @@
public static final Log LOG = LogFactory.getLog(ZKRMStateStore.class);
- private static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
+ protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
private int numRetries;
private String zkHostPort = null;
@@ -342,7 +342,7 @@ public synchronized void updateApplicationAttemptStateInternal(
}
@Override
- public synchronized void removeApplicationState(ApplicationState appState)
+ public synchronized void removeApplicationStateInternal(ApplicationState appState)
throws Exception {
String appId = appState.getAppId().toString();
String nodeRemovePath = getNodePath(rmAppRoot, appId);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index e3b083c..9112a58 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -944,6 +944,9 @@ public void transition(RMAppImpl app, RMAppEvent event) {
app.handler.handle(
new RMAppManagerEvent(app.applicationId,
RMAppManagerEventType.APP_COMPLETED));
+
+ // app completed, add to the removal queue of RMStateStore.
+ app.rmContext.getStateStore().removeApplication(app);
};
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
index 97f51a2..e5303b1 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
@@ -395,6 +395,8 @@ public void testRMRestartAppRunningAMFailed() throws Exception {
rm2.getRMContext().getRMApps().get(app0.getApplicationId());
Assert.assertEquals(RMAppAttemptState.FAILED, recoveredApp
.getAppAttempts().get(am0.getApplicationAttemptId()).getAppAttemptState());
+ rm1.stop();
+ rm2.stop();
}
@Test
@@ -442,6 +444,8 @@ public void testRMRestartFailedApp() throws Exception {
.contains("Failing the application."));
// failed diagnostics from attempt is lost because the diagnostics from
// attempt is not yet available by the time app is saving the app state.
+ rm1.stop();
+ rm2.stop();
}
@Test
@@ -488,6 +492,8 @@ public void testRMRestartKilledApp() throws Exception{
ApplicationReport appReport = verifyAppReportAfterRMRestart(app0, rm2);
Assert.assertEquals(app0.getDiagnostics().toString(),
appReport.getDiagnostics());
+ rm1.stop();
+ rm2.stop();
}
@Test
@@ -537,6 +543,9 @@ public void testRMRestartSucceededApp() throws Exception {
Assert.assertEquals(FinalApplicationStatus.SUCCEEDED,
appReport.getFinalApplicationStatus());
Assert.assertEquals("trackingUrl", appReport.getOriginalTrackingUrl());
+
+ rm1.stop();
+ rm2.stop();
}
@Test
@@ -619,6 +628,9 @@ public void testRMRestartGetApplicationList() throws Exception {
rm2.getClientRMService().getApplications(request2);
List appList2 = response2.getApplicationList();
Assert.assertTrue(3 == appList2.size());
+
+ rm1.stop();
+ rm2.stop();
}
private MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
@@ -1062,6 +1074,41 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception {
rm2.stop();
}
+ @Test
+ public void testStateStoreAppDelelayedRemoval() throws Exception {
+ // wait for 2 secs after application finishes, the app info will be deleted.
+ long retentionSecs = 2;
+ conf.setLong(YarnConfiguration.RM_STATE_STORE_APP_RETAIN_SECS, retentionSecs);
+ conf.setLong(YarnConfiguration.RM_STATE_STORE_APP_RETAIN_CHECK_INTERVAL_SECS, 0);
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+ RMState rmState = memStore.getState();
+
+ // start RM
+ MockRM rm1 = new MockRM(conf, memStore);
+ rm1.start();
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+ nm1.registerNode();
+
+ // create an app and finish the app.
+ RMApp app0 = rm1.submitApp(200);
+ MockAM am0 = launchAM(app0, rm1, nm1);
+ finishApplicationMaster(app0, rm1, nm1, am0);
+
+ // After half of the retention period, app state still exists.
+ Thread.sleep(retentionSecs/2 * 1000);
+ Map rmAppState =
+ rmState.getApplicationState();
+ Assert.assertNotNull(rmAppState.get(app0.getApplicationId()));
+
+ // after app retention period, app is removed
+ Thread.sleep(retentionSecs * 1000);
+ Assert.assertNull(rmAppState.get(app0.getApplicationId()));
+
+ rm1.stop();
+ }
+
public static class TestSecurityMockRM extends MockRM {
public TestSecurityMockRM(Configuration conf, RMStateStore store) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
index 95c14bf..3e18e08 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
@@ -26,6 +26,8 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -104,8 +106,9 @@ public EventHandler getEventHandler() {
}
interface RMStateStoreHelper {
- RMStateStore getRMStateStore() throws Exception;
+ RMStateStore getRMStateStore(Configuration conf) throws Exception;
boolean isFinalStateValid() throws Exception;
+ boolean appExists(RMApp app) throws Exception;
}
void waitNotify(TestDispatcher dispatcher) {
@@ -125,7 +128,7 @@ void waitNotify(TestDispatcher dispatcher) {
dispatcher.notified = false;
}
- void storeApp(RMStateStore store, ApplicationId appId, long submitTime,
+ RMApp storeApp(RMStateStore store, ApplicationId appId, long submitTime,
long startTime) throws Exception {
ApplicationSubmissionContext context =
new ApplicationSubmissionContextPBImpl();
@@ -138,6 +141,7 @@ void storeApp(RMStateStore store, ApplicationId appId, long submitTime,
when(mockApp.getApplicationSubmissionContext()).thenReturn(context);
when(mockApp.getUser()).thenReturn("test");
store.storeNewApplication(mockApp);
+ return mockApp;
}
ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId,
@@ -165,7 +169,7 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
long submitTime = System.currentTimeMillis();
long startTime = System.currentTimeMillis() + 1234;
Configuration conf = new YarnConfiguration();
- RMStateStore store = stateStoreHelper.getRMStateStore();
+ RMStateStore store = stateStoreHelper.getRMStateStore(conf);
TestDispatcher dispatcher = new TestDispatcher();
store.setRMDispatcher(dispatcher);
@@ -233,7 +237,7 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
store.close();
// load state
- store = stateStoreHelper.getRMStateStore();
+ store = stateStoreHelper.getRMStateStore(conf);
store.setRMDispatcher(dispatcher);
RMState state = store.loadState();
Map rmAppState =
@@ -300,7 +304,7 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
store.close();
// check updated application state.
- store = stateStoreHelper.getRMStateStore();
+ store = stateStoreHelper.getRMStateStore(conf);
store.setRMDispatcher(dispatcher);
RMState newRMState = store.loadState();
Map newRMAppState =
@@ -339,7 +343,8 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
public void testRMDTSecretManagerStateStore(
RMStateStoreHelper stateStoreHelper) throws Exception {
- RMStateStore store = stateStoreHelper.getRMStateStore();
+ YarnConfiguration conf = new YarnConfiguration();
+ RMStateStore store = stateStoreHelper.getRMStateStore(conf);
TestDispatcher dispatcher = new TestDispatcher();
store.setRMDispatcher(dispatcher);
@@ -367,6 +372,7 @@ public void testRMDTSecretManagerStateStore(
Assert.assertEquals(keySet, secretManagerState.getMasterKeyState());
Assert.assertEquals(sequenceNumber,
secretManagerState.getDTSequenceNumber());
+ store.close();
}
private Token generateAMRMToken(
@@ -379,4 +385,37 @@ public void testRMDTSecretManagerStateStore(
appToken.setService(new Text("appToken service"));
return appToken;
}
+
+ public void testAppDeletion(RMStateStoreHelper stateStoreHelper)
+ throws Exception {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setLong(YarnConfiguration.RM_STATE_STORE_APP_RETAIN_SECS, 0);
+ conf.setLong(
+ YarnConfiguration.RM_STATE_STORE_APP_RETAIN_CHECK_INTERVAL_SECS, 0);
+ RMStateStore store = stateStoreHelper.getRMStateStore(conf);
+ store.setRMDispatcher(new TestDispatcher());
+
+ // create and store apps
+ ArrayList appList = new ArrayList();
+ int NUM_APPS = 5;
+ for (int i = 0; i < NUM_APPS; i++) {
+ ApplicationId appId =
+ ConverterUtils.toApplicationId("application_1383183338324_000" + i);
+ RMApp app = storeApp(store, appId, 123456789, 987654321);
+ appList.add(app);
+ }
+
+ Assert.assertEquals(NUM_APPS, appList.size());
+ for (RMApp app : appList) {
+ // wait for app to be stored.
+ while (!stateStoreHelper.appExists(app));
+ }
+
+ for (RMApp app : appList) {
+ // remove the app
+ store.removeApplication(app);
+ // wait for app to be removed.
+ while (stateStoreHelper.appExists(app));
+ }
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
index a1a6eab..6e55aed 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
@@ -19,6 +19,9 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
import junit.framework.Assert;
import org.apache.commons.logging.Log;
@@ -32,6 +35,7 @@
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.Test;
@@ -42,7 +46,7 @@
class TestFSRMStateStoreTester implements RMStateStoreHelper {
Path workingDirPathURI;
- FileSystemRMStateStore store;
+ TestFileSystemRMStore store;
MiniDFSCluster cluster;
class TestFileSystemRMStore extends FileSystemRMStateStore {
@@ -54,6 +58,13 @@
start();
Assert.assertNotNull(fs);
}
+
+ public Path getAppDir(String appId) {
+ Path rootDir = new Path(workingDirPathURI, ROOT_DIR_NAME);
+ Path appRootDir = new Path(rootDir, RM_APP_ROOT);
+ Path appDir = new Path(appRootDir, appId);
+ return appDir;
+ }
}
public TestFSRMStateStoreTester(MiniDFSCluster cluster) throws Exception {
@@ -67,8 +78,7 @@ public TestFSRMStateStoreTester(MiniDFSCluster cluster) throws Exception {
}
@Override
- public RMStateStore getRMStateStore() throws Exception {
- YarnConfiguration conf = new YarnConfiguration();
+ public RMStateStore getRMStateStore(Configuration conf) throws Exception {
conf.set(YarnConfiguration.FS_RM_STATE_STORE_URI,
workingDirPathURI.toString());
this.store = new TestFileSystemRMStore(conf);
@@ -81,9 +91,17 @@ public boolean isFinalStateValid() throws Exception {
FileStatus[] files = fs.listStatus(workingDirPathURI);
return files.length == 1;
}
+
+ @Override
+ public boolean appExists(RMApp app) throws IOException {
+ FileSystem fs = cluster.getFileSystem();
+ Path nodePath =
+ store.getAppDir(app.getApplicationId().toString());
+ return fs.exists(nodePath);
+ }
}
- @Test
+ @Test(timeout = 60000)
public void testFSRMStateStore() throws Exception {
HdfsConfiguration conf = new HdfsConfiguration();
MiniDFSCluster cluster =
@@ -94,15 +112,12 @@ public void testFSRMStateStore() throws Exception {
// It should discard the entry and remove it from file system.
FSDataOutputStream fsOut = null;
FileSystemRMStateStore fileSystemRMStateStore =
- (FileSystemRMStateStore) fsTester.getRMStateStore();
+ (FileSystemRMStateStore) fsTester.getRMStateStore(conf);
String appAttemptIdStr3 = "appattempt_1352994193343_0001_000003";
ApplicationAttemptId attemptId3 =
ConverterUtils.toApplicationAttemptId(appAttemptIdStr3);
- Path rootDir =
- new Path(fileSystemRMStateStore.fsWorkingPath, "FSRMStateRoot");
- Path appRootDir = new Path(rootDir, "RMAppRoot");
Path appDir =
- new Path(appRootDir, attemptId3.getApplicationId().toString());
+ fsTester.store.getAppDir(attemptId3.getApplicationId().toString());
Path tempAppAttemptFile =
new Path(appDir, attemptId3.toString() + ".tmp");
fsOut = fileSystemRMStateStore.fs.create(tempAppAttemptFile, false);
@@ -110,9 +125,10 @@ public void testFSRMStateStore() throws Exception {
fsOut.close();
testRMAppStateStore(fsTester);
- Assert.assertFalse(fileSystemRMStateStore.fsWorkingPath
+ Assert.assertFalse(fsTester.workingDirPathURI
.getFileSystem(conf).exists(tempAppAttemptFile));
testRMDTSecretManagerStateStore(fsTester);
+ testAppDeletion(fsTester);
} finally {
cluster.shutdown();
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
index a6929a8..35508cd 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
@@ -27,7 +27,9 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
import org.junit.Test;
public class TestZKRMStateStore extends RMStateStoreTestBase {
@@ -37,7 +39,8 @@
class TestZKRMStateStoreTester implements RMStateStoreHelper {
ZooKeeper client;
- ZKRMStateStore store;
+ TestZKRMStateStoreInternal store;
+ String workingZnode;
class TestZKRMStateStoreInternal extends ZKRMStateStore {
@@ -52,11 +55,15 @@ public TestZKRMStateStoreInternal(Configuration conf, String workingZnode)
public ZooKeeper getNewZooKeeper() throws IOException {
return client;
}
+
+ public String getAppNode(String appId) {
+ return workingZnode + "/" + ROOT_ZNODE_NAME + "/" + RM_APP_ROOT + "/"
+ + appId;
+ }
}
- public RMStateStore getRMStateStore() throws Exception {
- String workingZnode = "/Test";
- YarnConfiguration conf = new YarnConfiguration();
+ public RMStateStore getRMStateStore(Configuration conf) throws Exception {
+ workingZnode = "/Test";
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort);
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
this.client = createClient();
@@ -69,12 +76,21 @@ public boolean isFinalStateValid() throws Exception {
List nodes = client.getChildren(store.znodeWorkingPath, false);
return nodes.size() == 1;
}
+
+ @Override
+ public boolean appExists(RMApp app) throws Exception {
+ Stat node =
+ client.exists(store.getAppNode(app.getApplicationId().toString()),
+ false);
+ return node !=null;
+ }
}
- @Test
+ @Test (timeout = 60000)
public void testZKRMStateStoreRealZK() throws Exception {
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
testRMAppStateStore(zkTester);
testRMDTSecretManagerStateStore(zkTester);
+ testAppDeletion(zkTester);
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
index b5f4992..8b4e54d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
@@ -310,6 +310,7 @@ private void assertKilled(RMApp application) {
StringBuilder diag = application.getDiagnostics();
Assert.assertEquals("application diagnostics is not correct",
"Application killed by user.", diag.toString());
+ verify(store, times(1)).removeApplication(application);
}
private void assertAppAndAttemptKilled(RMApp application)
@@ -333,6 +334,7 @@ private void assertFailed(RMApp application, String regex) {
Assert.assertTrue("application diagnostics is not correct",
diag.toString().matches(regex));
assertAppFinalStateSaved(application);
+ verify(store, times(1)).removeApplication(application);
}
private void sendAppUpdateSavedEvent(RMApp application) {
@@ -451,6 +453,7 @@ protected RMApp testCreateAppFinished(
assertFinalAppStatus(FinalApplicationStatus.FAILED, application);
Assert.assertTrue("Finished app missing diagnostics",
application.getDiagnostics().indexOf(diagnostics) != -1);
+ verify(store, times(1)).removeApplication(application);
return application;
}