diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
index 529df11..543339d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
@@ -26,6 +26,7 @@
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.api.AMRetryCountResetPolicy;
import org.apache.hadoop.yarn.util.Records;
import java.util.Set;
@@ -35,17 +36,19 @@
* information needed by the ResourceManager to launch
* the ApplicationMaster for an application.
*
- * It includes details such as:
- *
- * - {@link ApplicationId} of the application.
- * - Application user.
- * - Application name.
- * - {@link Priority} of the application.
- * -
- * {@link ContainerLaunchContext} of the container in which the
- *
ApplicationMaster is executed.
- *
- *
+ *
+ * It includes details such as:
+ *
+ * - {@link ApplicationId} of the application.
+ * - Application user.
+ * - Application name.
+ * - {@link Priority} of the application.
+ * -
+ * {@link ContainerLaunchContext} of the container in which the
+ *
ApplicationMaster is executed.
+ * -
+ * AM retry count reset policy.{@link AMRetryCountResetPolicy}
+ *
*
*
* @see ContainerLaunchContext
@@ -62,7 +65,7 @@ public static ApplicationSubmissionContext newInstance(
Priority priority, ContainerLaunchContext amContainer,
boolean isUnmanagedAM, boolean cancelTokensWhenComplete,
int maxAppAttempts, Resource resource, String applicationType,
- boolean keepContainers) {
+ boolean keepContainers, AMRetryCountResetPolicy resetPolicy) {
ApplicationSubmissionContext context =
Records.newRecord(ApplicationSubmissionContext.class);
context.setApplicationId(applicationId);
@@ -76,6 +79,7 @@ public static ApplicationSubmissionContext newInstance(
context.setResource(resource);
context.setApplicationType(applicationType);
context.setKeepContainersAcrossApplicationAttempts(keepContainers);
+ context.setAMRetryCountResetPolicy(resetPolicy);
return context;
}
@@ -85,6 +89,19 @@ public static ApplicationSubmissionContext newInstance(
ApplicationId applicationId, String applicationName, String queue,
Priority priority, ContainerLaunchContext amContainer,
boolean isUnmanagedAM, boolean cancelTokensWhenComplete,
+ int maxAppAttempts, Resource resource, String applicationType,
+ boolean keepContainers) {
+ return newInstance(applicationId, applicationName, queue, priority,
+ amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts,
+ resource, applicationType, keepContainers, null);
+ }
+
+ @Public
+ @Stable
+ public static ApplicationSubmissionContext newInstance(
+ ApplicationId applicationId, String applicationName, String queue,
+ Priority priority, ContainerLaunchContext amContainer,
+ boolean isUnmanagedAM, boolean cancelTokensWhenComplete,
int maxAppAttempts, Resource resource, String applicationType) {
return newInstance(applicationId, applicationName, queue, priority,
amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts,
@@ -338,4 +355,13 @@ public abstract void setKeepContainersAcrossApplicationAttempts(
@Public
@Stable
public abstract void setApplicationTags(Set tags);
+
+ @Public
+ @Unstable
+ public abstract AMRetryCountResetPolicy getAMRetryCountResetPolicy();
+
+ @Public
+ @Unstable
+ public abstract void setAMRetryCountResetPolicy(
+ AMRetryCountResetPolicy policyObject);
}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/AMRetryCountResetPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/AMRetryCountResetPolicy.java
new file mode 100644
index 0000000..c094992
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/AMRetryCountResetPolicy.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api;
+
+import java.io.Serializable;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * We have WindowsSlideAMRetryCountResetPolicy available to use.
+ * To use this policy, you need to initiate it by
+ * passing a parameter which is used to define period of time in milliseconds
+ * that AM retry count will be reset.
+ */
+@Private
+@Unstable
+public interface AMRetryCountResetPolicy extends Serializable{
+
+ public void start();
+
+ public void stop();
+
+ public void amRetryCountReset();
+
+ public void recover();
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto
index 2eb6148..18aa108 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto
@@ -128,6 +128,7 @@ message ApplicationAttemptStateDataProto {
optional int64 start_time = 7;
optional FinalApplicationStatusProto final_application_status = 8;
optional int32 am_container_exit_status = 9 [default = -1000];
+ optional int64 end_time = 10;
}
message RMStateVersionProto {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 3f1fa6c..d1bdcd2 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -288,6 +288,7 @@ message ApplicationSubmissionContextProto {
optional string applicationType = 10 [default = "YARN"];
optional bool keep_containers_across_application_attempts = 11 [default = false];
repeated string applicationTags = 12;
+ optional bytes am_retrycount_reset_policy = 13;
}
enum ApplicationAccessTypeProto {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
index c4a3a72..a47b5f4 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.api.records.impl.pb;
import com.google.common.base.CharMatcher;
+
+import org.apache.commons.lang.SerializationUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -33,9 +35,12 @@
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
+import org.apache.hadoop.yarn.server.api.AMRetryCountResetPolicy;
+import com.google.protobuf.ByteString;
import com.google.protobuf.TextFormat;
+import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Set;
@@ -53,6 +58,7 @@
private ContainerLaunchContext amContainer = null;
private Resource resource = null;
private Set applicationTags = null;
+ private AMRetryCountResetPolicy resetPolicy = null;
public ApplicationSubmissionContextPBImpl() {
builder = ApplicationSubmissionContextProto.newBuilder();
@@ -109,6 +115,10 @@ private void mergeLocalToBuilder() {
if (this.applicationTags != null && !this.applicationTags.isEmpty()) {
builder.addAllApplicationTags(this.applicationTags);
}
+ if (this.resetPolicy != null) {
+ builder.setAmRetrycountResetPolicy(convertToProtoFormat(ByteBuffer
+ .wrap(SerializationUtils.serialize(this.resetPolicy))));
+ }
}
private void mergeLocalToProto() {
@@ -401,4 +411,41 @@ private ResourcePBImpl convertFromProtoFormat(ResourceProto p) {
private ResourceProto convertToProtoFormat(Resource t) {
return ((ResourcePBImpl)t).getProto();
}
+
+ @Override
+ public AMRetryCountResetPolicy getAMRetryCountResetPolicy() {
+ ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.resetPolicy != null) {
+ return this.resetPolicy;
+ }
+ if (!p.hasAmRetrycountResetPolicy()) {
+ return null;
+ }
+ try {
+ this.resetPolicy =
+ (AMRetryCountResetPolicy) SerializationUtils
+ .deserialize(convertFromProtoFormat(p.getAmRetrycountResetPolicy())
+ .array());
+ } catch (Exception ex) {
+ return null;
+ }
+ return this.resetPolicy;
+ }
+
+ @Override
+ public void setAMRetryCountResetPolicy(AMRetryCountResetPolicy policyObject) {
+ maybeInitBuilder();
+ if (resetPolicy == null) {
+ builder.clearAmRetrycountResetPolicy();
+ }
+ this.resetPolicy = policyObject;
+ }
+
+ protected final ByteBuffer convertFromProtoFormat(ByteString byteString) {
+ return ProtoUtils.convertFromProtoFormat(byteString);
+ }
+
+ protected final ByteString convertToProtoFormat(ByteBuffer byteBuffer) {
+ return ProtoUtils.convertToProtoFormat(byteBuffer);
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
index b315a84..160b9e2 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
@@ -240,6 +240,7 @@ private void loadRMAppState(RMState rmState) throws Exception {
new ApplicationAttemptState(attemptId,
attemptStateData.getMasterContainer(), credentials,
attemptStateData.getStartTime(),
+ attemptStateData.getEndTime(),
attemptStateData.getState(),
attemptStateData.getFinalTrackingUrl(),
attemptStateData.getDiagnostics(),
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
index 6b5b602..ec49014 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
@@ -158,8 +158,8 @@ public synchronized void updateApplicationAttemptStateInternal(
ApplicationAttemptState updatedAttemptState =
new ApplicationAttemptState(appAttemptId,
attemptStateData.getMasterContainer(), credentials,
- attemptStateData.getStartTime(), attemptStateData.getState(),
- attemptStateData.getFinalTrackingUrl(),
+ attemptStateData.getStartTime(), attemptStateData.getEndTime(),
+ attemptStateData.getState(), attemptStateData.getFinalTrackingUrl(),
attemptStateData.getDiagnostics(),
attemptStateData.getFinalApplicationStatus(),
attemptStateData.getAMContainerExitStatus());
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
index 9b05ea1..d98d8e9 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
@@ -53,8 +53,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNewSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@@ -257,6 +255,7 @@ public RMStateStore() {
final Container masterContainer;
final Credentials appAttemptCredentials;
long startTime = 0;
+ long endTime = 0;
// fields set when attempt completes
RMAppAttemptState state;
String finalTrackingUrl = "N/A";
@@ -267,19 +266,20 @@ public RMStateStore() {
public ApplicationAttemptState(ApplicationAttemptId attemptId,
Container masterContainer, Credentials appAttemptCredentials,
long startTime) {
- this(attemptId, masterContainer, appAttemptCredentials, startTime, null,
- null, "", null, ContainerExitStatus.INVALID);
+ this(attemptId, masterContainer, appAttemptCredentials, startTime,
+ -1, null, null, "", null, ContainerExitStatus.INVALID);
}
public ApplicationAttemptState(ApplicationAttemptId attemptId,
Container masterContainer, Credentials appAttemptCredentials,
- long startTime, RMAppAttemptState state, String finalTrackingUrl,
- String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus,
- int exitStatus) {
+ long startTime, long endTime, RMAppAttemptState state,
+ String finalTrackingUrl, String diagnostics,
+ FinalApplicationStatus amUnregisteredFinalStatus, int exitStatus) {
this.attemptId = attemptId;
this.masterContainer = masterContainer;
this.appAttemptCredentials = appAttemptCredentials;
this.startTime = startTime;
+ this.endTime = endTime;
this.state = state;
this.finalTrackingUrl = finalTrackingUrl;
this.diagnostics = diagnostics == null ? "" : diagnostics;
@@ -308,6 +308,9 @@ public String getDiagnostics() {
public long getStartTime() {
return startTime;
}
+ public long getEndTime() {
+ return endTime;
+ }
public FinalApplicationStatus getFinalApplicationStatus() {
return amUnregisteredFinalStatus;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index 01bca39..5bc8fc5 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -567,7 +567,8 @@ private void loadApplicationAttemptState(ApplicationState appState,
ApplicationAttemptState attemptState =
new ApplicationAttemptState(attemptId,
attemptStateData.getMasterContainer(), credentials,
- attemptStateData.getStartTime(), attemptStateData.getState(),
+ attemptStateData.getStartTime(), attemptStateData.getEndTime(),
+ attemptStateData.getState(),
attemptStateData.getFinalTrackingUrl(),
attemptStateData.getDiagnostics(),
attemptStateData.getFinalApplicationStatus(),
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java
index 90fb3ec..011016a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java
@@ -41,9 +41,10 @@
public abstract class ApplicationAttemptStateData {
public static ApplicationAttemptStateData newInstance(
ApplicationAttemptId attemptId, Container container,
- ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState,
- String finalTrackingUrl, String diagnostics,
- FinalApplicationStatus amUnregisteredFinalStatus, int exitStatus) {
+ ByteBuffer attemptTokens, long startTime, long endTime,
+ RMAppAttemptState finalState, String finalTrackingUrl,
+ String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus,
+ int exitStatus) {
ApplicationAttemptStateData attemptStateData =
Records.newRecord(ApplicationAttemptStateData.class);
attemptStateData.setAttemptId(attemptId);
@@ -53,6 +54,7 @@ public static ApplicationAttemptStateData newInstance(
attemptStateData.setFinalTrackingUrl(finalTrackingUrl);
attemptStateData.setDiagnostics(diagnostics);
attemptStateData.setStartTime(startTime);
+ attemptStateData.setEndTime(endTime);
attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus);
attemptStateData.setAMContainerExitStatus(exitStatus);
return attemptStateData;
@@ -69,9 +71,9 @@ public static ApplicationAttemptStateData newInstance(
}
return newInstance(attemptState.getAttemptId(),
attemptState.getMasterContainer(), appAttemptTokens,
- attemptState.getStartTime(), attemptState.getState(),
- attemptState.getFinalTrackingUrl(), attemptState.getDiagnostics(),
- attemptState.getFinalApplicationStatus(),
+ attemptState.getStartTime(), attemptState.getEndTime(),
+ attemptState.getState(), attemptState.getFinalTrackingUrl(),
+ attemptState.getDiagnostics(), attemptState.getFinalApplicationStatus(),
attemptState.getAMContainerExitStatus());
}
@@ -146,6 +148,14 @@ public static ApplicationAttemptStateData newInstance(
public abstract void setStartTime(long startTime);
/**
+ * Get the end time of the application.
+ * @return end time of the application
+ */
+ public abstract long getEndTime();
+
+ public abstract void setEndTime(long endTime);
+
+ /**
* Get the final finish status of the application.
* @return final finish status of the application
*/
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java
index a90bda4..2db8491 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java
@@ -229,6 +229,18 @@ public void setStartTime(long startTime) {
}
@Override
+ public long getEndTime() {
+ ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
+ return p.getEndTime();
+ }
+
+ @Override
+ public void setEndTime(long endTime) {
+ maybeInitBuilder();
+ builder.setEndTime(endTime);
+ }
+
+ @Override
public FinalApplicationStatus getFinalApplicationStatus() {
ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasFinalApplicationStatus()) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
index 2b590a0..eb61a98 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
@@ -22,6 +22,7 @@
import java.util.Map;
import java.util.Set;
+import org.apache.hadoop.yarn.server.api.AMRetryCountResetPolicy;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -223,4 +224,11 @@ ApplicationReport createAndGetApplicationReport(String clientUserName,
* @return the external user-facing state of ApplicationMaster.
*/
YarnApplicationState createApplicationState();
+
+ /**
+ * Used by {@link AMRetryCountResetPolicy} to reset the retryCount
+ * @param retryCount
+ */
+ void setRetryCount(int retryCount);
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index bff41cf..be276bb 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -124,6 +124,8 @@
private RMAppState targetedFinalState;
private RMAppState recoveredFinalState;
+ private int attemptResetCount = 0;
+
Object transitionTodo;
private static final StateMachineFactory= this.maxAppAttempts) {
+ } else if (getNumFailedAppAttempts() - this.attemptResetCount
+ >= this.maxAppAttempts) {
msg = "Application " + this.getApplicationId() + " failed "
+ this.maxAppAttempts + " times due to "
+ failedEvent.getDiagnostics() + ". Failing the application.";
@@ -1131,7 +1136,8 @@ public AttemptFailedTransition(RMAppState initialState) {
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
if (!app.submissionContext.getUnmanagedAM()
- && app.getNumFailedAppAttempts() < app.maxAppAttempts) {
+ && app.getNumFailedAppAttempts() - app.attemptResetCount
+ < app.maxAppAttempts) {
boolean transferStateFromPreviousAttempt = false;
RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
transferStateFromPreviousAttempt =
@@ -1193,7 +1199,7 @@ public static boolean isAppInFinalState(RMApp rmApp) {
return appState == RMAppState.FAILED || appState == RMAppState.FINISHED
|| appState == RMAppState.KILLED;
}
-
+
private RMAppState getRecoveredFinalState() {
return this.recoveredFinalState;
}
@@ -1202,4 +1208,14 @@ private RMAppState getRecoveredFinalState() {
public Set getRanNodes() {
return ranNodes;
}
+
+ @Override
+ public void setRetryCount(int retryCount) {
+ try {
+ this.writeLock.lock();
+ this.attemptResetCount = retryCount;
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
index 68a068b..08c4f79 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
@@ -174,6 +174,12 @@
long getStartTime();
/**
+ * the end time of the application.
+ * @return the end time of the application.
+ */
+ long getEndTime();
+
+ /**
* The current state of the {@link RMAppAttempt}.
*
* @return the current state {@link RMAppAttemptState} for this application
@@ -207,4 +213,8 @@
*
*/
boolean shouldCountTowardsMaxAttemptRetry();
+
+ boolean mayBeLastAttempt();
+
+ void setMaybeLastAttemptFlag(boolean maybeLastAttempt);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index ef572d4..08996b7 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -37,6 +37,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
@@ -60,6 +61,7 @@
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.AMRetryCountResetPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
@@ -84,6 +86,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.policy.WindowsSlideAMRetryCountResetPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
@@ -141,6 +144,7 @@
private String originalTrackingUrl = "N/A";
private String proxiedTrackingUrl = "N/A";
private long startTime = 0;
+ private long endTime = -1;
// Set to null initially. Will eventually get set
// if an RMAppAttemptUnregistrationEvent occurs
@@ -154,7 +158,8 @@
// re-created if this attempt is eventually failed because of preemption,
// hardware error or NM resync. So this flag indicates that this may be
// last attempt.
- private final boolean maybeLastAttempt;
+ private boolean maybeLastAttempt;
+ private AMRetryCountResetPolicy resetPolicy = null;
private static final ExpiredTransition EXPIRED_TRANSITION =
new ExpiredTransition();
@@ -411,6 +416,14 @@ public RMAppAttemptImpl(ApplicationAttemptId appAttemptId,
this.proxiedTrackingUrl = generateProxyUriWithScheme(null);
this.maybeLastAttempt = maybeLastAttempt;
this.stateMachine = stateMachineFactory.make(this);
+ this.resetPolicy = submissionContext.getAMRetryCountResetPolicy();
+ if (this.resetPolicy != null) {
+ if (this.resetPolicy instanceof WindowsSlideAMRetryCountResetPolicy) {
+ ((WindowsSlideAMRetryCountResetPolicy) this.resetPolicy)
+ .setup((RMAppImpl) this.rmContext.getRMApps().get(
+ appAttemptId.getApplicationId()));
+ }
+ }
}
@Override
@@ -694,6 +707,10 @@ public void recover(RMState state) throws Exception {
this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl);
this.finalStatus = attemptState.getFinalApplicationStatus();
this.startTime = attemptState.getStartTime();
+ this.endTime = attemptState.getEndTime();
+ if (this.resetPolicy != null) {
+ this.resetPolicy.recover();
+ }
}
public void transferStateFromPreviousAttempt(RMAppAttempt attempt) {
@@ -984,10 +1001,12 @@ private void rememberTargetTransitionsAndStoreState(RMAppAttemptEvent event,
}
RMStateStore rmStore = rmContext.getStateStore();
+ this.setEndTime(System.currentTimeMillis());
ApplicationAttemptState attemptState =
new ApplicationAttemptState(applicationAttemptId, getMasterContainer(),
rmStore.getCredentialsFromAppAttempt(this), startTime,
- stateToBeStored, finalTrackingUrl, diags, finalStatus, exitStatus);
+ this.getEndTime(), stateToBeStored, finalTrackingUrl, diags,
+ finalStatus, exitStatus);
LOG.info("Updating application attempt " + applicationAttemptId
+ " with final state: " + targetedFinalState + ", and exit status: "
+ exitStatus);
@@ -1036,6 +1055,12 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
((MultipleArcTransition) appAttempt.transitionTodo).transition(
appAttempt, causeEvent);
}
+
+ // Stop AMRetryCountResetPolicy
+ if (appAttempt.resetPolicy != null) {
+ appAttempt.resetPolicy.stop();
+ }
+
return appAttempt.targetedFinalState;
}
}
@@ -1233,6 +1258,11 @@ public void transition(RMAppAttemptImpl appAttempt,
appAttempt.rmContext.getRMApplicationHistoryWriter()
.applicationAttemptStarted(appAttempt);
+
+ // Start AMRetryCountResetPolicy
+ if (appAttempt.resetPolicy != null) {
+ appAttempt.resetPolicy.start();
+ }
}
}
@@ -1609,6 +1639,25 @@ public long getStartTime() {
}
@Override
+ public long getEndTime() {
+ try {
+ this.readLock.lock();
+ return this.endTime;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ private void setEndTime(long endTime) {
+ try {
+ this.writeLock.lock();
+ this.endTime = endTime;
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ @Override
public RMAppAttemptState getState() {
this.readLock.lock();
@@ -1698,7 +1747,23 @@ public ApplicationAttemptReport createApplicationAttemptReport() {
}
// for testing
+ @Override
public boolean mayBeLastAttempt() {
- return maybeLastAttempt;
+ try {
+ this.readLock.lock();
+ return maybeLastAttempt;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Private
+ public void setMaybeLastAttemptFlag(boolean maybeLastAttempt) {
+ try {
+ this.writeLock.lock();
+ this.maybeLastAttempt = maybeLastAttempt;
+ } finally {
+ this.writeLock.unlock();
+ }
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/policy/WindowsSlideAMRetryCountResetPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/policy/WindowsSlideAMRetryCountResetPolicy.java
new file mode 100644
index 0000000..7d1d719
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/policy/WindowsSlideAMRetryCountResetPolicy.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp.policy;
+
+import java.util.Map.Entry;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.server.api.AMRetryCountResetPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+
+/**
+ * The policy is used to reset AM retry count if current AppMaster successfully
+ * runs for a period of time. To use this policy, you need to initiate it by
+ * passing a parameter which is used to define period of time in milliseconds
+ * that AM retry count will be reset.
+ */
+public class WindowsSlideAMRetryCountResetPolicy implements
+ AMRetryCountResetPolicy {
+
+ private static final long serialVersionUID = 1529666453440395104L;
+
+ private final int resetWindow;
+
+ private Timer timer;
+
+ private RMAppImpl rmApp;
+
+ public WindowsSlideAMRetryCountResetPolicy(int resetWindow) {
+ super();
+ this.resetWindow = resetWindow;
+ if (this.resetWindow <= 0) {
+ throw new IllegalArgumentException("The parameter should larger than 0");
+ }
+ this.timer = null;
+ }
+
+ @Override
+ public synchronized void amRetryCountReset() {
+ // The formula we are using to calculate the lastAttempt:
+ // maxAppAttempts == getNumFailedAppAttempts() - attemptResetCount.
+ // To do the AM retry count reset, just set it
+ // as current number of FailedAppAttempts.
+ int attemptFailureCount = 0;
+ for (Entry entry : this.rmApp
+ .getAppAttempts().entrySet()) {
+ if (entry.getKey() != this.rmApp.getCurrentAppAttempt().getAppAttemptId()
+ && entry.getValue().shouldCountTowardsMaxAttemptRetry()) {
+ attemptFailureCount++;
+ }
+ }
+ rmApp.setRetryCount(attemptFailureCount);
+ if (this.rmApp.getCurrentAppAttempt().mayBeLastAttempt()) {
+ this.rmApp.getCurrentAppAttempt().setMaybeLastAttemptFlag(false);
+ }
+ }
+
+ public int getResetWindow() {
+ return resetWindow;
+ }
+
+ @Override
+ public synchronized void start() {
+ this.timer = new Timer();
+ this.timer.scheduleAtFixedRate(new retryCountResetActivator(), resetWindow,
+ resetWindow);
+ }
+
+ @Override
+ public synchronized void stop() {
+ if (this.timer != null) {
+ this.timer.cancel();
+ }
+ }
+
+ private class retryCountResetActivator extends TimerTask {
+ @Override
+ public void run() {
+ amRetryCountReset();
+ }
+ }
+
+ public synchronized void setup(RMAppImpl rmApp) {
+ this.rmApp = rmApp;
+ }
+
+ @Override
+ public synchronized void recover() {
+ int attemptFailureCount = 0;
+ if (this.rmApp.getCurrentAppAttempt().getEndTime()
+ - this.rmApp.getCurrentAppAttempt().getStartTime() >= resetWindow) {
+ for (Entry entry : this.rmApp
+ .getAppAttempts().entrySet()) {
+ if (entry.getKey() != this.rmApp.getCurrentAppAttempt()
+ .getAppAttemptId()
+ && entry.getValue().shouldCountTowardsMaxAttemptRetry()) {
+ attemptFailureCount++;
+ }
+ }
+ this.rmApp.setRetryCount(attemptFailureCount);
+ }
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 6949a81..484c7c3 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -53,6 +53,7 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.AMRetryCountResetPolicy;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
@@ -283,6 +284,25 @@ public RMApp submitApp(int masterMemory, String name, String user,
int maxAppAttempts, Credentials ts, String appType,
boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
ApplicationId applicationId) throws Exception {
+ return submitApp(masterMemory, name, user, acls, unmanaged, queue,
+ maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
+ isAppIdProvided, applicationId, null);
+ }
+
+ public RMApp submitApp(int masterMemory, AMRetryCountResetPolicy resetPolicy)
+ throws Exception {
+ return submitApp(masterMemory, "", UserGroupInformation.getCurrentUser()
+ .getShortUserName(), null, false, null,
+ super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false,
+ false, null, resetPolicy);
+ }
+
+ public RMApp submitApp(int masterMemory, String name, String user,
+ Map acls, boolean unmanaged, String queue,
+ int maxAppAttempts, Credentials ts, String appType,
+ boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
+ ApplicationId applicationId, AMRetryCountResetPolicy resetPolicy) throws Exception {
ApplicationId appId = isAppIdProvided ? applicationId : null;
ApplicationClientProtocol client = getClientRMService();
if (! isAppIdProvided) {
@@ -318,6 +338,9 @@ public RMApp submitApp(int masterMemory, String name, String user,
clc.setTokens(securityTokens);
}
sub.setAMContainerSpec(clc);
+ if (resetPolicy != null) {
+ sub.setAMRetryCountResetPolicy(resetPolicy);
+ }
req.setApplicationSubmissionContext(sub);
UserGroupInformation fakeUser =
UserGroupInformation.createUserForTesting(user, new String[] {"someGroup"});
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
index 4349a23..5656a5e 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
@@ -165,6 +165,11 @@ public YarnApplicationState createApplicationState() {
public Set getRanNodes() {
throw new UnsupportedOperationException("Not supported yet.");
}
+
+ @Override
+ public void setRetryCount(int retryCount) {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
}
public static RMApp newApplication(int i) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
index 49d7135..bf4b256 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
@@ -307,8 +307,8 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
new ApplicationAttemptState(oldAttemptState.getAttemptId(),
oldAttemptState.getMasterContainer(),
oldAttemptState.getAppAttemptCredentials(),
- oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
- "myTrackingUrl", "attemptDiagnostics",
+ oldAttemptState.getStartTime(), oldAttemptState.getEndTime(),
+ RMAppAttemptState.FINISHED, "myTrackingUrl", "attemptDiagnostics",
FinalApplicationStatus.SUCCEEDED, 100);
store.updateApplicationAttemptState(newAttemptState);
@@ -330,8 +330,8 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
new ApplicationAttemptState(dummyAttemptId,
oldAttemptState.getMasterContainer(),
oldAttemptState.getAppAttemptCredentials(),
- oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
- "myTrackingUrl", "attemptDiagnostics",
+ oldAttemptState.getStartTime(), oldAttemptState.getEndTime(),
+ RMAppAttemptState.FINISHED, "myTrackingUrl", "attemptDiagnostics",
FinalApplicationStatus.SUCCEEDED, 111);
store.updateApplicationAttemptState(dummyAttempt);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
index 8f26d10..94d8096 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
@@ -238,4 +238,9 @@ public YarnApplicationState createApplicationState() {
public Set getRanNodes() {
return null;
}
+
+ @Override
+ public void setRetryCount(int retryCount) {
+ // Do nothing
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppRetryCountResetPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppRetryCountResetPolicy.java
new file mode 100644
index 0000000..3f0d9af
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppRetryCountResetPolicy.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
+
+import java.util.Collections;
+
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.policy.WindowsSlideAMRetryCountResetPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestRMAppRetryCountResetPolicy {
+
+ @Test (timeout = 50000)
+ public void testWindowsSlideAMRetryCountResetPolicy() throws Exception {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+ conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+ // explicitly set max-am-retry count as 1.
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+
+ MockRM rm1 = new MockRM(conf, memStore);
+ rm1.start();
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
+ nm1.registerNode();
+ WindowsSlideAMRetryCountResetPolicy resetPolicy =
+ new WindowsSlideAMRetryCountResetPolicy(4000);
+ RMApp app1 = rm1.submitApp(200, resetPolicy);
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ // Fail attempt1 normally
+ nm1
+ .nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+ am1.waitForState(RMAppAttemptState.FAILED);
+ // launch the second attempt
+ rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+ Assert.assertEquals(2, app1.getAppAttempts().size());
+ RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
+ Assert.assertTrue(((RMAppAttemptImpl) attempt2).mayBeLastAttempt());
+ MockAM am2 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+ am2.waitForState(RMAppAttemptState.RUNNING);
+ // wait for 4 second for WindowsSlideAMRetryCountResetPolicy to reset
+ Thread.sleep(4000);
+ // Right now Attempt2 is not the last Attempt
+ Assert.assertTrue(!((RMAppAttemptImpl) attempt2).mayBeLastAttempt());
+
+ // Fail attempt2 normally
+ nm1
+ .nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+ am2.waitForState(RMAppAttemptState.FAILED);
+
+ // launch the third attempt
+ rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+ Assert.assertEquals(3, app1.getAppAttempts().size());
+ RMAppAttempt attempt3 = app1.getCurrentAppAttempt();
+ Assert.assertTrue(((RMAppAttemptImpl) attempt3).mayBeLastAttempt());
+ MockAM am3 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+ am3.waitForState(RMAppAttemptState.RUNNING);
+
+ // Restart rm.
+ @SuppressWarnings("resource")
+ MockRM rm2 = new MockRM(conf, memStore);
+ rm2.start();
+
+ // re-register the NM
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+ NMContainerStatus status = Records.newRecord(NMContainerStatus.class);
+ status
+ .setContainerExitStatus(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER);
+ status.setContainerId(attempt3.getMasterContainer().getId());
+ status.setContainerState(ContainerState.COMPLETE);
+ status.setDiagnostics("");
+ nm1.registerNode(Collections.singletonList(status), null);
+
+ rm2.waitForState(attempt3.getAppAttemptId(), RMAppAttemptState.FAILED);
+
+ rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+
+ // Lauch Attempt 4 and it should be the last Attempt
+ MockAM am4 =
+ rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 4, nm1);
+ RMAppAttempt attempt4 =
+ rm2.getRMContext().getRMApps().get(app1.getApplicationId())
+ .getCurrentAppAttempt();
+ Assert.assertTrue(((RMAppAttemptImpl) attempt4).mayBeLastAttempt());
+
+ // wait for 2 second for WindowsSlideAMRetryCountResetPolicy to reset
+ Thread.sleep(4000);
+ // Right now Attempt4 is not the last Attempt
+ Assert.assertTrue(!((RMAppAttemptImpl) attempt4).mayBeLastAttempt());
+
+ // Fail attempt4 normally
+ nm1
+ .nodeHeartbeat(am4.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+ am4.waitForState(RMAppAttemptState.FAILED);
+ rm1.stop();
+ rm2.stop();
+ }
+}