diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
index 1ee04f0..b79e372 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
@@ -45,6 +45,15 @@
* {@link ContainerLaunchContext} of the container in which the
* ApplicationMaster is executed.
*
+ *
maxAppAttempts. The maximum number of application attempts.
+ * It should be no larger than the global number of max attempts in the
+ * Yarn configuration.
+ * slidingWindowSize. The default value is -1.
+ * when slidingWindowSize is set to > 0, the failure number will no take
+ * failures which happen out of the window into failure count.
+ * If failure count reaches to maxAppAttempts,
+ * the application will be failed.
+ *
*
*
*
@@ -103,6 +112,22 @@ public static ApplicationSubmissionContext newInstance(
resource, null);
}
+ @Public
+ @Unstable
+ public static ApplicationSubmissionContext newInstance(
+ ApplicationId applicationId, String applicationName, String queue,
+ Priority priority, ContainerLaunchContext amContainer,
+ boolean isUnmanagedAM, boolean cancelTokensWhenComplete,
+ int maxAppAttempts, Resource resource, String applicationType,
+ boolean keepContainers, long slidingWindowSize) {
+ ApplicationSubmissionContext context =
+ newInstance(applicationId, applicationName, queue, priority,
+ amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts,
+ resource, applicationType, keepContainers);
+ context.setSlidingWindowSize(slidingWindowSize);
+ return context;
+ }
+
/**
* Get the ApplicationId of the submitted application.
* @return ApplicationId of the submitted application
@@ -338,4 +363,21 @@ public abstract void setKeepContainersAcrossApplicationAttempts(
@Public
@Stable
public abstract void setApplicationTags(Set tags);
+
+ /**
+ * Get the slidingWindowSize for the application
+ *
+ * @return the slidingWindowSize
+ */
+ @Public
+ @Stable
+ public abstract long getSlidingWindowSize();
+
+ /**
+ * Set the slidingWindowSize for the application
+ * @param slidingWindowSize
+ */
+ @Public
+ @Stable
+ public abstract void setSlidingWindowSize(long slidingWindowSize);
}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 3f1fa6c..539038a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -288,6 +288,7 @@ message ApplicationSubmissionContextProto {
optional string applicationType = 10 [default = "YARN"];
optional bool keep_containers_across_application_attempts = 11 [default = false];
repeated string applicationTags = 12;
+ optional int64 sliding_window_size = 13 [default = -1];
}
enum ApplicationAccessTypeProto {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
index c2f3268..e5a8af0 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
@@ -402,4 +402,16 @@ private ResourcePBImpl convertFromProtoFormat(ResourceProto p) {
private ResourceProto convertToProtoFormat(Resource t) {
return ((ResourcePBImpl)t).getProto();
}
+
+ @Override
+ public long getSlidingWindowSize() {
+ ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
+ return p.getSlidingWindowSize();
+ }
+
+ @Override
+ public void setSlidingWindowSize(long slidingWindowSize) {
+ maybeInitBuilder();
+ builder.setSlidingWindowSize(slidingWindowSize);
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
index 0a3b269..dca762c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
@@ -280,7 +280,8 @@ private void loadRMAppState(RMState rmState) throws Exception {
attemptStateData.getFinalTrackingUrl(),
attemptStateData.getDiagnostics(),
attemptStateData.getFinalApplicationStatus(),
- attemptStateData.getAMContainerExitStatus());
+ attemptStateData.getAMContainerExitStatus(),
+ attemptStateData.getFinishTime());
// assert child node name is same as application attempt id
assert attemptId.equals(attemptState.getAttemptId());
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
index f56517c..dc43821 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
@@ -167,7 +167,8 @@ public synchronized void updateApplicationAttemptStateInternal(
attemptStateData.getFinalTrackingUrl(),
attemptStateData.getDiagnostics(),
attemptStateData.getFinalApplicationStatus(),
- attemptStateData.getAMContainerExitStatus());
+ attemptStateData.getAMContainerExitStatus(),
+ attemptStateData.getFinishTime());
ApplicationState appState =
state.getApplicationState().get(
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
index 714a108..3602c35 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
@@ -259,6 +259,7 @@ public RMStateStore() {
final Container masterContainer;
final Credentials appAttemptCredentials;
long startTime = 0;
+ long finishTime = 0;
// fields set when attempt completes
RMAppAttemptState state;
String finalTrackingUrl = "N/A";
@@ -270,14 +271,14 @@ public ApplicationAttemptState(ApplicationAttemptId attemptId,
Container masterContainer, Credentials appAttemptCredentials,
long startTime) {
this(attemptId, masterContainer, appAttemptCredentials, startTime, null,
- null, "", null, ContainerExitStatus.INVALID);
+ null, "", null, ContainerExitStatus.INVALID, 0);
}
public ApplicationAttemptState(ApplicationAttemptId attemptId,
Container masterContainer, Credentials appAttemptCredentials,
long startTime, RMAppAttemptState state, String finalTrackingUrl,
String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus,
- int exitStatus) {
+ int exitStatus, long finishTime) {
this.attemptId = attemptId;
this.masterContainer = masterContainer;
this.appAttemptCredentials = appAttemptCredentials;
@@ -287,6 +288,7 @@ public ApplicationAttemptState(ApplicationAttemptId attemptId,
this.diagnostics = diagnostics == null ? "" : diagnostics;
this.amUnregisteredFinalStatus = amUnregisteredFinalStatus;
this.exitStatus = exitStatus;
+ this.finishTime = finishTime;
}
public Container getMasterContainer() {
@@ -316,6 +318,9 @@ public FinalApplicationStatus getFinalApplicationStatus() {
public int getAMContainerExitStatus(){
return this.exitStatus;
}
+ public long getFinishTime() {
+ return this.finishTime;
+ }
}
/**
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index 1b1ec76..d0e9e1a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -603,7 +603,8 @@ private void loadApplicationAttemptState(ApplicationState appState,
attemptStateData.getFinalTrackingUrl(),
attemptStateData.getDiagnostics(),
attemptStateData.getFinalApplicationStatus(),
- attemptStateData.getAMContainerExitStatus());
+ attemptStateData.getAMContainerExitStatus(),
+ attemptStateData.getFinishTime());
appState.attempts.put(attemptState.getAttemptId(), attemptState);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java
index 5cb9787..923600f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java
@@ -43,7 +43,8 @@ public static ApplicationAttemptStateData newInstance(
ApplicationAttemptId attemptId, Container container,
ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState,
String finalTrackingUrl, String diagnostics,
- FinalApplicationStatus amUnregisteredFinalStatus, int exitStatus) {
+ FinalApplicationStatus amUnregisteredFinalStatus, int exitStatus,
+ long finishTime) {
ApplicationAttemptStateData attemptStateData =
Records.newRecord(ApplicationAttemptStateData.class);
attemptStateData.setAttemptId(attemptId);
@@ -55,6 +56,7 @@ public static ApplicationAttemptStateData newInstance(
attemptStateData.setStartTime(startTime);
attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus);
attemptStateData.setAMContainerExitStatus(exitStatus);
+ attemptStateData.setFinishTime(finishTime);
return attemptStateData;
}
@@ -72,7 +74,7 @@ public static ApplicationAttemptStateData newInstance(
attemptState.getStartTime(), attemptState.getState(),
attemptState.getFinalTrackingUrl(), attemptState.getDiagnostics(),
attemptState.getFinalApplicationStatus(),
- attemptState.getAMContainerExitStatus());
+ attemptState.getAMContainerExitStatus(), attemptState.getFinishTime());
}
public abstract ApplicationAttemptStateDataProto getProto();
@@ -157,4 +159,12 @@ public abstract void setFinalApplicationStatus(
public abstract int getAMContainerExitStatus();
public abstract void setAMContainerExitStatus(int exitStatus);
+
+ /**
+ * Get the finish time of the application attempt.
+ * @return finish time of the application attempt
+ */
+ public abstract long getFinishTime();
+
+ public abstract void setFinishTime(long finishTime);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java
index 5c62d63..809dbf3 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java
@@ -294,4 +294,16 @@ private FinalApplicationStatusProto convertToProtoFormat(FinalApplicationStatus
private FinalApplicationStatus convertFromProtoFormat(FinalApplicationStatusProto s) {
return ProtoUtils.convertFromProtoFormat(s);
}
+
+ @Override
+ public long getFinishTime() {
+ ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
+ return p.getFinishTime();
+ }
+
+ @Override
+ public void setFinishTime(long finishTime) {
+ maybeInitBuilder();
+ builder.setFinishTime(finishTime);
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 48cf460..51e2637 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -35,7 +35,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -109,6 +108,10 @@
private final String applicationType;
private final Set applicationTags;
+ private final long slidingWindowSize;
+
+ private boolean isAttemptFailureExceedMaxAttempt = false;
+
// Mutable fields
private long startTime;
private long finishTime = 0;
@@ -355,6 +358,8 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
this.maxAppAttempts = individualMaxAppAttempts;
}
+ this.slidingWindowSize = submissionContext.getSlidingWindowSize();
+
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
this.writeLock = lock.writeLock();
@@ -875,7 +880,7 @@ private String getAppAttemptFailedDiagnostics(RMAppEvent event) {
msg = "Unmanaged application " + this.getApplicationId()
+ " failed due to " + failedEvent.getDiagnostics()
+ ". Failing the application.";
- } else if (getNumFailedAppAttempts() >= this.maxAppAttempts) {
+ } else if (this.isAttemptFailureExceedMaxAttempt) {
msg = "Application " + this.getApplicationId() + " failed "
+ this.maxAppAttempts + " times due to "
+ failedEvent.getDiagnostics() + ". Failing the application.";
@@ -1094,11 +1099,15 @@ public void transition(RMAppImpl app, RMAppEvent event) {
private int getNumFailedAppAttempts() {
int completedAttempts = 0;
+ long endTime = System.currentTimeMillis();
// Do not count AM preemption, hardware failures or NM resync
// as attempt failure.
for (RMAppAttempt attempt : attempts.values()) {
if (attempt.shouldCountTowardsMaxAttemptRetry()) {
- completedAttempts++;
+ if (this.slidingWindowSize <= 0
+ || (attempt.getFinishTime() > endTime - this.slidingWindowSize)) {
+ completedAttempts++;
+ }
}
}
return completedAttempts;
@@ -1116,8 +1125,10 @@ public AttemptFailedTransition(RMAppState initialState) {
@Override
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
+ int numberOfFailure = app.getNumFailedAppAttempts();
+
if (!app.submissionContext.getUnmanagedAM()
- && app.getNumFailedAppAttempts() < app.maxAppAttempts) {
+ && numberOfFailure < app.maxAppAttempts) {
boolean transferStateFromPreviousAttempt = false;
RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
transferStateFromPreviousAttempt =
@@ -1135,6 +1146,9 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
}
return initialState;
} else {
+ if (numberOfFailure >= app.maxAppAttempts) {
+ app.isAttemptFailureExceedMaxAttempt = true;
+ }
app.rememberTargetTransitionsAndStoreState(event,
new AttemptFailedFinalStateSavedTransition(), RMAppState.FAILED,
RMAppState.FAILED);
@@ -1212,4 +1226,5 @@ public RMAppMetrics getRMAppMetrics() {
return new RMAppMetrics(resourcePreempted,
numNonAMContainerPreempted, numAMContainerPreempted);
}
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
index b5ed92c..943a5e5 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
@@ -213,4 +213,10 @@
* @return metrics
*/
RMAppAttemptMetrics getRMAppAttemptMetrics();
+
+ /**
+ * the finish time of the application attempt.
+ * @return the finish time of the application attempt.
+ */
+ long getFinishTime();
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index 19fc800..7efc931 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -42,7 +42,6 @@
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -141,6 +140,7 @@
private String originalTrackingUrl = "N/A";
private String proxiedTrackingUrl = "N/A";
private long startTime = 0;
+ private long finishTime = 0;
// Set to null initially. Will eventually get set
// if an RMAppAttemptUnregistrationEvent occurs
@@ -733,6 +733,7 @@ public void recover(RMState state) throws Exception {
this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl);
this.finalStatus = attemptState.getFinalApplicationStatus();
this.startTime = attemptState.getStartTime();
+ this.finishTime = attemptState.getFinishTime();
}
public void transferStateFromPreviousAttempt(RMAppAttempt attempt) {
@@ -1019,10 +1020,12 @@ private void rememberTargetTransitionsAndStoreState(RMAppAttemptEvent event,
}
RMStateStore rmStore = rmContext.getStateStore();
+ setFinishTime(System.currentTimeMillis());
ApplicationAttemptState attemptState =
new ApplicationAttemptState(applicationAttemptId, getMasterContainer(),
rmStore.getCredentialsFromAppAttempt(this), startTime,
- stateToBeStored, finalTrackingUrl, diags, finalStatus, exitStatus);
+ stateToBeStored, finalTrackingUrl, diags, finalStatus, exitStatus,
+ getFinishTime());
LOG.info("Updating application attempt " + applicationAttemptId
+ " with final state: " + targetedFinalState + ", and exit status: "
+ exitStatus);
@@ -1732,4 +1735,23 @@ public RMAppAttemptMetrics getRMAppAttemptMetrics() {
// lock
return attemptMetrics;
}
+
+ @Override
+ public long getFinishTime() {
+ try {
+ this.readLock.lock();
+ return this.finishTime;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ private void setFinishTime(long finishTime) {
+ try {
+ this.writeLock.lock();
+ this.finishTime = finishTime;
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto
index eab6af1..8baf4dc 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto
@@ -78,6 +78,7 @@ message ApplicationAttemptStateDataProto {
optional int64 start_time = 7;
optional FinalApplicationStatusProto final_application_status = 8;
optional int32 am_container_exit_status = 9 [default = -1000];
+ optional int64 finish_time = 10;
}
message EpochProto {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 3817637..9eee8af 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -278,7 +278,15 @@ public RMApp submitApp(int masterMemory, String name, String user,
boolean waitForAccepted, boolean keepContainers) throws Exception {
return submitApp(masterMemory, name, user, acls, unmanaged, queue,
maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
- false, null);
+ false, null, 0);
+ }
+
+ public RMApp submitApp(int masterMemory, long slidingWindowSize) throws Exception {
+ return submitApp(masterMemory, "", UserGroupInformation.getCurrentUser()
+ .getShortUserName(), null, false, null,
+ super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false,
+ false, null, slidingWindowSize);
}
public RMApp submitApp(int masterMemory, String name, String user,
@@ -286,6 +294,16 @@ public RMApp submitApp(int masterMemory, String name, String user,
int maxAppAttempts, Credentials ts, String appType,
boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
ApplicationId applicationId) throws Exception {
+ return submitApp(masterMemory, name, user, acls, unmanaged, queue,
+ maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
+ isAppIdProvided, applicationId, 0);
+ }
+
+ public RMApp submitApp(int masterMemory, String name, String user,
+ Map acls, boolean unmanaged, String queue,
+ int maxAppAttempts, Credentials ts, String appType,
+ boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
+ ApplicationId applicationId, long slidingWindowSize) throws Exception {
ApplicationId appId = isAppIdProvided ? applicationId : null;
ApplicationClientProtocol client = getClientRMService();
if (! isAppIdProvided) {
@@ -321,6 +339,7 @@ public RMApp submitApp(int masterMemory, String name, String user,
clc.setTokens(securityTokens);
}
sub.setAMContainerSpec(clc);
+ sub.setSlidingWindowSize(slidingWindowSize);
req.setApplicationSubmissionContext(sub);
UserGroupInformation fakeUser =
UserGroupInformation.createUserForTesting(user, new String[] {"someGroup"});
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
index 6c5c818..0ab581f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
@@ -584,4 +584,125 @@ public void testRMRestartOrFailoverNotCountedForAMFailures()
rm1.stop();
rm2.stop();
}
+
+ @Test (timeout = 50000)
+ public void testRMAppSlidingWindow() throws Exception {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+ conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+ // explicitly set max-am-retry count as 2.
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+
+ MockRM rm1 = new MockRM(conf, memStore);
+ rm1.start();
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
+ nm1.registerNode();
+
+ // set window size to a larger number : 20s
+ // we will verify the app should be failed if
+ // two continuous attempts failed in 20s.
+ RMApp app = rm1.submitApp(200, 20000);
+ MockAM am = MockRM.launchAM(app, rm1, nm1);
+ // Fail current attempt normally
+ nm1.nodeHeartbeat(am.getApplicationAttemptId(),
+ 1, ContainerState.COMPLETE);
+ am.waitForState(RMAppAttemptState.FAILED);
+ // launch the second attempt
+ rm1.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
+ Assert.assertEquals(2, app.getAppAttempts().size());
+ Assert.assertTrue(((RMAppAttemptImpl) app.getCurrentAppAttempt())
+ .mayBeLastAttempt());
+ MockAM am_2 = MockRM.launchAndRegisterAM(app, rm1, nm1);
+ am_2.waitForState(RMAppAttemptState.RUNNING);
+ nm1.nodeHeartbeat(am_2.getApplicationAttemptId(),
+ 1, ContainerState.COMPLETE);
+ am_2.waitForState(RMAppAttemptState.FAILED);
+ // current app should be failed.
+ rm1.waitForState(app.getApplicationId(), RMAppState.FAILED);
+
+ // set window size to 6s
+ RMApp app1 = rm1.submitApp(200, 6000);
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ // Fail attempt1 normally
+ nm1.nodeHeartbeat(am1.getApplicationAttemptId(),
+ 1, ContainerState.COMPLETE);
+ am1.waitForState(RMAppAttemptState.FAILED);
+
+ // launch the second attempt
+ rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+ Assert.assertEquals(2, app1.getAppAttempts().size());
+ RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
+ Assert.assertTrue(((RMAppAttemptImpl) attempt2).mayBeLastAttempt());
+ MockAM am2 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+ am2.waitForState(RMAppAttemptState.RUNNING);
+
+ // wait for 6 seconds
+ Thread.sleep(6000);
+
+ // Fail attempt2 normally
+ nm1.nodeHeartbeat(am2.getApplicationAttemptId(),
+ 1, ContainerState.COMPLETE);
+ am2.waitForState(RMAppAttemptState.FAILED);
+
+ // can launch the third attempt successfully
+ rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+ Assert.assertEquals(3, app1.getAppAttempts().size());
+ RMAppAttempt attempt3 = app1.getCurrentAppAttempt();
+ MockAM am3 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+ am3.waitForState(RMAppAttemptState.RUNNING);
+
+ // Restart rm.
+ @SuppressWarnings("resource")
+ MockRM rm2 = new MockRM(conf, memStore);
+ rm2.start();
+
+ // re-register the NM
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+ NMContainerStatus status = Records.newRecord(NMContainerStatus.class);
+ status
+ .setContainerExitStatus(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER);
+ status.setContainerId(attempt3.getMasterContainer().getId());
+ status.setContainerState(ContainerState.COMPLETE);
+ status.setDiagnostics("");
+ nm1.registerNode(Collections.singletonList(status), null);
+
+ rm2.waitForState(attempt3.getAppAttemptId(), RMAppAttemptState.FAILED);
+
+ rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+
+ // Lauch Attempt 4
+ MockAM am4 =
+ rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 4, nm1);
+
+ // wait for 6 seconds
+ Thread.sleep(6000);
+
+ // Fail attempt4 normally
+ nm1
+ .nodeHeartbeat(am4.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+ am4.waitForState(RMAppAttemptState.FAILED);
+
+ // can launch the 5th attempt successfully
+ rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+
+ MockAM am5 =
+ rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 5, nm1);
+ am5.waitForState(RMAppAttemptState.RUNNING);
+
+ // Fail attempt5 normally
+ nm1
+ .nodeHeartbeat(am5.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+ am5.waitForState(RMAppAttemptState.FAILED);
+
+ rm2.waitForState(app1.getApplicationId(), RMAppState.FAILED);
+ rm1.stop();
+ rm2.stop();
+ }
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
index 620ba9f..e408870 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
@@ -304,7 +304,8 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
oldAttemptState.getAppAttemptCredentials(),
oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
"myTrackingUrl", "attemptDiagnostics",
- FinalApplicationStatus.SUCCEEDED, 100);
+ FinalApplicationStatus.SUCCEEDED, 100,
+ oldAttemptState.getFinishTime());
store.updateApplicationAttemptState(newAttemptState);
// test updating the state of an app/attempt whose initial state was not
@@ -327,7 +328,8 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
oldAttemptState.getAppAttemptCredentials(),
oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
"myTrackingUrl", "attemptDiagnostics",
- FinalApplicationStatus.SUCCEEDED, 111);
+ FinalApplicationStatus.SUCCEEDED, 111,
+ oldAttemptState.getFinishTime());
store.updateApplicationAttemptState(dummyAttempt);
// let things settle down