diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 14c1ffc..9a2c239 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -427,6 +427,15 @@ private static void addDeprecatedKeys() {
public static final long DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS =
10000;
+ public static final String RMLIVENESS_STATESTORE_UPDATE_INTERVAL =
+ RM_PREFIX + "statestore.update.interval";
+ public static final long DEFAULT_RMLIVENESS_STATESTORE_UPDATE_INTERVAL
+ = 10000;
+
+ public static final String RM_STATESTORE_RECOVER_EXPIRATION_TIME =
+ RM_PREFIX + "recover.statestore.expiration.time";
+ public static final long DEFAULT_RM_STATESTORE_RECOVER_EXPIRATION_TIME = -1;
+
/** Zookeeper interaction configs */
public static final String RM_ZK_PREFIX = RM_PREFIX + "zk-";
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index bf94195..396cb41 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -384,6 +384,20 @@
+ rm liveness update interval to rm statestore.
+
+ yarn.resourcemanager.statestore.update.interval
+
+
+
+
+ rm recover statestore expiration time.
+
+ yarn.resourcemanager.recover.statestore.expiration.time
+
+
+
+
The class to use as the persistent store.
If org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 01a1c8f..a10ba34 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -406,6 +406,7 @@ protected static void validateConfigs(Configuration conf) {
private ResourceManager rm;
private boolean recoveryEnabled;
private RMActiveServiceContext activeServiceContext;
+ private long rmStateStoreExpiration;
RMActiveServices(ResourceManager rm) {
super("RMActiveServices");
@@ -445,6 +446,9 @@ protected void serviceInit(Configuration configuration) throws Exception {
rmContext.setRMDelegatedNodeLabelsUpdater(delegatedNodeLabelsUpdater);
}
+ rmStateStoreExpiration = conf.getLong(
+ YarnConfiguration.RM_STATESTORE_RECOVER_EXPIRATION_TIME,
+ YarnConfiguration.DEFAULT_RM_STATESTORE_RECOVER_EXPIRATION_TIME);
boolean isRecoveryEnabled = conf.getBoolean(
YarnConfiguration.RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
@@ -578,14 +582,20 @@ protected void serviceStart() throws Exception {
if(recoveryEnabled) {
try {
- LOG.info("Recovery started");
- rmStore.checkVersion();
- if (rmContext.isWorkPreservingRecoveryEnabled()) {
- rmContext.setEpoch(rmStore.getAndIncrementEpoch());
+ long lastRMLivenessTS = rmStore.getLastRMLivenessTS();
+ if(System.currentTimeMillis() - lastRMLivenessTS >
+ rmStateStoreExpiration && (rmStateStoreExpiration != -1)) {
+ LOG.warn("RM StateStore has expired");
+ } else {
+ LOG.info("Recovery started");
+ rmStore.checkVersion();
+ if (rmContext.isWorkPreservingRecoveryEnabled()) {
+ rmContext.setEpoch(rmStore.getAndIncrementEpoch());
+ }
+ RMState state = rmStore.loadState();
+ recover(state);
+ LOG.info("Recovery ended");
}
- RMState state = rmStore.loadState();
- recover(state);
- LOG.info("Recovery ended");
} catch (Exception e) {
// the Exception from loadState() needs to be handled for
// HA and we need to give up master status if we got fenced
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
index faaadb8..bee7ade 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
@@ -553,6 +553,26 @@ protected void updateApplicationStateInternal(ApplicationId appId,
}
@Override
+ protected void updateRMLiveness(long timeStamp)
+ throws IOException {
+ db.put(bytes("RMLivenessKey"), bytes(String.valueOf(timeStamp)));
+ }
+
+ @Override
+ protected long getLastRMLivenessTSInternal() throws Exception {
+ byte[] data = db.get(bytes("RMLivenessKey"));
+ long time;
+ try {
+ time = Long.parseLong(asString(db.get(bytes("RMLivenessKey"))));
+ } catch (NumberFormatException e) {
+ LOG.error("invalid timestamp stored " +
+ asString(db.get(bytes("RMLivenessKey"))));
+ time = -1;
+ }
+ return time;
+ }
+
+ @Override
protected void storeApplicationAttemptStateInternal(
ApplicationAttemptId attemptId,
ApplicationAttemptStateData attemptStateData) throws IOException {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
index a2cf517..04be7ac 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
@@ -659,6 +659,8 @@ public void setRMDispatcher(Dispatcher dispatcher) {
}
AsyncDispatcher dispatcher;
+ private Thread livenessUpdater;
+ private long rmLivenessUpdateInterval;
@Override
protected void serviceInit(Configuration conf) throws Exception{
@@ -669,12 +671,18 @@ protected void serviceInit(Configuration conf) throws Exception{
new ForwardingEventHandler());
dispatcher.setDrainEventsOnStop();
initInternal(conf);
+ rmLivenessUpdateInterval = conf.getLong(
+ YarnConfiguration.RMLIVENESS_STATESTORE_UPDATE_INTERVAL,
+ YarnConfiguration.DEFAULT_RMLIVENESS_STATESTORE_UPDATE_INTERVAL);
}
@Override
protected void serviceStart() throws Exception {
dispatcher.start();
startInternal();
+ livenessUpdater = new Thread(new RMLivenessUpdater());
+ livenessUpdater.setName("RM Liveness Updater");
+ livenessUpdater.start();
}
/**
@@ -691,6 +699,9 @@ protected void serviceStart() throws Exception {
@Override
protected void serviceStop() throws Exception {
+ if (livenessUpdater != null) {
+ livenessUpdater.interrupt();
+ }
dispatcher.stop();
closeInternal();
}
@@ -762,7 +773,7 @@ public void checkVersion() throws Exception {
* This must not be called on the dispatcher thread
*/
public abstract RMState loadState() throws Exception;
-
+
/**
* Non-Blocking API
* ResourceManager services use this to store the application's state
@@ -805,7 +816,19 @@ protected abstract void storeApplicationStateInternal(ApplicationId appId,
protected abstract void updateApplicationStateInternal(ApplicationId appId,
ApplicationStateData appStateData) throws Exception;
-
+
+ protected void updateRMLiveness(long timeStamp) throws Exception {
+ //do nothing
+ }
+
+ public long getLastRMLivenessTS() throws Exception {
+ return getLastRMLivenessTSInternal();
+ }
+
+ protected long getLastRMLivenessTSInternal() throws Exception {
+ return -1;
+ }
+
@SuppressWarnings("unchecked")
/**
* Non-blocking API
@@ -1189,4 +1212,25 @@ public RMStateStoreState getRMStateStoreState() {
this.readLock.unlock();
}
}
+
+ private class RMLivenessUpdater implements Runnable {
+ @SuppressWarnings("unchecked")
+ @Override
+ public void run() {
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ Thread.sleep(rmLivenessUpdateInterval);
+ } catch (InterruptedException e) {
+ LOG.info(getName() + " thread interrupted");
+ break;
+ }
+ long timeStamp = System.currentTimeMillis();
+ try {
+ updateRMLiveness(timeStamp);
+ } catch (Exception e) {
+ LOG.error("fail to update rmliveness. ", e);
+ }
+ }
+ }
+ }
}