diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java index 1ee04f0..b79e372 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java @@ -45,6 +45,15 @@ * {@link ContainerLaunchContext} of the container in which the * ApplicationMaster is executed. * + *
  • maxAppAttempts. The maximum number of application attempts. + * It should be no larger than the global number of max attempts in the + * Yarn configuration.
  • + *
  • slidingWindowSize. The default value is -1. + * when slidingWindowSize is set to > 0, the failure number will no take + * failures which happen out of the window into failure count. + * If failure count reaches to maxAppAttempts, + * the application will be failed. + *
  • * *

    * @@ -103,6 +112,22 @@ public static ApplicationSubmissionContext newInstance( resource, null); } + @Public + @Unstable + public static ApplicationSubmissionContext newInstance( + ApplicationId applicationId, String applicationName, String queue, + Priority priority, ContainerLaunchContext amContainer, + boolean isUnmanagedAM, boolean cancelTokensWhenComplete, + int maxAppAttempts, Resource resource, String applicationType, + boolean keepContainers, long slidingWindowSize) { + ApplicationSubmissionContext context = + newInstance(applicationId, applicationName, queue, priority, + amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts, + resource, applicationType, keepContainers); + context.setSlidingWindowSize(slidingWindowSize); + return context; + } + /** * Get the ApplicationId of the submitted application. * @return ApplicationId of the submitted application @@ -338,4 +363,21 @@ public abstract void setKeepContainersAcrossApplicationAttempts( @Public @Stable public abstract void setApplicationTags(Set tags); + + /** + * Get the slidingWindowSize for the application + * + * @return the slidingWindowSize + */ + @Public + @Stable + public abstract long getSlidingWindowSize(); + + /** + * Set the slidingWindowSize for the application + * @param slidingWindowSize + */ + @Public + @Stable + public abstract void setSlidingWindowSize(long slidingWindowSize); } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 3f1fa6c..539038a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -288,6 +288,7 @@ message ApplicationSubmissionContextProto { optional string applicationType = 10 [default = "YARN"]; optional bool keep_containers_across_application_attempts = 11 [default = false]; repeated string applicationTags = 12; + optional int64 sliding_window_size = 13 [default = -1]; } enum ApplicationAccessTypeProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java index c2f3268..e5a8af0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java @@ -402,4 +402,16 @@ private ResourcePBImpl convertFromProtoFormat(ResourceProto p) { private ResourceProto convertToProtoFormat(Resource t) { return ((ResourcePBImpl)t).getProto(); } + + @Override + public long getSlidingWindowSize() { + ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; + return p.getSlidingWindowSize(); + } + + @Override + public void setSlidingWindowSize(long slidingWindowSize) { + maybeInitBuilder(); + builder.setSlidingWindowSize(slidingWindowSize); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index 0a3b269..dca762c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -280,7 +280,8 @@ private void loadRMAppState(RMState rmState) throws Exception { attemptStateData.getFinalTrackingUrl(), attemptStateData.getDiagnostics(), attemptStateData.getFinalApplicationStatus(), - attemptStateData.getAMContainerExitStatus()); + attemptStateData.getAMContainerExitStatus(), + attemptStateData.getFinishTime()); // assert child node name is same as application attempt id assert attemptId.equals(attemptState.getAttemptId()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index f56517c..dc43821 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -167,7 +167,8 @@ public synchronized void updateApplicationAttemptStateInternal( attemptStateData.getFinalTrackingUrl(), attemptStateData.getDiagnostics(), attemptStateData.getFinalApplicationStatus(), - attemptStateData.getAMContainerExitStatus()); + attemptStateData.getAMContainerExitStatus(), + attemptStateData.getFinishTime()); ApplicationState appState = state.getApplicationState().get( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 714a108..3602c35 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -259,6 +259,7 @@ public RMStateStore() { final Container masterContainer; final Credentials appAttemptCredentials; long startTime = 0; + long finishTime = 0; // fields set when attempt completes RMAppAttemptState state; String finalTrackingUrl = "N/A"; @@ -270,14 +271,14 @@ public ApplicationAttemptState(ApplicationAttemptId attemptId, Container masterContainer, Credentials appAttemptCredentials, long startTime) { this(attemptId, masterContainer, appAttemptCredentials, startTime, null, - null, "", null, ContainerExitStatus.INVALID); + null, "", null, ContainerExitStatus.INVALID, 0); } public ApplicationAttemptState(ApplicationAttemptId attemptId, Container masterContainer, Credentials appAttemptCredentials, long startTime, RMAppAttemptState state, String finalTrackingUrl, String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus, - int exitStatus) { + int exitStatus, long finishTime) { this.attemptId = attemptId; this.masterContainer = masterContainer; this.appAttemptCredentials = appAttemptCredentials; @@ -287,6 +288,7 @@ public ApplicationAttemptState(ApplicationAttemptId attemptId, this.diagnostics = diagnostics == null ? "" : diagnostics; this.amUnregisteredFinalStatus = amUnregisteredFinalStatus; this.exitStatus = exitStatus; + this.finishTime = finishTime; } public Container getMasterContainer() { @@ -316,6 +318,9 @@ public FinalApplicationStatus getFinalApplicationStatus() { public int getAMContainerExitStatus(){ return this.exitStatus; } + public long getFinishTime() { + return this.finishTime; + } } /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 1b1ec76..d0e9e1a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -603,7 +603,8 @@ private void loadApplicationAttemptState(ApplicationState appState, attemptStateData.getFinalTrackingUrl(), attemptStateData.getDiagnostics(), attemptStateData.getFinalApplicationStatus(), - attemptStateData.getAMContainerExitStatus()); + attemptStateData.getAMContainerExitStatus(), + attemptStateData.getFinishTime()); appState.attempts.put(attemptState.getAttemptId(), attemptState); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java index 5cb9787..923600f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java @@ -43,7 +43,8 @@ public static ApplicationAttemptStateData newInstance( ApplicationAttemptId attemptId, Container container, ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState, String finalTrackingUrl, String diagnostics, - FinalApplicationStatus amUnregisteredFinalStatus, int exitStatus) { + FinalApplicationStatus amUnregisteredFinalStatus, int exitStatus, + long finishTime) { ApplicationAttemptStateData attemptStateData = Records.newRecord(ApplicationAttemptStateData.class); attemptStateData.setAttemptId(attemptId); @@ -55,6 +56,7 @@ public static ApplicationAttemptStateData newInstance( attemptStateData.setStartTime(startTime); attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus); attemptStateData.setAMContainerExitStatus(exitStatus); + attemptStateData.setFinishTime(finishTime); return attemptStateData; } @@ -72,7 +74,7 @@ public static ApplicationAttemptStateData newInstance( attemptState.getStartTime(), attemptState.getState(), attemptState.getFinalTrackingUrl(), attemptState.getDiagnostics(), attemptState.getFinalApplicationStatus(), - attemptState.getAMContainerExitStatus()); + attemptState.getAMContainerExitStatus(), attemptState.getFinishTime()); } public abstract ApplicationAttemptStateDataProto getProto(); @@ -157,4 +159,12 @@ public abstract void setFinalApplicationStatus( public abstract int getAMContainerExitStatus(); public abstract void setAMContainerExitStatus(int exitStatus); + + /** + * Get the finish time of the application attempt. + * @return finish time of the application attempt + */ + public abstract long getFinishTime(); + + public abstract void setFinishTime(long finishTime); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java index 5c62d63..809dbf3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java @@ -294,4 +294,16 @@ private FinalApplicationStatusProto convertToProtoFormat(FinalApplicationStatus private FinalApplicationStatus convertFromProtoFormat(FinalApplicationStatusProto s) { return ProtoUtils.convertFromProtoFormat(s); } + + @Override + public long getFinishTime() { + ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; + return p.getFinishTime(); + } + + @Override + public void setFinishTime(long finishTime) { + maybeInitBuilder(); + builder.setFinishTime(finishTime); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 48cf460..51e2637 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -35,7 +35,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; @@ -109,6 +108,10 @@ private final String applicationType; private final Set applicationTags; + private final long slidingWindowSize; + + private boolean isAttemptFailureExceedMaxAttempt = false; + // Mutable fields private long startTime; private long finishTime = 0; @@ -355,6 +358,8 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, this.maxAppAttempts = individualMaxAppAttempts; } + this.slidingWindowSize = submissionContext.getSlidingWindowSize(); + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.readLock = lock.readLock(); this.writeLock = lock.writeLock(); @@ -875,7 +880,7 @@ private String getAppAttemptFailedDiagnostics(RMAppEvent event) { msg = "Unmanaged application " + this.getApplicationId() + " failed due to " + failedEvent.getDiagnostics() + ". Failing the application."; - } else if (getNumFailedAppAttempts() >= this.maxAppAttempts) { + } else if (this.isAttemptFailureExceedMaxAttempt) { msg = "Application " + this.getApplicationId() + " failed " + this.maxAppAttempts + " times due to " + failedEvent.getDiagnostics() + ". Failing the application."; @@ -1094,11 +1099,15 @@ public void transition(RMAppImpl app, RMAppEvent event) { private int getNumFailedAppAttempts() { int completedAttempts = 0; + long endTime = System.currentTimeMillis(); // Do not count AM preemption, hardware failures or NM resync // as attempt failure. for (RMAppAttempt attempt : attempts.values()) { if (attempt.shouldCountTowardsMaxAttemptRetry()) { - completedAttempts++; + if (this.slidingWindowSize <= 0 + || (attempt.getFinishTime() > endTime - this.slidingWindowSize)) { + completedAttempts++; + } } } return completedAttempts; @@ -1116,8 +1125,10 @@ public AttemptFailedTransition(RMAppState initialState) { @Override public RMAppState transition(RMAppImpl app, RMAppEvent event) { + int numberOfFailure = app.getNumFailedAppAttempts(); + if (!app.submissionContext.getUnmanagedAM() - && app.getNumFailedAppAttempts() < app.maxAppAttempts) { + && numberOfFailure < app.maxAppAttempts) { boolean transferStateFromPreviousAttempt = false; RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event; transferStateFromPreviousAttempt = @@ -1135,6 +1146,9 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { } return initialState; } else { + if (numberOfFailure >= app.maxAppAttempts) { + app.isAttemptFailureExceedMaxAttempt = true; + } app.rememberTargetTransitionsAndStoreState(event, new AttemptFailedFinalStateSavedTransition(), RMAppState.FAILED, RMAppState.FAILED); @@ -1212,4 +1226,5 @@ public RMAppMetrics getRMAppMetrics() { return new RMAppMetrics(resourcePreempted, numNonAMContainerPreempted, numAMContainerPreempted); } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java index b5ed92c..943a5e5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java @@ -213,4 +213,10 @@ * @return metrics */ RMAppAttemptMetrics getRMAppAttemptMetrics(); + + /** + * the finish time of the application attempt. + * @return the finish time of the application attempt. + */ + long getFinishTime(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 19fc800..7efc931 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -42,7 +42,6 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -141,6 +140,7 @@ private String originalTrackingUrl = "N/A"; private String proxiedTrackingUrl = "N/A"; private long startTime = 0; + private long finishTime = 0; // Set to null initially. Will eventually get set // if an RMAppAttemptUnregistrationEvent occurs @@ -733,6 +733,7 @@ public void recover(RMState state) throws Exception { this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl); this.finalStatus = attemptState.getFinalApplicationStatus(); this.startTime = attemptState.getStartTime(); + this.finishTime = attemptState.getFinishTime(); } public void transferStateFromPreviousAttempt(RMAppAttempt attempt) { @@ -1019,10 +1020,12 @@ private void rememberTargetTransitionsAndStoreState(RMAppAttemptEvent event, } RMStateStore rmStore = rmContext.getStateStore(); + setFinishTime(System.currentTimeMillis()); ApplicationAttemptState attemptState = new ApplicationAttemptState(applicationAttemptId, getMasterContainer(), rmStore.getCredentialsFromAppAttempt(this), startTime, - stateToBeStored, finalTrackingUrl, diags, finalStatus, exitStatus); + stateToBeStored, finalTrackingUrl, diags, finalStatus, exitStatus, + getFinishTime()); LOG.info("Updating application attempt " + applicationAttemptId + " with final state: " + targetedFinalState + ", and exit status: " + exitStatus); @@ -1732,4 +1735,23 @@ public RMAppAttemptMetrics getRMAppAttemptMetrics() { // lock return attemptMetrics; } + + @Override + public long getFinishTime() { + try { + this.readLock.lock(); + return this.finishTime; + } finally { + this.readLock.unlock(); + } + } + + private void setFinishTime(long finishTime) { + try { + this.writeLock.lock(); + this.finishTime = finishTime; + } finally { + this.writeLock.unlock(); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto index eab6af1..8baf4dc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto @@ -78,6 +78,7 @@ message ApplicationAttemptStateDataProto { optional int64 start_time = 7; optional FinalApplicationStatusProto final_application_status = 8; optional int32 am_container_exit_status = 9 [default = -1000]; + optional int64 finish_time = 10; } message EpochProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 3817637..9eee8af 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -278,7 +278,15 @@ public RMApp submitApp(int masterMemory, String name, String user, boolean waitForAccepted, boolean keepContainers) throws Exception { return submitApp(masterMemory, name, user, acls, unmanaged, queue, maxAppAttempts, ts, appType, waitForAccepted, keepContainers, - false, null); + false, null, 0); + } + + public RMApp submitApp(int masterMemory, long slidingWindowSize) throws Exception { + return submitApp(masterMemory, "", UserGroupInformation.getCurrentUser() + .getShortUserName(), null, false, null, + super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false, + false, null, slidingWindowSize); } public RMApp submitApp(int masterMemory, String name, String user, @@ -286,6 +294,16 @@ public RMApp submitApp(int masterMemory, String name, String user, int maxAppAttempts, Credentials ts, String appType, boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided, ApplicationId applicationId) throws Exception { + return submitApp(masterMemory, name, user, acls, unmanaged, queue, + maxAppAttempts, ts, appType, waitForAccepted, keepContainers, + isAppIdProvided, applicationId, 0); + } + + public RMApp submitApp(int masterMemory, String name, String user, + Map acls, boolean unmanaged, String queue, + int maxAppAttempts, Credentials ts, String appType, + boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided, + ApplicationId applicationId, long slidingWindowSize) throws Exception { ApplicationId appId = isAppIdProvided ? applicationId : null; ApplicationClientProtocol client = getClientRMService(); if (! isAppIdProvided) { @@ -321,6 +339,7 @@ public RMApp submitApp(int masterMemory, String name, String user, clc.setTokens(securityTokens); } sub.setAMContainerSpec(clc); + sub.setSlidingWindowSize(slidingWindowSize); req.setApplicationSubmissionContext(sub); UserGroupInformation fakeUser = UserGroupInformation.createUserForTesting(user, new String[] {"someGroup"}); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index 6c5c818..0ab581f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -584,4 +584,125 @@ public void testRMRestartOrFailoverNotCountedForAMFailures() rm1.stop(); rm2.stop(); } + + @Test (timeout = 50000) + public void testRMAppSlidingWindow() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + // explicitly set max-am-retry count as 2. + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // set window size to a larger number : 20s + // we will verify the app should be failed if + // two continuous attempts failed in 20s. + RMApp app = rm1.submitApp(200, 20000); + MockAM am = MockRM.launchAM(app, rm1, nm1); + // Fail current attempt normally + nm1.nodeHeartbeat(am.getApplicationAttemptId(), + 1, ContainerState.COMPLETE); + am.waitForState(RMAppAttemptState.FAILED); + // launch the second attempt + rm1.waitForState(app.getApplicationId(), RMAppState.ACCEPTED); + Assert.assertEquals(2, app.getAppAttempts().size()); + Assert.assertTrue(((RMAppAttemptImpl) app.getCurrentAppAttempt()) + .mayBeLastAttempt()); + MockAM am_2 = MockRM.launchAndRegisterAM(app, rm1, nm1); + am_2.waitForState(RMAppAttemptState.RUNNING); + nm1.nodeHeartbeat(am_2.getApplicationAttemptId(), + 1, ContainerState.COMPLETE); + am_2.waitForState(RMAppAttemptState.FAILED); + // current app should be failed. + rm1.waitForState(app.getApplicationId(), RMAppState.FAILED); + + // set window size to 6s + RMApp app1 = rm1.submitApp(200, 6000); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // Fail attempt1 normally + nm1.nodeHeartbeat(am1.getApplicationAttemptId(), + 1, ContainerState.COMPLETE); + am1.waitForState(RMAppAttemptState.FAILED); + + // launch the second attempt + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + Assert.assertEquals(2, app1.getAppAttempts().size()); + RMAppAttempt attempt2 = app1.getCurrentAppAttempt(); + Assert.assertTrue(((RMAppAttemptImpl) attempt2).mayBeLastAttempt()); + MockAM am2 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + am2.waitForState(RMAppAttemptState.RUNNING); + + // wait for 6 seconds + Thread.sleep(6000); + + // Fail attempt2 normally + nm1.nodeHeartbeat(am2.getApplicationAttemptId(), + 1, ContainerState.COMPLETE); + am2.waitForState(RMAppAttemptState.FAILED); + + // can launch the third attempt successfully + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + Assert.assertEquals(3, app1.getAppAttempts().size()); + RMAppAttempt attempt3 = app1.getCurrentAppAttempt(); + MockAM am3 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + am3.waitForState(RMAppAttemptState.RUNNING); + + // Restart rm. + @SuppressWarnings("resource") + MockRM rm2 = new MockRM(conf, memStore); + rm2.start(); + + // re-register the NM + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + NMContainerStatus status = Records.newRecord(NMContainerStatus.class); + status + .setContainerExitStatus(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER); + status.setContainerId(attempt3.getMasterContainer().getId()); + status.setContainerState(ContainerState.COMPLETE); + status.setDiagnostics(""); + nm1.registerNode(Collections.singletonList(status), null); + + rm2.waitForState(attempt3.getAppAttemptId(), RMAppAttemptState.FAILED); + + rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + + // Lauch Attempt 4 + MockAM am4 = + rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 4, nm1); + + // wait for 6 seconds + Thread.sleep(6000); + + // Fail attempt4 normally + nm1 + .nodeHeartbeat(am4.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am4.waitForState(RMAppAttemptState.FAILED); + + // can launch the 5th attempt successfully + rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + + MockAM am5 = + rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 5, nm1); + am5.waitForState(RMAppAttemptState.RUNNING); + + // Fail attempt5 normally + nm1 + .nodeHeartbeat(am5.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am5.waitForState(RMAppAttemptState.FAILED); + + rm2.waitForState(app1.getApplicationId(), RMAppState.FAILED); + rm1.stop(); + rm2.stop(); + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index 620ba9f..e408870 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -304,7 +304,8 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) oldAttemptState.getAppAttemptCredentials(), oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED, "myTrackingUrl", "attemptDiagnostics", - FinalApplicationStatus.SUCCEEDED, 100); + FinalApplicationStatus.SUCCEEDED, 100, + oldAttemptState.getFinishTime()); store.updateApplicationAttemptState(newAttemptState); // test updating the state of an app/attempt whose initial state was not @@ -327,7 +328,8 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) oldAttemptState.getAppAttemptCredentials(), oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED, "myTrackingUrl", "attemptDiagnostics", - FinalApplicationStatus.SUCCEEDED, 111); + FinalApplicationStatus.SUCCEEDED, 111, + oldAttemptState.getFinishTime()); store.updateApplicationAttemptState(dummyAttempt); // let things settle down