diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 54ad422..c4314aa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -319,6 +319,14 @@ private static void addDeprecatedKeys() {
RM_PREFIX + "am.max-attempts";
public static final int DEFAULT_RM_AM_MAX_ATTEMPTS = 2;
+ /**
+ * The maximum number of application attempts to keep in RM and RMStateStore.
+ * It's a global setting for all application masters.
+ */
+ public static final String RM_AM_MAX_ATTEMPTS_TO_KEEP =
+ RM_PREFIX + "am.max-attempts-to-keep";
+ public static final int DEFAULT_RM_AM_MAX_ATTEMPTS_TO_KEEP = 3;
+
/** The keytab for the resource manager.*/
public static final String RM_KEYTAB =
RM_PREFIX + "keytab";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 13a7b1b..bc16793 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -234,6 +234,14 @@
+ The maximum number of application attempts to keep in
+ RM and RMStateStore. It's a global setting for all application masters.
+
+ yarn.resourcemanager.am.max-attempts-to-keep
+ 3
+
+
+
How often to check that containers are still alive.
yarn.resourcemanager.container.liveness-monitor.interval-ms
600000
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index d392410..19419a0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -817,6 +817,21 @@ public void handle(RMAppAttemptEvent event) {
LOG.error("Error in handling event type " + event.getType()
+ " for applicationAttempt " + appAttemptId, t);
}
+ } else {
+ RMAppAttempt dummyAppAttempt = rmApp.getDummyAppAttempt();
+ if (dummyAppAttempt != null) {
+ try {
+ LOG.info("Event handled by dummyAppAttempt: " + event);
+ dummyAppAttempt.handle(event);
+ } catch (Throwable t) {
+ LOG.error("Error in handling event type " + event.getType()
+ + " for applicationAttempt " + appAttemptId
+ + " with dummyAttempt", t);
+ }
+ } else {
+ LOG.error("Could not handle event " + event.getType()
+ + " for applicationAttempt " + appAttemptId);
+ }
}
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index bd24b25..902244b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -230,6 +230,11 @@ void handleNMContainerStatus(NMContainerStatus containerStatus, NodeId nodeId) {
}
RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptId);
+ if (rmAppAttempt == null) {
+ LOG.info("Ignoring not found attempt " + appAttemptId);
+ return;
+ }
+
Container masterContainer = rmAppAttempt.getMasterContainer();
if (masterContainer.getId().equals(containerStatus.getContainerId())
&& containerStatus.getContainerState() == ContainerState.COMPLETE) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
index a1cebf5..021ca36 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
@@ -482,6 +482,18 @@ public synchronized void updateApplicationAttemptStateInternal(
}
@Override
+ public synchronized void removeApplicationAttemptInternal(
+ ApplicationAttemptId appAttemptId)
+ throws Exception {
+ Path appDirPath =
+ getAppDir(rmAppRoot, appAttemptId.getApplicationId());
+ Path nodeRemovePath = getNodePath(appDirPath, appAttemptId.toString());
+ LOG.info("Removing info for attempt: " + appAttemptId + " at: "
+ + nodeRemovePath);
+ deleteFileWithRetries(nodeRemovePath);
+ }
+
+ @Override
public synchronized void removeApplicationStateInternal(
ApplicationStateData appState)
throws Exception {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
index afc6721..045e258 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
@@ -575,6 +575,28 @@ protected void updateApplicationAttemptStateInternal(
}
@Override
+ public synchronized void removeApplicationAttemptInternal(
+ ApplicationAttemptId attemptId)
+ throws IOException {
+ String attemptKey = getApplicationAttemptNodeKey(attemptId);
+ try {
+ WriteBatch batch = db.createWriteBatch();
+ try {
+ batch.delete(bytes(attemptKey));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removing state for attempt " + attemptId + " at "
+ + attemptKey);
+ }
+ db.write(batch);
+ } finally {
+ batch.close();
+ }
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
protected void removeApplicationStateInternal(ApplicationStateData appState)
throws IOException {
ApplicationId appId =
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
index ce6addb..caaea7e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
@@ -142,6 +142,19 @@ public synchronized void updateApplicationAttemptStateInternal(
}
@Override
+ public synchronized void removeApplicationAttemptInternal(
+ ApplicationAttemptId appAttemptId) throws Exception {
+ ApplicationStateData appState =
+ state.getApplicationState().get(appAttemptId.getApplicationId());
+ ApplicationAttemptStateData attemptState =
+ appState.attempts.remove(appAttemptId);
+ LOG.info("Removing state for attempt: " + appAttemptId);
+ if (attemptState == null) {
+ throw new YarnRuntimeException("Application doesn't exist");
+ }
+ }
+
+ @Override
public synchronized void removeApplicationStateInternal(
ApplicationStateData appState) throws Exception {
ApplicationId appId =
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
index 96f77f5..f6fd6fe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
@@ -132,6 +132,12 @@ protected void updateApplicationAttemptStateInternal(ApplicationAttemptId attemp
}
@Override
+ public synchronized void removeApplicationAttemptInternal(
+ ApplicationAttemptId attemptId) throws Exception {
+ // Do nothing
+ }
+
+ @Override
public void checkVersion() throws Exception {
// Do nothing
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
index ec42cbe..ae17aaa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
@@ -137,6 +137,10 @@
new UpdateAppAttemptTransition())
.addTransition(RMStateStoreState.ACTIVE,
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
+ RMStateStoreEventType.REMOVE_APP_ATTEMPT,
+ new RemoveAppAttemptTransition())
+ .addTransition(RMStateStoreState.ACTIVE,
+ EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
RMStateStoreEventType.STORE_MASTERKEY,
new StoreRMDTMasterKeyTransition())
.addTransition(RMStateStoreState.ACTIVE,
@@ -552,6 +556,32 @@ private static RMStateStoreState finalState(boolean isFenced) {
return isFenced ? RMStateStoreState.FENCED : RMStateStoreState.ACTIVE;
}
+ private static class RemoveAppAttemptTransition implements
+ MultipleArcTransition {
+ @Override
+ public RMStateStoreState transition(RMStateStore store,
+ RMStateStoreEvent event) {
+ if (!(event instanceof RMStateStoreRemoveAppAttemptEvent)) {
+ // should never happen
+ LOG.error("Illegal event type: " + event.getClass());
+ return RMStateStoreState.ACTIVE;
+ }
+ boolean isFenced = false;
+ ApplicationAttemptId attemptId =
+ ((RMStateStoreRemoveAppAttemptEvent) event).getApplicationAttemptId();
+ ApplicationId appId = attemptId.getApplicationId();
+ LOG.info("Removing attempt " + attemptId + " from app: " + appId);
+ try {
+ store.removeApplicationAttemptInternal(attemptId);
+ } catch (Exception e) {
+ LOG.error("Error removing attempt: " + attemptId, e);
+ isFenced = store.notifyStoreOperationFailedInternal(e);
+ }
+ return finalState(isFenced);
+ }
+ }
+
public RMStateStore() {
super(RMStateStore.class.getName());
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@@ -983,6 +1013,29 @@ public void removeApplication(RMApp app) {
protected abstract void removeApplicationStateInternal(
ApplicationStateData appState) throws Exception;
+ /**
+ * Non-blocking API
+ * ResourceManager services call this to remove an attempt from the state
+ * store
+ * This does not block the dispatcher threads
+ * There is no notification of completion for this operation.
+ */
+ @SuppressWarnings("unchecked")
+ public synchronized void removeApplicationAttempt(
+ ApplicationAttemptId applicationAttemptId) {
+ dispatcher.getEventHandler().handle(
+ new RMStateStoreRemoveAppAttemptEvent(applicationAttemptId));
+ }
+
+ /**
+ * Blocking API
+ * Derived classes must implement this method to remove the state of specified
+ * attempt.
+ */
+ protected abstract void removeApplicationAttemptInternal(
+ ApplicationAttemptId attemptId) throws Exception;
+
+
// TODO: This should eventually become cluster-Id + "AM_RM_TOKEN_SERVICE". See
// YARN-1779
public static final Text AM_RM_TOKEN_SERVICE = new Text(
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java
index 492826d..b34634d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java
@@ -24,6 +24,7 @@
UPDATE_APP,
UPDATE_APP_ATTEMPT,
REMOVE_APP,
+ REMOVE_APP_ATTEMPT,
FENCED,
// Below events should be called synchronously
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppAttemptEvent.java
similarity index 62%
copy from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java
copy to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppAttemptEvent.java
index 492826d..7455c39 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppAttemptEvent.java
@@ -18,21 +18,20 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
-public enum RMStateStoreEventType {
- STORE_APP_ATTEMPT,
- STORE_APP,
- UPDATE_APP,
- UPDATE_APP_ATTEMPT,
- REMOVE_APP,
- FENCED,
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
- // Below events should be called synchronously
- STORE_MASTERKEY,
- REMOVE_MASTERKEY,
- STORE_DELEGATION_TOKEN,
- REMOVE_DELEGATION_TOKEN,
- UPDATE_DELEGATION_TOKEN,
- UPDATE_AMRM_TOKEN,
- STORE_RESERVATION,
- REMOVE_RESERVATION,
+/**
+ * A event used to remove an attempt.
+ */
+public class RMStateStoreRemoveAppAttemptEvent extends RMStateStoreEvent {
+ private ApplicationAttemptId applicationAttemptId;
+
+ RMStateStoreRemoveAppAttemptEvent(ApplicationAttemptId applicationAttemptId) {
+ super(RMStateStoreEventType.REMOVE_APP_ATTEMPT);
+ this.applicationAttemptId = applicationAttemptId;
+ }
+
+ public ApplicationAttemptId getApplicationAttemptId() {
+ return applicationAttemptId;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index ca0f4ac..ddb8a0b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -659,6 +659,22 @@ public synchronized void updateApplicationAttemptStateInternal(
}
@Override
+ public synchronized void removeApplicationAttemptInternal(
+ ApplicationAttemptId appAttemptId)
+ throws Exception {
+ String appId = appAttemptId.getApplicationId().toString();
+ String appIdRemovePath = getNodePath(rmAppRoot, appId);
+ String attemptIdRemovePath = getNodePath(appIdRemovePath,
+ appAttemptId.toString());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removing info for attempt: " + appAttemptId + " at: "
+ + attemptIdRemovePath);
+ }
+ safeDelete(attemptIdRemovePath);
+ }
+
+ @Override
public synchronized void removeApplicationStateInternal(
ApplicationStateData appState)
throws Exception {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java
index 1d199ed..5137bdc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java
@@ -80,6 +80,16 @@ public ApplicationAttemptStateData getAttempt(
return attempts.get(attemptId);
}
+ public int getAttemptStartId() {
+ int min = Integer.MAX_VALUE;
+ for(ApplicationAttemptId attemptId : attempts.keySet()) {
+ if (attemptId.getAttemptId() < min) {
+ min = attemptId.getAttemptId();
+ }
+ }
+ return min == Integer.MAX_VALUE ? 0 : min - 1;
+ }
+
public abstract ApplicationStateDataProto getProto();
/**
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
index bb0fc34..88ac051 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
@@ -115,6 +115,12 @@
RMAppAttempt getCurrentAppAttempt();
/**
+ * A dummy appAttempt used to handle event for those removed from attempts.
+ * @return a dummy appAttempt to handle event for those removed attempts.
+ */
+ RMAppAttempt getDummyAppAttempt();
+
+ /**
* {@link RMApp} can have multiple application attempts {@link RMAppAttempt}.
* This method returns the all {@link RMAppAttempt}s for the RMApp.
* @return all {@link RMAppAttempt}s for the RMApp.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index c4c8d2e..eee4047 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -128,6 +128,8 @@
private final ApplicationMasterService masterService;
private final StringBuilder diagnostics = new StringBuilder();
private final int maxAppAttempts;
+ // Max attempts to be kept in RMApp and RMStateStore
+ private final int maxAppAttemptsToKeep;
private final ReadLock readLock;
private final WriteLock writeLock;
private final Map attempts
@@ -149,8 +151,16 @@
private long startTime;
private long finishTime = 0;
private long storedFinishTime = 0;
+ private int currentAttemptId = 0;
// This field isn't protected by readlock now.
private volatile RMAppAttempt currentAttempt;
+ /**
+ * For work-preserving AM restart, failed attempt are still capturing
+ * CONTAINER_FINISHED event and record the finished containers for the
+ * use by the next new attempt. It is used to handle CONTAINER_FINISHED
+ * event for those removed attempts.
+ */
+ private RMAppAttempt dummyAttempt;
private String queue;
private EventHandler handler;
private static final AppFinishedTransition FINISHED_TRANSITION =
@@ -436,6 +446,10 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
+ ".");
}
+ maxAppAttemptsToKeep = conf.getInt(
+ YarnConfiguration.RM_AM_MAX_ATTEMPTS_TO_KEEP,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS_TO_KEEP);
+
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
this.writeLock = lock.writeLock();
@@ -557,6 +571,11 @@ public RMAppAttempt getCurrentAppAttempt() {
}
@Override
+ public RMAppAttempt getDummyAppAttempt() {
+ return this.dummyAttempt;
+ }
+
+ @Override
public Map getAppAttempts() {
this.readLock.lock();
@@ -809,6 +828,7 @@ public void recover(RMState state) {
this.storedFinishTime = appState.getFinishTime();
this.startTime = appState.getStartTime();
this.callerContext = appState.getCallerContext();
+ this.currentAttemptId = appState.getAttemptStartId();
// send the ATS create Event
sendATSCreateEvent(this, this.startTime);
@@ -816,13 +836,16 @@ public void recover(RMState state) {
for(int i=0; i= app.maxAppAttempts) {
@@ -1340,6 +1366,26 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
return RMAppState.FINAL_SAVING;
}
}
+
+ private void removeExcessAttempts(RMAppImpl app) {
+ while (app.attempts.size() > app.maxAppAttemptsToKeep) {
+ // attempts' first element is oldest attempt because it is a
+ // LinkedHashMap
+ RMAppAttempt oldestAttempt = app.attempts.values().iterator().next();
+ removeAppAttempt(app, oldestAttempt);
+ }
+ }
+
+ // Remove the attempt in app.attempts, and set dummyAttempt if needed.
+ private void removeAppAttempt(RMAppImpl app, RMAppAttempt appAttempt) {
+ ApplicationAttemptId attemptId = appAttempt.getAppAttemptId();
+ LOG.info("Remove attempt from state store : " + attemptId);
+ if (app.dummyAttempt == null) {
+ app.dummyAttempt = app.attempts.get(attemptId);
+ }
+ app.attempts.remove(attemptId);
+ app.rmContext.getStateStore().removeApplicationAttempt(attemptId);
+ }
}
@Override
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
index d2b8eee..c2d1dc0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
@@ -497,6 +497,7 @@ public void testRMRestartWaitForPreviousAMToFinish() throws Exception {
// be started immediately.
YarnConfiguration conf = new YarnConfiguration(this.conf);
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 40);
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS_TO_KEEP, 40);
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
RMState rmState = memStore.getState();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
index 560a305..27a00f4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
@@ -103,6 +103,10 @@ public RMAppAttempt getCurrentAppAttempt() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
+ public RMAppAttempt getDummyAppAttempt() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+ @Override
public Map getAppAttempts() {
throw new UnsupportedOperationException("Not supported yet.");
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
index acacc40..c4c69b2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
@@ -547,6 +547,7 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception {
ResourceScheduler.class);
// explicitly set max-am-retry count as 1.
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS_TO_KEEP, 3);
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
MemoryRMStateStore memStore = new MemoryRMStateStore();
@@ -610,7 +611,7 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception {
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
MockAM am4 =
- rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 4, nm1);
+ rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 3, nm1);
RMAppAttempt attempt4 = app1.getCurrentAppAttempt();
Assert.assertTrue(((RMAppAttemptImpl) attempt4).mayBeLastAttempt());
@@ -629,7 +630,7 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception {
// launch next AM in nm2
nm2.nodeHeartbeat(true);
MockAM am5 =
- rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 5, nm2);
+ rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 3, nm2);
RMAppAttempt attempt5 = app1.getCurrentAppAttempt();
Assert.assertTrue(((RMAppAttemptImpl) attempt5).mayBeLastAttempt());
// fail the AM normally
@@ -640,7 +641,7 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception {
// AM should not be restarted.
rm1.waitForState(app1.getApplicationId(), RMAppState.FAILED);
- Assert.assertEquals(5, app1.getAppAttempts().size());
+ Assert.assertEquals(3, app1.getAppAttempts().size());
rm1.stop();
}
@@ -786,6 +787,7 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception {
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
// explicitly set max-am-retry count as 2.
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS_TO_KEEP, 3);
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
@@ -874,7 +876,7 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception {
// Lauch Attempt 4
MockAM am4 =
- rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 4, nm1);
+ rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 3, nm1);
// wait for 10 seconds
clock.setTime(System.currentTimeMillis() + 10*1000);
@@ -887,7 +889,7 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception {
rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
MockAM am5 =
- rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 5, nm1);
+ rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 3, nm1);
clock.reset();
am5.waitForState(RMAppAttemptState.RUNNING);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
index 35fd0c3..e4adfe2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
@@ -151,6 +151,11 @@ public RMAppAttempt getCurrentAppAttempt() {
return attempt;
}
+ @Override
+ public RMAppAttempt getDummyAppAttempt() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
public void setCurrentAppAttempt(RMAppAttempt attempt) {
this.attempt = attempt;
}