diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 14c1ffc..9a2c239 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -427,6 +427,15 @@ private static void addDeprecatedKeys() { public static final long DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS = 10000; + public static final String RMLIVENESS_STATESTORE_UPDATE_INTERVAL = + RM_PREFIX + "statestore.update.interval"; + public static final long DEFAULT_RMLIVENESS_STATESTORE_UPDATE_INTERVAL + = 10000; + + public static final String RM_STATESTORE_RECOVER_EXPIRATION_TIME = + RM_PREFIX + "recover.statestore.expiration.time"; + public static final long DEFAULT_RM_STATESTORE_RECOVER_EXPIRATION_TIME = -1; + /** Zookeeper interaction configs */ public static final String RM_ZK_PREFIX = RM_PREFIX + "zk-"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index bf94195..396cb41 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -384,6 +384,20 @@ + rm liveness update interval to rm statestore. + + yarn.resourcemanager.statestore.update.interval + + + + + rm recover statestore expiration time. + + yarn.resourcemanager.recover.statestore.expiration.time + + + + The class to use as the persistent store. If org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 01a1c8f..a10ba34 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -406,6 +406,7 @@ protected static void validateConfigs(Configuration conf) { private ResourceManager rm; private boolean recoveryEnabled; private RMActiveServiceContext activeServiceContext; + private long rmStateStoreExpiration; RMActiveServices(ResourceManager rm) { super("RMActiveServices"); @@ -445,6 +446,9 @@ protected void serviceInit(Configuration configuration) throws Exception { rmContext.setRMDelegatedNodeLabelsUpdater(delegatedNodeLabelsUpdater); } + rmStateStoreExpiration = conf.getLong( + YarnConfiguration.RM_STATESTORE_RECOVER_EXPIRATION_TIME, + YarnConfiguration.DEFAULT_RM_STATESTORE_RECOVER_EXPIRATION_TIME); boolean isRecoveryEnabled = conf.getBoolean( YarnConfiguration.RECOVERY_ENABLED, YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED); @@ -578,14 +582,20 @@ protected void serviceStart() throws Exception { if(recoveryEnabled) { try { - LOG.info("Recovery started"); - rmStore.checkVersion(); - if (rmContext.isWorkPreservingRecoveryEnabled()) { - rmContext.setEpoch(rmStore.getAndIncrementEpoch()); + long lastRMLivenessTS = rmStore.getLastRMLivenessTS(); + if(System.currentTimeMillis() - lastRMLivenessTS > + rmStateStoreExpiration && (rmStateStoreExpiration != -1)) { + LOG.warn("RM StateStore has expired"); + } else { + LOG.info("Recovery started"); + rmStore.checkVersion(); + if (rmContext.isWorkPreservingRecoveryEnabled()) { + rmContext.setEpoch(rmStore.getAndIncrementEpoch()); + } + RMState state = rmStore.loadState(); + recover(state); + LOG.info("Recovery ended"); } - RMState state = rmStore.loadState(); - recover(state); - LOG.info("Recovery ended"); } catch (Exception e) { // the Exception from loadState() needs to be handled for // HA and we need to give up master status if we got fenced diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java index faaadb8..bee7ade 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java @@ -553,6 +553,26 @@ protected void updateApplicationStateInternal(ApplicationId appId, } @Override + protected void updateRMLiveness(long timeStamp) + throws IOException { + db.put(bytes("RMLivenessKey"), bytes(String.valueOf(timeStamp))); + } + + @Override + protected long getLastRMLivenessTSInternal() throws Exception { + byte[] data = db.get(bytes("RMLivenessKey")); + long time; + try { + time = Long.parseLong(asString(db.get(bytes("RMLivenessKey")))); + } catch (NumberFormatException e) { + LOG.error("invalid timestamp stored " + + asString(db.get(bytes("RMLivenessKey")))); + time = -1; + } + return time; + } + + @Override protected void storeApplicationAttemptStateInternal( ApplicationAttemptId attemptId, ApplicationAttemptStateData attemptStateData) throws IOException { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index a2cf517..04be7ac 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -659,6 +659,8 @@ public void setRMDispatcher(Dispatcher dispatcher) { } AsyncDispatcher dispatcher; + private Thread livenessUpdater; + private long rmLivenessUpdateInterval; @Override protected void serviceInit(Configuration conf) throws Exception{ @@ -669,12 +671,18 @@ protected void serviceInit(Configuration conf) throws Exception{ new ForwardingEventHandler()); dispatcher.setDrainEventsOnStop(); initInternal(conf); + rmLivenessUpdateInterval = conf.getLong( + YarnConfiguration.RMLIVENESS_STATESTORE_UPDATE_INTERVAL, + YarnConfiguration.DEFAULT_RMLIVENESS_STATESTORE_UPDATE_INTERVAL); } @Override protected void serviceStart() throws Exception { dispatcher.start(); startInternal(); + livenessUpdater = new Thread(new RMLivenessUpdater()); + livenessUpdater.setName("RM Liveness Updater"); + livenessUpdater.start(); } /** @@ -691,6 +699,9 @@ protected void serviceStart() throws Exception { @Override protected void serviceStop() throws Exception { + if (livenessUpdater != null) { + livenessUpdater.interrupt(); + } dispatcher.stop(); closeInternal(); } @@ -762,7 +773,7 @@ public void checkVersion() throws Exception { * This must not be called on the dispatcher thread */ public abstract RMState loadState() throws Exception; - + /** * Non-Blocking API * ResourceManager services use this to store the application's state @@ -805,7 +816,19 @@ protected abstract void storeApplicationStateInternal(ApplicationId appId, protected abstract void updateApplicationStateInternal(ApplicationId appId, ApplicationStateData appStateData) throws Exception; - + + protected void updateRMLiveness(long timeStamp) throws Exception { + //do nothing + } + + public long getLastRMLivenessTS() throws Exception { + return getLastRMLivenessTSInternal(); + } + + protected long getLastRMLivenessTSInternal() throws Exception { + return -1; + } + @SuppressWarnings("unchecked") /** * Non-blocking API @@ -1189,4 +1212,25 @@ public RMStateStoreState getRMStateStoreState() { this.readLock.unlock(); } } + + private class RMLivenessUpdater implements Runnable { + @SuppressWarnings("unchecked") + @Override + public void run() { + while (!Thread.currentThread().isInterrupted()) { + try { + Thread.sleep(rmLivenessUpdateInterval); + } catch (InterruptedException e) { + LOG.info(getName() + " thread interrupted"); + break; + } + long timeStamp = System.currentTimeMillis(); + try { + updateRMLiveness(timeStamp); + } catch (Exception e) { + LOG.error("fail to update rmliveness. ", e); + } + } + } + } }