diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index a7f485d..7b79dee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -288,6 +288,14 @@ private static void addDeprecatedKeys() { RM_PREFIX + "am.max-attempts"; public static final int DEFAULT_RM_AM_MAX_ATTEMPTS = 2; + /** + * The maximum number of application attempts to keep in RM and RMStateStore. + * It's a global setting for all application masters. + */ + public static final String RM_AM_MAX_ATTEMPTS_TO_KEEP = + RM_PREFIX + "am.max-attempts-to-keep"; + public static final int DEFAULT_RM_AM_MAX_ATTEMPTS_TO_KEEP = 3; + /** The keytab for the resource manager.*/ public static final String RM_KEYTAB = RM_PREFIX + "keytab"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 130cfd4..08cf559 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -838,6 +838,20 @@ public void handle(RMAppAttemptEvent event) { LOG.error("Error in handling event type " + event.getType() + " for applicationAttempt " + appAttemptId, t); } + } else { + RMAppAttempt dummyAppAttempt = rmApp.getDummyAppAttempt(); + if (dummyAppAttempt != null) { + try { + LOG.info("Event handled by dummyAppAttempt: " + event); + dummyAppAttempt.handle(event); + } catch (Throwable t) { + LOG.error("Error in handling event type " + event.getType() + + " for applicationAttempt " + appAttemptId + " with dummyAttempt", t); + } + } else { + LOG.error("Could not handle event " + event.getType() + + " for applicationAttempt " + appAttemptId); + } } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index 68d26bb..6e2eb4a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -462,6 +462,18 @@ public synchronized void updateApplicationAttemptStateInternal( } @Override + public synchronized void removeApplicationAttemptInternal( + ApplicationAttemptId appAttemptId) + throws Exception { + Path appDirPath = + getAppDir(rmAppRoot, appAttemptId.getApplicationId()); + Path nodeRemovePath = getNodePath(appDirPath, appAttemptId.toString()); + LOG.info("Removing info for attempt: " + appAttemptId + " at: " + + nodeRemovePath); + deleteFileWithRetries(nodeRemovePath); + } + + @Override public synchronized void removeApplicationStateInternal( ApplicationStateData appState) throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java index 46a3459..c9b5644 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java @@ -519,6 +519,28 @@ protected void updateApplicationAttemptStateInternal( } @Override + public synchronized void removeApplicationAttemptInternal( + ApplicationAttemptId attemptId) + throws IOException { + String attemptKey = getApplicationAttemptNodeKey(attemptId); + try { + WriteBatch batch = db.createWriteBatch(); + try { + batch.delete(bytes(attemptKey)); + if (LOG.isDebugEnabled()) { + LOG.debug("Removing state for attempt " + attemptId + " at " + + attemptKey); + } + db.write(batch); + } finally { + batch.close(); + } + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override protected void removeApplicationStateInternal(ApplicationStateData appState) throws IOException { ApplicationId appId = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index 609f403..c85b9e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -139,6 +139,19 @@ public synchronized void updateApplicationAttemptStateInternal( } @Override + public synchronized void removeApplicationAttemptInternal( + ApplicationAttemptId appAttemptId) throws Exception { + ApplicationStateData appState = + state.getApplicationState().get(appAttemptId.getApplicationId()); + ApplicationAttemptStateData attemptState = + appState.attempts.remove(appAttemptId); + LOG.info("Removing state for attempt: " + appAttemptId); + if (attemptState == null) { + throw new YarnRuntimeException("Application doesn't exist"); + } + } + + @Override public synchronized void removeApplicationStateInternal( ApplicationStateData appState) throws Exception { ApplicationId appId = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java index 92c07cd..24fee9c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java @@ -118,6 +118,12 @@ protected void updateApplicationAttemptStateInternal(ApplicationAttemptId attemp } @Override + public synchronized void removeApplicationAttemptInternal(ApplicationAttemptId attemptId) + throws Exception { + // Do nothing + } + + @Override public void checkVersion() throws Exception { // Do nothing } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index cc4edd7..eaee504 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -119,6 +119,8 @@ .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, RMStateStoreEventType.UPDATE_APP_ATTEMPT, new UpdateAppAttemptTransition()) .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, + RMStateStoreEventType.REMOVE_APP_ATTEMPT, new RemoveAppAttemptTransition()) + .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, RMStateStoreEventType.STORE_MASTERKEY, new StoreRMDTMasterKeyTransition()) .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, @@ -414,6 +416,28 @@ public void transition(RMStateStore store, RMStateStoreEvent event) { } } + private static class RemoveAppAttemptTransition implements + SingleArcTransition { + @Override + public void transition(RMStateStore store, RMStateStoreEvent event) { + if (!(event instanceof RMStateStoreRemoveAppAttemptEvent)) { + // should never happen + LOG.error("Illegal event type: " + event.getClass()); + return; + } + ApplicationAttemptId attemptId = + ((RMStateStoreRemoveAppAttemptEvent) event).getApplicationAttemptId(); + ApplicationId appId = attemptId.getApplicationId(); + LOG.info("Removing attempt " + attemptId + " from app: " + appId); + try { + store.removeApplicationAttemptInternal(attemptId); + } catch (Exception e) { + LOG.error("Error removing attempt: " + attemptId, e); + store.notifyStoreOperationFailed(e); + } + }; + } + public RMStateStore() { super(RMStateStore.class.getName()); ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); @@ -798,6 +822,28 @@ public synchronized void removeApplication(RMApp app) { protected abstract void removeApplicationStateInternal( ApplicationStateData appState) throws Exception; + /** + * Non-blocking API + * ResourceManager services call this to remove an attempt from the state + * store + * This does not block the dispatcher threads + * There is no notification of completion for this operation. + */ + public synchronized void removeApplicationAttempt( + ApplicationAttemptId applicationAttemptId) { + dispatcher.getEventHandler().handle( + new RMStateStoreRemoveAppAttemptEvent(applicationAttemptId)); + } + + /** + * Blocking API + * Derived classes must implement this method to remove the state of specified + * attempt + */ + protected abstract void removeApplicationAttemptInternal( + ApplicationAttemptId attemptId) throws Exception; + + // TODO: This should eventually become cluster-Id + "AM_RM_TOKEN_SERVICE". See // YARN-1779 public static final Text AM_RM_TOKEN_SERVICE = new Text( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java index beba5eb..09d18a2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java @@ -24,6 +24,7 @@ UPDATE_APP, UPDATE_APP_ATTEMPT, REMOVE_APP, + REMOVE_APP_ATTEMPT, FENCED, // Below events should be called synchronously diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppAttemptEvent.java similarity index 63% copy from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java copy to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppAttemptEvent.java index beba5eb..9648102 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppAttemptEvent.java @@ -18,19 +18,18 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; -public enum RMStateStoreEventType { - STORE_APP_ATTEMPT, - STORE_APP, - UPDATE_APP, - UPDATE_APP_ATTEMPT, - REMOVE_APP, - FENCED, - // Below events should be called synchronously - STORE_MASTERKEY, - REMOVE_MASTERKEY, - STORE_DELEGATION_TOKEN, - REMOVE_DELEGATION_TOKEN, - UPDATE_DELEGATION_TOKEN, - UPDATE_AMRM_TOKEN +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; + +public class RMStateStoreRemoveAppAttemptEvent extends RMStateStoreEvent { + ApplicationAttemptId applicationAttemptId; + + RMStateStoreRemoveAppAttemptEvent(ApplicationAttemptId applicationAttemptId) { + super(RMStateStoreEventType.REMOVE_APP_ATTEMPT); + this.applicationAttemptId = applicationAttemptId; + } + + public ApplicationAttemptId getApplicationAttemptId() { + return applicationAttemptId; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 9da6400..0ffc1fc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -676,6 +676,19 @@ public synchronized void updateApplicationAttemptStateInternal( } @Override + public synchronized void removeApplicationAttemptInternal( + ApplicationAttemptId appAttemptId) throws Exception { + String appId = appAttemptId.getApplicationId().toString(); + String appIdRemovePath = getNodePath(rmAppRoot, appId); + String attemptIdRemovePath = getNodePath(appIdRemovePath, appAttemptId.toString()); + + if (LOG.isDebugEnabled()) { + LOG.debug("Removing attempt info: " + appAttemptId + " at: " + attemptIdRemovePath); + } + doMultiWithRetries(Op.delete(attemptIdRemovePath, -1)); + } + + @Override public synchronized void removeApplicationStateInternal( ApplicationStateData appState) throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java index 43046a9..01adb14 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java @@ -44,7 +44,8 @@ public static ApplicationStateData newInstance(long submitTime, long startTime, String user, ApplicationSubmissionContext submissionContext, - RMAppState state, String diagnostics, long finishTime) { + RMAppState state, String diagnostics, long finishTime, + int numFailedAppAttempts) { ApplicationStateData appState = Records.newRecord(ApplicationStateData.class); appState.setSubmitTime(submitTime); appState.setStartTime(startTime); @@ -53,12 +54,13 @@ public static ApplicationStateData newInstance(long submitTime, appState.setState(state); appState.setDiagnostics(diagnostics); appState.setFinishTime(finishTime); + appState.setNumFailedAppAttempts(numFailedAppAttempts); return appState; } public static ApplicationStateData newInstance(long submitTime, long startTime, ApplicationSubmissionContext context, String user) { - return newInstance(submitTime, startTime, user, context, null, "", 0); + return newInstance(submitTime, startTime, user, context, null, "", 0, 0); } public int getAttemptCount() { @@ -70,6 +72,16 @@ public ApplicationAttemptStateData getAttempt( return attempts.get(attemptId); } + public int getAttemptStartId() { + int min = Integer.MAX_VALUE; + for(ApplicationAttemptId attemptId : attempts.keySet()) { + if (attemptId.getAttemptId() < min) { + min = attemptId.getAttemptId(); + } + } + return min == Integer.MAX_VALUE ? 0 : min - 1; + } + public abstract ApplicationStateDataProto getProto(); /** @@ -144,4 +156,14 @@ public abstract void setApplicationSubmissionContext( public abstract long getFinishTime(); public abstract void setFinishTime(long finishTime); + + /** + * Get the number of failed attempts which does not include preempted, + * hardware error and NM resync. + * @return the number of failed attempts which does not include preempted, + * hardware error and NM resync. + */ + public abstract int getNumFailedAppAttempts(); + + public abstract void setNumFailedAppAttempts(int numFailedAppAttempts); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java index d8cbd23..5b7a1ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java @@ -196,6 +196,18 @@ public void setFinishTime(long finishTime) { } @Override + public int getNumFailedAppAttempts() { + ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder; + return p.getNumFailedAppAttempts(); + } + + @Override + public void setNumFailedAppAttempts(int numFailedAppAttempts) { + maybeInitBuilder(); + builder.setNumFailedAppAttempts(numFailedAppAttempts); + } + + @Override public int hashCode() { return getProto().hashCode(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index be9dfaf..4691ac4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -114,6 +114,12 @@ RMAppAttempt getCurrentAppAttempt(); /** + * A dummy appAttempt used to handle event for those removed from attempts. + * @return a dummy appAttempt to handle event for those removed attempts. + */ + RMAppAttempt getDummyAppAttempt(); + + /** * {@link RMApp} can have multiple application attempts {@link RMAppAttempt}. * This method returns the all {@link RMAppAttempt}s for the RMApp. * @return all {@link RMAppAttempt}s for the RMApp. @@ -196,6 +202,14 @@ ApplicationReport createAndGetApplicationReport(String clientUserName, FinalApplicationStatus getFinalApplicationStatus(); /** + * Get the number of failed attempts which does not include preempted, + * hardware error and NM resync. + * @return the number of failed attempts which does not include preempted, + * hardware error and NM resync. + */ + int getNumFailedAppAttempts(); + + /** * The number of max attempts of the application. * @return the number of max attempts of the application. */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index b4e4965..cd5cb1dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -119,6 +119,8 @@ private final ApplicationMasterService masterService; private final StringBuilder diagnostics = new StringBuilder(); private final int maxAppAttempts; + // Max attempts to be kept in RMApp and RMStateStore + private final int maxAppAttemptsToKeep; private final ReadLock readLock; private final WriteLock writeLock; private final Map attempts @@ -138,8 +140,19 @@ private long startTime; private long finishTime = 0; private long storedFinishTime = 0; + private int currentAttemptId = 0; + // The number of failed attempts which does not include preempted, + // hardware error and NM resync + private int numFailedAppAttempts = 0; // This field isn't protected by readlock now. private volatile RMAppAttempt currentAttempt; + /** + * For work-preserving AM restart, failed attempt are still capturing + * CONTAINER_FINISHED event and record the finished containers for the + * use by the next new attempt. It is used to handle CONTAINER_FINISHED + * event for those removed attempts. + */ + private RMAppAttempt dummyAttempt; private String queue; private EventHandler handler; private static final AppFinishedTransition FINISHED_TRANSITION = @@ -416,6 +429,9 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, + "."); } + maxAppAttemptsToKeep = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS_TO_KEEP, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS_TO_KEEP); + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.readLock = lock.readLock(); this.writeLock = lock.writeLock(); @@ -524,6 +540,16 @@ public RMAppAttempt getCurrentAppAttempt() { } @Override + public RMAppAttempt getDummyAppAttempt() { + this.readLock.lock(); + try { + return this.dummyAttempt; + } finally { + this.readLock.unlock(); + } + } + + @Override public Map getAppAttempts() { this.readLock.lock(); @@ -765,17 +791,22 @@ public void recover(RMState state) { this.diagnostics.append(appState.getDiagnostics()); this.storedFinishTime = appState.getFinishTime(); this.startTime = appState.getStartTime(); + this.currentAttemptId = appState.getAttemptStartId(); + this.numFailedAppAttempts = appState.getNumFailedAppAttempts(); for(int i=0; i endTime - - this.attemptFailuresValidityInterval)) { - completedAttempts++; + if (attempt.shouldCountTowardsMaxAttemptRetry()) { + if (this.attemptFailuresValidityInterval <= 0 + || (attempt.getFinishTime() > endTime + - this.attemptFailuresValidityInterval)) { + numFailedAppAttempts ++; } - } } - return completedAttempts; } private static final class AttemptFailedTransition implements @@ -1259,6 +1291,8 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { return RMAppState.FINAL_SAVING; } + app.updateNumFailedAppAttempts(app.currentAttempt); + boolean transferStateFromPreviousAttempt; RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event; transferStateFromPreviousAttempt = @@ -1274,6 +1308,11 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { // but when pulling finished container we will check this flag again. ((RMAppAttemptImpl) app.currentAttempt) .transferStateFromPreviousAttempt(oldAttempt); + + while (app.attempts.size() > app.maxAppAttemptsToKeep) { + removeOldestAttempt(app); + } + return initialState; } else { if (numberOfFailure >= app.maxAppAttempts) { @@ -1285,6 +1324,18 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { return RMAppState.FINAL_SAVING; } } + + // Remove oldest attempt in app.attempts, and set dummyAttempt if needed. + private void removeOldestAttempt(RMAppImpl app) { + // Just need remove attempts' first element because it is a LinkedHashMap + ApplicationAttemptId attemptId = app.attempts.keySet().iterator().next(); + LOG.info("Remove oldest attempt in app.attempts : " + attemptId); + if (app.dummyAttempt == null) { + app.dummyAttempt = app.attempts.get(attemptId); + } + app.attempts.remove(attemptId); + app.rmContext.getStateStore().removeApplicationAttempt(attemptId); + } } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto index 3c8ac34..5380104 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto @@ -67,6 +67,7 @@ message ApplicationStateDataProto { optional RMAppStateProto application_state = 5; optional string diagnostics = 6 [default = "N/A"]; optional int64 finish_time = 7; + optional int32 num_failed_appAttempts = 8; } message ApplicationAttemptStateDataProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index a23c789..1462a4e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -102,6 +102,14 @@ public RMAppAttempt getCurrentAppAttempt() { throw new UnsupportedOperationException("Not supported yet."); } @Override + public RMAppAttempt getDummyAppAttempt() { + throw new UnsupportedOperationException("Not supported yet."); + } + @Override + public int getNumFailedAppAttempts() { + throw new UnsupportedOperationException("Not supported yet."); + } + @Override public Map getAppAttempts() { throw new UnsupportedOperationException("Not supported yet."); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index 3bc0709..0affa40 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -317,7 +317,7 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) ApplicationStateData.newInstance(appState.getSubmitTime(), appState.getStartTime(), appState.getUser(), appState.getApplicationSubmissionContext(), RMAppState.FINISHED, - "appDiagnostics", 1234); + "appDiagnostics", 1234, 6); appState2.attempts.putAll(appState.attempts); store.updateApplicationState(appState2); @@ -342,7 +342,7 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) ApplicationStateData dummyApp = ApplicationStateData.newInstance(appState.getSubmitTime(), appState.getStartTime(), appState.getUser(), dummyContext, - RMAppState.FINISHED, "appDiagnostics", 1234); + RMAppState.FINISHED, "appDiagnostics", 1234, 6); store.updateApplicationState(dummyApp); ApplicationAttemptId dummyAttemptId = @@ -379,6 +379,7 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) assertEquals( RMAppState.FINISHED, updatedAppState.getState()); assertEquals("appDiagnostics", updatedAppState.getDiagnostics()); assertEquals(1234, updatedAppState.getFinishTime()); + assertEquals(6, updatedAppState.getNumFailedAppAttempts()); // check updated attempt state assertNotNull(newRMAppState.get(dummyApp.getApplicationSubmissionContext diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java index b1e7a0b..8f6478a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java @@ -281,7 +281,7 @@ public void run() { store.storeApplicationStateInternal( ApplicationId.newInstance(100L, 1), ApplicationStateData.newInstance(111, 111, "user", null, - RMAppState.ACCEPTED, "diagnostics", 333)); + RMAppState.ACCEPTED, "diagnostics", 333, 222)); } catch (Exception e) { assertionFailedInThread.set(true); e.printStackTrace(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index c6ee3ba..cf3cdfd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -147,6 +147,16 @@ public RMAppAttempt getCurrentAppAttempt() { return attempt; } + @Override + public RMAppAttempt getDummyAppAttempt() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public int getNumFailedAppAttempts() { + throw new UnsupportedOperationException("Not supported yet."); + } + public void setCurrentAppAttempt(RMAppAttempt attempt) { this.attempt = attempt; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index 72f1dff..d882cdd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -999,7 +999,7 @@ public void createRMStateForApplications( ApplicationStateData appState = ApplicationStateData.newInstance(app.getSubmitTime(), app.getStartTime(), app.getUser(), app.getApplicationSubmissionContext(), rmAppState, - null, app.getFinishTime()); + null, app.getFinishTime(), app.getNumFailedAppAttempts()); applicationState.put(app.getApplicationId(), appState); }