diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 14c1ffc..b6c60ca 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -427,6 +427,15 @@ private static void addDeprecatedKeys() {
public static final long DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS =
10000;
+ public static final String RMLIVENESS_STATESTORE_UPDATE_INTERVAL_MS =
+ RM_PREFIX + "statestore.update.interval-ms";
+ public static final long DEFAULT_RMLIVENESS_STATESTORE_UPDATE_INTERVAL_MS
+ = 10000;
+
+ public static final String RM_STATESTORE_RECOVER_EXPIRATION_TIME_MS =
+ RM_PREFIX + "recover.statestore.expiration.time-ms";
+ public static final long DEFAULT_RM_STATESTORE_RECOVER_EXPIRATION_TIME_MS = Long.MAX_VALUE;
+
/** Zookeeper interaction configs */
public static final String RM_ZK_PREFIX = RM_PREFIX + "zk-";
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 997eb8e..4068f73 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -384,6 +384,21 @@
+ RM liveness update interval to rm statestore in milliseconds.
+ By default rm liveness will heartbeat to rm statestore every 10 seconds.
+
+ yarn.resourcemanager.statestore.update.interval-ms
+
+
+
+
+ RM recover statestore expiration time in milliseconds.
+
+ yarn.resourcemanager.recover.statestore.expiration.time-ms
+
+
+
+
The class to use as the persistent store.
If org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
index 9802a37..68a56a3 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
@@ -135,4 +135,8 @@ void setRMDelegatedNodeLabelsUpdater(
PlacementManager getQueuePlacementManager();
void setQueuePlacementManager(PlacementManager placementMgr);
+
+ boolean isRMStateStoreExpired();
+
+ void setRMStateStoreExpireState(boolean expired);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index ed9942b..9da0150 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -72,7 +72,7 @@
private RMApplicationHistoryWriter rmApplicationHistoryWriter;
private SystemMetricsPublisher systemMetricsPublisher;
-
+ private boolean isStateStoreExpired = false;
/**
* Default constructor. To be used in conjunction with setter methods for
* individual fields.
@@ -461,4 +461,14 @@ public PlacementManager getQueuePlacementManager() {
public void setQueuePlacementManager(PlacementManager placementMgr) {
this.activeServiceContext.setQueuePlacementManager(placementMgr);
}
+
+ @Override
+ public boolean isRMStateStoreExpired() {
+ return this.isStateStoreExpired;
+ }
+
+ @Override
+ public void setRMStateStoreExpireState(boolean expired) {
+ this.isStateStoreExpired = expired;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 01a1c8f..3f936b0 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -406,6 +406,7 @@ protected static void validateConfigs(Configuration conf) {
private ResourceManager rm;
private boolean recoveryEnabled;
private RMActiveServiceContext activeServiceContext;
+ private long rmStateStoreExpiration;
RMActiveServices(ResourceManager rm) {
super("RMActiveServices");
@@ -445,6 +446,9 @@ protected void serviceInit(Configuration configuration) throws Exception {
rmContext.setRMDelegatedNodeLabelsUpdater(delegatedNodeLabelsUpdater);
}
+ rmStateStoreExpiration = conf.getLong(
+ YarnConfiguration.RM_STATESTORE_RECOVER_EXPIRATION_TIME_MS,
+ YarnConfiguration.DEFAULT_RM_STATESTORE_RECOVER_EXPIRATION_TIME_MS);
boolean isRecoveryEnabled = conf.getBoolean(
YarnConfiguration.RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
@@ -580,12 +584,19 @@ protected void serviceStart() throws Exception {
try {
LOG.info("Recovery started");
rmStore.checkVersion();
+ long lastRMLivenessTS = rmStore.getLastRMLivenessTS();
+ boolean expire = (System.currentTimeMillis() - lastRMLivenessTS >
+ rmStateStoreExpiration);
+ rmContext.setRMStateStoreExpireState(expire);
if (rmContext.isWorkPreservingRecoveryEnabled()) {
rmContext.setEpoch(rmStore.getAndIncrementEpoch());
}
RMState state = rmStore.loadState();
recover(state);
LOG.info("Recovery ended");
+ rmContext.setRMStateStoreExpireState(false);
+ rmStore.startRmLivenessHeartBeat();
+ //}
} catch (Exception e) {
// the Exception from loadState() needs to be handled for
// HA and we need to give up master status if we got fenced
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
index faaadb8..50d8c71 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
@@ -95,7 +95,6 @@
private static final Version CURRENT_VERSION_INFO = Version
.newInstance(1, 1);
-
private DB db;
private String getApplicationNodeKey(ApplicationId appId) {
@@ -257,7 +256,9 @@ private void loadReservationState(RMState rmState) throws IOException {
while (iter.hasNext()) {
Entry entry = iter.next();
String key = asString(entry.getKey());
-
+ if(RM_RESERVATION_KEY_PREFIX.length() != key.length()) {
+ continue;
+ }
String planReservationString =
key.substring(RM_RESERVATION_KEY_PREFIX.length());
String[] parts = planReservationString.split(SEPARATOR);
@@ -553,6 +554,29 @@ protected void updateApplicationStateInternal(ApplicationId appId,
}
@Override
+ protected void updateRMLiveness(long timeStamp)
+ throws IOException {
+ db.put(bytes(RM_LIVENESS_HEARTBEAT_KEY), bytes(String.valueOf(timeStamp)));
+ }
+
+ @Override
+ protected long getLastRMLivenessTSInternal() throws Exception {
+ byte[] val = db.get(bytes(RM_LIVENESS_HEARTBEAT_KEY));
+ if(val == null) {
+ return 0;
+ }
+ long time;
+ try {
+ time = Long.parseLong(asString(val));
+ } catch (NumberFormatException e) {
+ LOG.error("invalid timestamp stored " +
+ asString(val));
+ time = -1;
+ }
+ return time;
+ }
+
+ @Override
protected void storeApplicationAttemptStateInternal(
ApplicationAttemptId attemptId,
ApplicationAttemptStateData attemptStateData) throws IOException {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
index a2cf517..f6df9b5 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
@@ -23,7 +23,9 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.TimerTask;
import java.util.TreeMap;
+import java.util.Timer;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -31,6 +33,7 @@
import javax.crypto.SecretKey;
import com.google.common.annotations.VisibleForTesting;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -94,6 +97,7 @@
"ReservationSystemRoot";
protected static final String VERSION_NODE = "RMVersionNode";
protected static final String EPOCH_NODE = "EpochNode";
+ protected static final String RM_LIVENESS_HEARTBEAT_KEY = "RmLivenssHeartbeatKey";
private ResourceManager resourceManager;
private final ReadLock readLock;
private final WriteLock writeLock;
@@ -659,6 +663,7 @@ public void setRMDispatcher(Dispatcher dispatcher) {
}
AsyncDispatcher dispatcher;
+ private long rmLivenessUpdateInterval;
@Override
protected void serviceInit(Configuration conf) throws Exception{
@@ -669,6 +674,9 @@ protected void serviceInit(Configuration conf) throws Exception{
new ForwardingEventHandler());
dispatcher.setDrainEventsOnStop();
initInternal(conf);
+ rmLivenessUpdateInterval = conf.getLong(
+ YarnConfiguration.RMLIVENESS_STATESTORE_UPDATE_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_RMLIVENESS_STATESTORE_UPDATE_INTERVAL_MS);
}
@Override
@@ -677,6 +685,12 @@ protected void serviceStart() throws Exception {
startInternal();
}
+ public void startRmLivenessHeartBeat() throws Exception {
+ Timer rmLivenessHeartBeat = new Timer();
+ rmLivenessHeartBeat.schedule(new RMLivenessHeartBeat(),
+ rmLivenessUpdateInterval, rmLivenessUpdateInterval);
+ }
+
/**
* Derived classes initialize themselves using this method.
*/
@@ -762,7 +776,7 @@ public void checkVersion() throws Exception {
* This must not be called on the dispatcher thread
*/
public abstract RMState loadState() throws Exception;
-
+
/**
* Non-Blocking API
* ResourceManager services use this to store the application's state
@@ -805,7 +819,19 @@ protected abstract void storeApplicationStateInternal(ApplicationId appId,
protected abstract void updateApplicationStateInternal(ApplicationId appId,
ApplicationStateData appStateData) throws Exception;
-
+
+ protected void updateRMLiveness(long timeStamp) throws Exception {
+ //do nothing
+ }
+
+ public long getLastRMLivenessTS() throws Exception {
+ return getLastRMLivenessTSInternal();
+ }
+
+ protected long getLastRMLivenessTSInternal() throws Exception {
+ return -1;
+ }
+
@SuppressWarnings("unchecked")
/**
* Non-blocking API
@@ -1189,4 +1215,17 @@ public RMStateStoreState getRMStateStoreState() {
this.readLock.unlock();
}
}
+
+ private class RMLivenessHeartBeat extends TimerTask {
+ @SuppressWarnings("unchecked")
+ @Override
+ public void run() {
+ long timeStamp = System.currentTimeMillis();
+ try {
+ updateRMLiveness(timeStamp);
+ } catch (Exception e) {
+ LOG.error("fail to update rmliveness. ", e);
+ }
+ }
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index bcfce9a..41ce65f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -376,7 +376,7 @@
EnumSet.of(RMAppEventType.APP_ACCEPTED,
RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,
- RMAppEventType.NODE_UPDATE, RMAppEventType.MOVE))
+ RMAppEventType.NODE_UPDATE, RMAppEventType.MOVE, RMAppEventType.ATTEMPT_KILLED))
.installTopology();
@@ -939,8 +939,17 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
RMAppRecoverEvent recoverEvent = (RMAppRecoverEvent) event;
app.recover(recoverEvent.getRMState());
// The app has completed.
- if (app.recoveredFinalState != null) {
+ if (app.recoveredFinalState != null || app.rmContext.isRMStateStoreExpired()) {
app.recoverAppAttempts();
+ //app.
+ if(app.rmContext.isRMStateStoreExpired() &&
+ app.recoveredFinalState == null) {
+ LOG.warn("state store expire. recover app " +
+ app.getApplicationId() + "as killed");
+ new FinalSavingTransition(
+ new AppKilledTransition(), RMAppState.KILLED).transition(app, event);
+ return RMAppState.FINAL_SAVING;
+ }
new FinalTransition(app.recoveredFinalState).transition(app, event);
return app.recoveredFinalState;
}
@@ -1112,6 +1121,9 @@ private void rememberTargetTransitionsAndStoreState(RMAppEvent event,
RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
diags = getAppAttemptFailedDiagnostics(failedEvent);
break;
+ case RECOVER:
+ diags = "recover app as killed due to RM state store expire";
+ this.diagnostics.append(diags);
default:
break;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index 88a89b5..8e72228 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -213,7 +213,7 @@
new UnexpectedAMRegisteredTransition(), RMAppAttemptState.FAILED))
.addTransition( RMAppAttemptState.NEW,
EnumSet.of(RMAppAttemptState.FINISHED, RMAppAttemptState.KILLED,
- RMAppAttemptState.FAILED, RMAppAttemptState.LAUNCHED),
+ RMAppAttemptState.FAILED, RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING),
RMAppAttemptEventType.RECOVER, new AttemptRecoveredTransition())
// Transitions from SUBMITTED state
@@ -1149,7 +1149,14 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
return RMAppAttemptState.KILLED;
}
return RMAppAttemptState.FAILED;
- } else{
+ } else if (appAttempt.rmContext.isRMStateStoreExpired()) {
+ LOG.warn("state store expired. recover app attempt " +
+ appAttempt.getAppAttemptId() + "as killed");
+ new FinalSavingTransition(new BaseFinalTransition(
+ RMAppAttemptState.KILLED), RMAppAttemptState.KILLED).transition(appAttempt, event);
+ appAttempt.progress = 1;
+ return RMAppAttemptState.FINAL_SAVING;
+ } else {
// Add the current attempt to the scheduler.
if (appAttempt.rmContext.isWorkPreservingRecoveryEnabled()) {
// Need to register an app attempt before AM can register
@@ -1239,6 +1246,9 @@ private void rememberTargetTransitionsAndStoreState(RMAppAttemptEvent event,
case EXPIRE:
diags = getAMExpiredDiagnostics(event);
break;
+ case RECOVER:
+ diags = ". Attempt is recovered as killed due to RM state store expire";
+ this.diagnostics.append(diags);
default:
break;
}