diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AMRetryCountResetPolicyContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AMRetryCountResetPolicyContext.java
new file mode 100644
index 0000000..684c04c
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AMRetryCountResetPolicyContext.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.util.Records;
+
+@Public
+@Unstable
+public abstract class AMRetryCountResetPolicyContext {
+
+ @Public
+ @Unstable
+ public static AMRetryCountResetPolicyContext newInstance(
+ AMRetryCountResetPolicyType resetPolicy, long resetWindow) {
+ AMRetryCountResetPolicyContext policy =
+ Records.newRecord(AMRetryCountResetPolicyContext.class);
+ policy.setAMRetryCountResetPolicyType(resetPolicy);
+ policy.setResetWindow(resetWindow);
+ return policy;
+ }
+
+ /**
+ * Get AMRetryCountResetPolicyType.
+ *
+ * @see AMRetryCountResetPolicyType
+ */
+ @Public
+ @Stable
+ public abstract AMRetryCountResetPolicyType getAMRetryCountResetPolicyType();
+
+ /**
+ * Set AMRetryCountResetPolicyType.
+ *
+ * @see AMRetryCountResetPolicyType
+ */
+ @Public
+ @Stable
+ public abstract void setAMRetryCountResetPolicyType(
+ AMRetryCountResetPolicyType amRetryCountResetPolicyType);
+
+ /**
+ * Should specific it if WindowsSlideAMRetryCountResetPolicyType is used
+ *
+ * @see AMRetryCountResetPolicyType
+ */
+ @Public
+ @Stable
+ public abstract long getResetWindow();
+
+ @Public
+ @Stable
+ public abstract void setResetWindow(long resetWindow);
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AMRetryCountResetPolicyType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AMRetryCountResetPolicyType.java
new file mode 100644
index 0000000..9838dca
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AMRetryCountResetPolicyType.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * AM Retry Count Reset Policy types.
+ */
+@Public
+@Unstable
+public enum AMRetryCountResetPolicyType {
+
+ /**
+ * The policy is used to reset AM retry count if current AppMaster successfully
+ * runs for a period of time. Should specific resetWindow in milliseconds
+ */
+ WINDOWSSLIDE_AM_RETRYCOUNT_RESETPOLICY;
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
index 1ee04f0..5573dc5 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
@@ -35,17 +35,19 @@
* information needed by the ResourceManager to launch
* the ApplicationMaster for an application.
*
- * It includes details such as:
- *
- * - {@link ApplicationId} of the application.
- * - Application user.
- * - Application name.
- * - {@link Priority} of the application.
- * -
- * {@link ContainerLaunchContext} of the container in which the
- *
ApplicationMaster is executed.
- *
- *
+ *
+ * It includes details such as:
+ *
+ * - {@link ApplicationId} of the application.
+ * - Application user.
+ * - Application name.
+ * - {@link Priority} of the application.
+ * -
+ * {@link ContainerLaunchContext} of the container in which the
+ *
ApplicationMaster is executed.
+ * -
+ * AM retry count reset policy Context.{@link AMRetryCountResetPolicyContext}
+ *
*
*
* @see ContainerLaunchContext
@@ -62,7 +64,8 @@ public static ApplicationSubmissionContext newInstance(
Priority priority, ContainerLaunchContext amContainer,
boolean isUnmanagedAM, boolean cancelTokensWhenComplete,
int maxAppAttempts, Resource resource, String applicationType,
- boolean keepContainers) {
+ boolean keepContainers,
+ AMRetryCountResetPolicyContext resetPolicyContext) {
ApplicationSubmissionContext context =
Records.newRecord(ApplicationSubmissionContext.class);
context.setApplicationId(applicationId);
@@ -76,6 +79,7 @@ public static ApplicationSubmissionContext newInstance(
context.setResource(resource);
context.setApplicationType(applicationType);
context.setKeepContainersAcrossApplicationAttempts(keepContainers);
+ context.setAMRetryCountResetPolicyContext(resetPolicyContext);
return context;
}
@@ -85,6 +89,19 @@ public static ApplicationSubmissionContext newInstance(
ApplicationId applicationId, String applicationName, String queue,
Priority priority, ContainerLaunchContext amContainer,
boolean isUnmanagedAM, boolean cancelTokensWhenComplete,
+ int maxAppAttempts, Resource resource, String applicationType,
+ boolean keepContainers) {
+ return newInstance(applicationId, applicationName, queue, priority,
+ amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts,
+ resource, applicationType, keepContainers, null);
+ }
+
+ @Public
+ @Stable
+ public static ApplicationSubmissionContext newInstance(
+ ApplicationId applicationId, String applicationName, String queue,
+ Priority priority, ContainerLaunchContext amContainer,
+ boolean isUnmanagedAM, boolean cancelTokensWhenComplete,
int maxAppAttempts, Resource resource, String applicationType) {
return newInstance(applicationId, applicationName, queue, priority,
amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts,
@@ -338,4 +355,20 @@ public abstract void setKeepContainersAcrossApplicationAttempts(
@Public
@Stable
public abstract void setApplicationTags(Set tags);
-}
\ No newline at end of file
+
+ /**
+ * Get {@link AMRetryCountResetPolicyContext}
+ */
+ @Public
+ @Unstable
+ public abstract AMRetryCountResetPolicyContext
+ getAMRetryCountResetPolicyContext();
+
+ /**
+ * Set {@link AMRetryCountResetPolicyContext}
+ */
+ @Public
+ @Unstable
+ public abstract void setAMRetryCountResetPolicyContext(
+ AMRetryCountResetPolicyContext resetPolicyContext);
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto
index 2eb6148..18aa108 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto
@@ -128,6 +128,7 @@ message ApplicationAttemptStateDataProto {
optional int64 start_time = 7;
optional FinalApplicationStatusProto final_application_status = 8;
optional int32 am_container_exit_status = 9 [default = -1000];
+ optional int64 end_time = 10;
}
message RMStateVersionProto {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 3f1fa6c..f704d22 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -288,6 +288,16 @@ message ApplicationSubmissionContextProto {
optional string applicationType = 10 [default = "YARN"];
optional bool keep_containers_across_application_attempts = 11 [default = false];
repeated string applicationTags = 12;
+ optional AMRetryCountResetPolicyContextProto reset_policy_context = 13;
+}
+
+message AMRetryCountResetPolicyContextProto {
+ optional AMRetryCountResetPolicyTypeProto reset_policy = 1;
+ optional int64 reset_window = 2 [default = 0];
+}
+
+enum AMRetryCountResetPolicyTypeProto {
+ WINDOWSSLIDE_AM_RETRYCOUNT_RESETPOLICY=1;
}
enum ApplicationAccessTypeProto {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/AMRetryCountResetPolicyContextPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/AMRetryCountResetPolicyContextPBImpl.java
new file mode 100644
index 0000000..9d906ad
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/AMRetryCountResetPolicyContextPBImpl.java
@@ -0,0 +1,101 @@
+package org.apache.hadoop.yarn.api.records.impl.pb;
+
+import org.apache.hadoop.yarn.api.records.AMRetryCountResetPolicyContext;
+import org.apache.hadoop.yarn.api.records.AMRetryCountResetPolicyType;
+import org.apache.hadoop.yarn.proto.YarnProtos.AMRetryCountResetPolicyContextProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.AMRetryCountResetPolicyContextProtoOrBuilder;
+import com.google.protobuf.TextFormat;
+
+public class AMRetryCountResetPolicyContextPBImpl extends
+ AMRetryCountResetPolicyContext {
+
+ AMRetryCountResetPolicyContextProto proto =
+ AMRetryCountResetPolicyContextProto.getDefaultInstance();
+ AMRetryCountResetPolicyContextProto.Builder builder = null;
+ boolean viaProto = false;
+
+ public AMRetryCountResetPolicyContextPBImpl() {
+ builder = AMRetryCountResetPolicyContextProto.newBuilder();
+ }
+
+ public AMRetryCountResetPolicyContextPBImpl(
+ AMRetryCountResetPolicyContextProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public AMRetryCountResetPolicyContextProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = AMRetryCountResetPolicyContextProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null)
+ return false;
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+
+ @Override
+ public AMRetryCountResetPolicyType getAMRetryCountResetPolicyType() {
+ AMRetryCountResetPolicyContextProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasResetPolicy()) {
+ return null;
+ }
+ return ProtoUtils.convertFromProtoFormat(p.getResetPolicy());
+ }
+
+ @Override
+ public void setAMRetryCountResetPolicyType(
+ AMRetryCountResetPolicyType amRetryCountResetPolicyType) {
+ maybeInitBuilder();
+ if (amRetryCountResetPolicyType == null) {
+ builder.clearResetPolicy();
+ return;
+ }
+ builder.setResetPolicy(ProtoUtils
+ .convertToProtoFormat(amRetryCountResetPolicyType));
+ }
+
+ @Override
+ public long getResetWindow() {
+ AMRetryCountResetPolicyContextProtoOrBuilder p = viaProto ? proto : builder;
+ return (p.getResetWindow());
+ }
+
+ @Override
+ public void setResetWindow(long resetWindow) {
+ maybeInitBuilder();
+ builder.setResetWindow(resetWindow);
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
index c4a3a72..e0cf604 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
@@ -19,21 +19,23 @@
package org.apache.hadoop.yarn.api.records.impl.pb;
import com.google.common.base.CharMatcher;
+
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.AMRetryCountResetPolicyContext;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.proto.YarnProtos.AMRetryCountResetPolicyContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
-
import com.google.protobuf.TextFormat;
import java.util.HashSet;
@@ -53,6 +55,7 @@
private ContainerLaunchContext amContainer = null;
private Resource resource = null;
private Set applicationTags = null;
+ private AMRetryCountResetPolicyContext resetPolicyContext = null;
public ApplicationSubmissionContextPBImpl() {
builder = ApplicationSubmissionContextProto.newBuilder();
@@ -109,6 +112,9 @@ private void mergeLocalToBuilder() {
if (this.applicationTags != null && !this.applicationTags.isEmpty()) {
builder.addAllApplicationTags(this.applicationTags);
}
+ if (this.resetPolicyContext != null) {
+ builder.setResetPolicyContext(convertToProtoFormat(this.resetPolicyContext));
+ }
}
private void mergeLocalToProto() {
@@ -401,4 +407,37 @@ private ResourcePBImpl convertFromProtoFormat(ResourceProto p) {
private ResourceProto convertToProtoFormat(Resource t) {
return ((ResourcePBImpl)t).getProto();
}
+
+ @Override
+ public AMRetryCountResetPolicyContext getAMRetryCountResetPolicyContext() {
+ ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.resetPolicyContext != null) {
+ return this.resetPolicyContext;
+ }
+ if (!p.hasResetPolicyContext()) {
+ return null;
+ }
+ this.resetPolicyContext = convertFromProtoFormat(p.getResetPolicyContext());
+ return this.resetPolicyContext;
+ }
+
+ @Override
+ public void setAMRetryCountResetPolicyContext(
+ AMRetryCountResetPolicyContext resetPolicyContext) {
+ maybeInitBuilder();
+ if (resetPolicyContext == null)
+ builder.clearResetPolicyContext();
+ this.resetPolicyContext = resetPolicyContext;
+ }
+
+ private AMRetryCountResetPolicyContextPBImpl convertFromProtoFormat(
+ AMRetryCountResetPolicyContextProto p) {
+ return new AMRetryCountResetPolicyContextPBImpl(p);
+ }
+
+ private AMRetryCountResetPolicyContextProto convertToProtoFormat(
+ AMRetryCountResetPolicyContext t) {
+ return ((AMRetryCountResetPolicyContextPBImpl) t).getProto();
+ }
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
index 4a3c137..15cd43d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
@@ -24,6 +24,7 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
import org.apache.hadoop.yarn.api.records.AMCommand;
+import org.apache.hadoop.yarn.api.records.AMRetryCountResetPolicyType;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -37,6 +38,7 @@
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.proto.YarnProtos.AMCommandProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.AMRetryCountResetPolicyTypeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAccessTypeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStateProto;
@@ -51,6 +53,7 @@
import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto;
import com.google.protobuf.ByteString;
+
import org.apache.hadoop.yarn.proto.YarnServiceProtos;
@Private
@@ -180,6 +183,19 @@ public static AMCommand convertFromProtoFormat(AMCommandProto e) {
}
/*
+ * AMRetryCountResetPolicyType
+ */
+ public static AMRetryCountResetPolicyTypeProto convertToProtoFormat(
+ AMRetryCountResetPolicyType e) {
+ return AMRetryCountResetPolicyTypeProto.valueOf(e.name());
+ }
+
+ public static AMRetryCountResetPolicyType convertFromProtoFormat(
+ AMRetryCountResetPolicyTypeProto e) {
+ return AMRetryCountResetPolicyType.valueOf(e.name());
+ }
+
+ /*
* ByteBuffer
*/
public static ByteBuffer convertFromProtoFormat(ByteString byteString) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
index b315a84..160b9e2 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
@@ -240,6 +240,7 @@ private void loadRMAppState(RMState rmState) throws Exception {
new ApplicationAttemptState(attemptId,
attemptStateData.getMasterContainer(), credentials,
attemptStateData.getStartTime(),
+ attemptStateData.getEndTime(),
attemptStateData.getState(),
attemptStateData.getFinalTrackingUrl(),
attemptStateData.getDiagnostics(),
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
index 6b5b602..ec49014 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
@@ -158,8 +158,8 @@ public synchronized void updateApplicationAttemptStateInternal(
ApplicationAttemptState updatedAttemptState =
new ApplicationAttemptState(appAttemptId,
attemptStateData.getMasterContainer(), credentials,
- attemptStateData.getStartTime(), attemptStateData.getState(),
- attemptStateData.getFinalTrackingUrl(),
+ attemptStateData.getStartTime(), attemptStateData.getEndTime(),
+ attemptStateData.getState(), attemptStateData.getFinalTrackingUrl(),
attemptStateData.getDiagnostics(),
attemptStateData.getFinalApplicationStatus(),
attemptStateData.getAMContainerExitStatus());
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
index 9b05ea1..d98d8e9 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
@@ -53,8 +53,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNewSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@@ -257,6 +255,7 @@ public RMStateStore() {
final Container masterContainer;
final Credentials appAttemptCredentials;
long startTime = 0;
+ long endTime = 0;
// fields set when attempt completes
RMAppAttemptState state;
String finalTrackingUrl = "N/A";
@@ -267,19 +266,20 @@ public RMStateStore() {
public ApplicationAttemptState(ApplicationAttemptId attemptId,
Container masterContainer, Credentials appAttemptCredentials,
long startTime) {
- this(attemptId, masterContainer, appAttemptCredentials, startTime, null,
- null, "", null, ContainerExitStatus.INVALID);
+ this(attemptId, masterContainer, appAttemptCredentials, startTime,
+ -1, null, null, "", null, ContainerExitStatus.INVALID);
}
public ApplicationAttemptState(ApplicationAttemptId attemptId,
Container masterContainer, Credentials appAttemptCredentials,
- long startTime, RMAppAttemptState state, String finalTrackingUrl,
- String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus,
- int exitStatus) {
+ long startTime, long endTime, RMAppAttemptState state,
+ String finalTrackingUrl, String diagnostics,
+ FinalApplicationStatus amUnregisteredFinalStatus, int exitStatus) {
this.attemptId = attemptId;
this.masterContainer = masterContainer;
this.appAttemptCredentials = appAttemptCredentials;
this.startTime = startTime;
+ this.endTime = endTime;
this.state = state;
this.finalTrackingUrl = finalTrackingUrl;
this.diagnostics = diagnostics == null ? "" : diagnostics;
@@ -308,6 +308,9 @@ public String getDiagnostics() {
public long getStartTime() {
return startTime;
}
+ public long getEndTime() {
+ return endTime;
+ }
public FinalApplicationStatus getFinalApplicationStatus() {
return amUnregisteredFinalStatus;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index 6dd4574..5a71025 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -568,7 +568,8 @@ private void loadApplicationAttemptState(ApplicationState appState,
ApplicationAttemptState attemptState =
new ApplicationAttemptState(attemptId,
attemptStateData.getMasterContainer(), credentials,
- attemptStateData.getStartTime(), attemptStateData.getState(),
+ attemptStateData.getStartTime(), attemptStateData.getEndTime(),
+ attemptStateData.getState(),
attemptStateData.getFinalTrackingUrl(),
attemptStateData.getDiagnostics(),
attemptStateData.getFinalApplicationStatus(),
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java
index 90fb3ec..011016a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java
@@ -41,9 +41,10 @@
public abstract class ApplicationAttemptStateData {
public static ApplicationAttemptStateData newInstance(
ApplicationAttemptId attemptId, Container container,
- ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState,
- String finalTrackingUrl, String diagnostics,
- FinalApplicationStatus amUnregisteredFinalStatus, int exitStatus) {
+ ByteBuffer attemptTokens, long startTime, long endTime,
+ RMAppAttemptState finalState, String finalTrackingUrl,
+ String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus,
+ int exitStatus) {
ApplicationAttemptStateData attemptStateData =
Records.newRecord(ApplicationAttemptStateData.class);
attemptStateData.setAttemptId(attemptId);
@@ -53,6 +54,7 @@ public static ApplicationAttemptStateData newInstance(
attemptStateData.setFinalTrackingUrl(finalTrackingUrl);
attemptStateData.setDiagnostics(diagnostics);
attemptStateData.setStartTime(startTime);
+ attemptStateData.setEndTime(endTime);
attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus);
attemptStateData.setAMContainerExitStatus(exitStatus);
return attemptStateData;
@@ -69,9 +71,9 @@ public static ApplicationAttemptStateData newInstance(
}
return newInstance(attemptState.getAttemptId(),
attemptState.getMasterContainer(), appAttemptTokens,
- attemptState.getStartTime(), attemptState.getState(),
- attemptState.getFinalTrackingUrl(), attemptState.getDiagnostics(),
- attemptState.getFinalApplicationStatus(),
+ attemptState.getStartTime(), attemptState.getEndTime(),
+ attemptState.getState(), attemptState.getFinalTrackingUrl(),
+ attemptState.getDiagnostics(), attemptState.getFinalApplicationStatus(),
attemptState.getAMContainerExitStatus());
}
@@ -146,6 +148,14 @@ public static ApplicationAttemptStateData newInstance(
public abstract void setStartTime(long startTime);
/**
+ * Get the end time of the application.
+ * @return end time of the application
+ */
+ public abstract long getEndTime();
+
+ public abstract void setEndTime(long endTime);
+
+ /**
* Get the final finish status of the application.
* @return final finish status of the application
*/
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java
index a90bda4..2db8491 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java
@@ -229,6 +229,18 @@ public void setStartTime(long startTime) {
}
@Override
+ public long getEndTime() {
+ ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
+ return p.getEndTime();
+ }
+
+ @Override
+ public void setEndTime(long endTime) {
+ maybeInitBuilder();
+ builder.setEndTime(endTime);
+ }
+
+ @Override
public FinalApplicationStatus getFinalApplicationStatus() {
ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasFinalApplicationStatus()) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
index 2b590a0..738ee7d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
@@ -32,6 +32,7 @@
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.policy.AMRetryCountResetPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
/**
@@ -223,4 +224,13 @@ ApplicationReport createAndGetApplicationReport(String clientUserName,
* @return the external user-facing state of ApplicationMaster.
*/
YarnApplicationState createApplicationState();
+
+ /**
+ * Used by {@link AMRetryCountResetPolicy} to reset the retryCount.
+ * The retryCount will be used to calculate whether the next Attempt
+ * is the last retry.
+ * @param retryCount
+ */
+ void setRetryCount(int retryCount);
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index bff41cf..be276bb 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -124,6 +124,8 @@
private RMAppState targetedFinalState;
private RMAppState recoveredFinalState;
+ private int attemptResetCount = 0;
+
Object transitionTodo;
private static final StateMachineFactory= this.maxAppAttempts) {
+ } else if (getNumFailedAppAttempts() - this.attemptResetCount
+ >= this.maxAppAttempts) {
msg = "Application " + this.getApplicationId() + " failed "
+ this.maxAppAttempts + " times due to "
+ failedEvent.getDiagnostics() + ". Failing the application.";
@@ -1131,7 +1136,8 @@ public AttemptFailedTransition(RMAppState initialState) {
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
if (!app.submissionContext.getUnmanagedAM()
- && app.getNumFailedAppAttempts() < app.maxAppAttempts) {
+ && app.getNumFailedAppAttempts() - app.attemptResetCount
+ < app.maxAppAttempts) {
boolean transferStateFromPreviousAttempt = false;
RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
transferStateFromPreviousAttempt =
@@ -1193,7 +1199,7 @@ public static boolean isAppInFinalState(RMApp rmApp) {
return appState == RMAppState.FAILED || appState == RMAppState.FINISHED
|| appState == RMAppState.KILLED;
}
-
+
private RMAppState getRecoveredFinalState() {
return this.recoveredFinalState;
}
@@ -1202,4 +1208,14 @@ private RMAppState getRecoveredFinalState() {
public Set getRanNodes() {
return ranNodes;
}
+
+ @Override
+ public void setRetryCount(int retryCount) {
+ try {
+ this.writeLock.lock();
+ this.attemptResetCount = retryCount;
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
index 68a068b..c3fd6ed 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
@@ -23,6 +23,7 @@
import javax.crypto.SecretKey;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
@@ -174,6 +175,12 @@
long getStartTime();
/**
+ * the end time of the application.
+ * @return the end time of the application.
+ */
+ long getEndTime();
+
+ /**
* The current state of the {@link RMAppAttempt}.
*
* @return the current state {@link RMAppAttemptState} for this application
@@ -207,4 +214,17 @@
*
*/
boolean shouldCountTowardsMaxAttemptRetry();
+
+ @Private
+ boolean mayBeLastAttempt();
+
+ /**
+ * Could be used by {@link AMRetryCountResetPolicy} to reset
+ * the maybeLastAttempt flag. It can happen when current attempt is
+ * the last attempt, but the {@link AMRetryCountResetPolicy} has been
+ * triggered.
+ * @param maybeLastAttempt
+ */
+ @Private
+ void setMaybeLastAttemptFlag(boolean maybeLastAttempt);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index a4fe426..a9d50e8 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -84,6 +84,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.policy.AMResetCountResetPolicyFactory;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.policy.AMRetryCountResetPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -142,6 +144,7 @@
private String originalTrackingUrl = "N/A";
private String proxiedTrackingUrl = "N/A";
private long startTime = 0;
+ private long endTime = -1;
// Set to null initially. Will eventually get set
// if an RMAppAttemptUnregistrationEvent occurs
@@ -155,7 +158,8 @@
// re-created if this attempt is eventually failed because of preemption,
// hardware error or NM resync. So this flag indicates that this may be
// last attempt.
- private final boolean maybeLastAttempt;
+ private boolean maybeLastAttempt;
+ private AMRetryCountResetPolicy resetPolicy = null;
private static final ExpiredTransition EXPIRED_TRANSITION =
new ExpiredTransition();
@@ -412,6 +416,9 @@ public RMAppAttemptImpl(ApplicationAttemptId appAttemptId,
this.proxiedTrackingUrl = generateProxyUriWithScheme(null);
this.maybeLastAttempt = maybeLastAttempt;
this.stateMachine = stateMachineFactory.make(this);
+ this.resetPolicy =
+ AMResetCountResetPolicyFactory.getAMRetryCountResetPolicy(rmContext,
+ submissionContext);
}
@Override
@@ -695,6 +702,10 @@ public void recover(RMState state) throws Exception {
this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl);
this.finalStatus = attemptState.getFinalApplicationStatus();
this.startTime = attemptState.getStartTime();
+ this.endTime = attemptState.getEndTime();
+ if (this.resetPolicy != null) {
+ this.resetPolicy.recover();
+ }
}
public void transferStateFromPreviousAttempt(RMAppAttempt attempt) {
@@ -988,10 +999,12 @@ private void rememberTargetTransitionsAndStoreState(RMAppAttemptEvent event,
}
RMStateStore rmStore = rmContext.getStateStore();
+ this.setEndTime(System.currentTimeMillis());
ApplicationAttemptState attemptState =
new ApplicationAttemptState(applicationAttemptId, getMasterContainer(),
rmStore.getCredentialsFromAppAttempt(this), startTime,
- stateToBeStored, finalTrackingUrl, diags, finalStatus, exitStatus);
+ this.getEndTime(), stateToBeStored, finalTrackingUrl, diags,
+ finalStatus, exitStatus);
LOG.info("Updating application attempt " + applicationAttemptId
+ " with final state: " + targetedFinalState + ", and exit status: "
+ exitStatus);
@@ -1040,6 +1053,12 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
((MultipleArcTransition) appAttempt.transitionTodo).transition(
appAttempt, causeEvent);
}
+
+ // Stop AMRetryCountResetPolicy
+ if (appAttempt.resetPolicy != null) {
+ appAttempt.resetPolicy.stop();
+ }
+
return appAttempt.targetedFinalState;
}
}
@@ -1237,6 +1256,11 @@ public void transition(RMAppAttemptImpl appAttempt,
appAttempt.rmContext.getRMApplicationHistoryWriter()
.applicationAttemptStarted(appAttempt);
+
+ // Start AMRetryCountResetPolicy
+ if (appAttempt.resetPolicy != null) {
+ appAttempt.resetPolicy.start();
+ }
}
}
@@ -1613,6 +1637,25 @@ public long getStartTime() {
}
@Override
+ public long getEndTime() {
+ try {
+ this.readLock.lock();
+ return this.endTime;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ private void setEndTime(long endTime) {
+ try {
+ this.writeLock.lock();
+ this.endTime = endTime;
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ @Override
public RMAppAttemptState getState() {
this.readLock.lock();
@@ -1702,7 +1745,23 @@ public ApplicationAttemptReport createApplicationAttemptReport() {
}
// for testing
+ @Override
public boolean mayBeLastAttempt() {
- return maybeLastAttempt;
+ try {
+ this.readLock.lock();
+ return maybeLastAttempt;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Override
+ public void setMaybeLastAttemptFlag(boolean maybeLastAttempt) {
+ try {
+ this.writeLock.lock();
+ this.maybeLastAttempt = maybeLastAttempt;
+ } finally {
+ this.writeLock.unlock();
+ }
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/policy/AMResetCountResetPolicyFactory.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/policy/AMResetCountResetPolicyFactory.java
new file mode 100644
index 0000000..1745d2b
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/policy/AMResetCountResetPolicyFactory.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp.policy;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.AMRetryCountResetPolicyType;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+
+@Private
+@Unstable
+/**
+ * Factory for {@link AMRetryCountResetPolicy} implementations.
+ */
+public class AMResetCountResetPolicyFactory {
+ /**
+ * Creates an instance of {@link AMRetryCountResetPolicy}
+ *
+ * @return amRetryCountResetPolicy
+ */
+ public static AMRetryCountResetPolicy getAMRetryCountResetPolicy(
+ RMContext rmContext, ApplicationSubmissionContext context) {
+ if (context.getAMRetryCountResetPolicyContext() != null) {
+ AMRetryCountResetPolicyType policyType =
+ context.getAMRetryCountResetPolicyContext()
+ .getAMRetryCountResetPolicyType();
+ if (policyType != null
+ && AMRetryCountResetPolicyType.WINDOWSSLIDE_AM_RETRYCOUNT_RESETPOLICY
+ == context.getAMRetryCountResetPolicyContext()
+ .getAMRetryCountResetPolicyType()) {
+ long resetWindow =
+ context.getAMRetryCountResetPolicyContext().getResetWindow();
+ RMApp rmApp = rmContext.getRMApps().get(context.getApplicationId());
+ return new WindowsSlideAMRetryCountResetPolicy(resetWindow, rmApp);
+ }
+ }
+ return new NullAMRetryCountResetPolicy();
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/policy/AMRetryCountResetPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/policy/AMRetryCountResetPolicy.java
new file mode 100644
index 0000000..54e73b9
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/policy/AMRetryCountResetPolicy.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp.policy;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * We have WindowsSlideAMRetryCountResetPolicy available to use.
+ * To use this policy, you need to initiate it by
+ * passing a parameter which is used to define period of time in milliseconds
+ * that AM retry count will be reset.
+ */
+@Private
+@Unstable
+public interface AMRetryCountResetPolicy {
+
+ public void start();
+
+ public void stop();
+
+ public void amRetryCountReset();
+
+ public void recover();
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/policy/NullAMRetryCountResetPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/policy/NullAMRetryCountResetPolicy.java
new file mode 100644
index 0000000..fd797d3
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/policy/NullAMRetryCountResetPolicy.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp.policy;
+
+
+public class NullAMRetryCountResetPolicy implements AMRetryCountResetPolicy {
+
+ @Override
+ public void start() {
+ // DO NOTHING
+ }
+
+ @Override
+ public void stop() {
+ // DO NOTHING
+ }
+
+ @Override
+ public void amRetryCountReset() {
+ // DO NOTHING
+ }
+
+ @Override
+ public void recover() {
+ // DO NOTHING
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/policy/WindowsSlideAMRetryCountResetPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/policy/WindowsSlideAMRetryCountResetPolicy.java
new file mode 100644
index 0000000..13d0ab1
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/policy/WindowsSlideAMRetryCountResetPolicy.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp.policy;
+
+import java.util.Map.Entry;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.AMRetryCountResetPolicyType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+
+/**
+ * The policy is used to reset AM retry count if current AppMaster successfully
+ * runs for a period of time. To use this policy, you need to pass
+ * WindowsSlideAMRetryCountResetPolicyType as {@link AMRetryCountResetPolicyType}
+ * and resetWindow which is used to define period of time in milliseconds
+ * that AM retry count will be reset into {@link ApplicationSubmissionContext}
+ */
+public class WindowsSlideAMRetryCountResetPolicy implements
+ AMRetryCountResetPolicy {
+
+ private final long resetWindow;
+
+ private Timer timer;
+
+ private RMApp rmApp;
+
+ public WindowsSlideAMRetryCountResetPolicy(long resetWindow, RMApp rmApp) {
+ super();
+ this.resetWindow = resetWindow;
+ if (this.resetWindow <= 0) {
+ throw new IllegalArgumentException("The parameter should larger than 0");
+ }
+ this.timer = null;
+ this.rmApp = rmApp;
+ }
+
+ @Override
+ public synchronized void amRetryCountReset() {
+ // The formula we are using to calculate the lastAttempt:
+ // maxAppAttempts == getNumFailedAppAttempts() - attemptResetCount.
+ // To do the AM retry count reset, just set it
+ // as current number of FailedAppAttempts.
+ int attemptFailureCount = 0;
+ for (Entry entry : this.rmApp
+ .getAppAttempts().entrySet()) {
+ if (entry.getKey() != this.rmApp.getCurrentAppAttempt().getAppAttemptId()
+ && entry.getValue().shouldCountTowardsMaxAttemptRetry()) {
+ attemptFailureCount++;
+ }
+ }
+ rmApp.setRetryCount(attemptFailureCount);
+ if (this.rmApp.getCurrentAppAttempt().mayBeLastAttempt()) {
+ this.rmApp.getCurrentAppAttempt().setMaybeLastAttemptFlag(false);
+ }
+ }
+
+ public long getResetWindow() {
+ return resetWindow;
+ }
+
+ @Override
+ public synchronized void start() {
+ this.timer = new Timer();
+ this.timer.scheduleAtFixedRate(new retryCountResetActivator(), resetWindow,
+ resetWindow);
+ }
+
+ @Override
+ public synchronized void stop() {
+ if (this.timer != null) {
+ this.timer.cancel();
+ }
+ }
+
+ private class retryCountResetActivator extends TimerTask {
+ @Override
+ public void run() {
+ amRetryCountReset();
+ }
+ }
+
+ @Override
+ public synchronized void recover() {
+ int attemptFailureCount = 0;
+ if (this.rmApp.getCurrentAppAttempt().getEndTime()
+ - this.rmApp.getCurrentAppAttempt().getStartTime() >= resetWindow) {
+ for (Entry entry : this.rmApp
+ .getAppAttempts().entrySet()) {
+ if (entry.getKey() != this.rmApp.getCurrentAppAttempt()
+ .getAppAttemptId()
+ && entry.getValue().shouldCountTowardsMaxAttemptRetry()) {
+ attemptFailureCount++;
+ }
+ }
+ this.rmApp.setRetryCount(attemptFailureCount);
+ }
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 6949a81..17c2fca 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -38,6 +38,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.records.AMRetryCountResetPolicyContext;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -283,6 +284,26 @@ public RMApp submitApp(int masterMemory, String name, String user,
int maxAppAttempts, Credentials ts, String appType,
boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
ApplicationId applicationId) throws Exception {
+ return submitApp(masterMemory, name, user, acls, unmanaged, queue,
+ maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
+ isAppIdProvided, applicationId, null);
+ }
+
+ public RMApp submitApp(int masterMemory,
+ AMRetryCountResetPolicyContext resetPolicyContext) throws Exception {
+ return submitApp(masterMemory, "", UserGroupInformation.getCurrentUser()
+ .getShortUserName(), null, false, null,
+ super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false,
+ false, null, resetPolicyContext);
+ }
+
+ public RMApp submitApp(int masterMemory, String name, String user,
+ Map acls, boolean unmanaged, String queue,
+ int maxAppAttempts, Credentials ts, String appType,
+ boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
+ ApplicationId applicationId,
+ AMRetryCountResetPolicyContext resetPolicyContext) throws Exception {
ApplicationId appId = isAppIdProvided ? applicationId : null;
ApplicationClientProtocol client = getClientRMService();
if (! isAppIdProvided) {
@@ -318,6 +339,9 @@ public RMApp submitApp(int masterMemory, String name, String user,
clc.setTokens(securityTokens);
}
sub.setAMContainerSpec(clc);
+ if (resetPolicyContext != null) {
+ sub.setAMRetryCountResetPolicyContext(resetPolicyContext);
+ }
req.setApplicationSubmissionContext(sub);
UserGroupInformation fakeUser =
UserGroupInformation.createUserForTesting(user, new String[] {"someGroup"});
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
index 4349a23..5656a5e 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
@@ -165,6 +165,11 @@ public YarnApplicationState createApplicationState() {
public Set getRanNodes() {
throw new UnsupportedOperationException("Not supported yet.");
}
+
+ @Override
+ public void setRetryCount(int retryCount) {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
}
public static RMApp newApplication(int i) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
index 49d7135..bf4b256 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
@@ -307,8 +307,8 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
new ApplicationAttemptState(oldAttemptState.getAttemptId(),
oldAttemptState.getMasterContainer(),
oldAttemptState.getAppAttemptCredentials(),
- oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
- "myTrackingUrl", "attemptDiagnostics",
+ oldAttemptState.getStartTime(), oldAttemptState.getEndTime(),
+ RMAppAttemptState.FINISHED, "myTrackingUrl", "attemptDiagnostics",
FinalApplicationStatus.SUCCEEDED, 100);
store.updateApplicationAttemptState(newAttemptState);
@@ -330,8 +330,8 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
new ApplicationAttemptState(dummyAttemptId,
oldAttemptState.getMasterContainer(),
oldAttemptState.getAppAttemptCredentials(),
- oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
- "myTrackingUrl", "attemptDiagnostics",
+ oldAttemptState.getStartTime(), oldAttemptState.getEndTime(),
+ RMAppAttemptState.FINISHED, "myTrackingUrl", "attemptDiagnostics",
FinalApplicationStatus.SUCCEEDED, 111);
store.updateApplicationAttemptState(dummyAttempt);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
index 8f26d10..94d8096 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
@@ -238,4 +238,9 @@ public YarnApplicationState createApplicationState() {
public Set getRanNodes() {
return null;
}
+
+ @Override
+ public void setRetryCount(int retryCount) {
+ // Do nothing
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppRetryCountResetPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppRetryCountResetPolicy.java
new file mode 100644
index 0000000..650e302
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppRetryCountResetPolicy.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
+
+import java.util.Collections;
+
+import org.apache.hadoop.yarn.api.records.AMRetryCountResetPolicyContext;
+import org.apache.hadoop.yarn.api.records.AMRetryCountResetPolicyType;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestRMAppRetryCountResetPolicy {
+
+ @Test (timeout = 50000)
+ public void testWindowsSlideAMRetryCountResetPolicy() throws Exception {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+ conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+ // explicitly set max-am-retry count as 1.
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+
+ MockRM rm1 = new MockRM(conf, memStore);
+ rm1.start();
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
+ nm1.registerNode();
+ AMRetryCountResetPolicyContext resetPolicyContext =
+ AMRetryCountResetPolicyContext
+ .newInstance(
+ AMRetryCountResetPolicyType.WINDOWSSLIDE_AM_RETRYCOUNT_RESETPOLICY,
+ 4000);
+ RMApp app1 = rm1.submitApp(200, resetPolicyContext);
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ // Fail attempt1 normally
+ nm1
+ .nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+ am1.waitForState(RMAppAttemptState.FAILED);
+ // launch the second attempt
+ rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+ Assert.assertEquals(2, app1.getAppAttempts().size());
+ RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
+ Assert.assertTrue(((RMAppAttemptImpl) attempt2).mayBeLastAttempt());
+ MockAM am2 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+ am2.waitForState(RMAppAttemptState.RUNNING);
+ // wait for 4 second for WindowsSlideAMRetryCountResetPolicy to reset
+ Thread.sleep(4000);
+ // Right now Attempt2 is not the last Attempt
+ Assert.assertTrue(!((RMAppAttemptImpl) attempt2).mayBeLastAttempt());
+
+ // Fail attempt2 normally
+ nm1
+ .nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+ am2.waitForState(RMAppAttemptState.FAILED);
+
+ // launch the third attempt
+ rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+ Assert.assertEquals(3, app1.getAppAttempts().size());
+ RMAppAttempt attempt3 = app1.getCurrentAppAttempt();
+ Assert.assertTrue(((RMAppAttemptImpl) attempt3).mayBeLastAttempt());
+ MockAM am3 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+ am3.waitForState(RMAppAttemptState.RUNNING);
+
+ // Restart rm.
+ @SuppressWarnings("resource")
+ MockRM rm2 = new MockRM(conf, memStore);
+ rm2.start();
+
+ // re-register the NM
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+ NMContainerStatus status = Records.newRecord(NMContainerStatus.class);
+ status
+ .setContainerExitStatus(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER);
+ status.setContainerId(attempt3.getMasterContainer().getId());
+ status.setContainerState(ContainerState.COMPLETE);
+ status.setDiagnostics("");
+ nm1.registerNode(Collections.singletonList(status), null);
+
+ rm2.waitForState(attempt3.getAppAttemptId(), RMAppAttemptState.FAILED);
+
+ rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+
+ // Lauch Attempt 4 and it should be the last Attempt
+ MockAM am4 =
+ rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 4, nm1);
+ RMAppAttempt attempt4 =
+ rm2.getRMContext().getRMApps().get(app1.getApplicationId())
+ .getCurrentAppAttempt();
+ Assert.assertTrue(((RMAppAttemptImpl) attempt4).mayBeLastAttempt());
+
+ // wait for 4 second for WindowsSlideAMRetryCountResetPolicy to reset
+ Thread.sleep(4000);
+ // Right now Attempt4 is not the last Attempt
+ Assert.assertTrue(!((RMAppAttemptImpl) attempt4).mayBeLastAttempt());
+
+ // Fail attempt4 normally
+ nm1
+ .nodeHeartbeat(am4.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+ am4.waitForState(RMAppAttemptState.FAILED);
+ rm1.stop();
+ rm2.stop();
+ }
+}