diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationRetryPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationRetryPolicy.java new file mode 100644 index 0000000..d50ffde --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationRetryPolicy.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.util.Records; + +/** + *

+ * ApplicationRetryPolicy includes: + *

+ * By default {@link ApplicationRetryPolicyType#MAX_RETRIES_POLICY} will be used. + * + *

+ */ +@Public +@Unstable +public abstract class ApplicationRetryPolicy { + + public static final String errorMessage = "ApplicationRetryPolicyType" + + " and ApplicationRetriesPolicyContext are mis-matching."; + + public static ApplicationRetryPolicy newInstance( + MaxApplicationRetriesPolicyContext context) { + ApplicationRetryPolicy retryPolicy = + Records.newRecord(ApplicationRetryPolicy.class); + retryPolicy.setApplicationRetryPolicyType( + ApplicationRetryPolicyType.MAX_RETRIES_POLICY); + retryPolicy.setMaxApplicationRetriesPolicyContext(context); + return retryPolicy; + } + + public static ApplicationRetryPolicy newInstance( + ApplicationRetryPolicyType policyType, + MaxApplicationRetriesPolicyContext maxApplicationRetriesPolicyContext) { + if (policyType != ApplicationRetryPolicyType.MAX_RETRIES_POLICY) { + throw new IllegalArgumentException(errorMessage + + "The ApplicationRetryPolicyType should be specified as " + + ApplicationRetryPolicyType.MAX_RETRIES_POLICY + + " to use MaxApplicationRetriesPolicy"); + } + ApplicationRetryPolicy retryPolicy = + Records.newRecord(ApplicationRetryPolicy.class); + retryPolicy.setApplicationRetryPolicyType(policyType); + retryPolicy.setMaxApplicationRetriesPolicyContext( + maxApplicationRetriesPolicyContext); + return retryPolicy; + } + + public static ApplicationRetryPolicy newInstance( + ApplicationRetryPolicyType policyType, + WindowedApplicationRetriesPolicyContext + windowedApplicationRetriesPolicyContext) { + if (policyType != ApplicationRetryPolicyType.WINDOWED_RETRIES_POLICY) { + throw new IllegalArgumentException(errorMessage + + "The ApplicationRetryPolicyType should be specified as " + + ApplicationRetryPolicyType.WINDOWED_RETRIES_POLICY + + " to use WindowedApplicationRetriesPolicy"); + } + ApplicationRetryPolicy retryPolicy = + Records.newRecord(ApplicationRetryPolicy.class); + retryPolicy.setApplicationRetryPolicyType(policyType); + retryPolicy.setWindowedApplicationRetriesPolicyContext( + windowedApplicationRetriesPolicyContext); + return retryPolicy; + } + /** + * Get {@link ApplicationRetryPolicyType} + */ + @Public + @Stable + public abstract ApplicationRetryPolicyType getApplicationRetryPolicyType(); + + @Private + @Stable + public abstract void setApplicationRetryPolicyType(ApplicationRetryPolicyType type); + + /** + * Get {@link MaxApplicationRetriesPolicyContext} + */ + @Public + @Stable + public abstract MaxApplicationRetriesPolicyContext + getMaxApplicationRetriesPolicyContext(); + + @Private + @Stable + public abstract void setMaxApplicationRetriesPolicyContext( + MaxApplicationRetriesPolicyContext context); + + /** + * Get {@link WindowedApplicationRetriesPolicyContext} + */ + @Public + @Stable + public abstract WindowedApplicationRetriesPolicyContext + getWindowedApplicationRetriesPolicyContext(); + + @Private + @Stable + public abstract void setWindowedApplicationRetriesPolicyContext( + WindowedApplicationRetriesPolicyContext context); + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationRetryPolicyType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationRetryPolicyType.java new file mode 100644 index 0000000..f9da93a --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationRetryPolicyType.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + * Application Retry Policy Types. + */ +@Public +@Unstable +public enum ApplicationRetryPolicyType { + + /** + * Application can be retried for certain number of times + */ + MAX_RETRIES_POLICY, + + /** + * The policy is used to reset AM retry count if current AppMaster successfully + * runs for a period of time. + */ + WINDOWED_RETRIES_POLICY +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java index 1ee04f0..face42b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java @@ -35,17 +35,20 @@ * information needed by the ResourceManager to launch * the ApplicationMaster for an application.

* - *

It includes details such as: - *

+ *

+ * It includes details such as: + *

*

* * @see ContainerLaunchContext @@ -56,13 +59,14 @@ public abstract class ApplicationSubmissionContext { @Public - @Stable + @Unstable public static ApplicationSubmissionContext newInstance( ApplicationId applicationId, String applicationName, String queue, Priority priority, ContainerLaunchContext amContainer, boolean isUnmanagedAM, boolean cancelTokensWhenComplete, int maxAppAttempts, Resource resource, String applicationType, - boolean keepContainers) { + boolean keepContainers, + ApplicationRetryPolicy resetPolicy) { ApplicationSubmissionContext context = Records.newRecord(ApplicationSubmissionContext.class); context.setApplicationId(applicationId); @@ -76,6 +80,7 @@ public static ApplicationSubmissionContext newInstance( context.setResource(resource); context.setApplicationType(applicationType); context.setKeepContainersAcrossApplicationAttempts(keepContainers); + context.setApplicationRetryPolicy(resetPolicy); return context; } @@ -85,6 +90,22 @@ public static ApplicationSubmissionContext newInstance( ApplicationId applicationId, String applicationName, String queue, Priority priority, ContainerLaunchContext amContainer, boolean isUnmanagedAM, boolean cancelTokensWhenComplete, + int maxAppAttempts, Resource resource, String applicationType, + boolean keepContainers) { + ApplicationRetryPolicy retryPolicy = + ApplicationRetryPolicy.newInstance(MaxApplicationRetriesPolicyContext + .newInstance(maxAppAttempts)); + return newInstance(applicationId, applicationName, queue, priority, + amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts, + resource, applicationType, keepContainers, retryPolicy); + } + + @Public + @Stable + public static ApplicationSubmissionContext newInstance( + ApplicationId applicationId, String applicationName, String queue, + Priority priority, ContainerLaunchContext amContainer, + boolean isUnmanagedAM, boolean cancelTokensWhenComplete, int maxAppAttempts, Resource resource, String applicationType) { return newInstance(applicationId, applicationName, queue, priority, amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts, @@ -338,4 +359,20 @@ public abstract void setKeepContainersAcrossApplicationAttempts( @Public @Stable public abstract void setApplicationTags(Set tags); -} \ No newline at end of file + + /** + * Get {@link ApplicationRetryPolicy} + */ + @Public + @Unstable + public abstract ApplicationRetryPolicy + getApplicationRetryPolicy(); + + /** + * Set {@link ApplicationRetryPolicy} + */ + @Public + @Unstable + public abstract void setApplicationRetryPolicy( + ApplicationRetryPolicy resetPolicy); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/MaxApplicationRetriesPolicyContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/MaxApplicationRetriesPolicyContext.java new file mode 100644 index 0000000..8491691 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/MaxApplicationRetriesPolicyContext.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.yarn.util.Records; + +/** + * The MaxApplicationRetriesPolicyContext should be specified + * when ApplicationRetryPolicyType.MAX_RETRIES_POLICY is specified + * in {@link ApplicationRetryPolicy} + * + *

+ *

+ *

+ */ +@Public +@Stable +public abstract class MaxApplicationRetriesPolicyContext { + public static final String exceptionMessage = + "The maxAppAttempts must be larger than 0"; + + public static MaxApplicationRetriesPolicyContext newInstance() { + MaxApplicationRetriesPolicyContext context = + Records.newRecord(MaxApplicationRetriesPolicyContext.class); + return context; + } + + public static MaxApplicationRetriesPolicyContext newInstance( + int maxAppAttempts) { + if (maxAppAttempts <= 0) { + throw new IllegalArgumentException(exceptionMessage); + } + MaxApplicationRetriesPolicyContext context = + Records.newRecord(MaxApplicationRetriesPolicyContext.class); + context.setMaxAppAttempts(maxAppAttempts); + return context; + } + + @Public + @Stable + public abstract int getMaxAppAttempts(); + + @Public + @Stable + public abstract void setMaxAppAttempts(int maxAppAttempts); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/WindowedApplicationRetriesPolicyContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/WindowedApplicationRetriesPolicyContext.java new file mode 100644 index 0000000..633e51c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/WindowedApplicationRetriesPolicyContext.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.yarn.util.Records; + +/** + * The WindowedApplicationRetriesPolicyContext should be specified + * when ApplicationRetryPolicyType.WINDOWED_RETRIES_POLICY is specified + * in {@link ApplicationRetryPolicy} + * + *

+ *

+ *

+ */ +@Public +@Stable +public abstract class WindowedApplicationRetriesPolicyContext { + public static final String exceptionMessage = + "The resetTimeWindow must be larger than 0"; + + public static WindowedApplicationRetriesPolicyContext newInstance() { + WindowedApplicationRetriesPolicyContext context = + Records.newRecord(WindowedApplicationRetriesPolicyContext.class); + return context; + } + + public static WindowedApplicationRetriesPolicyContext newInstance( + long resetTimeWindow) { + if (resetTimeWindow <= 0) { + throw new IllegalArgumentException(exceptionMessage); + } + WindowedApplicationRetriesPolicyContext context = + Records.newRecord(WindowedApplicationRetriesPolicyContext.class); + context.setResetTimeWindow(resetTimeWindow); + return context; + } + + @Public + @Stable + public abstract long getResetTimeWindow(); + + @Public + @Stable + public abstract void setResetTimeWindow(long resetTimeWindow); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto index 08c937f..20e8abe 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto @@ -128,6 +128,7 @@ message ApplicationAttemptStateDataProto { optional int64 start_time = 7; optional FinalApplicationStatusProto final_application_status = 8; optional int32 am_container_exit_status = 9 [default = -1000]; + optional int64 end_time = 10; } message EpochProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 3f1fa6c..e70cbfb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -288,6 +288,28 @@ message ApplicationSubmissionContextProto { optional string applicationType = 10 [default = "YARN"]; optional bool keep_containers_across_application_attempts = 11 [default = false]; repeated string applicationTags = 12; + optional ApplicationRetryPolicyProto app_retry_policy = 13; +} + +message ApplicationRetryPolicyProto { + optional ApplicationRetryPolicyTypeProto appRetryPolicyType = 1 [default = MAX_RETRIES_POLICY]; + // Only one of the following are accepted based on the type + // Each is a context object specific to the policy type + optional MaxApplicationRetriesPolicyContextProto max_app_retry_policy_context = 2; + optional WindowedApplicationRetriesPolicyContextProto windowed_app_retry_policy_context = 3; +} + +enum ApplicationRetryPolicyTypeProto { + MAX_RETRIES_POLICY = 1; + WINDOWED_RETRIES_POLICY = 2; +} + +message MaxApplicationRetriesPolicyContextProto { + optional int32 maxAppAttempts = 1 [default = 2]; +} + +message WindowedApplicationRetriesPolicyContextProto { + optional int64 reset_time_window = 1 [default = 86400]; } enum ApplicationAccessTypeProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationRetryPolicyPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationRetryPolicyPBImpl.java new file mode 100644 index 0000000..cd315b0 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationRetryPolicyPBImpl.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.impl.pb; + +import org.apache.hadoop.yarn.api.records.ApplicationRetryPolicy; +import org.apache.hadoop.yarn.api.records.ApplicationRetryPolicyType; +import org.apache.hadoop.yarn.api.records.MaxApplicationRetriesPolicyContext; +import org.apache.hadoop.yarn.api.records.WindowedApplicationRetriesPolicyContext; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationRetryPolicyProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationRetryPolicyProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnProtos.MaxApplicationRetriesPolicyContextProto; +import org.apache.hadoop.yarn.proto.YarnProtos.WindowedApplicationRetriesPolicyContextProto; +import com.google.protobuf.TextFormat; + +public class ApplicationRetryPolicyPBImpl extends ApplicationRetryPolicy { + + ApplicationRetryPolicyProto proto = ApplicationRetryPolicyProto + .getDefaultInstance(); + ApplicationRetryPolicyProto.Builder builder = null; + boolean viaProto = false; + + private WindowedApplicationRetriesPolicyContext windowd_app_retry_policy = + null; + private MaxApplicationRetriesPolicyContext max_app_retry_policy = null; + + public ApplicationRetryPolicyPBImpl() { + builder = ApplicationRetryPolicyProto.newBuilder(); + } + + public ApplicationRetryPolicyPBImpl(ApplicationRetryPolicyProto proto) { + this.proto = proto; + viaProto = true; + } + + public ApplicationRetryPolicyProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private void mergeLocalToBuilder() { + if (this.max_app_retry_policy != null) { + builder + .setMaxAppRetryPolicyContext(convertToProtoFormat(this.max_app_retry_policy)); + } + if (this.windowd_app_retry_policy != null) { + builder + .setWindowedAppRetryPolicyContext(convertToProtoFormat(this.windowd_app_retry_policy)); + } + } + + private WindowedApplicationRetriesPolicyContextProto convertToProtoFormat( + WindowedApplicationRetriesPolicyContext r) { + return ((WindowedApplicationRetriesPolicyContextPBImpl) r).getProto(); + } + + private MaxApplicationRetriesPolicyContextProto convertToProtoFormat( + MaxApplicationRetriesPolicyContext r) { + return ((MaxApplicationRetriesPolicyContextPBImpl) r).getProto(); + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ApplicationRetryPolicyProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public ApplicationRetryPolicyType getApplicationRetryPolicyType() { + ApplicationRetryPolicyProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasAppRetryPolicyType()) { + return null; + } + return ProtoUtils.convertFromProtoFormat(p.getAppRetryPolicyType()); + } + + @Override + public void setApplicationRetryPolicyType(ApplicationRetryPolicyType type) { + maybeInitBuilder(); + if (type == null) { + builder.clearAppRetryPolicyType(); + return; + } + builder.setAppRetryPolicyType(ProtoUtils.convertToProtoFormat(type)); + } + + @Override + public MaxApplicationRetriesPolicyContext + getMaxApplicationRetriesPolicyContext() { + ApplicationRetryPolicyProtoOrBuilder p = viaProto ? proto : builder; + if (this.max_app_retry_policy != null) { + return this.max_app_retry_policy; + } + if (!p.hasMaxAppRetryPolicyContext()) { + return null; + } + this.max_app_retry_policy = + convertFromProtoFormat(p.getMaxAppRetryPolicyContext()); + return this.max_app_retry_policy; + } + + private MaxApplicationRetriesPolicyContextPBImpl convertFromProtoFormat( + MaxApplicationRetriesPolicyContextProto maxAppRetryPolicyContext) { + return new MaxApplicationRetriesPolicyContextPBImpl( + maxAppRetryPolicyContext); + } + + @Override + public void setMaxApplicationRetriesPolicyContext( + MaxApplicationRetriesPolicyContext context) { + maybeInitBuilder(); + if (null == context) { + builder.clearMaxAppRetryPolicyContext(); + } + this.max_app_retry_policy = context; + } + + @Override + public WindowedApplicationRetriesPolicyContext + getWindowedApplicationRetriesPolicyContext() { + ApplicationRetryPolicyProtoOrBuilder p = viaProto ? proto : builder; + if (this.windowd_app_retry_policy != null) { + return this.windowd_app_retry_policy; + } + if (!p.hasWindowedAppRetryPolicyContext()) { + return null; + } + this.windowd_app_retry_policy = + convertFromProtoFormat(p.getWindowedAppRetryPolicyContext()); + return this.windowd_app_retry_policy; + } + + private WindowedApplicationRetriesPolicyContextPBImpl + convertFromProtoFormat(WindowedApplicationRetriesPolicyContextProto + windowedAppRetryPolicyContext) { + return new WindowedApplicationRetriesPolicyContextPBImpl( + windowedAppRetryPolicyContext); + } + + @Override + public void setWindowedApplicationRetriesPolicyContext( + WindowedApplicationRetriesPolicyContext context) { + maybeInitBuilder(); + if (null == context) { + builder.clearMaxAppRetryPolicyContext(); + } + this.windowd_app_retry_policy = context; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java index c2f3268..6b13eeb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java @@ -19,15 +19,18 @@ package org.apache.hadoop.yarn.api.records.impl.pb; import com.google.common.base.CharMatcher; + import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationRetryPolicy; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationRetryPolicyProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto; @@ -53,6 +56,7 @@ private ContainerLaunchContext amContainer = null; private Resource resource = null; private Set applicationTags = null; + private ApplicationRetryPolicy resetPolicy = null; public ApplicationSubmissionContextPBImpl() { builder = ApplicationSubmissionContextProto.newBuilder(); @@ -110,6 +114,9 @@ private void mergeLocalToBuilder() { builder.clearApplicationTags(); builder.addAllApplicationTags(this.applicationTags); } + if (this.resetPolicy != null) { + builder.setAppRetryPolicy(convertToProtoFormat(this.resetPolicy)); + } } private void mergeLocalToProto() { @@ -402,4 +409,37 @@ private ResourcePBImpl convertFromProtoFormat(ResourceProto p) { private ResourceProto convertToProtoFormat(Resource t) { return ((ResourcePBImpl)t).getProto(); } + + @Override + public ApplicationRetryPolicy getApplicationRetryPolicy() { + ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; + if (this.resetPolicy != null) { + return this.resetPolicy; + } + if (!p.hasAppRetryPolicy()) { + return null; + } + this.resetPolicy = convertFromProtoFormat(p.getAppRetryPolicy()); + return this.resetPolicy; + } + + @Override + public void setApplicationRetryPolicy( + ApplicationRetryPolicy resetPolicy) { + maybeInitBuilder(); + if (resetPolicy == null) + builder.clearAppRetryPolicy(); + this.resetPolicy = resetPolicy; + } + + private ApplicationRetryPolicyPBImpl convertFromProtoFormat( + ApplicationRetryPolicyProto p) { + return new ApplicationRetryPolicyPBImpl(p); + } + + private ApplicationRetryPolicyProto convertToProtoFormat( + ApplicationRetryPolicy t) { + return ((ApplicationRetryPolicyPBImpl) t).getProto(); + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/MaxApplicationRetriesPolicyContextPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/MaxApplicationRetriesPolicyContextPBImpl.java new file mode 100644 index 0000000..05c997f --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/MaxApplicationRetriesPolicyContextPBImpl.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.impl.pb; + +import org.apache.hadoop.yarn.api.records.MaxApplicationRetriesPolicyContext; +import org.apache.hadoop.yarn.proto.YarnProtos.MaxApplicationRetriesPolicyContextProto; +import org.apache.hadoop.yarn.proto.YarnProtos.MaxApplicationRetriesPolicyContextProtoOrBuilder; + +public class MaxApplicationRetriesPolicyContextPBImpl extends + MaxApplicationRetriesPolicyContext { + + MaxApplicationRetriesPolicyContextProto proto = + MaxApplicationRetriesPolicyContextProto.getDefaultInstance(); + MaxApplicationRetriesPolicyContextProto.Builder builder = null; + boolean viaProto = false; + + public MaxApplicationRetriesPolicyContextPBImpl() { + builder = MaxApplicationRetriesPolicyContextProto.newBuilder(); + } + + public MaxApplicationRetriesPolicyContextPBImpl( + MaxApplicationRetriesPolicyContextProto proto) { + this.proto = proto; + viaProto = true; + } + + public MaxApplicationRetriesPolicyContextProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = MaxApplicationRetriesPolicyContextProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public int getMaxAppAttempts() { + MaxApplicationRetriesPolicyContextProtoOrBuilder p = + viaProto ? proto : builder; + return p.getMaxAppAttempts(); + } + + @Override + public void setMaxAppAttempts(int maxAppAttempts) { + maybeInitBuilder(); + builder.setMaxAppAttempts(maxAppAttempts); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java index 4a3c137..5a567fc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; +import org.apache.hadoop.yarn.api.records.ApplicationRetryPolicyType; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LocalResourceType; @@ -39,6 +40,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.AMCommandProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAccessTypeProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationRetryPolicyTypeProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStateProto; import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceTypeProto; @@ -51,6 +53,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto; import com.google.protobuf.ByteString; + import org.apache.hadoop.yarn.proto.YarnServiceProtos; @Private @@ -180,6 +183,19 @@ public static AMCommand convertFromProtoFormat(AMCommandProto e) { } /* + * ApplicationRetryPolicyType + */ + public static ApplicationRetryPolicyTypeProto convertToProtoFormat( + ApplicationRetryPolicyType e) { + return ApplicationRetryPolicyTypeProto.valueOf(e.name()); + } + + public static ApplicationRetryPolicyType convertFromProtoFormat( + ApplicationRetryPolicyTypeProto e) { + return ApplicationRetryPolicyType.valueOf(e.name()); + } + + /* * ByteBuffer */ public static ByteBuffer convertFromProtoFormat(ByteString byteString) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/WindowedApplicationRetriesPolicyContextPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/WindowedApplicationRetriesPolicyContextPBImpl.java new file mode 100644 index 0000000..e24a6bd --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/WindowedApplicationRetriesPolicyContextPBImpl.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.impl.pb; + +import org.apache.hadoop.yarn.api.records.WindowedApplicationRetriesPolicyContext; +import org.apache.hadoop.yarn.proto.YarnProtos.WindowedApplicationRetriesPolicyContextProto; +import org.apache.hadoop.yarn.proto.YarnProtos.WindowedApplicationRetriesPolicyContextProtoOrBuilder; + +public class WindowedApplicationRetriesPolicyContextPBImpl extends + WindowedApplicationRetriesPolicyContext { + + WindowedApplicationRetriesPolicyContextProto proto = + WindowedApplicationRetriesPolicyContextProto.getDefaultInstance(); + WindowedApplicationRetriesPolicyContextProto.Builder builder = null; + boolean viaProto = false; + + public WindowedApplicationRetriesPolicyContextPBImpl() { + builder = WindowedApplicationRetriesPolicyContextProto.newBuilder(); + } + + public WindowedApplicationRetriesPolicyContextPBImpl( + WindowedApplicationRetriesPolicyContextProto proto) { + this.proto = proto; + viaProto = true; + } + + public WindowedApplicationRetriesPolicyContextProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = WindowedApplicationRetriesPolicyContextProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public long getResetTimeWindow() { + WindowedApplicationRetriesPolicyContextProtoOrBuilder p = + viaProto ? proto : builder; + return p.getResetTimeWindow(); + } + + @Override + public void setResetTimeWindow(long resetTimeWindow) { + maybeInitBuilder(); + builder.setResetTimeWindow(resetTimeWindow); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java index c6572e9..fe8b79f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java @@ -137,6 +137,7 @@ private static Object genTypeValue(Type type) { */ @SuppressWarnings("rawtypes") private static Object generateByNewInstance(Class clazz) throws Exception { + System.out.println("The class name is " + clazz.getName()); Object ret = typeValueCache.get(clazz); if (ret != null) { return ret; @@ -178,6 +179,9 @@ public static void setup() throws Exception { "http", "localhost", 8080, "file0")); typeValueCache.put(SerializedException.class, SerializedException.newInstance(new IOException("exception for test"))); + typeValueCache.put(ApplicationRetryPolicy.class, ApplicationRetryPolicy + .newInstance(ApplicationRetryPolicyType.MAX_RETRIES_POLICY, + MaxApplicationRetriesPolicyContext.newInstance())); generateByNewInstance(ApplicationId.class); generateByNewInstance(ApplicationAttemptId.class); generateByNewInstance(ContainerId.class); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index d57669c..0bbc3db 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -276,6 +276,7 @@ private void loadRMAppState(RMState rmState) throws Exception { new ApplicationAttemptState(attemptId, attemptStateData.getMasterContainer(), credentials, attemptStateData.getStartTime(), + attemptStateData.getEndTime(), attemptStateData.getState(), attemptStateData.getFinalTrackingUrl(), attemptStateData.getDiagnostics(), diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index f56517c..ecc8645 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -163,8 +163,8 @@ public synchronized void updateApplicationAttemptStateInternal( ApplicationAttemptState updatedAttemptState = new ApplicationAttemptState(appAttemptId, attemptStateData.getMasterContainer(), credentials, - attemptStateData.getStartTime(), attemptStateData.getState(), - attemptStateData.getFinalTrackingUrl(), + attemptStateData.getStartTime(), attemptStateData.getEndTime(), + attemptStateData.getState(), attemptStateData.getFinalTrackingUrl(), attemptStateData.getDiagnostics(), attemptStateData.getFinalApplicationStatus(), attemptStateData.getAMContainerExitStatus()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 714a108..b29ecf2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -259,6 +259,7 @@ public RMStateStore() { final Container masterContainer; final Credentials appAttemptCredentials; long startTime = 0; + long endTime = 0; // fields set when attempt completes RMAppAttemptState state; String finalTrackingUrl = "N/A"; @@ -269,19 +270,20 @@ public RMStateStore() { public ApplicationAttemptState(ApplicationAttemptId attemptId, Container masterContainer, Credentials appAttemptCredentials, long startTime) { - this(attemptId, masterContainer, appAttemptCredentials, startTime, null, - null, "", null, ContainerExitStatus.INVALID); + this(attemptId, masterContainer, appAttemptCredentials, startTime, + -1, null, null, "", null, ContainerExitStatus.INVALID); } public ApplicationAttemptState(ApplicationAttemptId attemptId, Container masterContainer, Credentials appAttemptCredentials, - long startTime, RMAppAttemptState state, String finalTrackingUrl, - String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus, - int exitStatus) { + long startTime, long endTime, RMAppAttemptState state, + String finalTrackingUrl, String diagnostics, + FinalApplicationStatus amUnregisteredFinalStatus, int exitStatus) { this.attemptId = attemptId; this.masterContainer = masterContainer; this.appAttemptCredentials = appAttemptCredentials; this.startTime = startTime; + this.endTime = endTime; this.state = state; this.finalTrackingUrl = finalTrackingUrl; this.diagnostics = diagnostics == null ? "" : diagnostics; @@ -310,6 +312,9 @@ public String getDiagnostics() { public long getStartTime() { return startTime; } + public long getEndTime() { + return endTime; + } public FinalApplicationStatus getFinalApplicationStatus() { return amUnregisteredFinalStatus; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 1544dcc..6d6c3f5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -599,7 +599,8 @@ private void loadApplicationAttemptState(ApplicationState appState, ApplicationAttemptState attemptState = new ApplicationAttemptState(attemptId, attemptStateData.getMasterContainer(), credentials, - attemptStateData.getStartTime(), attemptStateData.getState(), + attemptStateData.getStartTime(), attemptStateData.getEndTime(), + attemptStateData.getState(), attemptStateData.getFinalTrackingUrl(), attemptStateData.getDiagnostics(), attemptStateData.getFinalApplicationStatus(), diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java index 90fb3ec..011016a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java @@ -41,9 +41,10 @@ public abstract class ApplicationAttemptStateData { public static ApplicationAttemptStateData newInstance( ApplicationAttemptId attemptId, Container container, - ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState, - String finalTrackingUrl, String diagnostics, - FinalApplicationStatus amUnregisteredFinalStatus, int exitStatus) { + ByteBuffer attemptTokens, long startTime, long endTime, + RMAppAttemptState finalState, String finalTrackingUrl, + String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus, + int exitStatus) { ApplicationAttemptStateData attemptStateData = Records.newRecord(ApplicationAttemptStateData.class); attemptStateData.setAttemptId(attemptId); @@ -53,6 +54,7 @@ public static ApplicationAttemptStateData newInstance( attemptStateData.setFinalTrackingUrl(finalTrackingUrl); attemptStateData.setDiagnostics(diagnostics); attemptStateData.setStartTime(startTime); + attemptStateData.setEndTime(endTime); attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus); attemptStateData.setAMContainerExitStatus(exitStatus); return attemptStateData; @@ -69,9 +71,9 @@ public static ApplicationAttemptStateData newInstance( } return newInstance(attemptState.getAttemptId(), attemptState.getMasterContainer(), appAttemptTokens, - attemptState.getStartTime(), attemptState.getState(), - attemptState.getFinalTrackingUrl(), attemptState.getDiagnostics(), - attemptState.getFinalApplicationStatus(), + attemptState.getStartTime(), attemptState.getEndTime(), + attemptState.getState(), attemptState.getFinalTrackingUrl(), + attemptState.getDiagnostics(), attemptState.getFinalApplicationStatus(), attemptState.getAMContainerExitStatus()); } @@ -146,6 +148,14 @@ public static ApplicationAttemptStateData newInstance( public abstract void setStartTime(long startTime); /** + * Get the end time of the application. + * @return end time of the application + */ + public abstract long getEndTime(); + + public abstract void setEndTime(long endTime); + + /** * Get the final finish status of the application. * @return final finish status of the application */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java index a90bda4..2db8491 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java @@ -229,6 +229,18 @@ public void setStartTime(long startTime) { } @Override + public long getEndTime() { + ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; + return p.getEndTime(); + } + + @Override + public void setEndTime(long endTime) { + maybeInitBuilder(); + builder.setEndTime(endTime); + } + + @Override public FinalApplicationStatus getFinalApplicationStatus() { ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; if (!p.hasFinalApplicationStatus()) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index bf374b4..c903cd8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -223,7 +223,13 @@ ApplicationReport createAndGetApplicationReport(String clientUserName, * @return the external user-facing state of ApplicationMaster. */ YarnApplicationState createApplicationState(); - + + /** + * Get number of failed attempts which exclude AM preemption, + * hardware failures or NM resync as attempt failure. + */ + int getNumFailedAppAttempts(); + /** * Get RMAppMetrics of the {@link RMApp}. * diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 48cf460..761277e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -32,10 +32,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; @@ -46,7 +46,6 @@ import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnApplicationState; -import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -69,6 +68,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppStartAttemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.retry.ApplicationRetriesPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.retry.ApplicationRetriesPolicyFactory; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; @@ -82,6 +83,8 @@ import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.util.resource.Resources; +import com.google.common.annotations.VisibleForTesting; + @SuppressWarnings({ "rawtypes", "unchecked" }) public class RMAppImpl implements RMApp, Recoverable { @@ -99,7 +102,6 @@ private final YarnScheduler scheduler; private final ApplicationMasterService masterService; private final StringBuilder diagnostics = new StringBuilder(); - private final int maxAppAttempts; private final ReadLock readLock; private final WriteLock writeLock; private final Map attempts @@ -129,6 +131,8 @@ Object transitionTodo; + private ApplicationRetriesPolicy retryPolicy; + private static final StateMachineFactory globalMaxAppAttempts) { - this.maxAppAttempts = globalMaxAppAttempts; - LOG.warn("The specific max attempts: " + individualMaxAppAttempts - + " for application: " + applicationId.getId() - + " is invalid, because it is out of the range [1, " - + globalMaxAppAttempts + "]. Use the global max attempts instead."); - } else { - this.maxAppAttempts = individualMaxAppAttempts; - } - ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.readLock = lock.readLock(); this.writeLock = lock.writeLock(); @@ -362,6 +352,10 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, this.stateMachine = stateMachineFactory.make(this); rmContext.getRMApplicationHistoryWriter().applicationStarted(this); + + this.retryPolicy = + ApplicationRetriesPolicyFactory.getAMRetryCountResetPolicy(rmContext, + submissionContext, conf); } @Override @@ -633,7 +627,7 @@ public StringBuilder getDiagnostics() { @Override public int getMaxAppAttempts() { - return this.maxAppAttempts; + return this.retryPolicy.getMaxAppAttempts(); } @Override @@ -678,6 +672,8 @@ public void recover(RMState state) throws Exception{ createNewAttempt(); ((RMAppAttemptImpl)this.currentAttempt).recover(state); } + + this.retryPolicy.recover(); } private void createNewAttempt() { @@ -685,12 +681,7 @@ private void createNewAttempt() { ApplicationAttemptId.newInstance(applicationId, attempts.size() + 1); RMAppAttempt attempt = new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService, - submissionContext, conf, - // The newly created attempt maybe last attempt if (number of - // previously failed attempts(which should not include Preempted, - // hardware error and NM resync) + 1) equal to the max-attempt - // limit. - maxAppAttempts == (getNumFailedAppAttempts() + 1)); + submissionContext, conf, this.retryPolicy.lastAttempt()); attempts.put(appAttemptId, attempt); currentAttempt = attempt; } @@ -808,7 +799,7 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { && (app.currentAttempt.getState() == RMAppAttemptState.KILLED || app.currentAttempt.getState() == RMAppAttemptState.FINISHED || (app.currentAttempt.getState() == RMAppAttemptState.FAILED - && app.getNumFailedAppAttempts() == app.maxAppAttempts))) { + && app.retryPolicy.isLastAttempt()))) { return RMAppState.ACCEPTED; } @@ -875,9 +866,9 @@ private String getAppAttemptFailedDiagnostics(RMAppEvent event) { msg = "Unmanaged application " + this.getApplicationId() + " failed due to " + failedEvent.getDiagnostics() + ". Failing the application."; - } else if (getNumFailedAppAttempts() >= this.maxAppAttempts) { + } else if (this.retryPolicy.isLastAttempt()) { msg = "Application " + this.getApplicationId() + " failed " - + this.maxAppAttempts + " times due to " + + getMaxAppAttempts() + " times due to " + failedEvent.getDiagnostics() + ". Failing the application."; } return msg; @@ -965,6 +956,7 @@ public FinalSavingTransition(Object transitionToDo, public void transition(RMAppImpl app, RMAppEvent event) { app.rememberTargetTransitionsAndStoreState(event, transitionToDo, targetedFinalState, stateToBeStored); + app.retryPolicy.stop(); } } @@ -1092,7 +1084,8 @@ public void transition(RMAppImpl app, RMAppEvent event) { }; } - private int getNumFailedAppAttempts() { + @Override + public int getNumFailedAppAttempts() { int completedAttempts = 0; // Do not count AM preemption, hardware failures or NM resync // as attempt failure. @@ -1117,7 +1110,7 @@ public AttemptFailedTransition(RMAppState initialState) { public RMAppState transition(RMAppImpl app, RMAppEvent event) { if (!app.submissionContext.getUnmanagedAM() - && app.getNumFailedAppAttempts() < app.maxAppAttempts) { + && ! app.retryPolicy.isLastAttempt()) { boolean transferStateFromPreviousAttempt = false; RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event; transferStateFromPreviousAttempt = @@ -1143,6 +1136,14 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { } } + private static final class StartApplicationRetryPolicy extends + RMAppTransition { + @Override + public void transition(RMAppImpl app, RMAppEvent event) { + app.retryPolicy.start(); + }; + } + @Override public String getApplicationType() { return this.applicationType; @@ -1182,7 +1183,7 @@ public static boolean isAppInFinalState(RMApp rmApp) { return appState == RMAppState.FAILED || appState == RMAppState.FINISHED || appState == RMAppState.KILLED; } - + private RMAppState getRecoveredFinalState() { return this.recoveredFinalState; } @@ -1212,4 +1213,10 @@ public RMAppMetrics getRMAppMetrics() { return new RMAppMetrics(resourcePreempted, numNonAMContainerPreempted, numAMContainerPreempted); } + + @Private + @VisibleForTesting + public ApplicationRetriesPolicy getApplicationRetryPolicy() { + return this.retryPolicy; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java index b5ed92c..9f27757 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java @@ -23,6 +23,7 @@ import javax.crypto.SecretKey; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; @@ -174,6 +175,12 @@ long getStartTime(); /** + * the end time of the application. + * @return the end time of the application. + */ + long getEndTime(); + + /** * The current state of the {@link RMAppAttempt}. * * @return the current state {@link RMAppAttemptState} for this application @@ -207,6 +214,9 @@ * */ boolean shouldCountTowardsMaxAttemptRetry(); + + @Private + boolean mayBeLastAttempt(); /** * Get metrics from the {@link RMAppAttempt} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 19fc800..e48d8f1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -42,7 +42,6 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -83,6 +82,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.retry.ApplicationRetriesPolicy; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; @@ -141,6 +141,7 @@ private String originalTrackingUrl = "N/A"; private String proxiedTrackingUrl = "N/A"; private long startTime = 0; + private long endTime = -1; // Set to null initially. Will eventually get set // if an RMAppAttemptUnregistrationEvent occurs @@ -154,7 +155,8 @@ // re-created if this attempt is eventually failed because of preemption, // hardware error or NM resync. So this flag indicates that this may be // last attempt. - private final boolean maybeLastAttempt; + private boolean maybeLastAttempt; + private static final ExpiredTransition EXPIRED_TRANSITION = new ExpiredTransition(); @@ -733,6 +735,7 @@ public void recover(RMState state) throws Exception { this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl); this.finalStatus = attemptState.getFinalApplicationStatus(); this.startTime = attemptState.getStartTime(); + this.endTime = attemptState.getEndTime(); } public void transferStateFromPreviousAttempt(RMAppAttempt attempt) { @@ -1019,10 +1022,12 @@ private void rememberTargetTransitionsAndStoreState(RMAppAttemptEvent event, } RMStateStore rmStore = rmContext.getStateStore(); + this.setEndTime(System.currentTimeMillis()); ApplicationAttemptState attemptState = new ApplicationAttemptState(applicationAttemptId, getMasterContainer(), rmStore.getCredentialsFromAppAttempt(this), startTime, - stateToBeStored, finalTrackingUrl, diags, finalStatus, exitStatus); + this.getEndTime(), stateToBeStored, finalTrackingUrl, diags, + finalStatus, exitStatus); LOG.info("Updating application attempt " + applicationAttemptId + " with final state: " + targetedFinalState + ", and exit status: " + exitStatus); @@ -1645,6 +1650,25 @@ public long getStartTime() { } @Override + public long getEndTime() { + try { + this.readLock.lock(); + return this.endTime; + } finally { + this.readLock.unlock(); + } + } + + private void setEndTime(long endTime) { + try { + this.writeLock.lock(); + this.endTime = endTime; + } finally { + this.writeLock.unlock(); + } + } + + @Override public RMAppAttemptState getState() { this.readLock.lock(); @@ -1722,8 +1746,30 @@ public ApplicationAttemptReport createApplicationAttemptReport() { } // for testing + @Override public boolean mayBeLastAttempt() { - return maybeLastAttempt; + try { + this.readLock.lock(); + return maybeLastAttempt; + } finally { + this.readLock.unlock(); + } + } + + /** + * Could be used by {@link ApplicationRetriesPolicy} to reset + * the maybeLastAttempt flag. It can happen when current attempt is + * the last attempt, but the {@link ApplicationRetriesPolicy} has been + * triggered. + */ + @Private + public void setMaybeLastAttemptFlag(boolean maybeLastAttempt) { + try { + this.writeLock.lock(); + this.maybeLastAttempt = maybeLastAttempt; + } finally { + this.writeLock.unlock(); + } } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/retry/ApplicationRetriesPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/retry/ApplicationRetriesPolicy.java new file mode 100644 index 0000000..20d30be --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/retry/ApplicationRetriesPolicy.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmapp.retry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; + +/** + * We have following resetPolicy available to use: + *
    + *
  • {@link MaxRetriesPolicy}. By default
  • + *
  • {@link WindowedApplicationRetriesPolicy}
  • + *
+ */ +@Private +@Unstable +public abstract class ApplicationRetriesPolicy { + + private static final Log LOG = LogFactory + .getLog(ApplicationRetriesPolicy.class); + + protected RMApp rmApp; + protected final RMContext rmContext; + protected final ApplicationSubmissionContext appContext; + protected final int maxAppAttempts; + + + public ApplicationRetriesPolicy(RMContext rmContext, + ApplicationSubmissionContext appContext, Configuration conf) { + this.rmContext = rmContext; + this.appContext = appContext; + this.maxAppAttempts = calculateMaxAppAttempts(appContext, conf); + } + + public abstract void start(); + + public abstract void stop(); + + public abstract void amRetryCountReset(); + + public abstract void recover(); + + public abstract int getMaxAppAttempts(); + + public abstract boolean isLastAttempt(); + + public abstract boolean lastAttempt(); + + public abstract int getMaxAttemptsFromAppContext( + ApplicationSubmissionContext appContext); + + protected synchronized RMApp checkAndSetupRMApp() { + if (this.rmApp == null) { + this.rmApp = + this.rmContext.getRMApps().get(this.appContext.getApplicationId()); + } + return this.rmApp; + } + + private int calculateMaxAppAttempts( + ApplicationSubmissionContext appContext, Configuration conf) { + int globalMaxAppAttempts = + conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); + int individualMaxAppAttempts = getMaxAttemptsFromAppContext(appContext); + if (individualMaxAppAttempts <= 0 + || individualMaxAppAttempts > globalMaxAppAttempts) { + LOG.warn("The specific max attempts: " + individualMaxAppAttempts + + " for application: " + appContext.getApplicationId() + + " is invalid, because it is out of the range [1, " + + globalMaxAppAttempts + "]. Use the global max attempts instead."); + return globalMaxAppAttempts; + } else { + return individualMaxAppAttempts; + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/retry/ApplicationRetriesPolicyFactory.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/retry/ApplicationRetriesPolicyFactory.java new file mode 100644 index 0000000..c908d46 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/retry/ApplicationRetriesPolicyFactory.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmapp.retry; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationRetryPolicyType; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; + +@Private +@Unstable +/** + * Factory for {@link ApplicationRetriesPolicy} implementations. + */ +public class ApplicationRetriesPolicyFactory { + /** + * Creates an instance of {@link ApplicationRetriesPolicy} + */ + public static ApplicationRetriesPolicy getAMRetryCountResetPolicy( + RMContext rmContext, ApplicationSubmissionContext context, + Configuration conf) { + if (context.getApplicationRetryPolicy() != null) { + ApplicationRetryPolicyType policyType = + context.getApplicationRetryPolicy().getApplicationRetryPolicyType(); + if (policyType != null + && ApplicationRetryPolicyType.WINDOWED_RETRIES_POLICY == policyType) { + return new WindowedApplicationRetriesPolicy(rmContext, context, conf); + } + } + return new MaxRetriesPolicy(rmContext, context, conf); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/retry/MaxRetriesPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/retry/MaxRetriesPolicy.java new file mode 100644 index 0000000..ddf56a3 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/retry/MaxRetriesPolicy.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmapp.retry; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; + +public class MaxRetriesPolicy extends ApplicationRetriesPolicy { + + public MaxRetriesPolicy(RMContext rmContext, + ApplicationSubmissionContext appContext, Configuration conf) { + super(rmContext, appContext, conf); + } + + @Override + public void start() { + // DO NOTHING + } + + @Override + public void stop() { + // DO NOTHING + } + + @Override + public void amRetryCountReset() { + // DO NOTHING + } + + @Override + public void recover() { + // DO NOTHING + } + + @Override + public int getMaxAppAttempts() { + return this.maxAppAttempts; + } + + + @Override + public synchronized boolean isLastAttempt() { + if (checkAndSetupRMApp() == null) { + return true; + } + return this.rmApp.getNumFailedAppAttempts() >= this.maxAppAttempts; + } + + @Override + public synchronized boolean lastAttempt() { + if (checkAndSetupRMApp() == null) { + return true; + } + return this.maxAppAttempts == (this.rmApp.getNumFailedAppAttempts() + 1); + } + + @Override + public int getMaxAttemptsFromAppContext( + ApplicationSubmissionContext appContext) { + return appContext.getApplicationRetryPolicy() == null + || appContext.getApplicationRetryPolicy() + .getMaxApplicationRetriesPolicyContext() == null ? appContext + .getMaxAppAttempts() : appContext.getApplicationRetryPolicy() + .getMaxApplicationRetriesPolicyContext().getMaxAppAttempts(); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/retry/WindowedApplicationRetriesPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/retry/WindowedApplicationRetriesPolicy.java new file mode 100644 index 0000000..fcff9e5 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/retry/WindowedApplicationRetriesPolicy.java @@ -0,0 +1,191 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmapp.retry; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ApplicationRetryPolicy; +import org.apache.hadoop.yarn.api.records.ApplicationRetryPolicyType; +import org.apache.hadoop.yarn.api.records.WindowedApplicationRetriesPolicyContext; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; + +import com.google.common.annotations.VisibleForTesting; + +/** + * The policy is used to reset AM retry count if current AppMaster successfully + * runs for a period of time. To use this policy, you need to pass + * {@link ApplicationRetryPolicyType#WINDOWED_RETRIES_POLICY} + * into {@link ApplicationRetryPolicy}, and specify + * {@link WindowedApplicationRetriesPolicyContext} + * + * @see ApplicationSubmissionContext + */ +public class WindowedApplicationRetriesPolicy extends + ApplicationRetriesPolicy { + + private static final Log LOG = LogFactory + .getLog(WindowedApplicationRetriesPolicy.class); + + private final long resetWindow; + + private int retryCount = 0; + + private volatile boolean keepRunning ; + + private Thread resetThread; + + public WindowedApplicationRetriesPolicy(RMContext rmContext, + ApplicationSubmissionContext appContext, Configuration conf) { + super(rmContext, appContext, conf); + this.resetWindow = appContext.getApplicationRetryPolicy() + .getWindowedApplicationRetriesPolicyContext().getResetTimeWindow(); + this.keepRunning = true; + } + + @Override + public synchronized void amRetryCountReset() { + if (checkAndSetupRMApp() == null) { + return; + } + // The formula we are using to calculate the lastAttempt: + // maxAppAttempts == getNumFailedAppAttempts() - attemptResetCount. + // To do the AM retry count reset, just set it + // as current number of FailedAppAttempts. + int attemptFailureCount = 0; + for (Entry entry : this.rmApp + .getAppAttempts().entrySet()) { + if (entry.getKey() != this.rmApp.getCurrentAppAttempt().getAppAttemptId() + && entry.getValue().shouldCountTowardsMaxAttemptRetry()) { + attemptFailureCount++; + } + } + this.retryCount = attemptFailureCount; + if (this.rmApp.getCurrentAppAttempt().mayBeLastAttempt()) { + ((RMAppAttemptImpl) this.rmApp.getCurrentAppAttempt()) + .setMaybeLastAttemptFlag(false); + } + } + + @Override + public void start() { + Runnable runnable = new Runnable() { + @Override + public void run() { + while(keepRunning) { + try { + Thread.sleep(resetWindow); + }catch (InterruptedException e) { + LOG.warn("Interrupted. " + e.getMessage()); + keepRunning = false; + } + if (!keepRunning) { + return; + } + amRetryCountReset(); + } + } + }; + this.resetThread = + new Thread(runnable, "AMCountReset Thread"); + resetThread.setDaemon(true); + resetThread.start(); + } + + @Override + public void stop() { + this.keepRunning = false; + this.resetThread.interrupt(); + try { + this.resetThread.join(); + } catch (InterruptedException ex) { + LOG.error("Error joining with AMCountResetThread", ex); + } + } + + @Override + public void recover() { + if (checkAndSetupRMApp() == null) { + return; + } + + int attemptFailureCount = 0; + // sort the AttemptIds + List attemptIds = + new ArrayList(); + attemptIds.addAll(this.rmApp.getAppAttempts().keySet()); + Collections.sort(attemptIds); + + for (int i = 0; i < attemptIds.size(); i++) { + RMAppAttempt cur = this.rmApp.getAppAttempts().get(attemptIds.get(i)); + if (cur.getEndTime() - cur.getStartTime() >= this.resetWindow) { + this.retryCount = attemptFailureCount; + } + if (cur.shouldCountTowardsMaxAttemptRetry()) { + attemptFailureCount++; + } + } + } + + @Override + public synchronized boolean lastAttempt() { + if (checkAndSetupRMApp() == null) { + return true; + } + return this.maxAppAttempts == + (this.rmApp.getNumFailedAppAttempts() + 1 - this.retryCount); + } + + @Override + public synchronized boolean isLastAttempt() { + if (checkAndSetupRMApp() == null) { + return true; + } + return (this.rmApp.getNumFailedAppAttempts() - this.retryCount + >= this.maxAppAttempts); + } + + @Override + public int getMaxAppAttempts() { + return this.maxAppAttempts; + } + + @Override + public int getMaxAttemptsFromAppContext( + ApplicationSubmissionContext appContext) { + return appContext.getMaxAppAttempts(); + } + + @Private + @VisibleForTesting + public void abort() { + this.keepRunning = false; + this.resetThread.interrupt(); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index a7f6240..b801fda 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -43,12 +43,14 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationRetryPolicy; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.MaxApplicationRetriesPolicyContext; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; @@ -286,6 +288,26 @@ public RMApp submitApp(int masterMemory, String name, String user, int maxAppAttempts, Credentials ts, String appType, boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided, ApplicationId applicationId) throws Exception { + return submitApp(masterMemory, name, user, acls, unmanaged, queue, + maxAppAttempts, ts, appType, waitForAccepted, keepContainers, + isAppIdProvided, applicationId, null); + } + + public RMApp submitApp(int masterMemory, + ApplicationRetryPolicy resetPolicy) throws Exception { + return submitApp(masterMemory, "", UserGroupInformation.getCurrentUser() + .getShortUserName(), null, false, null, + super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false, + false, null, resetPolicy); + } + + public RMApp submitApp(int masterMemory, String name, String user, + Map acls, boolean unmanaged, String queue, + int maxAppAttempts, Credentials ts, String appType, + boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided, + ApplicationId applicationId, ApplicationRetryPolicy resetPolicy) + throws Exception { ApplicationId appId = isAppIdProvided ? applicationId : null; ApplicationClientProtocol client = getClientRMService(); if (! isAppIdProvided) { @@ -321,6 +343,13 @@ public RMApp submitApp(int masterMemory, String name, String user, clc.setTokens(securityTokens); } sub.setAMContainerSpec(clc); + if (resetPolicy != null) { + sub.setApplicationRetryPolicy(resetPolicy); + } else if (maxAppAttempts > 0){ + sub.setApplicationRetryPolicy(ApplicationRetryPolicy + .newInstance(MaxApplicationRetriesPolicyContext + .newInstance(maxAppAttempts))); + } req.setApplicationSubmissionContext(sub); UserGroupInformation fakeUser = UserGroupInformation.createUserForTesting(user, new String[] {"someGroup"}); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index ff60fcd..765d7cc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -169,6 +169,11 @@ public YarnApplicationState createApplicationState() { } @Override + public int getNumFailedAppAttempts() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override public RMAppMetrics getRMAppMetrics() { return new RMAppMetrics(Resource.newInstance(0, 0), 0, 0); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index 620ba9f..246ff97 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -302,8 +302,8 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) new ApplicationAttemptState(oldAttemptState.getAttemptId(), oldAttemptState.getMasterContainer(), oldAttemptState.getAppAttemptCredentials(), - oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED, - "myTrackingUrl", "attemptDiagnostics", + oldAttemptState.getStartTime(), oldAttemptState.getEndTime(), + RMAppAttemptState.FINISHED, "myTrackingUrl", "attemptDiagnostics", FinalApplicationStatus.SUCCEEDED, 100); store.updateApplicationAttemptState(newAttemptState); @@ -325,8 +325,8 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) new ApplicationAttemptState(dummyAttemptId, oldAttemptState.getMasterContainer(), oldAttemptState.getAppAttemptCredentials(), - oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED, - "myTrackingUrl", "attemptDiagnostics", + oldAttemptState.getStartTime(), oldAttemptState.getEndTime(), + RMAppAttemptState.FINISHED, "myTrackingUrl", "attemptDiagnostics", FinalApplicationStatus.SUCCEEDED, 111); store.updateApplicationAttemptState(dummyAttempt); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index b63d2fe..fabaa8c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -239,7 +239,12 @@ public YarnApplicationState createApplicationState() { public Set getRanNodes() { return null; } - + + @Override + public int getNumFailedAppAttempts() { + return 0; + } + public Resource getResourcePreempted() { throw new UnsupportedOperationException("Not supported yet."); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationRetryPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationRetryPolicy.java new file mode 100644 index 0000000..9a6de79 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationRetryPolicy.java @@ -0,0 +1,283 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmapp; + +import java.util.Collections; + +import org.apache.hadoop.yarn.api.records.ApplicationRetryPolicy; +import org.apache.hadoop.yarn.api.records.ApplicationRetryPolicyType; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.MaxApplicationRetriesPolicyContext; +import org.apache.hadoop.yarn.api.records.WindowedApplicationRetriesPolicyContext; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.retry.ApplicationRetriesPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.retry.WindowedApplicationRetriesPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.util.Records; +import org.junit.Assert; +import org.junit.Test; + +public class TestApplicationRetryPolicy { + + @Test (timeout = 10000) + public void testApplicationRetryPolicyIllegalArguments() throws Exception { + try { + MaxApplicationRetriesPolicyContext.newInstance(0); + Assert.fail("Exception expected"); + } catch (IllegalArgumentException ex) { + Assert.assertTrue(ex.getMessage().contains( + MaxApplicationRetriesPolicyContext.exceptionMessage)); + } + + try { + WindowedApplicationRetriesPolicyContext.newInstance(0); + Assert.fail("Exception expected"); + } catch (IllegalArgumentException ex) { + Assert.assertTrue(ex.getMessage().contains( + WindowedApplicationRetriesPolicyContext.exceptionMessage)); + } + + try { + ApplicationRetryPolicy.newInstance( + ApplicationRetryPolicyType.MAX_RETRIES_POLICY, + WindowedApplicationRetriesPolicyContext.newInstance(2)); + Assert.fail("Exception expected"); + } catch (IllegalArgumentException ex) { + Assert.assertTrue(ex.getMessage().contains( + ApplicationRetryPolicy.errorMessage)); + } + + try { + ApplicationRetryPolicy.newInstance( + ApplicationRetryPolicyType.WINDOWED_RETRIES_POLICY, + MaxApplicationRetriesPolicyContext.newInstance(2)); + Assert.fail("Exception expected"); + } catch (IllegalArgumentException ex) { + Assert.assertTrue(ex.getMessage().contains( + ApplicationRetryPolicy.errorMessage)); + } + + ApplicationRetryPolicy context = ApplicationRetryPolicy.newInstance( + MaxApplicationRetriesPolicyContext.newInstance(2)); + Assert.assertTrue(context.getApplicationRetryPolicyType() + == ApplicationRetryPolicyType.MAX_RETRIES_POLICY); + } + + @Test (timeout = 50000) + public void testWindowedApplicationRetriesPolicyFail() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + MockRM rm1 = new MockRM(conf); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); + nm1.registerNode(); + WindowedApplicationRetriesPolicyContext retryContext = + WindowedApplicationRetriesPolicyContext.newInstance(4000); + ApplicationRetryPolicy resetPolicy = + ApplicationRetryPolicy + .newInstance( + ApplicationRetryPolicyType.WINDOWED_RETRIES_POLICY, + retryContext); + RMApp app1 = rm1.submitApp(200, resetPolicy); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // Abort the WindowedApplicationRetriesPolicy + ApplicationRetriesPolicy policy = ((RMAppImpl)app1).getApplicationRetryPolicy(); + Assert.assertTrue(policy instanceof WindowedApplicationRetriesPolicy); + ((WindowedApplicationRetriesPolicy)policy).abort(); + + // Fail attempt1 normally + nm1 + .nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am1.waitForState(RMAppAttemptState.FAILED); + // launch the second attempt + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + Assert.assertEquals(2, app1.getAppAttempts().size()); + RMAppAttempt attempt2 = app1.getCurrentAppAttempt(); + Assert.assertTrue(((RMAppAttemptImpl) attempt2).mayBeLastAttempt()); + MockAM am2 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + am2.waitForState(RMAppAttemptState.RUNNING); + // wait for 4 second for WindowsSlideAMRetryCountResetPolicy to reset + Thread.sleep(4000); + + // WindowsSlideAMRetryCountResetPolicy has already been aborted. It will + // not reset the maxAMRetryCount. + // Right now Attempt2 is the last Attempt + Assert.assertTrue(((RMAppAttemptImpl) attempt2).mayBeLastAttempt()); + + // Fail attempt2 normally + nm1 + .nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am2.waitForState(RMAppAttemptState.FAILED); + + rm1.waitForState(app1.getApplicationId(), RMAppState.FAILED); + rm1.stop(); + } + + @Test (timeout = 50000) + public void testWindowedApplicationRetriesPolicy() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + // explicitly set max-am-retry count as 2. + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); + nm1.registerNode(); + WindowedApplicationRetriesPolicyContext retryContext = + WindowedApplicationRetriesPolicyContext.newInstance(4000); + ApplicationRetryPolicy resetPolicy = + ApplicationRetryPolicy + .newInstance( + ApplicationRetryPolicyType.WINDOWED_RETRIES_POLICY, + retryContext); + RMApp app1 = rm1.submitApp(200, resetPolicy); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // Fail attempt1 normally + nm1 + .nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am1.waitForState(RMAppAttemptState.FAILED); + // launch the second attempt + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + Assert.assertEquals(2, app1.getAppAttempts().size()); + RMAppAttempt attempt2 = app1.getCurrentAppAttempt(); + Assert.assertTrue(((RMAppAttemptImpl) attempt2).mayBeLastAttempt()); + MockAM am2 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + am2.waitForState(RMAppAttemptState.RUNNING); + // wait for 4 second for WindowsSlideAMRetryCountResetPolicy to reset + Thread.sleep(4000); + // Right now Attempt2 is not the last Attempt + Assert.assertTrue(!((RMAppAttemptImpl) attempt2).mayBeLastAttempt()); + + // Fail attempt2 normally + nm1 + .nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am2.waitForState(RMAppAttemptState.FAILED); + + // launch the third attempt + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + Assert.assertEquals(3, app1.getAppAttempts().size()); + RMAppAttempt attempt3 = app1.getCurrentAppAttempt(); + MockAM am3 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + am3.waitForState(RMAppAttemptState.RUNNING); + + // Restart rm. + @SuppressWarnings("resource") + MockRM rm2 = new MockRM(conf, memStore); + rm2.start(); + + // re-register the NM + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + NMContainerStatus status = Records.newRecord(NMContainerStatus.class); + status + .setContainerExitStatus(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER); + status.setContainerId(attempt3.getMasterContainer().getId()); + status.setContainerState(ContainerState.COMPLETE); + status.setDiagnostics(""); + nm1.registerNode(Collections.singletonList(status), null); + + rm2.waitForState(attempt3.getAppAttemptId(), RMAppAttemptState.FAILED); + + rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + + // Lauch Attempt 4 and it should be the last Attempt + MockAM am4 = + rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 4, nm1); + RMAppAttempt attempt4 = + rm2.getRMContext().getRMApps().get(app1.getApplicationId()) + .getCurrentAppAttempt(); + Assert.assertTrue(((RMAppAttemptImpl) attempt4).mayBeLastAttempt()); + + // wait for 4 second for WindowsSlideAMRetryCountResetPolicy to reset + Thread.sleep(4000); + // Right now Attempt4 is not the last Attempt + Assert.assertTrue(!((RMAppAttemptImpl) attempt4).mayBeLastAttempt()); + + // Fail attempt4 normally + nm1 + .nodeHeartbeat(am4.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am4.waitForState(RMAppAttemptState.FAILED); + rm1.stop(); + rm2.stop(); + } + + @Test (timeout = 50000) + public void testMaxRetriesPolicy() throws Exception { + // set Global max-am-retry count as 5 + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 5); + + MockRM rm = new MockRM(conf); + rm.start(); + MockNM nm1 = + new MockNM("127.0.0.1:234", 8000, rm.getResourceTrackerService()); + nm1.registerNode(); + + // prepare MaxApplicationRetriesPolicyContext + // and set maxAppAttempts as 2 + MaxApplicationRetriesPolicyContext retryPolicyContext = + MaxApplicationRetriesPolicyContext.newInstance(2); + ApplicationRetryPolicy resetPolicy = + ApplicationRetryPolicy.newInstance(retryPolicyContext); + RMApp app1 = rm.submitApp(200, resetPolicy); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); + // Fail attempt1 normally + nm1.nodeHeartbeat(am1.getApplicationAttemptId(), + 1, ContainerState.COMPLETE); + am1.waitForState(RMAppAttemptState.FAILED); + // launch the second attempt + rm.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + Assert.assertEquals(2, app1.getAppAttempts().size()); + RMAppAttempt attempt2 = app1.getCurrentAppAttempt(); + //It should be the last Attempt + Assert.assertTrue(((RMAppAttemptImpl) attempt2).mayBeLastAttempt()); + MockAM am2 = MockRM.launchAndRegisterAM(app1, rm, nm1); + am2.waitForState(RMAppAttemptState.RUNNING); + // Fail attempt2 normally + nm1.nodeHeartbeat(am2.getApplicationAttemptId(), + 1, ContainerState.COMPLETE); + am2.waitForState(RMAppAttemptState.FAILED); + + // We set maxAppAttempts as 2 in MaxApplicationRetriesPolicyContext + // So, we can only try two attempts. + rm.waitForState(app1.getApplicationId(), RMAppState.FAILED); + rm.stop(); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index 61def87..1a5dd21 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -215,7 +215,7 @@ protected void createApplicationWithAMResource(ApplicationAttemptId attId, RMContext rmContext = resourceManager.getRMContext(); RMApp rmApp = new RMAppImpl(attId.getApplicationId(), rmContext, conf, null, null, null, ApplicationSubmissionContext.newInstance(null, null, - null, null, null, false, false, 0, amResource, null), null, null, + null, null, null, false, false, 1, amResource, null), null, null, 0, null, null); rmContext.getRMApps().put(attId.getApplicationId(), rmApp); AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent(