diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationRetryPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationRetryPolicy.java
new file mode 100644
index 0000000..131a4be
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationRetryPolicy.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ *
+ * ApplicationRetryPolicy includes:
+ *
+ * - {@link ApplicationRetryPolicyType}. If not specified,
+ * {@link ApplicationRetryPolicyType#MAX_RETRIES_POLICY} will be used.
+ * - {@link MaxApplicationRetriesPolicyContext}. It need to be specified
+ * only if {@link ApplicationRetryPolicyType#MAX_RETRIES_POLICY} is used.
+ * - {@link WindowedApplicationRetriesPolicyContext}. It need to be specified
+ * only if {@link ApplicationRetryPolicyType#WINDOWED_RETRIES_POLICY}
+ * is used.
+ *
+ * By default {@link ApplicationRetryPolicyType#MAX_RETRIES_POLICY} will be used.
+ *
+ *
+ */
+@Public
+@Unstable
+public abstract class ApplicationRetryPolicy {
+
+ public static ApplicationRetryPolicy newInstance(
+ MaxApplicationRetriesPolicyContext context) {
+ ApplicationRetryPolicy retryPolicy =
+ Records.newRecord(ApplicationRetryPolicy.class);
+ retryPolicy.setApplicationRetryPolicyType(
+ ApplicationRetryPolicyType.MAX_RETRIES_POLICY);
+ retryPolicy.setMaxApplicationRetriesPolicyContext(context);
+ return retryPolicy;
+ }
+
+ public static ApplicationRetryPolicy newInstance(
+ ApplicationRetryPolicyType policyType,
+ MaxApplicationRetriesPolicyContext maxApplicationRetriesPolicyContext) {
+ if (policyType != ApplicationRetryPolicyType.MAX_RETRIES_POLICY) {
+ throw new IllegalArgumentException(
+ "The ApplicationRetryPolicyType should be specified as "
+ + ApplicationRetryPolicyType.MAX_RETRIES_POLICY
+ + " to use MaxApplicationRetriesPolicy");
+ }
+ ApplicationRetryPolicy retryPolicy =
+ Records.newRecord(ApplicationRetryPolicy.class);
+ retryPolicy.setApplicationRetryPolicyType(policyType);
+ retryPolicy.setMaxApplicationRetriesPolicyContext(
+ maxApplicationRetriesPolicyContext);
+ return retryPolicy;
+ }
+
+ public static ApplicationRetryPolicy newInstance(
+ ApplicationRetryPolicyType policyType,
+ WindowedApplicationRetriesPolicyContext
+ windowedApplicationRetriesPolicyContext) {
+ if (policyType != ApplicationRetryPolicyType.WINDOWED_RETRIES_POLICY) {
+ throw new IllegalArgumentException(
+ "The ApplicationRetryPolicyType should be specified as "
+ + ApplicationRetryPolicyType.WINDOWED_RETRIES_POLICY
+ + " to use WindowedApplicationRetriesPolicy");
+ }
+ ApplicationRetryPolicy retryPolicy =
+ Records.newRecord(ApplicationRetryPolicy.class);
+ retryPolicy.setApplicationRetryPolicyType(policyType);
+ retryPolicy.setWindowedApplicationRetriesPolicyContext(
+ windowedApplicationRetriesPolicyContext);
+ return retryPolicy;
+ }
+ /**
+ * Get {@link ApplicationRetryPolicyType}
+ */
+ @Public
+ @Stable
+ public abstract ApplicationRetryPolicyType getApplicationRetryPolicyType();
+
+ @Private
+ @Stable
+ public abstract void setApplicationRetryPolicyType(ApplicationRetryPolicyType type);
+
+ /**
+ * Get {@link MaxApplicationRetriesPolicyContext}
+ */
+ @Public
+ @Stable
+ public abstract MaxApplicationRetriesPolicyContext
+ getMaxApplicationRetriesPolicyContext();
+
+ @Private
+ @Stable
+ public abstract void setMaxApplicationRetriesPolicyContext(
+ MaxApplicationRetriesPolicyContext context);
+
+ /**
+ * Get {@link WindowedApplicationRetriesPolicyContext}
+ */
+ @Public
+ @Stable
+ public abstract WindowedApplicationRetriesPolicyContext
+ getWindowedApplicationRetriesPolicyContext();
+
+ @Private
+ @Stable
+ public abstract void setWindowedApplicationRetriesPolicyContext(
+ WindowedApplicationRetriesPolicyContext context);
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationRetryPolicyType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationRetryPolicyType.java
new file mode 100644
index 0000000..f9da93a
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationRetryPolicyType.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * Application Retry Policy Types.
+ */
+@Public
+@Unstable
+public enum ApplicationRetryPolicyType {
+
+ /**
+ * Application can be retried for certain number of times
+ */
+ MAX_RETRIES_POLICY,
+
+ /**
+ * The policy is used to reset AM retry count if current AppMaster successfully
+ * runs for a period of time.
+ */
+ WINDOWED_RETRIES_POLICY
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
index 1ee04f0..e7ce69a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
@@ -35,17 +35,20 @@
* information needed by the ResourceManager to launch
* the ApplicationMaster for an application.
*
- * It includes details such as:
- *
- * - {@link ApplicationId} of the application.
- * - Application user.
- * - Application name.
- * - {@link Priority} of the application.
- * -
- * {@link ContainerLaunchContext} of the container in which the
- *
ApplicationMaster is executed.
- *
- *
+ *
+ * It includes details such as:
+ *
+ * - {@link ApplicationId} of the application.
+ * - Application user.
+ * - Application name.
+ * - {@link Priority} of the application.
+ * -
+ * {@link ContainerLaunchContext} of the container in which the
+ *
ApplicationMaster is executed.
+ * -
+ * {@link ApplicationRetryPolicy}. The default retryPolicy type is
+ * {@link ApplicationRetryPolicyType#MAX_RETRIES_POLICY}
+ *
*
*
* @see ContainerLaunchContext
@@ -62,7 +65,8 @@ public static ApplicationSubmissionContext newInstance(
Priority priority, ContainerLaunchContext amContainer,
boolean isUnmanagedAM, boolean cancelTokensWhenComplete,
int maxAppAttempts, Resource resource, String applicationType,
- boolean keepContainers) {
+ boolean keepContainers,
+ ApplicationRetryPolicy resetPolicy) {
ApplicationSubmissionContext context =
Records.newRecord(ApplicationSubmissionContext.class);
context.setApplicationId(applicationId);
@@ -76,6 +80,7 @@ public static ApplicationSubmissionContext newInstance(
context.setResource(resource);
context.setApplicationType(applicationType);
context.setKeepContainersAcrossApplicationAttempts(keepContainers);
+ context.setApplicationRetryPolicy(resetPolicy);
return context;
}
@@ -85,6 +90,22 @@ public static ApplicationSubmissionContext newInstance(
ApplicationId applicationId, String applicationName, String queue,
Priority priority, ContainerLaunchContext amContainer,
boolean isUnmanagedAM, boolean cancelTokensWhenComplete,
+ int maxAppAttempts, Resource resource, String applicationType,
+ boolean keepContainers) {
+ ApplicationRetryPolicy retryPolicy =
+ ApplicationRetryPolicy.newInstance(MaxApplicationRetriesPolicyContext
+ .newInstance(maxAppAttempts));
+ return newInstance(applicationId, applicationName, queue, priority,
+ amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts,
+ resource, applicationType, keepContainers, retryPolicy);
+ }
+
+ @Public
+ @Stable
+ public static ApplicationSubmissionContext newInstance(
+ ApplicationId applicationId, String applicationName, String queue,
+ Priority priority, ContainerLaunchContext amContainer,
+ boolean isUnmanagedAM, boolean cancelTokensWhenComplete,
int maxAppAttempts, Resource resource, String applicationType) {
return newInstance(applicationId, applicationName, queue, priority,
amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts,
@@ -338,4 +359,20 @@ public abstract void setKeepContainersAcrossApplicationAttempts(
@Public
@Stable
public abstract void setApplicationTags(Set tags);
-}
\ No newline at end of file
+
+ /**
+ * Get {@link ApplicationRetryPolicy}
+ */
+ @Public
+ @Unstable
+ public abstract ApplicationRetryPolicy
+ getApplicationRetryPolicy();
+
+ /**
+ * Set {@link ApplicationRetryPolicy}
+ */
+ @Public
+ @Unstable
+ public abstract void setApplicationRetryPolicy(
+ ApplicationRetryPolicy resetPolicy);
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/MaxApplicationRetriesPolicyContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/MaxApplicationRetriesPolicyContext.java
new file mode 100644
index 0000000..c8cf337
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/MaxApplicationRetriesPolicyContext.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * The MaxApplicationRetriesPolicyContext should be specified
+ * when ApplicationRetryPolicyType.MAX_RETRIES_POLICY is specified
+ * in {@link ApplicationRetryPolicy}
+ *
+ *
+ *
+ * It includes:
+ * - maxAppAttempts. If not specified, 2 will be used
+ * as default value
+ *
+ *
+ */
+@Public
+@Stable
+public abstract class MaxApplicationRetriesPolicyContext {
+
+ public static MaxApplicationRetriesPolicyContext newInstance() {
+ MaxApplicationRetriesPolicyContext context =
+ Records.newRecord(MaxApplicationRetriesPolicyContext.class);
+ return context;
+ }
+
+ public static MaxApplicationRetriesPolicyContext newInstance(
+ int maxAppAttempts) {
+ MaxApplicationRetriesPolicyContext context =
+ Records.newRecord(MaxApplicationRetriesPolicyContext.class);
+ context.setMaxAppAttempts(maxAppAttempts);
+ return context;
+ }
+
+ @Public
+ @Stable
+ public abstract int getMaxAppAttempts();
+
+ @Public
+ @Stable
+ public abstract void setMaxAppAttempts(int maxAppAttempts);
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/WindowedApplicationRetriesPolicyContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/WindowedApplicationRetriesPolicyContext.java
new file mode 100644
index 0000000..ab31c8f
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/WindowedApplicationRetriesPolicyContext.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * The WindowedApplicationRetriesPolicyContext should be specified
+ * when ApplicationRetryPolicyType.WINDOWED_RETRIES_POLICY is specified
+ * in {@link ApplicationRetryPolicy}
+ *
+ *
+ *
+ * It includes:
+ * - resetTimeWindow in milliseconds. If not specified, 86400 will be used
+ * as default value
+ *
+ *
+ */
+@Public
+@Stable
+public abstract class WindowedApplicationRetriesPolicyContext {
+
+ public static WindowedApplicationRetriesPolicyContext newInstance() {
+ WindowedApplicationRetriesPolicyContext context =
+ Records.newRecord(WindowedApplicationRetriesPolicyContext.class);
+ return context;
+ }
+
+ public static WindowedApplicationRetriesPolicyContext newInstance(
+ long resetTimeWindow) {
+ WindowedApplicationRetriesPolicyContext context =
+ Records.newRecord(WindowedApplicationRetriesPolicyContext.class);
+ if (resetTimeWindow <= 0) {
+ throw new IllegalArgumentException(
+ "The resetTimeWindow should larger than 0");
+ }
+ context.setResetTimeWindow(resetTimeWindow);
+ return context;
+ }
+
+ @Public
+ @Stable
+ public abstract long getResetTimeWindow();
+
+ @Public
+ @Stable
+ public abstract void setResetTimeWindow(long resetTimeWindow);
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto
index 2eb6148..18aa108 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto
@@ -128,6 +128,7 @@ message ApplicationAttemptStateDataProto {
optional int64 start_time = 7;
optional FinalApplicationStatusProto final_application_status = 8;
optional int32 am_container_exit_status = 9 [default = -1000];
+ optional int64 end_time = 10;
}
message RMStateVersionProto {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 3f1fa6c..e70cbfb 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -288,6 +288,28 @@ message ApplicationSubmissionContextProto {
optional string applicationType = 10 [default = "YARN"];
optional bool keep_containers_across_application_attempts = 11 [default = false];
repeated string applicationTags = 12;
+ optional ApplicationRetryPolicyProto app_retry_policy = 13;
+}
+
+message ApplicationRetryPolicyProto {
+ optional ApplicationRetryPolicyTypeProto appRetryPolicyType = 1 [default = MAX_RETRIES_POLICY];
+ // Only one of the following are accepted based on the type
+ // Each is a context object specific to the policy type
+ optional MaxApplicationRetriesPolicyContextProto max_app_retry_policy_context = 2;
+ optional WindowedApplicationRetriesPolicyContextProto windowed_app_retry_policy_context = 3;
+}
+
+enum ApplicationRetryPolicyTypeProto {
+ MAX_RETRIES_POLICY = 1;
+ WINDOWED_RETRIES_POLICY = 2;
+}
+
+message MaxApplicationRetriesPolicyContextProto {
+ optional int32 maxAppAttempts = 1 [default = 2];
+}
+
+message WindowedApplicationRetriesPolicyContextProto {
+ optional int64 reset_time_window = 1 [default = 86400];
}
enum ApplicationAccessTypeProto {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationRetryPolicyPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationRetryPolicyPBImpl.java
new file mode 100644
index 0000000..cd315b0
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationRetryPolicyPBImpl.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records.impl.pb;
+
+import org.apache.hadoop.yarn.api.records.ApplicationRetryPolicy;
+import org.apache.hadoop.yarn.api.records.ApplicationRetryPolicyType;
+import org.apache.hadoop.yarn.api.records.MaxApplicationRetriesPolicyContext;
+import org.apache.hadoop.yarn.api.records.WindowedApplicationRetriesPolicyContext;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationRetryPolicyProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationRetryPolicyProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnProtos.MaxApplicationRetriesPolicyContextProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.WindowedApplicationRetriesPolicyContextProto;
+import com.google.protobuf.TextFormat;
+
+public class ApplicationRetryPolicyPBImpl extends ApplicationRetryPolicy {
+
+ ApplicationRetryPolicyProto proto = ApplicationRetryPolicyProto
+ .getDefaultInstance();
+ ApplicationRetryPolicyProto.Builder builder = null;
+ boolean viaProto = false;
+
+ private WindowedApplicationRetriesPolicyContext windowd_app_retry_policy =
+ null;
+ private MaxApplicationRetriesPolicyContext max_app_retry_policy = null;
+
+ public ApplicationRetryPolicyPBImpl() {
+ builder = ApplicationRetryPolicyProto.newBuilder();
+ }
+
+ public ApplicationRetryPolicyPBImpl(ApplicationRetryPolicyProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public ApplicationRetryPolicyProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null)
+ return false;
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+
+ private void mergeLocalToBuilder() {
+ if (this.max_app_retry_policy != null) {
+ builder
+ .setMaxAppRetryPolicyContext(convertToProtoFormat(this.max_app_retry_policy));
+ }
+ if (this.windowd_app_retry_policy != null) {
+ builder
+ .setWindowedAppRetryPolicyContext(convertToProtoFormat(this.windowd_app_retry_policy));
+ }
+ }
+
+ private WindowedApplicationRetriesPolicyContextProto convertToProtoFormat(
+ WindowedApplicationRetriesPolicyContext r) {
+ return ((WindowedApplicationRetriesPolicyContextPBImpl) r).getProto();
+ }
+
+ private MaxApplicationRetriesPolicyContextProto convertToProtoFormat(
+ MaxApplicationRetriesPolicyContext r) {
+ return ((MaxApplicationRetriesPolicyContextPBImpl) r).getProto();
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = ApplicationRetryPolicyProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ @Override
+ public ApplicationRetryPolicyType getApplicationRetryPolicyType() {
+ ApplicationRetryPolicyProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasAppRetryPolicyType()) {
+ return null;
+ }
+ return ProtoUtils.convertFromProtoFormat(p.getAppRetryPolicyType());
+ }
+
+ @Override
+ public void setApplicationRetryPolicyType(ApplicationRetryPolicyType type) {
+ maybeInitBuilder();
+ if (type == null) {
+ builder.clearAppRetryPolicyType();
+ return;
+ }
+ builder.setAppRetryPolicyType(ProtoUtils.convertToProtoFormat(type));
+ }
+
+ @Override
+ public MaxApplicationRetriesPolicyContext
+ getMaxApplicationRetriesPolicyContext() {
+ ApplicationRetryPolicyProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.max_app_retry_policy != null) {
+ return this.max_app_retry_policy;
+ }
+ if (!p.hasMaxAppRetryPolicyContext()) {
+ return null;
+ }
+ this.max_app_retry_policy =
+ convertFromProtoFormat(p.getMaxAppRetryPolicyContext());
+ return this.max_app_retry_policy;
+ }
+
+ private MaxApplicationRetriesPolicyContextPBImpl convertFromProtoFormat(
+ MaxApplicationRetriesPolicyContextProto maxAppRetryPolicyContext) {
+ return new MaxApplicationRetriesPolicyContextPBImpl(
+ maxAppRetryPolicyContext);
+ }
+
+ @Override
+ public void setMaxApplicationRetriesPolicyContext(
+ MaxApplicationRetriesPolicyContext context) {
+ maybeInitBuilder();
+ if (null == context) {
+ builder.clearMaxAppRetryPolicyContext();
+ }
+ this.max_app_retry_policy = context;
+ }
+
+ @Override
+ public WindowedApplicationRetriesPolicyContext
+ getWindowedApplicationRetriesPolicyContext() {
+ ApplicationRetryPolicyProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.windowd_app_retry_policy != null) {
+ return this.windowd_app_retry_policy;
+ }
+ if (!p.hasWindowedAppRetryPolicyContext()) {
+ return null;
+ }
+ this.windowd_app_retry_policy =
+ convertFromProtoFormat(p.getWindowedAppRetryPolicyContext());
+ return this.windowd_app_retry_policy;
+ }
+
+ private WindowedApplicationRetriesPolicyContextPBImpl
+ convertFromProtoFormat(WindowedApplicationRetriesPolicyContextProto
+ windowedAppRetryPolicyContext) {
+ return new WindowedApplicationRetriesPolicyContextPBImpl(
+ windowedAppRetryPolicyContext);
+ }
+
+ @Override
+ public void setWindowedApplicationRetriesPolicyContext(
+ WindowedApplicationRetriesPolicyContext context) {
+ maybeInitBuilder();
+ if (null == context) {
+ builder.clearMaxAppRetryPolicyContext();
+ }
+ this.windowd_app_retry_policy = context;
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
index c4a3a72..cfaf0d2 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
@@ -19,15 +19,18 @@
package org.apache.hadoop.yarn.api.records.impl.pb;
import com.google.common.base.CharMatcher;
+
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationRetryPolicy;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationRetryPolicyProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto;
@@ -53,6 +56,7 @@
private ContainerLaunchContext amContainer = null;
private Resource resource = null;
private Set applicationTags = null;
+ private ApplicationRetryPolicy resetPolicy = null;
public ApplicationSubmissionContextPBImpl() {
builder = ApplicationSubmissionContextProto.newBuilder();
@@ -109,6 +113,9 @@ private void mergeLocalToBuilder() {
if (this.applicationTags != null && !this.applicationTags.isEmpty()) {
builder.addAllApplicationTags(this.applicationTags);
}
+ if (this.resetPolicy != null) {
+ builder.setAppRetryPolicy(convertToProtoFormat(this.resetPolicy));
+ }
}
private void mergeLocalToProto() {
@@ -401,4 +408,37 @@ private ResourcePBImpl convertFromProtoFormat(ResourceProto p) {
private ResourceProto convertToProtoFormat(Resource t) {
return ((ResourcePBImpl)t).getProto();
}
+
+ @Override
+ public ApplicationRetryPolicy getApplicationRetryPolicy() {
+ ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.resetPolicy != null) {
+ return this.resetPolicy;
+ }
+ if (!p.hasAppRetryPolicy()) {
+ return null;
+ }
+ this.resetPolicy = convertFromProtoFormat(p.getAppRetryPolicy());
+ return this.resetPolicy;
+ }
+
+ @Override
+ public void setApplicationRetryPolicy(
+ ApplicationRetryPolicy resetPolicy) {
+ maybeInitBuilder();
+ if (resetPolicy == null)
+ builder.clearAppRetryPolicy();
+ this.resetPolicy = resetPolicy;
+ }
+
+ private ApplicationRetryPolicyPBImpl convertFromProtoFormat(
+ ApplicationRetryPolicyProto p) {
+ return new ApplicationRetryPolicyPBImpl(p);
+ }
+
+ private ApplicationRetryPolicyProto convertToProtoFormat(
+ ApplicationRetryPolicy t) {
+ return ((ApplicationRetryPolicyPBImpl) t).getProto();
+ }
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/MaxApplicationRetriesPolicyContextPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/MaxApplicationRetriesPolicyContextPBImpl.java
new file mode 100644
index 0000000..05c997f
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/MaxApplicationRetriesPolicyContextPBImpl.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records.impl.pb;
+
+import org.apache.hadoop.yarn.api.records.MaxApplicationRetriesPolicyContext;
+import org.apache.hadoop.yarn.proto.YarnProtos.MaxApplicationRetriesPolicyContextProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.MaxApplicationRetriesPolicyContextProtoOrBuilder;
+
+public class MaxApplicationRetriesPolicyContextPBImpl extends
+ MaxApplicationRetriesPolicyContext {
+
+ MaxApplicationRetriesPolicyContextProto proto =
+ MaxApplicationRetriesPolicyContextProto.getDefaultInstance();
+ MaxApplicationRetriesPolicyContextProto.Builder builder = null;
+ boolean viaProto = false;
+
+ public MaxApplicationRetriesPolicyContextPBImpl() {
+ builder = MaxApplicationRetriesPolicyContextProto.newBuilder();
+ }
+
+ public MaxApplicationRetriesPolicyContextPBImpl(
+ MaxApplicationRetriesPolicyContextProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public MaxApplicationRetriesPolicyContextProto getProto() {
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = MaxApplicationRetriesPolicyContextProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ @Override
+ public int getMaxAppAttempts() {
+ MaxApplicationRetriesPolicyContextProtoOrBuilder p =
+ viaProto ? proto : builder;
+ return p.getMaxAppAttempts();
+ }
+
+ @Override
+ public void setMaxAppAttempts(int maxAppAttempts) {
+ maybeInitBuilder();
+ builder.setMaxAppAttempts(maxAppAttempts);
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
index 4a3c137..5a567fc 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
@@ -26,6 +26,7 @@
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
+import org.apache.hadoop.yarn.api.records.ApplicationRetryPolicyType;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
@@ -39,6 +40,7 @@
import org.apache.hadoop.yarn.proto.YarnProtos.AMCommandProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAccessTypeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationRetryPolicyTypeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceTypeProto;
@@ -51,6 +53,7 @@
import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto;
import com.google.protobuf.ByteString;
+
import org.apache.hadoop.yarn.proto.YarnServiceProtos;
@Private
@@ -180,6 +183,19 @@ public static AMCommand convertFromProtoFormat(AMCommandProto e) {
}
/*
+ * ApplicationRetryPolicyType
+ */
+ public static ApplicationRetryPolicyTypeProto convertToProtoFormat(
+ ApplicationRetryPolicyType e) {
+ return ApplicationRetryPolicyTypeProto.valueOf(e.name());
+ }
+
+ public static ApplicationRetryPolicyType convertFromProtoFormat(
+ ApplicationRetryPolicyTypeProto e) {
+ return ApplicationRetryPolicyType.valueOf(e.name());
+ }
+
+ /*
* ByteBuffer
*/
public static ByteBuffer convertFromProtoFormat(ByteString byteString) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/WindowedApplicationRetriesPolicyContextPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/WindowedApplicationRetriesPolicyContextPBImpl.java
new file mode 100644
index 0000000..e24a6bd
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/WindowedApplicationRetriesPolicyContextPBImpl.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records.impl.pb;
+
+import org.apache.hadoop.yarn.api.records.WindowedApplicationRetriesPolicyContext;
+import org.apache.hadoop.yarn.proto.YarnProtos.WindowedApplicationRetriesPolicyContextProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.WindowedApplicationRetriesPolicyContextProtoOrBuilder;
+
+public class WindowedApplicationRetriesPolicyContextPBImpl extends
+ WindowedApplicationRetriesPolicyContext {
+
+ WindowedApplicationRetriesPolicyContextProto proto =
+ WindowedApplicationRetriesPolicyContextProto.getDefaultInstance();
+ WindowedApplicationRetriesPolicyContextProto.Builder builder = null;
+ boolean viaProto = false;
+
+ public WindowedApplicationRetriesPolicyContextPBImpl() {
+ builder = WindowedApplicationRetriesPolicyContextProto.newBuilder();
+ }
+
+ public WindowedApplicationRetriesPolicyContextPBImpl(
+ WindowedApplicationRetriesPolicyContextProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public WindowedApplicationRetriesPolicyContextProto getProto() {
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = WindowedApplicationRetriesPolicyContextProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ @Override
+ public long getResetTimeWindow() {
+ WindowedApplicationRetriesPolicyContextProtoOrBuilder p =
+ viaProto ? proto : builder;
+ return p.getResetTimeWindow();
+ }
+
+ @Override
+ public void setResetTimeWindow(long resetTimeWindow) {
+ maybeInitBuilder();
+ builder.setResetTimeWindow(resetTimeWindow);
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
index 243c7a1..4102102 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
@@ -272,6 +272,7 @@ private void loadRMAppState(RMState rmState) throws Exception {
new ApplicationAttemptState(attemptId,
attemptStateData.getMasterContainer(), credentials,
attemptStateData.getStartTime(),
+ attemptStateData.getEndTime(),
attemptStateData.getState(),
attemptStateData.getFinalTrackingUrl(),
attemptStateData.getDiagnostics(),
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
index 369f89a..d72605c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
@@ -163,8 +163,8 @@ public synchronized void updateApplicationAttemptStateInternal(
ApplicationAttemptState updatedAttemptState =
new ApplicationAttemptState(appAttemptId,
attemptStateData.getMasterContainer(), credentials,
- attemptStateData.getStartTime(), attemptStateData.getState(),
- attemptStateData.getFinalTrackingUrl(),
+ attemptStateData.getStartTime(), attemptStateData.getEndTime(),
+ attemptStateData.getState(), attemptStateData.getFinalTrackingUrl(),
attemptStateData.getDiagnostics(),
attemptStateData.getFinalApplicationStatus(),
attemptStateData.getAMContainerExitStatus());
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
index e2c4e7e..e7bfaf8 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
@@ -257,6 +257,7 @@ public RMStateStore() {
final Container masterContainer;
final Credentials appAttemptCredentials;
long startTime = 0;
+ long endTime = 0;
// fields set when attempt completes
RMAppAttemptState state;
String finalTrackingUrl = "N/A";
@@ -267,19 +268,20 @@ public RMStateStore() {
public ApplicationAttemptState(ApplicationAttemptId attemptId,
Container masterContainer, Credentials appAttemptCredentials,
long startTime) {
- this(attemptId, masterContainer, appAttemptCredentials, startTime, null,
- null, "", null, ContainerExitStatus.INVALID);
+ this(attemptId, masterContainer, appAttemptCredentials, startTime,
+ -1, null, null, "", null, ContainerExitStatus.INVALID);
}
public ApplicationAttemptState(ApplicationAttemptId attemptId,
Container masterContainer, Credentials appAttemptCredentials,
- long startTime, RMAppAttemptState state, String finalTrackingUrl,
- String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus,
- int exitStatus) {
+ long startTime, long endTime, RMAppAttemptState state,
+ String finalTrackingUrl, String diagnostics,
+ FinalApplicationStatus amUnregisteredFinalStatus, int exitStatus) {
this.attemptId = attemptId;
this.masterContainer = masterContainer;
this.appAttemptCredentials = appAttemptCredentials;
this.startTime = startTime;
+ this.endTime = endTime;
this.state = state;
this.finalTrackingUrl = finalTrackingUrl;
this.diagnostics = diagnostics == null ? "" : diagnostics;
@@ -308,6 +310,9 @@ public String getDiagnostics() {
public long getStartTime() {
return startTime;
}
+ public long getEndTime() {
+ return endTime;
+ }
public FinalApplicationStatus getFinalApplicationStatus() {
return amUnregisteredFinalStatus;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index 5644ad9..d0007e3 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -594,7 +594,8 @@ private void loadApplicationAttemptState(ApplicationState appState,
ApplicationAttemptState attemptState =
new ApplicationAttemptState(attemptId,
attemptStateData.getMasterContainer(), credentials,
- attemptStateData.getStartTime(), attemptStateData.getState(),
+ attemptStateData.getStartTime(), attemptStateData.getEndTime(),
+ attemptStateData.getState(),
attemptStateData.getFinalTrackingUrl(),
attemptStateData.getDiagnostics(),
attemptStateData.getFinalApplicationStatus(),
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java
index 90fb3ec..011016a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java
@@ -41,9 +41,10 @@
public abstract class ApplicationAttemptStateData {
public static ApplicationAttemptStateData newInstance(
ApplicationAttemptId attemptId, Container container,
- ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState,
- String finalTrackingUrl, String diagnostics,
- FinalApplicationStatus amUnregisteredFinalStatus, int exitStatus) {
+ ByteBuffer attemptTokens, long startTime, long endTime,
+ RMAppAttemptState finalState, String finalTrackingUrl,
+ String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus,
+ int exitStatus) {
ApplicationAttemptStateData attemptStateData =
Records.newRecord(ApplicationAttemptStateData.class);
attemptStateData.setAttemptId(attemptId);
@@ -53,6 +54,7 @@ public static ApplicationAttemptStateData newInstance(
attemptStateData.setFinalTrackingUrl(finalTrackingUrl);
attemptStateData.setDiagnostics(diagnostics);
attemptStateData.setStartTime(startTime);
+ attemptStateData.setEndTime(endTime);
attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus);
attemptStateData.setAMContainerExitStatus(exitStatus);
return attemptStateData;
@@ -69,9 +71,9 @@ public static ApplicationAttemptStateData newInstance(
}
return newInstance(attemptState.getAttemptId(),
attemptState.getMasterContainer(), appAttemptTokens,
- attemptState.getStartTime(), attemptState.getState(),
- attemptState.getFinalTrackingUrl(), attemptState.getDiagnostics(),
- attemptState.getFinalApplicationStatus(),
+ attemptState.getStartTime(), attemptState.getEndTime(),
+ attemptState.getState(), attemptState.getFinalTrackingUrl(),
+ attemptState.getDiagnostics(), attemptState.getFinalApplicationStatus(),
attemptState.getAMContainerExitStatus());
}
@@ -146,6 +148,14 @@ public static ApplicationAttemptStateData newInstance(
public abstract void setStartTime(long startTime);
/**
+ * Get the end time of the application.
+ * @return end time of the application
+ */
+ public abstract long getEndTime();
+
+ public abstract void setEndTime(long endTime);
+
+ /**
* Get the final finish status of the application.
* @return final finish status of the application
*/
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java
index a90bda4..2db8491 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java
@@ -229,6 +229,18 @@ public void setStartTime(long startTime) {
}
@Override
+ public long getEndTime() {
+ ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
+ return p.getEndTime();
+ }
+
+ @Override
+ public void setEndTime(long endTime) {
+ maybeInitBuilder();
+ builder.setEndTime(endTime);
+ }
+
+ @Override
public FinalApplicationStatus getFinalApplicationStatus() {
ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasFinalApplicationStatus()) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
index bf374b4..c903cd8 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
@@ -223,7 +223,13 @@ ApplicationReport createAndGetApplicationReport(String clientUserName,
* @return the external user-facing state of ApplicationMaster.
*/
YarnApplicationState createApplicationState();
-
+
+ /**
+ * Get number of failed attempts which exclude AM preemption,
+ * hardware failures or NM resync as attempt failure.
+ */
+ int getNumFailedAppAttempts();
+
/**
* Get RMAppMetrics of the {@link RMApp}.
*
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index efa1ee7..53eb726 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -46,7 +46,6 @@
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -69,6 +68,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppStartAttemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.retry.ApplicationRetriesPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.retry.ApplicationRetriesPolicyFactory;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -99,7 +100,6 @@
private final YarnScheduler scheduler;
private final ApplicationMasterService masterService;
private final StringBuilder diagnostics = new StringBuilder();
- private final int maxAppAttempts;
private final ReadLock readLock;
private final WriteLock writeLock;
private final Map attempts
@@ -129,6 +129,8 @@
Object transitionTodo;
+ private ApplicationRetriesPolicy retryPolicy;
+
private static final StateMachineFactory globalMaxAppAttempts) {
- this.maxAppAttempts = globalMaxAppAttempts;
- LOG.warn("The specific max attempts: " + individualMaxAppAttempts
- + " for application: " + applicationId.getId()
- + " is invalid, because it is out of the range [1, "
- + globalMaxAppAttempts + "]. Use the global max attempts instead.");
- } else {
- this.maxAppAttempts = individualMaxAppAttempts;
- }
-
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
this.writeLock = lock.writeLock();
@@ -359,6 +347,10 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
this.stateMachine = stateMachineFactory.make(this);
rmContext.getRMApplicationHistoryWriter().applicationStarted(this);
+
+ this.retryPolicy =
+ ApplicationRetriesPolicyFactory.getAMRetryCountResetPolicy(rmContext,
+ submissionContext, conf);
}
@Override
@@ -630,7 +622,7 @@ public StringBuilder getDiagnostics() {
@Override
public int getMaxAppAttempts() {
- return this.maxAppAttempts;
+ return this.retryPolicy.getMaxAppAttempts();
}
@Override
@@ -675,6 +667,8 @@ public void recover(RMState state) throws Exception{
createNewAttempt();
((RMAppAttemptImpl)this.currentAttempt).recover(state);
}
+
+ this.retryPolicy.recover();
}
private void createNewAttempt() {
@@ -682,12 +676,7 @@ private void createNewAttempt() {
ApplicationAttemptId.newInstance(applicationId, attempts.size() + 1);
RMAppAttempt attempt =
new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService,
- submissionContext, conf,
- // The newly created attempt maybe last attempt if (number of
- // previously failed attempts(which should not include Preempted,
- // hardware error and NM resync) + 1) equal to the max-attempt
- // limit.
- maxAppAttempts == (getNumFailedAppAttempts() + 1));
+ submissionContext, conf, this.retryPolicy.lastAttempt());
attempts.put(appAttemptId, attempt);
currentAttempt = attempt;
}
@@ -805,7 +794,7 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
&& (app.currentAttempt.getState() == RMAppAttemptState.KILLED
|| app.currentAttempt.getState() == RMAppAttemptState.FINISHED
|| (app.currentAttempt.getState() == RMAppAttemptState.FAILED
- && app.getNumFailedAppAttempts() == app.maxAppAttempts))) {
+ && app.retryPolicy.isLastAttempt()))) {
return RMAppState.ACCEPTED;
}
@@ -890,9 +879,9 @@ private String getAppAttemptFailedDiagnostics(RMAppEvent event) {
msg = "Unmanaged application " + this.getApplicationId()
+ " failed due to " + failedEvent.getDiagnostics()
+ ". Failing the application.";
- } else if (getNumFailedAppAttempts() >= this.maxAppAttempts) {
+ } else if (this.retryPolicy.isLastAttempt()) {
msg = "Application " + this.getApplicationId() + " failed "
- + this.maxAppAttempts + " times due to "
+ + getMaxAppAttempts() + " times due to "
+ failedEvent.getDiagnostics() + ". Failing the application.";
}
return msg;
@@ -980,6 +969,7 @@ public FinalSavingTransition(Object transitionToDo,
public void transition(RMAppImpl app, RMAppEvent event) {
app.rememberTargetTransitionsAndStoreState(event, transitionToDo,
targetedFinalState, stateToBeStored);
+ app.retryPolicy.stop();
}
}
@@ -1107,7 +1097,8 @@ public void transition(RMAppImpl app, RMAppEvent event) {
};
}
- private int getNumFailedAppAttempts() {
+ @Override
+ public int getNumFailedAppAttempts() {
int completedAttempts = 0;
// Do not count AM preemption, hardware failures or NM resync
// as attempt failure.
@@ -1132,7 +1123,7 @@ public AttemptFailedTransition(RMAppState initialState) {
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
if (!app.submissionContext.getUnmanagedAM()
- && app.getNumFailedAppAttempts() < app.maxAppAttempts) {
+ && ! app.retryPolicy.isLastAttempt()) {
boolean transferStateFromPreviousAttempt = false;
RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
transferStateFromPreviousAttempt =
@@ -1158,6 +1149,14 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
}
}
+ private static final class StartApplicationRetryPolicy extends
+ RMAppTransition {
+ @Override
+ public void transition(RMAppImpl app, RMAppEvent event) {
+ app.retryPolicy.start();
+ };
+ }
+
@Override
public String getApplicationType() {
return this.applicationType;
@@ -1194,7 +1193,7 @@ public static boolean isAppInFinalState(RMApp rmApp) {
return appState == RMAppState.FAILED || appState == RMAppState.FINISHED
|| appState == RMAppState.KILLED;
}
-
+
private RMAppState getRecoveredFinalState() {
return this.recoveredFinalState;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
index b5ed92c..9f27757 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
@@ -23,6 +23,7 @@
import javax.crypto.SecretKey;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
@@ -174,6 +175,12 @@
long getStartTime();
/**
+ * the end time of the application.
+ * @return the end time of the application.
+ */
+ long getEndTime();
+
+ /**
* The current state of the {@link RMAppAttempt}.
*
* @return the current state {@link RMAppAttemptState} for this application
@@ -207,6 +214,9 @@
*
*/
boolean shouldCountTowardsMaxAttemptRetry();
+
+ @Private
+ boolean mayBeLastAttempt();
/**
* Get metrics from the {@link RMAppAttempt}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index 50a0755..51f5ea6 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -37,6 +37,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
@@ -84,6 +85,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.retry.ApplicationRetriesPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -142,6 +144,7 @@
private String originalTrackingUrl = "N/A";
private String proxiedTrackingUrl = "N/A";
private long startTime = 0;
+ private long endTime = -1;
// Set to null initially. Will eventually get set
// if an RMAppAttemptUnregistrationEvent occurs
@@ -155,7 +158,8 @@
// re-created if this attempt is eventually failed because of preemption,
// hardware error or NM resync. So this flag indicates that this may be
// last attempt.
- private final boolean maybeLastAttempt;
+ private boolean maybeLastAttempt;
+
private static final ExpiredTransition EXPIRED_TRANSITION =
new ExpiredTransition();
@@ -713,6 +717,7 @@ public void recover(RMState state) throws Exception {
this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl);
this.finalStatus = attemptState.getFinalApplicationStatus();
this.startTime = attemptState.getStartTime();
+ this.endTime = attemptState.getEndTime();
}
public void transferStateFromPreviousAttempt(RMAppAttempt attempt) {
@@ -1006,10 +1011,12 @@ private void rememberTargetTransitionsAndStoreState(RMAppAttemptEvent event,
}
RMStateStore rmStore = rmContext.getStateStore();
+ this.setEndTime(System.currentTimeMillis());
ApplicationAttemptState attemptState =
new ApplicationAttemptState(applicationAttemptId, getMasterContainer(),
rmStore.getCredentialsFromAppAttempt(this), startTime,
- stateToBeStored, finalTrackingUrl, diags, finalStatus, exitStatus);
+ this.getEndTime(), stateToBeStored, finalTrackingUrl, diags,
+ finalStatus, exitStatus);
LOG.info("Updating application attempt " + applicationAttemptId
+ " with final state: " + targetedFinalState + ", and exit status: "
+ exitStatus);
@@ -1641,6 +1648,25 @@ public long getStartTime() {
}
@Override
+ public long getEndTime() {
+ try {
+ this.readLock.lock();
+ return this.endTime;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ private void setEndTime(long endTime) {
+ try {
+ this.writeLock.lock();
+ this.endTime = endTime;
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ @Override
public RMAppAttemptState getState() {
this.readLock.lock();
@@ -1730,8 +1756,30 @@ public ApplicationAttemptReport createApplicationAttemptReport() {
}
// for testing
+ @Override
public boolean mayBeLastAttempt() {
- return maybeLastAttempt;
+ try {
+ this.readLock.lock();
+ return maybeLastAttempt;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ /**
+ * Could be used by {@link ApplicationRetriesPolicy} to reset
+ * the maybeLastAttempt flag. It can happen when current attempt is
+ * the last attempt, but the {@link ApplicationRetriesPolicy} has been
+ * triggered.
+ */
+ @Private
+ public void setMaybeLastAttemptFlag(boolean maybeLastAttempt) {
+ try {
+ this.writeLock.lock();
+ this.maybeLastAttempt = maybeLastAttempt;
+ } finally {
+ this.writeLock.unlock();
+ }
}
@Override
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/retry/ApplicationRetriesPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/retry/ApplicationRetriesPolicy.java
new file mode 100644
index 0000000..c09a8a3
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/retry/ApplicationRetriesPolicy.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp.retry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+
+/**
+ * We have following resetPolicy available to use:
+ *
+ * - {@link MaxRetriesPolicy}. By default
+ * - {@link WindowedApplicationRetriesPolicy}
+ *
+ */
+@Private
+@Unstable
+public abstract class ApplicationRetriesPolicy {
+
+ private static final Log LOG = LogFactory
+ .getLog(ApplicationRetriesPolicy.class);
+
+ protected RMApp rmApp;
+ protected final RMContext rmContext;
+ protected final ApplicationSubmissionContext appContext;
+ protected int maxAppAttempts;
+
+
+ public ApplicationRetriesPolicy(RMContext rmContext,
+ ApplicationSubmissionContext appContext, Configuration conf) {
+ this.rmContext = rmContext;
+ this.appContext = appContext;
+ this.maxAppAttempts = calculateMaxAppAttempts(appContext, conf);
+ }
+
+ public abstract void start();
+
+ public abstract void stop();
+
+ public abstract void amRetryCountReset();
+
+ public abstract void recover();
+
+ public abstract int getMaxAppAttempts();
+
+ public abstract boolean isLastAttempt();
+
+ public abstract boolean lastAttempt();
+
+ public abstract int getMaxAttemptsFromAppContext(
+ ApplicationSubmissionContext appContext);
+
+ protected synchronized RMApp checkAndSetupRMApp() {
+ if (this.rmApp == null) {
+ this.rmApp =
+ this.rmContext.getRMApps().get(this.appContext.getApplicationId());
+ }
+ return this.rmApp;
+ }
+
+ private int calculateMaxAppAttempts(
+ ApplicationSubmissionContext appContext, Configuration conf) {
+ int globalMaxAppAttempts =
+ conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
+ int individualMaxAppAttempts = getMaxAttemptsFromAppContext(appContext);
+ if (individualMaxAppAttempts <= 0
+ || individualMaxAppAttempts > globalMaxAppAttempts) {
+ LOG.warn("The specific max attempts: " + individualMaxAppAttempts
+ + " for application: " + appContext.getApplicationId()
+ + " is invalid, because it is out of the range [1, "
+ + globalMaxAppAttempts + "]. Use the global max attempts instead.");
+ return globalMaxAppAttempts;
+ } else {
+ return individualMaxAppAttempts;
+ }
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/retry/ApplicationRetriesPolicyFactory.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/retry/ApplicationRetriesPolicyFactory.java
new file mode 100644
index 0000000..c908d46
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/retry/ApplicationRetriesPolicyFactory.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp.retry;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationRetryPolicyType;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+
+@Private
+@Unstable
+/**
+ * Factory for {@link ApplicationRetriesPolicy} implementations.
+ */
+public class ApplicationRetriesPolicyFactory {
+ /**
+ * Creates an instance of {@link ApplicationRetriesPolicy}
+ */
+ public static ApplicationRetriesPolicy getAMRetryCountResetPolicy(
+ RMContext rmContext, ApplicationSubmissionContext context,
+ Configuration conf) {
+ if (context.getApplicationRetryPolicy() != null) {
+ ApplicationRetryPolicyType policyType =
+ context.getApplicationRetryPolicy().getApplicationRetryPolicyType();
+ if (policyType != null
+ && ApplicationRetryPolicyType.WINDOWED_RETRIES_POLICY == policyType) {
+ return new WindowedApplicationRetriesPolicy(rmContext, context, conf);
+ }
+ }
+ return new MaxRetriesPolicy(rmContext, context, conf);
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/retry/MaxRetriesPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/retry/MaxRetriesPolicy.java
new file mode 100644
index 0000000..ddf56a3
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/retry/MaxRetriesPolicy.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp.retry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+
+public class MaxRetriesPolicy extends ApplicationRetriesPolicy {
+
+ public MaxRetriesPolicy(RMContext rmContext,
+ ApplicationSubmissionContext appContext, Configuration conf) {
+ super(rmContext, appContext, conf);
+ }
+
+ @Override
+ public void start() {
+ // DO NOTHING
+ }
+
+ @Override
+ public void stop() {
+ // DO NOTHING
+ }
+
+ @Override
+ public void amRetryCountReset() {
+ // DO NOTHING
+ }
+
+ @Override
+ public void recover() {
+ // DO NOTHING
+ }
+
+ @Override
+ public int getMaxAppAttempts() {
+ return this.maxAppAttempts;
+ }
+
+
+ @Override
+ public synchronized boolean isLastAttempt() {
+ if (checkAndSetupRMApp() == null) {
+ return true;
+ }
+ return this.rmApp.getNumFailedAppAttempts() >= this.maxAppAttempts;
+ }
+
+ @Override
+ public synchronized boolean lastAttempt() {
+ if (checkAndSetupRMApp() == null) {
+ return true;
+ }
+ return this.maxAppAttempts == (this.rmApp.getNumFailedAppAttempts() + 1);
+ }
+
+ @Override
+ public int getMaxAttemptsFromAppContext(
+ ApplicationSubmissionContext appContext) {
+ return appContext.getApplicationRetryPolicy() == null
+ || appContext.getApplicationRetryPolicy()
+ .getMaxApplicationRetriesPolicyContext() == null ? appContext
+ .getMaxAppAttempts() : appContext.getApplicationRetryPolicy()
+ .getMaxApplicationRetriesPolicyContext().getMaxAppAttempts();
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/retry/WindowedApplicationRetriesPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/retry/WindowedApplicationRetriesPolicy.java
new file mode 100644
index 0000000..a464751
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/retry/WindowedApplicationRetriesPolicy.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp.retry;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ApplicationRetryPolicy;
+import org.apache.hadoop.yarn.api.records.ApplicationRetryPolicyType;
+import org.apache.hadoop.yarn.api.records.WindowedApplicationRetriesPolicyContext;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
+
+/**
+ * The policy is used to reset AM retry count if current AppMaster successfully
+ * runs for a period of time. To use this policy, you need to pass
+ * {@link ApplicationRetryPolicyType#WINDOWED_RETRIES_POLICY}
+ * into {@link ApplicationRetryPolicy}, and specify
+ * {@link WindowedApplicationRetriesPolicyContext}
+ *
+ * @see ApplicationSubmissionContext
+ */
+public class WindowedApplicationRetriesPolicy extends
+ ApplicationRetriesPolicy {
+
+ private final long resetWindow;
+
+ private Timer timer;
+
+ private int retryCount = 0;
+
+ public WindowedApplicationRetriesPolicy(RMContext rmContext,
+ ApplicationSubmissionContext appContext, Configuration conf) {
+ super(rmContext, appContext, conf);
+ this.timer = null;
+ this.resetWindow = appContext.getApplicationRetryPolicy()
+ .getWindowedApplicationRetriesPolicyContext().getResetTimeWindow();
+ }
+
+ @Override
+ public synchronized void amRetryCountReset() {
+ if (checkAndSetupRMApp() == null) {
+ return;
+ }
+ // The formula we are using to calculate the lastAttempt:
+ // maxAppAttempts == getNumFailedAppAttempts() - attemptResetCount.
+ // To do the AM retry count reset, just set it
+ // as current number of FailedAppAttempts.
+ int attemptFailureCount = 0;
+ for (Entry entry : this.rmApp
+ .getAppAttempts().entrySet()) {
+ if (entry.getKey() != this.rmApp.getCurrentAppAttempt().getAppAttemptId()
+ && entry.getValue().shouldCountTowardsMaxAttemptRetry()) {
+ attemptFailureCount++;
+ }
+ }
+ this.retryCount = attemptFailureCount;
+ if (this.rmApp.getCurrentAppAttempt().mayBeLastAttempt()) {
+ ((RMAppAttemptImpl) this.rmApp.getCurrentAppAttempt())
+ .setMaybeLastAttemptFlag(false);
+ }
+ }
+
+ @Override
+ public synchronized void start() {
+ this.timer = new Timer();
+ this.timer.scheduleAtFixedRate(new retryCountResetActivator(), resetWindow,
+ resetWindow);
+ }
+
+ @Override
+ public synchronized void stop() {
+ if (this.timer != null) {
+ this.timer.cancel();
+ }
+ }
+
+ private class retryCountResetActivator extends TimerTask {
+ @Override
+ public void run() {
+ amRetryCountReset();
+ }
+ }
+
+ @Override
+ public synchronized void recover() {
+ if (checkAndSetupRMApp() == null) {
+ return;
+ }
+
+ int attemptFailureCount = 0;
+ // sort the AttemptIds
+ List attemptIds =
+ new ArrayList();
+ attemptIds.addAll(this.rmApp.getAppAttempts().keySet());
+ Collections.sort(attemptIds);
+
+ for (int i = 0; i < attemptIds.size(); i++) {
+ RMAppAttempt cur = this.rmApp.getAppAttempts().get(attemptIds.get(i));
+ if (cur.getEndTime() - cur.getStartTime() >= this.resetWindow) {
+ this.retryCount = attemptFailureCount;
+ }
+ if (cur.shouldCountTowardsMaxAttemptRetry()) {
+ attemptFailureCount++;
+ }
+ }
+ }
+
+ @Override
+ public synchronized boolean lastAttempt() {
+ if (checkAndSetupRMApp() == null) {
+ return true;
+ }
+ return this.maxAppAttempts ==
+ (this.rmApp.getNumFailedAppAttempts() + 1 - this.retryCount);
+ }
+
+ @Override
+ public synchronized boolean isLastAttempt() {
+ if (checkAndSetupRMApp() == null) {
+ return true;
+ }
+ return (this.rmApp.getNumFailedAppAttempts() - this.retryCount
+ >= this.maxAppAttempts);
+ }
+
+ @Override
+ public int getMaxAppAttempts() {
+ return this.maxAppAttempts;
+ }
+
+ @Override
+ public int getMaxAttemptsFromAppContext(
+ ApplicationSubmissionContext appContext) {
+ return appContext.getMaxAppAttempts();
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 6949a81..09a06c0 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -42,12 +42,14 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationRetryPolicy;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.MaxApplicationRetriesPolicyContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -283,6 +285,26 @@ public RMApp submitApp(int masterMemory, String name, String user,
int maxAppAttempts, Credentials ts, String appType,
boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
ApplicationId applicationId) throws Exception {
+ return submitApp(masterMemory, name, user, acls, unmanaged, queue,
+ maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
+ isAppIdProvided, applicationId, null);
+ }
+
+ public RMApp submitApp(int masterMemory,
+ ApplicationRetryPolicy resetPolicy) throws Exception {
+ return submitApp(masterMemory, "", UserGroupInformation.getCurrentUser()
+ .getShortUserName(), null, false, null,
+ super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false,
+ false, null, resetPolicy);
+ }
+
+ public RMApp submitApp(int masterMemory, String name, String user,
+ Map acls, boolean unmanaged, String queue,
+ int maxAppAttempts, Credentials ts, String appType,
+ boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
+ ApplicationId applicationId, ApplicationRetryPolicy resetPolicy)
+ throws Exception {
ApplicationId appId = isAppIdProvided ? applicationId : null;
ApplicationClientProtocol client = getClientRMService();
if (! isAppIdProvided) {
@@ -318,6 +340,13 @@ public RMApp submitApp(int masterMemory, String name, String user,
clc.setTokens(securityTokens);
}
sub.setAMContainerSpec(clc);
+ if (resetPolicy != null) {
+ sub.setApplicationRetryPolicy(resetPolicy);
+ } else {
+ sub.setApplicationRetryPolicy(ApplicationRetryPolicy
+ .newInstance(MaxApplicationRetriesPolicyContext
+ .newInstance(maxAppAttempts)));
+ }
req.setApplicationSubmissionContext(sub);
UserGroupInformation fakeUser =
UserGroupInformation.createUserForTesting(user, new String[] {"someGroup"});
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
index ff60fcd..765d7cc 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
@@ -169,6 +169,11 @@ public YarnApplicationState createApplicationState() {
}
@Override
+ public int getNumFailedAppAttempts() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
public RMAppMetrics getRMAppMetrics() {
return new RMAppMetrics(Resource.newInstance(0, 0), 0, 0);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
index a61f23f..a61a87f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
@@ -317,8 +317,8 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
new ApplicationAttemptState(oldAttemptState.getAttemptId(),
oldAttemptState.getMasterContainer(),
oldAttemptState.getAppAttemptCredentials(),
- oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
- "myTrackingUrl", "attemptDiagnostics",
+ oldAttemptState.getStartTime(), oldAttemptState.getEndTime(),
+ RMAppAttemptState.FINISHED, "myTrackingUrl", "attemptDiagnostics",
FinalApplicationStatus.SUCCEEDED, 100);
store.updateApplicationAttemptState(newAttemptState);
@@ -340,8 +340,8 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
new ApplicationAttemptState(dummyAttemptId,
oldAttemptState.getMasterContainer(),
oldAttemptState.getAppAttemptCredentials(),
- oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
- "myTrackingUrl", "attemptDiagnostics",
+ oldAttemptState.getStartTime(), oldAttemptState.getEndTime(),
+ RMAppAttemptState.FINISHED, "myTrackingUrl", "attemptDiagnostics",
FinalApplicationStatus.SUCCEEDED, 111);
store.updateApplicationAttemptState(dummyAttempt);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
index b63d2fe..fabaa8c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
@@ -239,7 +239,12 @@ public YarnApplicationState createApplicationState() {
public Set getRanNodes() {
return null;
}
-
+
+ @Override
+ public int getNumFailedAppAttempts() {
+ return 0;
+ }
+
public Resource getResourcePreempted() {
throw new UnsupportedOperationException("Not supported yet.");
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationRetryPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationRetryPolicy.java
new file mode 100644
index 0000000..4d7d810
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationRetryPolicy.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
+
+import java.util.Collections;
+
+import org.apache.hadoop.yarn.api.records.ApplicationRetryPolicy;
+import org.apache.hadoop.yarn.api.records.ApplicationRetryPolicyType;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.MaxApplicationRetriesPolicyContext;
+import org.apache.hadoop.yarn.api.records.WindowedApplicationRetriesPolicyContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestApplicationRetryPolicy {
+
+ @Test (timeout = 50000)
+ public void testWindowedApplicationRetriesPolicy() throws Exception {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+ conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+ // explicitly set max-am-retry count as 2.
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+
+ MockRM rm1 = new MockRM(conf, memStore);
+ rm1.start();
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
+ nm1.registerNode();
+ WindowedApplicationRetriesPolicyContext retryContext =
+ WindowedApplicationRetriesPolicyContext.newInstance(4000);
+ ApplicationRetryPolicy resetPolicy =
+ ApplicationRetryPolicy
+ .newInstance(
+ ApplicationRetryPolicyType.WINDOWED_RETRIES_POLICY,
+ retryContext);
+ RMApp app1 = rm1.submitApp(200, resetPolicy);
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ // Fail attempt1 normally
+ nm1
+ .nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+ am1.waitForState(RMAppAttemptState.FAILED);
+ // launch the second attempt
+ rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+ Assert.assertEquals(2, app1.getAppAttempts().size());
+ RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
+ Assert.assertTrue(((RMAppAttemptImpl) attempt2).mayBeLastAttempt());
+ MockAM am2 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+ am2.waitForState(RMAppAttemptState.RUNNING);
+ // wait for 4 second for WindowsSlideAMRetryCountResetPolicy to reset
+ Thread.sleep(4000);
+ // Right now Attempt2 is not the last Attempt
+ Assert.assertTrue(!((RMAppAttemptImpl) attempt2).mayBeLastAttempt());
+
+ // Fail attempt2 normally
+ nm1
+ .nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+ am2.waitForState(RMAppAttemptState.FAILED);
+
+ // launch the third attempt
+ rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+ Assert.assertEquals(3, app1.getAppAttempts().size());
+ RMAppAttempt attempt3 = app1.getCurrentAppAttempt();
+ MockAM am3 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+ am3.waitForState(RMAppAttemptState.RUNNING);
+
+ // Restart rm.
+ @SuppressWarnings("resource")
+ MockRM rm2 = new MockRM(conf, memStore);
+ rm2.start();
+
+ // re-register the NM
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+ NMContainerStatus status = Records.newRecord(NMContainerStatus.class);
+ status
+ .setContainerExitStatus(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER);
+ status.setContainerId(attempt3.getMasterContainer().getId());
+ status.setContainerState(ContainerState.COMPLETE);
+ status.setDiagnostics("");
+ nm1.registerNode(Collections.singletonList(status), null);
+
+ rm2.waitForState(attempt3.getAppAttemptId(), RMAppAttemptState.FAILED);
+
+ rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+
+ // Lauch Attempt 4 and it should be the last Attempt
+ MockAM am4 =
+ rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 4, nm1);
+ RMAppAttempt attempt4 =
+ rm2.getRMContext().getRMApps().get(app1.getApplicationId())
+ .getCurrentAppAttempt();
+ Assert.assertTrue(((RMAppAttemptImpl) attempt4).mayBeLastAttempt());
+
+ // wait for 4 second for WindowsSlideAMRetryCountResetPolicy to reset
+ Thread.sleep(4000);
+ // Right now Attempt4 is not the last Attempt
+ Assert.assertTrue(!((RMAppAttemptImpl) attempt4).mayBeLastAttempt());
+
+ // Fail attempt4 normally
+ nm1
+ .nodeHeartbeat(am4.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+ am4.waitForState(RMAppAttemptState.FAILED);
+ rm1.stop();
+ rm2.stop();
+ }
+
+ @Test (timeout = 50000)
+ public void testMaxRetriesPolicy() throws Exception {
+ // set Global max-am-retry count as 5
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 5);
+
+ MockRM rm = new MockRM(conf);
+ rm.start();
+ MockNM nm1 =
+ new MockNM("127.0.0.1:234", 8000, rm.getResourceTrackerService());
+ nm1.registerNode();
+
+ // prepare MaxApplicationRetriesPolicyContext
+ // and set maxAppAttempts as 2
+ MaxApplicationRetriesPolicyContext retryPolicyContext =
+ MaxApplicationRetriesPolicyContext.newInstance(2);
+ ApplicationRetryPolicy resetPolicy =
+ ApplicationRetryPolicy.newInstance(retryPolicyContext);
+ RMApp app1 = rm.submitApp(200, resetPolicy);
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
+ // Fail attempt1 normally
+ nm1.nodeHeartbeat(am1.getApplicationAttemptId(),
+ 1, ContainerState.COMPLETE);
+ am1.waitForState(RMAppAttemptState.FAILED);
+ // launch the second attempt
+ rm.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+ Assert.assertEquals(2, app1.getAppAttempts().size());
+ RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
+ //It should be the last Attempt
+ Assert.assertTrue(((RMAppAttemptImpl) attempt2).mayBeLastAttempt());
+ MockAM am2 = MockRM.launchAndRegisterAM(app1, rm, nm1);
+ am2.waitForState(RMAppAttemptState.RUNNING);
+ // Fail attempt2 normally
+ nm1.nodeHeartbeat(am2.getApplicationAttemptId(),
+ 1, ContainerState.COMPLETE);
+ am2.waitForState(RMAppAttemptState.FAILED);
+
+ // We set maxAppAttempts as 2 in MaxApplicationRetriesPolicyContext
+ // So, we can only try two attempts.
+ rm.waitForState(app1.getApplicationId(), RMAppState.FAILED);
+ rm.stop();
+ }
+}