diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java index 529df11..543339d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.AMRetryCountResetPolicy; import org.apache.hadoop.yarn.util.Records; import java.util.Set; @@ -35,17 +36,19 @@ * information needed by the ResourceManager to launch * the ApplicationMaster for an application.

* - *

It includes details such as: - *

+ *

+ * It includes details such as: + *

*

* * @see ContainerLaunchContext @@ -62,7 +65,7 @@ public static ApplicationSubmissionContext newInstance( Priority priority, ContainerLaunchContext amContainer, boolean isUnmanagedAM, boolean cancelTokensWhenComplete, int maxAppAttempts, Resource resource, String applicationType, - boolean keepContainers) { + boolean keepContainers, AMRetryCountResetPolicy resetPolicy) { ApplicationSubmissionContext context = Records.newRecord(ApplicationSubmissionContext.class); context.setApplicationId(applicationId); @@ -76,6 +79,7 @@ public static ApplicationSubmissionContext newInstance( context.setResource(resource); context.setApplicationType(applicationType); context.setKeepContainersAcrossApplicationAttempts(keepContainers); + context.setAMRetryCountResetPolicy(resetPolicy); return context; } @@ -85,6 +89,19 @@ public static ApplicationSubmissionContext newInstance( ApplicationId applicationId, String applicationName, String queue, Priority priority, ContainerLaunchContext amContainer, boolean isUnmanagedAM, boolean cancelTokensWhenComplete, + int maxAppAttempts, Resource resource, String applicationType, + boolean keepContainers) { + return newInstance(applicationId, applicationName, queue, priority, + amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts, + resource, applicationType, keepContainers, null); + } + + @Public + @Stable + public static ApplicationSubmissionContext newInstance( + ApplicationId applicationId, String applicationName, String queue, + Priority priority, ContainerLaunchContext amContainer, + boolean isUnmanagedAM, boolean cancelTokensWhenComplete, int maxAppAttempts, Resource resource, String applicationType) { return newInstance(applicationId, applicationName, queue, priority, amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts, @@ -338,4 +355,13 @@ public abstract void setKeepContainersAcrossApplicationAttempts( @Public @Stable public abstract void setApplicationTags(Set tags); + + @Public + @Unstable + public abstract AMRetryCountResetPolicy getAMRetryCountResetPolicy(); + + @Public + @Unstable + public abstract void setAMRetryCountResetPolicy( + AMRetryCountResetPolicy policyObject); } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/AMRetryCountResetPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/AMRetryCountResetPolicy.java new file mode 100644 index 0000000..c094992 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/AMRetryCountResetPolicy.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api; + +import java.io.Serializable; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + * We have WindowsSlideAMRetryCountResetPolicy available to use. + * To use this policy, you need to initiate it by + * passing a parameter which is used to define period of time in milliseconds + * that AM retry count will be reset. + */ +@Private +@Unstable +public interface AMRetryCountResetPolicy extends Serializable{ + + public void start(); + + public void stop(); + + public void amRetryCountReset(); + + public void recover(); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto index 2eb6148..18aa108 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto @@ -128,6 +128,7 @@ message ApplicationAttemptStateDataProto { optional int64 start_time = 7; optional FinalApplicationStatusProto final_application_status = 8; optional int32 am_container_exit_status = 9 [default = -1000]; + optional int64 end_time = 10; } message RMStateVersionProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 3f1fa6c..d1bdcd2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -288,6 +288,7 @@ message ApplicationSubmissionContextProto { optional string applicationType = 10 [default = "YARN"]; optional bool keep_containers_across_application_attempts = 11 [default = false]; repeated string applicationTags = 12; + optional bytes am_retrycount_reset_policy = 13; } enum ApplicationAccessTypeProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java index c4a3a72..a47b5f4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.api.records.impl.pb; import com.google.common.base.CharMatcher; + +import org.apache.commons.lang.SerializationUtils; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -33,9 +35,12 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; +import org.apache.hadoop.yarn.server.api.AMRetryCountResetPolicy; +import com.google.protobuf.ByteString; import com.google.protobuf.TextFormat; +import java.nio.ByteBuffer; import java.util.HashSet; import java.util.Set; @@ -53,6 +58,7 @@ private ContainerLaunchContext amContainer = null; private Resource resource = null; private Set applicationTags = null; + private AMRetryCountResetPolicy resetPolicy = null; public ApplicationSubmissionContextPBImpl() { builder = ApplicationSubmissionContextProto.newBuilder(); @@ -109,6 +115,10 @@ private void mergeLocalToBuilder() { if (this.applicationTags != null && !this.applicationTags.isEmpty()) { builder.addAllApplicationTags(this.applicationTags); } + if (this.resetPolicy != null) { + builder.setAmRetrycountResetPolicy(convertToProtoFormat(ByteBuffer + .wrap(SerializationUtils.serialize(this.resetPolicy)))); + } } private void mergeLocalToProto() { @@ -401,4 +411,41 @@ private ResourcePBImpl convertFromProtoFormat(ResourceProto p) { private ResourceProto convertToProtoFormat(Resource t) { return ((ResourcePBImpl)t).getProto(); } + + @Override + public AMRetryCountResetPolicy getAMRetryCountResetPolicy() { + ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; + if (this.resetPolicy != null) { + return this.resetPolicy; + } + if (!p.hasAmRetrycountResetPolicy()) { + return null; + } + try { + this.resetPolicy = + (AMRetryCountResetPolicy) SerializationUtils + .deserialize(convertFromProtoFormat(p.getAmRetrycountResetPolicy()) + .array()); + } catch (Exception ex) { + return null; + } + return this.resetPolicy; + } + + @Override + public void setAMRetryCountResetPolicy(AMRetryCountResetPolicy policyObject) { + maybeInitBuilder(); + if (resetPolicy == null) { + builder.clearAmRetrycountResetPolicy(); + } + this.resetPolicy = policyObject; + } + + protected final ByteBuffer convertFromProtoFormat(ByteString byteString) { + return ProtoUtils.convertFromProtoFormat(byteString); + } + + protected final ByteString convertToProtoFormat(ByteBuffer byteBuffer) { + return ProtoUtils.convertToProtoFormat(byteBuffer); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index b315a84..160b9e2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -240,6 +240,7 @@ private void loadRMAppState(RMState rmState) throws Exception { new ApplicationAttemptState(attemptId, attemptStateData.getMasterContainer(), credentials, attemptStateData.getStartTime(), + attemptStateData.getEndTime(), attemptStateData.getState(), attemptStateData.getFinalTrackingUrl(), attemptStateData.getDiagnostics(), diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index 6b5b602..ec49014 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -158,8 +158,8 @@ public synchronized void updateApplicationAttemptStateInternal( ApplicationAttemptState updatedAttemptState = new ApplicationAttemptState(appAttemptId, attemptStateData.getMasterContainer(), credentials, - attemptStateData.getStartTime(), attemptStateData.getState(), - attemptStateData.getFinalTrackingUrl(), + attemptStateData.getStartTime(), attemptStateData.getEndTime(), + attemptStateData.getState(), attemptStateData.getFinalTrackingUrl(), attemptStateData.getDiagnostics(), attemptStateData.getFinalApplicationStatus(), attemptStateData.getAMContainerExitStatus()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 9b05ea1..d98d8e9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -53,8 +53,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNewSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; @@ -257,6 +255,7 @@ public RMStateStore() { final Container masterContainer; final Credentials appAttemptCredentials; long startTime = 0; + long endTime = 0; // fields set when attempt completes RMAppAttemptState state; String finalTrackingUrl = "N/A"; @@ -267,19 +266,20 @@ public RMStateStore() { public ApplicationAttemptState(ApplicationAttemptId attemptId, Container masterContainer, Credentials appAttemptCredentials, long startTime) { - this(attemptId, masterContainer, appAttemptCredentials, startTime, null, - null, "", null, ContainerExitStatus.INVALID); + this(attemptId, masterContainer, appAttemptCredentials, startTime, + -1, null, null, "", null, ContainerExitStatus.INVALID); } public ApplicationAttemptState(ApplicationAttemptId attemptId, Container masterContainer, Credentials appAttemptCredentials, - long startTime, RMAppAttemptState state, String finalTrackingUrl, - String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus, - int exitStatus) { + long startTime, long endTime, RMAppAttemptState state, + String finalTrackingUrl, String diagnostics, + FinalApplicationStatus amUnregisteredFinalStatus, int exitStatus) { this.attemptId = attemptId; this.masterContainer = masterContainer; this.appAttemptCredentials = appAttemptCredentials; this.startTime = startTime; + this.endTime = endTime; this.state = state; this.finalTrackingUrl = finalTrackingUrl; this.diagnostics = diagnostics == null ? "" : diagnostics; @@ -308,6 +308,9 @@ public String getDiagnostics() { public long getStartTime() { return startTime; } + public long getEndTime() { + return endTime; + } public FinalApplicationStatus getFinalApplicationStatus() { return amUnregisteredFinalStatus; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 01bca39..5bc8fc5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -567,7 +567,8 @@ private void loadApplicationAttemptState(ApplicationState appState, ApplicationAttemptState attemptState = new ApplicationAttemptState(attemptId, attemptStateData.getMasterContainer(), credentials, - attemptStateData.getStartTime(), attemptStateData.getState(), + attemptStateData.getStartTime(), attemptStateData.getEndTime(), + attemptStateData.getState(), attemptStateData.getFinalTrackingUrl(), attemptStateData.getDiagnostics(), attemptStateData.getFinalApplicationStatus(), diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java index 90fb3ec..011016a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java @@ -41,9 +41,10 @@ public abstract class ApplicationAttemptStateData { public static ApplicationAttemptStateData newInstance( ApplicationAttemptId attemptId, Container container, - ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState, - String finalTrackingUrl, String diagnostics, - FinalApplicationStatus amUnregisteredFinalStatus, int exitStatus) { + ByteBuffer attemptTokens, long startTime, long endTime, + RMAppAttemptState finalState, String finalTrackingUrl, + String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus, + int exitStatus) { ApplicationAttemptStateData attemptStateData = Records.newRecord(ApplicationAttemptStateData.class); attemptStateData.setAttemptId(attemptId); @@ -53,6 +54,7 @@ public static ApplicationAttemptStateData newInstance( attemptStateData.setFinalTrackingUrl(finalTrackingUrl); attemptStateData.setDiagnostics(diagnostics); attemptStateData.setStartTime(startTime); + attemptStateData.setEndTime(endTime); attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus); attemptStateData.setAMContainerExitStatus(exitStatus); return attemptStateData; @@ -69,9 +71,9 @@ public static ApplicationAttemptStateData newInstance( } return newInstance(attemptState.getAttemptId(), attemptState.getMasterContainer(), appAttemptTokens, - attemptState.getStartTime(), attemptState.getState(), - attemptState.getFinalTrackingUrl(), attemptState.getDiagnostics(), - attemptState.getFinalApplicationStatus(), + attemptState.getStartTime(), attemptState.getEndTime(), + attemptState.getState(), attemptState.getFinalTrackingUrl(), + attemptState.getDiagnostics(), attemptState.getFinalApplicationStatus(), attemptState.getAMContainerExitStatus()); } @@ -146,6 +148,14 @@ public static ApplicationAttemptStateData newInstance( public abstract void setStartTime(long startTime); /** + * Get the end time of the application. + * @return end time of the application + */ + public abstract long getEndTime(); + + public abstract void setEndTime(long endTime); + + /** * Get the final finish status of the application. * @return final finish status of the application */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java index a90bda4..2db8491 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java @@ -229,6 +229,18 @@ public void setStartTime(long startTime) { } @Override + public long getEndTime() { + ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; + return p.getEndTime(); + } + + @Override + public void setEndTime(long endTime) { + maybeInitBuilder(); + builder.setEndTime(endTime); + } + + @Override public FinalApplicationStatus getFinalApplicationStatus() { ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; if (!p.hasFinalApplicationStatus()) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index 2b590a0..eb61a98 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.Set; +import org.apache.hadoop.yarn.server.api.AMRetryCountResetPolicy; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -223,4 +224,11 @@ ApplicationReport createAndGetApplicationReport(String clientUserName, * @return the external user-facing state of ApplicationMaster. */ YarnApplicationState createApplicationState(); + + /** + * Used by {@link AMRetryCountResetPolicy} to reset the retryCount + * @param retryCount + */ + void setRetryCount(int retryCount); + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index bff41cf..be276bb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -124,6 +124,8 @@ private RMAppState targetedFinalState; private RMAppState recoveredFinalState; + private int attemptResetCount = 0; + Object transitionTodo; private static final StateMachineFactory= this.maxAppAttempts) { + } else if (getNumFailedAppAttempts() - this.attemptResetCount + >= this.maxAppAttempts) { msg = "Application " + this.getApplicationId() + " failed " + this.maxAppAttempts + " times due to " + failedEvent.getDiagnostics() + ". Failing the application."; @@ -1131,7 +1136,8 @@ public AttemptFailedTransition(RMAppState initialState) { public RMAppState transition(RMAppImpl app, RMAppEvent event) { if (!app.submissionContext.getUnmanagedAM() - && app.getNumFailedAppAttempts() < app.maxAppAttempts) { + && app.getNumFailedAppAttempts() - app.attemptResetCount + < app.maxAppAttempts) { boolean transferStateFromPreviousAttempt = false; RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event; transferStateFromPreviousAttempt = @@ -1193,7 +1199,7 @@ public static boolean isAppInFinalState(RMApp rmApp) { return appState == RMAppState.FAILED || appState == RMAppState.FINISHED || appState == RMAppState.KILLED; } - + private RMAppState getRecoveredFinalState() { return this.recoveredFinalState; } @@ -1202,4 +1208,14 @@ private RMAppState getRecoveredFinalState() { public Set getRanNodes() { return ranNodes; } + + @Override + public void setRetryCount(int retryCount) { + try { + this.writeLock.lock(); + this.attemptResetCount = retryCount; + } finally { + this.writeLock.unlock(); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java index 68a068b..08c4f79 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java @@ -174,6 +174,12 @@ long getStartTime(); /** + * the end time of the application. + * @return the end time of the application. + */ + long getEndTime(); + + /** * The current state of the {@link RMAppAttempt}. * * @return the current state {@link RMAppAttemptState} for this application @@ -207,4 +213,8 @@ * */ boolean shouldCountTowardsMaxAttemptRetry(); + + boolean mayBeLastAttempt(); + + void setMaybeLastAttemptFlag(boolean maybeLastAttempt); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index ef572d4..08996b7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -37,6 +37,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -60,6 +61,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier; +import org.apache.hadoop.yarn.server.api.AMRetryCountResetPolicy; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; @@ -84,6 +86,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.policy.WindowsSlideAMRetryCountResetPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; @@ -141,6 +144,7 @@ private String originalTrackingUrl = "N/A"; private String proxiedTrackingUrl = "N/A"; private long startTime = 0; + private long endTime = -1; // Set to null initially. Will eventually get set // if an RMAppAttemptUnregistrationEvent occurs @@ -154,7 +158,8 @@ // re-created if this attempt is eventually failed because of preemption, // hardware error or NM resync. So this flag indicates that this may be // last attempt. - private final boolean maybeLastAttempt; + private boolean maybeLastAttempt; + private AMRetryCountResetPolicy resetPolicy = null; private static final ExpiredTransition EXPIRED_TRANSITION = new ExpiredTransition(); @@ -411,6 +416,14 @@ public RMAppAttemptImpl(ApplicationAttemptId appAttemptId, this.proxiedTrackingUrl = generateProxyUriWithScheme(null); this.maybeLastAttempt = maybeLastAttempt; this.stateMachine = stateMachineFactory.make(this); + this.resetPolicy = submissionContext.getAMRetryCountResetPolicy(); + if (this.resetPolicy != null) { + if (this.resetPolicy instanceof WindowsSlideAMRetryCountResetPolicy) { + ((WindowsSlideAMRetryCountResetPolicy) this.resetPolicy) + .setup((RMAppImpl) this.rmContext.getRMApps().get( + appAttemptId.getApplicationId())); + } + } } @Override @@ -694,6 +707,10 @@ public void recover(RMState state) throws Exception { this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl); this.finalStatus = attemptState.getFinalApplicationStatus(); this.startTime = attemptState.getStartTime(); + this.endTime = attemptState.getEndTime(); + if (this.resetPolicy != null) { + this.resetPolicy.recover(); + } } public void transferStateFromPreviousAttempt(RMAppAttempt attempt) { @@ -984,10 +1001,12 @@ private void rememberTargetTransitionsAndStoreState(RMAppAttemptEvent event, } RMStateStore rmStore = rmContext.getStateStore(); + this.setEndTime(System.currentTimeMillis()); ApplicationAttemptState attemptState = new ApplicationAttemptState(applicationAttemptId, getMasterContainer(), rmStore.getCredentialsFromAppAttempt(this), startTime, - stateToBeStored, finalTrackingUrl, diags, finalStatus, exitStatus); + this.getEndTime(), stateToBeStored, finalTrackingUrl, diags, + finalStatus, exitStatus); LOG.info("Updating application attempt " + applicationAttemptId + " with final state: " + targetedFinalState + ", and exit status: " + exitStatus); @@ -1036,6 +1055,12 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, ((MultipleArcTransition) appAttempt.transitionTodo).transition( appAttempt, causeEvent); } + + // Stop AMRetryCountResetPolicy + if (appAttempt.resetPolicy != null) { + appAttempt.resetPolicy.stop(); + } + return appAttempt.targetedFinalState; } } @@ -1233,6 +1258,11 @@ public void transition(RMAppAttemptImpl appAttempt, appAttempt.rmContext.getRMApplicationHistoryWriter() .applicationAttemptStarted(appAttempt); + + // Start AMRetryCountResetPolicy + if (appAttempt.resetPolicy != null) { + appAttempt.resetPolicy.start(); + } } } @@ -1609,6 +1639,25 @@ public long getStartTime() { } @Override + public long getEndTime() { + try { + this.readLock.lock(); + return this.endTime; + } finally { + this.readLock.unlock(); + } + } + + private void setEndTime(long endTime) { + try { + this.writeLock.lock(); + this.endTime = endTime; + } finally { + this.writeLock.unlock(); + } + } + + @Override public RMAppAttemptState getState() { this.readLock.lock(); @@ -1698,7 +1747,23 @@ public ApplicationAttemptReport createApplicationAttemptReport() { } // for testing + @Override public boolean mayBeLastAttempt() { - return maybeLastAttempt; + try { + this.readLock.lock(); + return maybeLastAttempt; + } finally { + this.readLock.unlock(); + } + } + + @Private + public void setMaybeLastAttemptFlag(boolean maybeLastAttempt) { + try { + this.writeLock.lock(); + this.maybeLastAttempt = maybeLastAttempt; + } finally { + this.writeLock.unlock(); + } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/policy/WindowsSlideAMRetryCountResetPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/policy/WindowsSlideAMRetryCountResetPolicy.java new file mode 100644 index 0000000..7d1d719 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/policy/WindowsSlideAMRetryCountResetPolicy.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmapp.policy; + +import java.util.Map.Entry; +import java.util.Timer; +import java.util.TimerTask; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.server.api.AMRetryCountResetPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; + +/** + * The policy is used to reset AM retry count if current AppMaster successfully + * runs for a period of time. To use this policy, you need to initiate it by + * passing a parameter which is used to define period of time in milliseconds + * that AM retry count will be reset. + */ +public class WindowsSlideAMRetryCountResetPolicy implements + AMRetryCountResetPolicy { + + private static final long serialVersionUID = 1529666453440395104L; + + private final int resetWindow; + + private Timer timer; + + private RMAppImpl rmApp; + + public WindowsSlideAMRetryCountResetPolicy(int resetWindow) { + super(); + this.resetWindow = resetWindow; + if (this.resetWindow <= 0) { + throw new IllegalArgumentException("The parameter should larger than 0"); + } + this.timer = null; + } + + @Override + public synchronized void amRetryCountReset() { + // The formula we are using to calculate the lastAttempt: + // maxAppAttempts == getNumFailedAppAttempts() - attemptResetCount. + // To do the AM retry count reset, just set it + // as current number of FailedAppAttempts. + int attemptFailureCount = 0; + for (Entry entry : this.rmApp + .getAppAttempts().entrySet()) { + if (entry.getKey() != this.rmApp.getCurrentAppAttempt().getAppAttemptId() + && entry.getValue().shouldCountTowardsMaxAttemptRetry()) { + attemptFailureCount++; + } + } + rmApp.setRetryCount(attemptFailureCount); + if (this.rmApp.getCurrentAppAttempt().mayBeLastAttempt()) { + this.rmApp.getCurrentAppAttempt().setMaybeLastAttemptFlag(false); + } + } + + public int getResetWindow() { + return resetWindow; + } + + @Override + public synchronized void start() { + this.timer = new Timer(); + this.timer.scheduleAtFixedRate(new retryCountResetActivator(), resetWindow, + resetWindow); + } + + @Override + public synchronized void stop() { + if (this.timer != null) { + this.timer.cancel(); + } + } + + private class retryCountResetActivator extends TimerTask { + @Override + public void run() { + amRetryCountReset(); + } + } + + public synchronized void setup(RMAppImpl rmApp) { + this.rmApp = rmApp; + } + + @Override + public synchronized void recover() { + int attemptFailureCount = 0; + if (this.rmApp.getCurrentAppAttempt().getEndTime() + - this.rmApp.getCurrentAppAttempt().getStartTime() >= resetWindow) { + for (Entry entry : this.rmApp + .getAppAttempts().entrySet()) { + if (entry.getKey() != this.rmApp.getCurrentAppAttempt() + .getAppAttemptId() + && entry.getValue().shouldCountTowardsMaxAttemptRetry()) { + attemptFailureCount++; + } + } + this.rmApp.setRetryCount(attemptFailureCount); + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 6949a81..484c7c3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.api.AMRetryCountResetPolicy; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; @@ -283,6 +284,25 @@ public RMApp submitApp(int masterMemory, String name, String user, int maxAppAttempts, Credentials ts, String appType, boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided, ApplicationId applicationId) throws Exception { + return submitApp(masterMemory, name, user, acls, unmanaged, queue, + maxAppAttempts, ts, appType, waitForAccepted, keepContainers, + isAppIdProvided, applicationId, null); + } + + public RMApp submitApp(int masterMemory, AMRetryCountResetPolicy resetPolicy) + throws Exception { + return submitApp(masterMemory, "", UserGroupInformation.getCurrentUser() + .getShortUserName(), null, false, null, + super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false, + false, null, resetPolicy); + } + + public RMApp submitApp(int masterMemory, String name, String user, + Map acls, boolean unmanaged, String queue, + int maxAppAttempts, Credentials ts, String appType, + boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided, + ApplicationId applicationId, AMRetryCountResetPolicy resetPolicy) throws Exception { ApplicationId appId = isAppIdProvided ? applicationId : null; ApplicationClientProtocol client = getClientRMService(); if (! isAppIdProvided) { @@ -318,6 +338,9 @@ public RMApp submitApp(int masterMemory, String name, String user, clc.setTokens(securityTokens); } sub.setAMContainerSpec(clc); + if (resetPolicy != null) { + sub.setAMRetryCountResetPolicy(resetPolicy); + } req.setApplicationSubmissionContext(sub); UserGroupInformation fakeUser = UserGroupInformation.createUserForTesting(user, new String[] {"someGroup"}); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index 4349a23..5656a5e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -165,6 +165,11 @@ public YarnApplicationState createApplicationState() { public Set getRanNodes() { throw new UnsupportedOperationException("Not supported yet."); } + + @Override + public void setRetryCount(int retryCount) { + throw new UnsupportedOperationException("Not supported yet."); + } } public static RMApp newApplication(int i) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index 49d7135..bf4b256 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -307,8 +307,8 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) new ApplicationAttemptState(oldAttemptState.getAttemptId(), oldAttemptState.getMasterContainer(), oldAttemptState.getAppAttemptCredentials(), - oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED, - "myTrackingUrl", "attemptDiagnostics", + oldAttemptState.getStartTime(), oldAttemptState.getEndTime(), + RMAppAttemptState.FINISHED, "myTrackingUrl", "attemptDiagnostics", FinalApplicationStatus.SUCCEEDED, 100); store.updateApplicationAttemptState(newAttemptState); @@ -330,8 +330,8 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) new ApplicationAttemptState(dummyAttemptId, oldAttemptState.getMasterContainer(), oldAttemptState.getAppAttemptCredentials(), - oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED, - "myTrackingUrl", "attemptDiagnostics", + oldAttemptState.getStartTime(), oldAttemptState.getEndTime(), + RMAppAttemptState.FINISHED, "myTrackingUrl", "attemptDiagnostics", FinalApplicationStatus.SUCCEEDED, 111); store.updateApplicationAttemptState(dummyAttempt); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index 8f26d10..94d8096 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -238,4 +238,9 @@ public YarnApplicationState createApplicationState() { public Set getRanNodes() { return null; } + + @Override + public void setRetryCount(int retryCount) { + // Do nothing + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppRetryCountResetPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppRetryCountResetPolicy.java new file mode 100644 index 0000000..3f0d9af --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppRetryCountResetPolicy.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmapp; + +import java.util.Collections; + +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.policy.WindowsSlideAMRetryCountResetPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.util.Records; +import org.junit.Assert; +import org.junit.Test; + +public class TestRMAppRetryCountResetPolicy { + + @Test (timeout = 50000) + public void testWindowsSlideAMRetryCountResetPolicy() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + // explicitly set max-am-retry count as 1. + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); + nm1.registerNode(); + WindowsSlideAMRetryCountResetPolicy resetPolicy = + new WindowsSlideAMRetryCountResetPolicy(4000); + RMApp app1 = rm1.submitApp(200, resetPolicy); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // Fail attempt1 normally + nm1 + .nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am1.waitForState(RMAppAttemptState.FAILED); + // launch the second attempt + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + Assert.assertEquals(2, app1.getAppAttempts().size()); + RMAppAttempt attempt2 = app1.getCurrentAppAttempt(); + Assert.assertTrue(((RMAppAttemptImpl) attempt2).mayBeLastAttempt()); + MockAM am2 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + am2.waitForState(RMAppAttemptState.RUNNING); + // wait for 4 second for WindowsSlideAMRetryCountResetPolicy to reset + Thread.sleep(4000); + // Right now Attempt2 is not the last Attempt + Assert.assertTrue(!((RMAppAttemptImpl) attempt2).mayBeLastAttempt()); + + // Fail attempt2 normally + nm1 + .nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am2.waitForState(RMAppAttemptState.FAILED); + + // launch the third attempt + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + Assert.assertEquals(3, app1.getAppAttempts().size()); + RMAppAttempt attempt3 = app1.getCurrentAppAttempt(); + Assert.assertTrue(((RMAppAttemptImpl) attempt3).mayBeLastAttempt()); + MockAM am3 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + am3.waitForState(RMAppAttemptState.RUNNING); + + // Restart rm. + @SuppressWarnings("resource") + MockRM rm2 = new MockRM(conf, memStore); + rm2.start(); + + // re-register the NM + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + NMContainerStatus status = Records.newRecord(NMContainerStatus.class); + status + .setContainerExitStatus(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER); + status.setContainerId(attempt3.getMasterContainer().getId()); + status.setContainerState(ContainerState.COMPLETE); + status.setDiagnostics(""); + nm1.registerNode(Collections.singletonList(status), null); + + rm2.waitForState(attempt3.getAppAttemptId(), RMAppAttemptState.FAILED); + + rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + + // Lauch Attempt 4 and it should be the last Attempt + MockAM am4 = + rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 4, nm1); + RMAppAttempt attempt4 = + rm2.getRMContext().getRMApps().get(app1.getApplicationId()) + .getCurrentAppAttempt(); + Assert.assertTrue(((RMAppAttemptImpl) attempt4).mayBeLastAttempt()); + + // wait for 2 second for WindowsSlideAMRetryCountResetPolicy to reset + Thread.sleep(4000); + // Right now Attempt4 is not the last Attempt + Assert.assertTrue(!((RMAppAttemptImpl) attempt4).mayBeLastAttempt()); + + // Fail attempt4 normally + nm1 + .nodeHeartbeat(am4.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am4.waitForState(RMAppAttemptState.FAILED); + rm1.stop(); + rm2.stop(); + } +}