diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 14c1ffc..b6c60ca 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -427,6 +427,15 @@ private static void addDeprecatedKeys() { public static final long DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS = 10000; + public static final String RMLIVENESS_STATESTORE_UPDATE_INTERVAL_MS = + RM_PREFIX + "statestore.update.interval-ms"; + public static final long DEFAULT_RMLIVENESS_STATESTORE_UPDATE_INTERVAL_MS + = 10000; + + public static final String RM_STATESTORE_RECOVER_EXPIRATION_TIME_MS = + RM_PREFIX + "recover.statestore.expiration.time-ms"; + public static final long DEFAULT_RM_STATESTORE_RECOVER_EXPIRATION_TIME_MS = Long.MAX_VALUE; + /** Zookeeper interaction configs */ public static final String RM_ZK_PREFIX = RM_PREFIX + "zk-"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 997eb8e..4068f73 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -384,6 +384,21 @@ + RM liveness update interval to rm statestore in milliseconds. + By default rm liveness will heartbeat to rm statestore every 10 seconds. + + yarn.resourcemanager.statestore.update.interval-ms + + + + + RM recover statestore expiration time in milliseconds. + + yarn.resourcemanager.recover.statestore.expiration.time-ms + + + + The class to use as the persistent store. If org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index 9802a37..68a56a3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -135,4 +135,8 @@ void setRMDelegatedNodeLabelsUpdater( PlacementManager getQueuePlacementManager(); void setQueuePlacementManager(PlacementManager placementMgr); + + boolean isRMStateStoreExpired(); + + void setRMStateStoreExpireState(boolean expired); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index ed9942b..9da0150 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -72,7 +72,7 @@ private RMApplicationHistoryWriter rmApplicationHistoryWriter; private SystemMetricsPublisher systemMetricsPublisher; - + private boolean isStateStoreExpired = false; /** * Default constructor. To be used in conjunction with setter methods for * individual fields. @@ -461,4 +461,14 @@ public PlacementManager getQueuePlacementManager() { public void setQueuePlacementManager(PlacementManager placementMgr) { this.activeServiceContext.setQueuePlacementManager(placementMgr); } + + @Override + public boolean isRMStateStoreExpired() { + return this.isStateStoreExpired; + } + + @Override + public void setRMStateStoreExpireState(boolean expired) { + this.isStateStoreExpired = expired; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 01a1c8f..3f936b0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -406,6 +406,7 @@ protected static void validateConfigs(Configuration conf) { private ResourceManager rm; private boolean recoveryEnabled; private RMActiveServiceContext activeServiceContext; + private long rmStateStoreExpiration; RMActiveServices(ResourceManager rm) { super("RMActiveServices"); @@ -445,6 +446,9 @@ protected void serviceInit(Configuration configuration) throws Exception { rmContext.setRMDelegatedNodeLabelsUpdater(delegatedNodeLabelsUpdater); } + rmStateStoreExpiration = conf.getLong( + YarnConfiguration.RM_STATESTORE_RECOVER_EXPIRATION_TIME_MS, + YarnConfiguration.DEFAULT_RM_STATESTORE_RECOVER_EXPIRATION_TIME_MS); boolean isRecoveryEnabled = conf.getBoolean( YarnConfiguration.RECOVERY_ENABLED, YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED); @@ -580,12 +584,19 @@ protected void serviceStart() throws Exception { try { LOG.info("Recovery started"); rmStore.checkVersion(); + long lastRMLivenessTS = rmStore.getLastRMLivenessTS(); + boolean expire = (System.currentTimeMillis() - lastRMLivenessTS > + rmStateStoreExpiration); + rmContext.setRMStateStoreExpireState(expire); if (rmContext.isWorkPreservingRecoveryEnabled()) { rmContext.setEpoch(rmStore.getAndIncrementEpoch()); } RMState state = rmStore.loadState(); recover(state); LOG.info("Recovery ended"); + rmContext.setRMStateStoreExpireState(false); + rmStore.startRmLivenessHeartBeat(); + //} } catch (Exception e) { // the Exception from loadState() needs to be handled for // HA and we need to give up master status if we got fenced diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java index faaadb8..50d8c71 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java @@ -95,7 +95,6 @@ private static final Version CURRENT_VERSION_INFO = Version .newInstance(1, 1); - private DB db; private String getApplicationNodeKey(ApplicationId appId) { @@ -257,7 +256,9 @@ private void loadReservationState(RMState rmState) throws IOException { while (iter.hasNext()) { Entry entry = iter.next(); String key = asString(entry.getKey()); - + if(RM_RESERVATION_KEY_PREFIX.length() != key.length()) { + continue; + } String planReservationString = key.substring(RM_RESERVATION_KEY_PREFIX.length()); String[] parts = planReservationString.split(SEPARATOR); @@ -553,6 +554,29 @@ protected void updateApplicationStateInternal(ApplicationId appId, } @Override + protected void updateRMLiveness(long timeStamp) + throws IOException { + db.put(bytes(RM_LIVENESS_HEARTBEAT_KEY), bytes(String.valueOf(timeStamp))); + } + + @Override + protected long getLastRMLivenessTSInternal() throws Exception { + byte[] val = db.get(bytes(RM_LIVENESS_HEARTBEAT_KEY)); + if(val == null) { + return 0; + } + long time; + try { + time = Long.parseLong(asString(val)); + } catch (NumberFormatException e) { + LOG.error("invalid timestamp stored " + + asString(val)); + time = -1; + } + return time; + } + + @Override protected void storeApplicationAttemptStateInternal( ApplicationAttemptId attemptId, ApplicationAttemptStateData attemptStateData) throws IOException { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index a2cf517..f6df9b5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -23,7 +23,9 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.TimerTask; import java.util.TreeMap; +import java.util.Timer; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; @@ -31,6 +33,7 @@ import javax.crypto.SecretKey; import com.google.common.annotations.VisibleForTesting; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -94,6 +97,7 @@ "ReservationSystemRoot"; protected static final String VERSION_NODE = "RMVersionNode"; protected static final String EPOCH_NODE = "EpochNode"; + protected static final String RM_LIVENESS_HEARTBEAT_KEY = "RmLivenssHeartbeatKey"; private ResourceManager resourceManager; private final ReadLock readLock; private final WriteLock writeLock; @@ -659,6 +663,7 @@ public void setRMDispatcher(Dispatcher dispatcher) { } AsyncDispatcher dispatcher; + private long rmLivenessUpdateInterval; @Override protected void serviceInit(Configuration conf) throws Exception{ @@ -669,6 +674,9 @@ protected void serviceInit(Configuration conf) throws Exception{ new ForwardingEventHandler()); dispatcher.setDrainEventsOnStop(); initInternal(conf); + rmLivenessUpdateInterval = conf.getLong( + YarnConfiguration.RMLIVENESS_STATESTORE_UPDATE_INTERVAL_MS, + YarnConfiguration.DEFAULT_RMLIVENESS_STATESTORE_UPDATE_INTERVAL_MS); } @Override @@ -677,6 +685,12 @@ protected void serviceStart() throws Exception { startInternal(); } + public void startRmLivenessHeartBeat() throws Exception { + Timer rmLivenessHeartBeat = new Timer(); + rmLivenessHeartBeat.schedule(new RMLivenessHeartBeat(), + rmLivenessUpdateInterval, rmLivenessUpdateInterval); + } + /** * Derived classes initialize themselves using this method. */ @@ -762,7 +776,7 @@ public void checkVersion() throws Exception { * This must not be called on the dispatcher thread */ public abstract RMState loadState() throws Exception; - + /** * Non-Blocking API * ResourceManager services use this to store the application's state @@ -805,7 +819,19 @@ protected abstract void storeApplicationStateInternal(ApplicationId appId, protected abstract void updateApplicationStateInternal(ApplicationId appId, ApplicationStateData appStateData) throws Exception; - + + protected void updateRMLiveness(long timeStamp) throws Exception { + //do nothing + } + + public long getLastRMLivenessTS() throws Exception { + return getLastRMLivenessTSInternal(); + } + + protected long getLastRMLivenessTSInternal() throws Exception { + return -1; + } + @SuppressWarnings("unchecked") /** * Non-blocking API @@ -1189,4 +1215,17 @@ public RMStateStoreState getRMStateStoreState() { this.readLock.unlock(); } } + + private class RMLivenessHeartBeat extends TimerTask { + @SuppressWarnings("unchecked") + @Override + public void run() { + long timeStamp = System.currentTimeMillis(); + try { + updateRMLiveness(timeStamp); + } catch (Exception e) { + LOG.error("fail to update rmliveness. ", e); + } + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index bcfce9a..41ce65f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -376,7 +376,7 @@ EnumSet.of(RMAppEventType.APP_ACCEPTED, RMAppEventType.APP_REJECTED, RMAppEventType.KILL, RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED, - RMAppEventType.NODE_UPDATE, RMAppEventType.MOVE)) + RMAppEventType.NODE_UPDATE, RMAppEventType.MOVE, RMAppEventType.ATTEMPT_KILLED)) .installTopology(); @@ -939,8 +939,17 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { RMAppRecoverEvent recoverEvent = (RMAppRecoverEvent) event; app.recover(recoverEvent.getRMState()); // The app has completed. - if (app.recoveredFinalState != null) { + if (app.recoveredFinalState != null || app.rmContext.isRMStateStoreExpired()) { app.recoverAppAttempts(); + //app. + if(app.rmContext.isRMStateStoreExpired() && + app.recoveredFinalState == null) { + LOG.warn("state store expire. recover app " + + app.getApplicationId() + "as killed"); + new FinalSavingTransition( + new AppKilledTransition(), RMAppState.KILLED).transition(app, event); + return RMAppState.FINAL_SAVING; + } new FinalTransition(app.recoveredFinalState).transition(app, event); return app.recoveredFinalState; } @@ -1112,6 +1121,9 @@ private void rememberTargetTransitionsAndStoreState(RMAppEvent event, RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event; diags = getAppAttemptFailedDiagnostics(failedEvent); break; + case RECOVER: + diags = "recover app as killed due to RM state store expire"; + this.diagnostics.append(diags); default: break; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 88a89b5..8e72228 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -213,7 +213,7 @@ new UnexpectedAMRegisteredTransition(), RMAppAttemptState.FAILED)) .addTransition( RMAppAttemptState.NEW, EnumSet.of(RMAppAttemptState.FINISHED, RMAppAttemptState.KILLED, - RMAppAttemptState.FAILED, RMAppAttemptState.LAUNCHED), + RMAppAttemptState.FAILED, RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING), RMAppAttemptEventType.RECOVER, new AttemptRecoveredTransition()) // Transitions from SUBMITTED state @@ -1149,7 +1149,14 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, return RMAppAttemptState.KILLED; } return RMAppAttemptState.FAILED; - } else{ + } else if (appAttempt.rmContext.isRMStateStoreExpired()) { + LOG.warn("state store expired. recover app attempt " + + appAttempt.getAppAttemptId() + "as killed"); + new FinalSavingTransition(new BaseFinalTransition( + RMAppAttemptState.KILLED), RMAppAttemptState.KILLED).transition(appAttempt, event); + appAttempt.progress = 1; + return RMAppAttemptState.FINAL_SAVING; + } else { // Add the current attempt to the scheduler. if (appAttempt.rmContext.isWorkPreservingRecoveryEnabled()) { // Need to register an app attempt before AM can register @@ -1239,6 +1246,9 @@ private void rememberTargetTransitionsAndStoreState(RMAppAttemptEvent event, case EXPIRE: diags = getAMExpiredDiagnostics(event); break; + case RECOVER: + diags = ". Attempt is recovered as killed due to RM state store expire"; + this.diagnostics.append(diags); default: break; }