diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto index eeb1479..4140998 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto @@ -71,14 +71,33 @@ message GetGroupsForUserResponseProto { //////////////////////////////////////////////////////////////////////// ////// RM recovery related records ///////////////////////////////////// //////////////////////////////////////////////////////////////////////// +enum RMAppAttemptStateProto { + ATTEMPT_NEW = 1; + ATTEMPT_SUBMITTED = 2; + ATTEMPT_SCHEDULED = 3; + ATTEMPT_ALLOCATED = 4; + ATTEMPT_LAUNCHED = 5; + ATTEMPT_FAILED = 6; + ATTEMPT_RUNNING = 7; + ATTEMPT_FINISHING = 8; + ATTEMPT_FINISHED = 9; + ATTEMPT_KILLED = 10; + ATTEMPT_ALLOCATED_SAVING = 11; + ATTEMPT_LAUNCHED_UNMANAGED_SAVING = 12; + ATTEMPT_RECOVERED = 13; + ATTEMPT_FINAL_SAVING = 14; +} + message ApplicationStateDataProto { optional int64 submit_time = 1; optional ApplicationSubmissionContextProto application_submission_context = 2; optional string user = 3; + optional YarnApplicationStateProto yarn_application_final_state = 4; } message ApplicationAttemptStateDataProto { optional ApplicationAttemptIdProto attemptId = 1; optional ContainerProto master_container = 2; optional bytes app_attempt_tokens = 3; + optional RMAppAttemptStateProto app_attempt_final_state = 4; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index e661344..d8b9f38 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; @@ -330,6 +331,11 @@ public void recover(RMState state) throws Exception { LOG.info("Recovering " + appStates.size() + " applications"); for(ApplicationState appState : appStates.values()) { boolean shouldRecover = true; + if (appState.getFinalState().equals(YarnApplicationState.FINISHED) + || appState.getFinalState().equals(YarnApplicationState.FAILED) + || appState.getFinalState().equals(YarnApplicationState.KILLED)) { + shouldRecover = false; + } if(appState.getApplicationSubmissionContext().getUnmanagedAM()) { // do not recover unmanaged applications since current recovery // mechanism of restarting attempts does not work for them. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index 062f5cc..533b7ce 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -130,7 +130,8 @@ private void loadRMAppState(RMState rmState) throws Exception { ApplicationState appState = new ApplicationState(appStateData.getSubmitTime(), appStateData.getApplicationSubmissionContext(), - appStateData.getUser()); + appStateData.getUser(), + appStateData.getYarnApplicationFinalState()); // assert child node name is same as actual applicationId assert appId.equals(appState.context.getApplicationId()); rmState.appState.put(appId, appState); @@ -152,7 +153,8 @@ private void loadRMAppState(RMState rmState) throws Exception { } ApplicationAttemptState attemptState = new ApplicationAttemptState(attemptId, - attemptStateData.getMasterContainer(), credentials); + attemptStateData.getMasterContainer(), credentials, + attemptStateData.getFinalState()); // assert child node name is same as application attempt id assert attemptId.equals(attemptState.getAttemptId()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index 0852ce8..6430fa2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -79,10 +79,11 @@ protected synchronized void closeInternal() throws Exception { public void storeApplicationState(String appId, ApplicationStateDataPBImpl appStateData) throws Exception { - ApplicationState appState = new ApplicationState( - appStateData.getSubmitTime(), - appStateData.getApplicationSubmissionContext(), appStateData.getUser()); - if (state.appState.containsKey(appState.getAppId())) { + ApplicationState appState = + new ApplicationState(appStateData.getSubmitTime(), + appStateData.getApplicationSubmissionContext(), + appStateData.getUser(), appStateData.getYarnApplicationFinalState()); + if (state.appState.containsKey(appState.getAppId())) { Exception e = new IOException("App: " + appId + " is already stored."); LOG.info("Error storing info for app: " + appId, e); throw e; @@ -105,7 +106,8 @@ public synchronized void storeApplicationAttemptState(String attemptIdStr, } ApplicationAttemptState attemptState = new ApplicationAttemptState(attemptId, - attemptStateData.getMasterContainer(), credentials); + attemptStateData.getMasterContainer(), credentials, + attemptStateData.getFinalState()); ApplicationState appState = state.getApplicationState().get( attemptState.getAttemptId().getApplicationId()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 2f4b896..9a3b391 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; @@ -50,9 +51,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRemovedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent; @Private @@ -86,13 +88,15 @@ public RMStateStore() { final ApplicationAttemptId attemptId; final Container masterContainer; final Credentials appAttemptCredentials; + RMAppAttemptState finalState; public ApplicationAttemptState(ApplicationAttemptId attemptId, - Container masterContainer, - Credentials appAttemptCredentials) { + Container masterContainer, Credentials appAttemptCredentials, + RMAppAttemptState finalState) { this.attemptId = attemptId; this.masterContainer = masterContainer; this.appAttemptCredentials = appAttemptCredentials; + this.finalState = finalState; } public Container getMasterContainer() { @@ -104,6 +108,9 @@ public ApplicationAttemptId getAttemptId() { public Credentials getAppAttemptCredentials() { return appAttemptCredentials; } + public RMAppAttemptState getFinalState(){ + return finalState; + } } /** @@ -113,14 +120,16 @@ public Credentials getAppAttemptCredentials() { final ApplicationSubmissionContext context; final long submitTime; final String user; + YarnApplicationState finalState; Map attempts = new HashMap(); - + ApplicationState(long submitTime, ApplicationSubmissionContext context, - String user) { + String user, YarnApplicationState finalState) { this.submitTime = submitTime; this.context = context; this.user = user; + this.finalState = finalState; } public ApplicationId getAppId() { @@ -141,6 +150,9 @@ public ApplicationAttemptState getAttempt(ApplicationAttemptId attemptId) { public String getUser() { return user; } + public YarnApplicationState getFinalState() { + return finalState; + } } public static class RMDTSecretManagerState { @@ -253,8 +265,9 @@ public synchronized void storeApplication(RMApp app) { ApplicationSubmissionContext context = app .getApplicationSubmissionContext(); assert context instanceof ApplicationSubmissionContextPBImpl; - ApplicationState appState = new ApplicationState( - app.getSubmitTime(), context, app.getUser()); + ApplicationState appState = + new ApplicationState(app.getSubmitTime(), context, app.getUser(), + app.getYarnApplicationFinalState()); dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState)); } @@ -279,7 +292,8 @@ public synchronized void storeApplicationAttempt(RMAppAttempt appAttempt) { ApplicationAttemptState attemptState = new ApplicationAttemptState(appAttempt.getAppAttemptId(), - appAttempt.getMasterContainer(), credentials); + appAttempt.getMasterContainer(), credentials, + appAttempt.getAppAttemptFinalState()); dispatcher.getEventHandler().handle( new RMStateStoreAppAttemptEvent(attemptState)); @@ -373,12 +387,13 @@ protected abstract void removeRMDTMasterKeyState(DelegationKey delegationKey) public synchronized void removeApplication(RMApp app) { ApplicationState appState = new ApplicationState( app.getSubmitTime(), app.getApplicationSubmissionContext(), - app.getUser()); + app.getUser(), app.getYarnApplicationFinalState()); for(RMAppAttempt appAttempt : app.getAppAttempts().values()) { Credentials credentials = getCredentialsFromAppAttempt(appAttempt); ApplicationAttemptState attemptState = new ApplicationAttemptState(appAttempt.getAppAttemptId(), - appAttempt.getMasterContainer(), credentials); + appAttempt.getMasterContainer(), credentials, + appAttempt.getAppAttemptFinalState()); appState.attempts.put(attemptState.getAttemptId(), attemptState); } @@ -430,17 +445,18 @@ private synchronized void handleStoreEvent(RMStateStoreEvent event) { switch(event.getType()) { case STORE_APP: { - ApplicationState apptState = + ApplicationState appState = ((RMStateStoreAppEvent) event).getAppState(); Exception storedException = null; ApplicationStateDataPBImpl appStateData = new ApplicationStateDataPBImpl(); - appStateData.setSubmitTime(apptState.getSubmitTime()); + appStateData.setSubmitTime(appState.getSubmitTime()); appStateData.setApplicationSubmissionContext( - apptState.getApplicationSubmissionContext()); - appStateData.setUser(apptState.getUser()); + appState.getApplicationSubmissionContext()); + appStateData.setUser(appState.getUser()); + appStateData.setYarnApplicationFinalState(appState.getFinalState()); ApplicationId appId = - apptState.getApplicationSubmissionContext().getApplicationId(); + appState.getApplicationSubmissionContext().getApplicationId(); LOG.info("Storing info for app: " + appId); try { @@ -471,7 +487,8 @@ private synchronized void handleStoreEvent(RMStateStoreEvent event) { ApplicationAttemptStateDataPBImpl attemptStateData = (ApplicationAttemptStateDataPBImpl) ApplicationAttemptStateDataPBImpl .newApplicationAttemptStateData(attemptState.getAttemptId(), - attemptState.getMasterContainer(), appAttemptTokens); + attemptState.getMasterContainer(), appAttemptTokens, + attemptState.getFinalState()); if (LOG.isDebugEnabled()) { LOG.debug("Storing info for attempt: " + attemptState.getAttemptId()); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 41c95d3..16e14e7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -224,8 +224,9 @@ private synchronized void loadRMAppState(RMState rmState) throws Exception { ApplicationStateDataProto.parseFrom(childData)); ApplicationState appState = new ApplicationState(appStateData.getSubmitTime(), - appStateData.getApplicationSubmissionContext(), - appStateData.getUser()); + appStateData.getApplicationSubmissionContext(), + appStateData.getUser(), + appStateData.getYarnApplicationFinalState()); if (!appId.equals(appState.context.getApplicationId())) { throw new YarnRuntimeException("The child node name is different " + "from the application id"); @@ -249,7 +250,8 @@ private synchronized void loadRMAppState(RMState rmState) throws Exception { } ApplicationAttemptState attemptState = new ApplicationAttemptState(attemptId, - attemptStateData.getMasterContainer(), credentials); + attemptStateData.getMasterContainer(), credentials, + attemptStateData.getFinalState()); if (!attemptId.equals(attemptState.getAttemptId())) { throw new YarnRuntimeException("The child node name is different " + "from the application attempt id"); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java index 2622b0e..b46e480 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java @@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; /* * Contains the state data that needs to be persisted for an ApplicationAttempt @@ -61,4 +62,12 @@ public ByteBuffer getAppAttemptTokens(); public void setAppAttemptTokens(ByteBuffer attemptTokens); + + /** + * Get the final state of the ap attempt. + * @return + */ + public RMAppAttemptState getFinalState(); + + public void setFinalState(RMAppAttemptState finalState); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java index 35b12e5..318045d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java @@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; /** * Contains all the state data that needs to be stored persistently @@ -66,6 +67,9 @@ @Public @Unstable public void setApplicationSubmissionContext( - ApplicationSubmissionContext context); + ApplicationSubmissionContext context); + public YarnApplicationState getYarnApplicationFinalState(); + + public void setYarnApplicationFinalState(YarnApplicationState finalState); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java index b539bec..d4dc242 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java @@ -29,7 +29,9 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMAppAttemptStateProto; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; public class ApplicationAttemptStateDataPBImpl extends ProtoBase @@ -156,14 +158,41 @@ public void setAppAttemptTokens(ByteBuffer attemptTokens) { this.appAttemptTokens = attemptTokens; } + @Override + public RMAppAttemptState getFinalState() { + ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasAppAttemptFinalState()) { + return null; + } + return convertFromProtoFormat(p.getAppAttemptFinalState()); + } + + @Override + public void setFinalState(RMAppAttemptState finalState) { + maybeInitBuilder(); + if (finalState == null) { + builder.clearAppAttemptFinalState(); + return; + } + builder.setAppAttemptFinalState(convertToProtoFormat(finalState)); + } public static ApplicationAttemptStateData newApplicationAttemptStateData( ApplicationAttemptId attemptId, Container container, - ByteBuffer attemptTokens) { + ByteBuffer attemptTokens, RMAppAttemptState finalState) { ApplicationAttemptStateData attemptStateData = recordFactory.newRecordInstance(ApplicationAttemptStateData.class); attemptStateData.setAttemptId(attemptId); attemptStateData.setMasterContainer(container); attemptStateData.setAppAttemptTokens(attemptTokens); + attemptStateData.setFinalState(finalState); return attemptStateData; } + + private static String RM_APP_ATTEMPT_PREFIX = "ATTEMPT_"; + public static RMAppAttemptStateProto convertToProtoFormat(RMAppAttemptState e) { + return RMAppAttemptStateProto.valueOf(RM_APP_ATTEMPT_PREFIX + e.name()); + } + public static RMAppAttemptState convertFromProtoFormat(RMAppAttemptStateProto e) { + return RMAppAttemptState.valueOf(e.name().replace(RM_APP_ATTEMPT_PREFIX, "")); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java index b02e056..aa40305 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java @@ -19,8 +19,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProtoOrBuilder; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; @@ -132,4 +134,24 @@ public void setApplicationSubmissionContext( this.applicationSubmissionContext = context; } + @Override + public YarnApplicationState getYarnApplicationFinalState() { + ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasYarnApplicationFinalState()) { + return null; + } + return ProtoUtils.convertFromProtoFormat(p.getYarnApplicationFinalState()); + } + + @Override + public void setYarnApplicationFinalState(YarnApplicationState finalState) { + maybeInitBuilder(); + if (finalState == null) { + builder.clearYarnApplicationFinalState(); + return; + } + builder.setYarnApplicationFinalState(ProtoUtils + .convertToProtoFormat(finalState)); + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index fadaa3b..861e69a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -211,4 +211,11 @@ ApplicationReport createAndGetApplicationReport(String clientUserName, * @return the external user-facing state of ApplicationMaster. */ YarnApplicationState createApplicationState(); + + /** + * Get the final state of the ApplicationMaster. + * @return the final state of the ApplicationMaster, null if final state is + * not yet available + */ + YarnApplicationState getYarnApplicationFinalState(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index a8a0af4..e3ef718 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -54,10 +54,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -111,8 +109,10 @@ private static final FinalTransition FINAL_TRANSITION = new FinalTransition(); private static final AppFinishedTransition FINISHED_TRANSITION = new AppFinishedTransition(); - private boolean isAppRemovalRequestSent = false; - private RMAppState previousStateAtRemoving; + private boolean isYarnApplicationFinalStateSavingRequestSent = false; + private RMAppState stateBeforeFinalSaving; + private RMAppEvent eventCausingFinalSaving; + private YarnApplicationState yarnApplicationFinalState; private static final StateMachineFactory { @Override - public void transition(RMAppImpl app, RMAppEvent event) { - if (event.getType().equals(RMAppEventType.APP_REMOVED)) { - RMAppRemovedEvent removeEvent = (RMAppRemovedEvent) event; - if (removeEvent.getRemovedException() != null) { - LOG.error( - "Failed to remove application: " + removeEvent.getApplicationId(), - removeEvent.getRemovedException()); - ExitUtil.terminate(1, removeEvent.getRemovedException()); + public RMAppState transition(RMAppImpl app, RMAppEvent event) { + app.finishTime = System.currentTimeMillis(); + RMAppStoredEvent storeEvent = (RMAppStoredEvent) event; + if (storeEvent.getStoredException() != null) { + LOG.error( + "Failed to store application final state: " + + storeEvent.getApplicationId(), storeEvent.getStoredException()); + ExitUtil.terminate(1, storeEvent.getStoredException()); + } + + RMAppEventType eventType = app.eventCausingFinalSaving.getType(); + if (eventType.equals(RMAppEventType.APP_REJECTED)) { + new AppRejectedTransition() + .transition(app, app.eventCausingFinalSaving); + return RMAppState.FAILED; + } else if (eventType.equals((RMAppEventType.ATTEMPT_FAILED))) { + String msg = null; + RMAppFailedAttemptEvent failedEvent = + (RMAppFailedAttemptEvent) app.eventCausingFinalSaving; + if (app.submissionContext.getUnmanagedAM()) { + // RM does not manage the AM. Do not retry + msg = "Unmanaged application " + app.getApplicationId() + + " failed due to " + failedEvent.getDiagnostics() + + ". Failing the application."; + } else if (app.attempts.size() >= app.maxAppAttempts) { + msg = "Application " + app.getApplicationId() + " failed " + + app.maxAppAttempts + " times due to " + + failedEvent.getDiagnostics() + ". Failing the application."; } + LOG.info(msg); + app.diagnostics.append(msg); + // Inform the node for app-finish + FINAL_TRANSITION.transition(app, app.eventCausingFinalSaving); + return RMAppState.FAILED; + } else if (eventType.equals(RMAppEventType.KILL)) { + new KillAppAndAttemptTransition().transition(app, + app.eventCausingFinalSaving); + return RMAppState.KILLED; + } else { + return RMAppState.FINISHING; } - app.finishTime = System.currentTimeMillis(); } } @@ -680,12 +713,23 @@ public void transition(RMAppImpl app, RMAppEvent event) { } } - private static final class RMAppRemovingTransition extends RMAppTransition { + private static final class RMAppFinalStateSavingTransition extends + RMAppTransition { @Override public void transition(RMAppImpl app, RMAppEvent event) { - LOG.info("Removing application with id " + app.applicationId); - app.removeApplicationState(); - app.previousStateAtRemoving = app.getState(); + app.stateBeforeFinalSaving = app.getState(); + app.eventCausingFinalSaving = event; + + if (event.getType().equals(RMAppEventType.APP_REJECTED)) { + app.yarnApplicationFinalState = YarnApplicationState.FAILED; + } else if (event.getType().equals((RMAppEventType.ATTEMPT_FAILED))) { + app.yarnApplicationFinalState = YarnApplicationState.FAILED; + } else if (event.getType().equals(RMAppEventType.KILL)) { + app.yarnApplicationFinalState = YarnApplicationState.KILLED; + } else { + app.yarnApplicationFinalState = YarnApplicationState.FINISHED; + } + app.storeYarnApplicationFinalState(); } } @@ -698,23 +742,19 @@ public void transition(RMAppImpl app, RMAppEvent event) { }; } - private static class AppKilledTransition extends FinalTransition { - @Override - public void transition(RMAppImpl app, RMAppEvent event) { - app.diagnostics.append("Application killed by user."); - super.transition(app, event); - }; - } - - private static class KillAppAndAttemptTransition extends AppKilledTransition { + private static class KillAppAndAttemptTransition extends FinalTransition { @SuppressWarnings("unchecked") @Override public void transition(RMAppImpl app, RMAppEvent event) { - app.handler.handle(new RMAppAttemptEvent(app.currentAttempt.getAppAttemptId(), - RMAppAttemptEventType.KILL)); + app.diagnostics.append("Application killed by user."); + if (app.currentAttempt != null) { + app.handler.handle(new RMAppAttemptEvent(app.currentAttempt + .getAppAttemptId(), RMAppAttemptEventType.KILL)); + } super.transition(app, event); } } + private static final class AppRejectedTransition extends FinalTransition{ public void transition(RMAppImpl app, RMAppEvent event) { @@ -744,9 +784,6 @@ public void transition(RMAppImpl app, RMAppEvent event) { if (app.getState() != RMAppState.FINISHING) { app.finishTime = System.currentTimeMillis(); } - // application completely done and remove from state store. - app.removeApplicationState(); - app.handler.handle( new RMAppManagerEvent(app.applicationId, RMAppManagerEventType.APP_COMPLETED)); @@ -764,32 +801,13 @@ public AttemptFailedTransition(RMAppState initialState) { @Override public RMAppState transition(RMAppImpl app, RMAppEvent event) { - - RMAppFailedAttemptEvent failedEvent = ((RMAppFailedAttemptEvent) event); - boolean retryApp = true; - String msg = null; - if (app.submissionContext.getUnmanagedAM()) { - // RM does not manage the AM. Do not retry - retryApp = false; - msg = "Unmanaged application " + app.getApplicationId() - + " failed due to " + failedEvent.getDiagnostics() - + ". Failing the application."; - } else if (app.attempts.size() >= app.maxAppAttempts) { - retryApp = false; - msg = "Application " + app.getApplicationId() + " failed " - + app.maxAppAttempts + " times due to " + failedEvent.getDiagnostics() - + ". Failing the application."; - } - - if (retryApp) { + if (!app.submissionContext.getUnmanagedAM() + && app.attempts.size() < app.maxAppAttempts) { app.createNewAttempt(true); return initialState; } else { - LOG.info(msg); - app.diagnostics.append(msg); - // Inform the node for app-finish - FINAL_TRANSITION.transition(app, event); - return RMAppState.FAILED; + new RMAppFinalStateSavingTransition().transition(app, event); + return RMAppState.FINAL_SAVING; } } @@ -814,9 +832,9 @@ public boolean isAppSafeToUnregister() { @Override public YarnApplicationState createApplicationState() { RMAppState rmAppState = getState(); - // If App is in REMOVING state, return its previous state. - if (rmAppState.equals(RMAppState.REMOVING)) { - rmAppState = previousStateAtRemoving; + // If App is in FINAL_SAVING state, return its previous state. + if (rmAppState.equals(RMAppState.FINAL_SAVING)) { + rmAppState = stateBeforeFinalSaving; } switch (rmAppState) { case NEW: @@ -841,10 +859,15 @@ public YarnApplicationState createApplicationState() { } } - private void removeApplicationState(){ - if (!isAppRemovalRequestSent) { - rmContext.getStateStore().removeApplication(this); - isAppRemovalRequestSent = true; + public YarnApplicationState getYarnApplicationFinalState() { + return this.yarnApplicationFinalState; + } + + private void storeYarnApplicationFinalState() { + LOG.info("Storing application final state with id " + this.applicationId); + if (!isYarnApplicationFinalStateSavingRequestSent) { + rmContext.getStateStore().storeApplication(this); + isYarnApplicationFinalStateSavingRequestSent = true; } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java index e9ce5b4..ececdae 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java @@ -24,7 +24,7 @@ SUBMITTED, ACCEPTED, RUNNING, - REMOVING, + FINAL_SAVING, FINISHING, FINISHED, FAILED, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java index 335dbda..3247b5e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java @@ -178,4 +178,11 @@ * @return the start time of the application. */ long getStartTime(); + + /** + * Get the final {@link RMAppAttempt} state. + * @return the final {@link RMAppAttempt} state, null if final state is not + * yet available + */ + RMAppAttemptState getAppAttemptFinalState(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index f68a4a5..a2d2b3e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -40,7 +40,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -157,6 +156,10 @@ private static final ExpiredTransition EXPIRED_TRANSITION = new ExpiredTransition(); + private RMAppAttemptState stateBeforeFinalSaving; + private RMAppAttemptEvent eventCausingFinalSaving; + private RMAppAttemptState appAttemptFinalState; + private static final StateMachineFactory { + @Override + public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, + RMAppAttemptEvent event) { + appAttempt.checkAttemptStoreError(event); + RMAppAttemptEvent causeEvent = appAttempt.eventCausingFinalSaving; + RMAppAttemptState beforeState = appAttempt.stateBeforeFinalSaving; + + switch (causeEvent.getType()) { + case APP_REJECTED: + new AppRejectedTransition().transition(appAttempt, causeEvent); + return RMAppAttemptState.FAILED; + case REGISTERED: + new UnexpectedAMRegisteredTransition().transition(appAttempt, + causeEvent); + return RMAppAttemptState.FAILED; + case LAUNCH_FAILED: + new LaunchFailedTransition().transition(appAttempt, causeEvent); + return RMAppAttemptState.FAILED; + case CONTAINER_FINISHED: + if (beforeState.equals(RMAppAttemptState.RUNNING)) { + RMAppAttemptContainerFinishedEvent containerFinishedEvent = + (RMAppAttemptContainerFinishedEvent) causeEvent; + ContainerStatus containerStatus = + containerFinishedEvent.getContainerStatus(); + // container associated with AM. must not be unmanaged + assert appAttempt.submissionContext.getUnmanagedAM() == false; + // Setup diagnostic message + appAttempt.diagnostics.append("AM Container for " + + appAttempt.getAppAttemptId() + " exited with " + " exitCode: " + + containerStatus.getExitStatus() + " due to: " + + containerStatus.getDiagnostics() + "." + + "Failing this attempt."); + new FinalTransition(RMAppAttemptState.FAILED).transition(appAttempt, + causeEvent); + } else { + new AMContainerCrashedTransition().transition(appAttempt, causeEvent); + } + return RMAppAttemptState.FAILED; + case EXPIRE: + EXPIRED_TRANSITION.transition(appAttempt, causeEvent); + return RMAppAttemptState.FAILED; + case KILL: + if (beforeState.equals(RMAppAttemptState.ALLOCATED)) { + new KillAllocatedAMTransition().transition(appAttempt, causeEvent); + } else if (beforeState.equals(RMAppAttemptState.LAUNCHED) + || beforeState.equals(RMAppAttemptState.RUNNING)) { + new FinalTransition(RMAppAttemptState.KILLED).transition(appAttempt, + causeEvent); + } else { + new BaseFinalTransition(RMAppAttemptState.KILLED).transition( + appAttempt, causeEvent); + } + return RMAppAttemptState.KILLED; + case UNREGISTERED: + appAttempt.unregisterApplicationMaster(appAttempt, causeEvent); + return RMAppAttemptState.FINISHING; + default: + // To do + return RMAppAttemptState.FINISHING; + } + } + } private static class BaseFinalTransition extends BaseTransition { @@ -1125,38 +1232,39 @@ public void transition(RMAppAttemptImpl appAttempt, @Override public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { - ApplicationAttemptId appAttemptId = appAttempt.getAppAttemptId(); - - appAttempt.rmContext.getAMLivelinessMonitor().unregister(appAttemptId); - - appAttempt.progress = 1.0f; - - RMAppAttemptUnregistrationEvent unregisterEvent - = (RMAppAttemptUnregistrationEvent) event; - appAttempt.diagnostics.append(unregisterEvent.getDiagnostics()); - appAttempt.origTrackingUrl = - sanitizeTrackingUrl(unregisterEvent.getTrackingUrl()); - appAttempt.proxiedTrackingUrl = - appAttempt.generateProxyUriWithScheme(appAttempt.origTrackingUrl); - appAttempt.finalStatus = unregisterEvent.getFinalApplicationStatus(); - // Tell the app if (appAttempt.getSubmissionContext().getUnmanagedAM()) { // Unmanaged AMs have no container to wait for, so they skip // the FINISHING state and go straight to FINISHED. + appAttempt.unregisterApplicationMaster(appAttempt, event); new FinalTransition(RMAppAttemptState.FINISHED).transition( appAttempt, event); return RMAppAttemptState.FINISHED; } - appAttempt.rmContext.getAMFinishingMonitor().register(appAttemptId); + // Saving the attempt final state + new AttemptFinalStateSavingTransition().transition(appAttempt, event); ApplicationId applicationId = appAttempt.getAppAttemptId().getApplicationId(); - appAttempt.eventHandler.handle( - new RMAppEvent(applicationId, RMAppEventType.ATTEMPT_UNREGISTERED)); - return RMAppAttemptState.FINISHING; + appAttempt.eventHandler.handle(new RMAppEvent(applicationId, + RMAppEventType.ATTEMPT_UNREGISTERED)); + return RMAppAttemptState.FINAL_SAVING; } } + private void unregisterApplicationMaster(RMAppAttempt appAttempt, + RMAppAttemptEvent event) { + ApplicationAttemptId appAttemptId = appAttempt.getAppAttemptId(); + rmContext.getAMLivelinessMonitor().unregister(appAttemptId); + progress = 1.0f; + + RMAppAttemptUnregistrationEvent unregisterEvent = + (RMAppAttemptUnregistrationEvent) event; + diagnostics.append(unregisterEvent.getDiagnostics()); + origTrackingUrl = sanitizeTrackingUrl(unregisterEvent.getTrackingUrl()); + proxiedTrackingUrl = generateProxyUriWithScheme(origTrackingUrl); + finalStatus = unregisterEvent.getFinalApplicationStatus(); + } + private static final class ContainerAcquiredTransition extends BaseTransition { @Override @@ -1186,18 +1294,9 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, if (appAttempt.masterContainer != null && appAttempt.masterContainer.getId().equals( containerStatus.getContainerId())) { - // container associated with AM. must not be unmanaged - assert appAttempt.submissionContext.getUnmanagedAM() == false; - // Setup diagnostic message - appAttempt.diagnostics.append("AM Container for " + - appAttempt.getAppAttemptId() + " exited with " + - " exitCode: " + containerStatus.getExitStatus() + - " due to: " + containerStatus.getDiagnostics() + "." + - "Failing this attempt."); - - new FinalTransition(RMAppAttemptState.FAILED).transition( - appAttempt, containerFinishedEvent); - return RMAppAttemptState.FAILED; + // Saving the final attempt state. + new AttemptFinalStateSavingTransition().transition(appAttempt, event); + return RMAppAttemptState.FINAL_SAVING; } // Normal container. @@ -1267,7 +1366,8 @@ private void checkAttemptStoreError(RMAppAttemptEvent event) { } } - private void storeAttempt(RMStateStore store) { + private void storeAttempt() { + RMStateStore store = rmContext.getStateStore(); // store attempt data in a non-blocking manner to prevent dispatcher // thread starvation and wait for state to be saved LOG.info("Storing attempt: AppId: " + @@ -1278,6 +1378,10 @@ private void storeAttempt(RMStateStore store) { store.storeApplicationAttempt(this); } + public RMAppAttemptState getAppAttemptFinalState() { + return this.appAttemptFinalState; + } + private void removeCredentials(RMAppAttemptImpl appAttempt) { // Unregister from the ClientToAMTokenSecretManager if (UserGroupInformation.isSecurityEnabled()) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java index 3eb13ed..2551ed1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java @@ -20,5 +20,6 @@ public enum RMAppAttemptState { NEW, SUBMITTED, SCHEDULED, ALLOCATED, LAUNCHED, FAILED, RUNNING, FINISHING, - FINISHED, KILLED, ALLOCATED_SAVING, LAUNCHED_UNMANAGED_SAVING, RECOVERED + FINISHED, KILLED, ALLOCATED_SAVING, LAUNCHED_UNMANAGED_SAVING, RECOVERED, + FINAL_SAVING } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index 73b9cf7..68de68c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -153,6 +153,11 @@ public boolean isAppSafeToUnregister() { public YarnApplicationState createApplicationState() { throw new UnsupportedOperationException("Not supported yet."); } + + @Override + public YarnApplicationState getYarnApplicationFinalState() { + throw new UnsupportedOperationException("Not supported yet."); + } } public static RMApp newApplication(int i) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java index d75fc7d..32c6f5f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java @@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -66,13 +67,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.util.ConverterUtils; - import org.apache.zookeeper.ZooKeeper; - import org.junit.Test; public class TestRMStateStore extends ClientBaseWithFixes{ @@ -231,8 +231,8 @@ void waitNotify(TestDispatcher dispatcher) { dispatcher.notified = false; } - void storeApp( - RMStateStore store, ApplicationId appId, long time) throws Exception { + void storeApp(RMStateStore store, ApplicationId appId, long time, + YarnApplicationState finalState) throws Exception { ApplicationSubmissionContext context = new ApplicationSubmissionContextPBImpl(); context.setApplicationId(appId); @@ -242,13 +242,14 @@ void storeApp( when(mockApp.getSubmitTime()).thenReturn(time); when(mockApp.getApplicationSubmissionContext()).thenReturn(context); when(mockApp.getUser()).thenReturn("test"); + when(mockApp.getYarnApplicationFinalState()).thenReturn(finalState); store.storeApplication(mockApp); } ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId, String containerIdStr, Token appToken, - SecretKey clientTokenMasterKey, TestDispatcher dispatcher) - throws Exception { + SecretKey clientTokenMasterKey, TestDispatcher dispatcher, + RMAppAttemptState finalState) throws Exception { Container container = new ContainerPBImpl(); container.setId(ConverterUtils.toContainerId(containerIdStr)); @@ -258,6 +259,7 @@ ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId, when(mockAttempt.getAMRMToken()).thenReturn(appToken); when(mockAttempt.getClientTokenMasterKey()) .thenReturn(clientTokenMasterKey); + when(mockAttempt.getAppAttemptFinalState()).thenReturn(finalState); dispatcher.attemptId = attemptId; dispatcher.storedException = null; store.storeApplicationAttempt(mockAttempt); @@ -281,7 +283,7 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) ApplicationAttemptId attemptId1 = ConverterUtils .toApplicationAttemptId("appattempt_1352994193343_0001_000001"); ApplicationId appId1 = attemptId1.getApplicationId(); - storeApp(store, appId1, submitTime); + storeApp(store, appId1, submitTime, YarnApplicationState.FINISHED); // create application token and client token key for attempt1 Token appAttemptToken1 = @@ -292,8 +294,8 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) clientToAMTokenMgr.createMasterKey(attemptId1); ContainerId containerId1 = storeAttempt(store, attemptId1, - "container_1352994193343_0001_01_000001", - appAttemptToken1, clientTokenKey1, dispatcher); + "container_1352994193343_0001_01_000001", appAttemptToken1, + clientTokenKey1, dispatcher, RMAppAttemptState.FINISHED); String appAttemptIdStr2 = "appattempt_1352994193343_0001_000002"; ApplicationAttemptId attemptId2 = @@ -308,15 +310,15 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) clientToAMTokenMgr.createMasterKey(attemptId2); ContainerId containerId2 = storeAttempt(store, attemptId2, - "container_1352994193343_0001_02_000001", - appAttemptToken2, clientTokenKey2, dispatcher); + "container_1352994193343_0001_02_000001", appAttemptToken2, + clientTokenKey2, dispatcher, RMAppAttemptState.FAILED); ApplicationAttemptId attemptIdRemoved = ConverterUtils .toApplicationAttemptId("appattempt_1352994193343_0002_000001"); ApplicationId appIdRemoved = attemptIdRemoved.getApplicationId(); - storeApp(store, appIdRemoved, submitTime); + storeApp(store, appIdRemoved, submitTime, YarnApplicationState.FAILED); storeAttempt(store, attemptIdRemoved, - "container_1352994193343_0002_01_000001", null, null, dispatcher); + "container_1352994193343_0002_01_000001", null, null, dispatcher, null); RMApp mockRemovedApp = mock(RMApp.class); HashMap attempts = @@ -350,6 +352,9 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) // submission context is loaded correctly assertEquals(appId1, appState.getApplicationSubmissionContext().getApplicationId()); + // app final state is loaded correctly + assertEquals(YarnApplicationState.FINISHED, appState.getFinalState()); + ApplicationAttemptState attemptState = appState.getAttempt(attemptId1); // attempt1 is loaded correctly assertNotNull(attemptState); @@ -364,6 +369,7 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) assertArrayEquals(clientTokenKey1.getEncoded(), attemptState.getAppAttemptCredentials() .getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME)); + assertEquals(RMAppAttemptState.FINISHED, attemptState.getFinalState()); attemptState = appState.getAttempt(attemptId2); // attempt2 is loaded correctly @@ -379,7 +385,8 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) assertArrayEquals(clientTokenKey2.getEncoded(), attemptState.getAppAttemptCredentials() .getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME)); - + assertEquals(RMAppAttemptState.FAILED, attemptState.getFinalState()); + // assert store is in expected state after everything is cleaned assertTrue(stateStoreHelper.isFinalStateValid()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index 73dc8d3..4a4a141 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -226,5 +226,10 @@ public boolean isAppSafeToUnregister() { @Override public YarnApplicationState createApplicationState() { return null; + } + + @Override + public YarnApplicationState getYarnApplicationFinalState() { + return null; }; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index 2075921..c179ace 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -24,6 +24,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.times; import java.io.IOException; import java.util.Arrays; @@ -286,7 +287,8 @@ private static void assertFinalAppStatus(FinalApplicationStatus status, RMApp ap } // test to make sure times are set when app finishes - private static void assertTimesAtFinish(RMApp application) { + private void assertTimesAtFinish(RMApp application) { + sendAppSavedEvent(application); assertStartTimeSet(application); Assert.assertTrue("application finish time is not greater then 0", (application.getFinishTime() > 0)); @@ -294,11 +296,14 @@ private static void assertTimesAtFinish(RMApp application) { (application.getFinishTime() >= application.getStartTime())); } - private void assertAppRemoved(RMApp application){ - verify(store).removeApplication(application); + private void assertAppFinalStateSaved(RMApp application){ + // storeApplication is called twice, one for the application submission and + // the other for saving the final state of the application + verify(store, times(2)).storeApplication(application); } - private static void assertKilled(RMApp application) { + private void assertKilled(RMApp application) { + sendAppSavedEvent(application); assertTimesAtFinish(application); assertAppState(RMAppState.KILLED, application); assertFinalAppStatus(FinalApplicationStatus.KILLED, application); @@ -307,14 +312,19 @@ private static void assertKilled(RMApp application) { "Application killed by user.", diag.toString()); } - private static void assertAppAndAttemptKilled(RMApp application) throws InterruptedException { + private void assertAppAndAttemptKilled(RMApp application) + throws InterruptedException { assertKilled(application); - Assert.assertEquals( RMAppAttemptState.KILLED, - application.getCurrentAppAttempt().getAppAttemptState() - ); + // send attempt final state saved event. + application.getCurrentAppAttempt().handle( + new RMAppAttemptStoredEvent(application.getCurrentAppAttempt() + .getAppAttemptId(), null)); + Assert.assertEquals(RMAppAttemptState.KILLED, application + .getCurrentAppAttempt().getAppAttemptState()); } - private static void assertFailed(RMApp application, String regex) { + private void assertFailed(RMApp application, String regex) { + sendAppSavedEvent(application); assertTimesAtFinish(application); assertAppState(RMAppState.FAILED, application); assertFinalAppStatus(FinalApplicationStatus.FAILED, application); @@ -323,6 +333,13 @@ private static void assertFailed(RMApp application, String regex) { diag.toString().matches(regex)); } + private void sendAppSavedEvent(RMApp application) { + RMAppEvent event = + new RMAppStoredEvent(application.getApplicationId(), null); + application.handle(event); + rmDispatcher.await(); + } + protected RMApp testCreateAppNewSaving( ApplicationSubmissionContext submissionContext) throws IOException { RMApp application = createNewTestApp(submissionContext); @@ -386,15 +403,15 @@ protected RMApp testCreateAppRunning( return application; } - protected RMApp testCreateAppRemoving( + protected RMApp testCreateAppFinalSaving( ApplicationSubmissionContext submissionContext) throws IOException { RMApp application = testCreateAppRunning(submissionContext); RMAppEvent finishingEvent = new RMAppEvent(application.getApplicationId(), RMAppEventType.ATTEMPT_UNREGISTERED); application.handle(finishingEvent); - assertAppState(RMAppState.REMOVING, application); - assertAppRemoved(application); + assertAppState(RMAppState.FINAL_SAVING, application); + assertAppFinalStateSaved(application); return application; } @@ -402,10 +419,10 @@ protected RMApp testCreateAppFinishing( ApplicationSubmissionContext submissionContext) throws IOException { // unmanaged AMs don't use the FINISHING state assert submissionContext == null || !submissionContext.getUnmanagedAM(); - RMApp application = testCreateAppRemoving(submissionContext); - // REMOVING => FINISHING event RMAppEventType.APP_REMOVED + RMApp application = testCreateAppFinalSaving(submissionContext); + // FINAL_SAVING => FINISHING event RMAppEventType.APP_SAVED RMAppEvent finishingEvent = - new RMAppRemovedEvent(application.getApplicationId(), null); + new RMAppStoredEvent(application.getApplicationId(), null); application.handle(finishingEvent); assertAppState(RMAppState.FINISHING, application); assertTimesAtFinish(application); @@ -552,7 +569,6 @@ public void testAppSubmittedKill() throws IOException, InterruptedException { RMAppEventType.KILL); application.handle(event); rmDispatcher.await(); - assertKilled(application); assertAppAndAttemptKilled(application); } @@ -597,7 +613,6 @@ public void testAppAcceptedKill() throws IOException, InterruptedException { RMAppEventType.KILL); application.handle(event); rmDispatcher.await(); - assertKilled(application); assertAppAndAttemptKilled(application); } @@ -666,10 +681,10 @@ public void testAppRunningFailed() throws IOException { } @Test - public void testAppRemovingFinished() throws IOException { - LOG.info("--- START: testAppRemovingFINISHED ---"); - RMApp application = testCreateAppRemoving(null); - // APP_REMOVING => FINISHED event RMAppEventType.ATTEMPT_FINISHED + public void testAppSavingFinalStateFinished() throws IOException { + LOG.info("--- START: testAppSavingFinalStateFinished ---"); + RMApp application = testCreateAppFinalSaving(null); + // FINAL_SAVING => FINISHED event RMAppEventType.ATTEMPT_FINISHED RMAppEvent finishedEvent = new RMAppFinishedAttemptEvent( application.getApplicationId(), null); application.handle(finishedEvent); @@ -678,10 +693,10 @@ public void testAppRemovingFinished() throws IOException { } @Test - public void testAppRemovingKilled() throws IOException { - LOG.info("--- START: testAppRemovingKilledD ---"); - RMApp application = testCreateAppRemoving(null); - // APP_REMOVING => KILLED event RMAppEventType.KILL + public void testAppFinalSavingKilled() throws IOException { + LOG.info("--- START: testAppFinalSavingKilled ---"); + RMApp application = testCreateAppFinalSaving(null); + // FINAL_SAVING => KILLED event RMAppEventType.KILL RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); application.handle(event); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index 1f3c506..ca51b1a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -330,6 +330,7 @@ private void testAppAttemptSubmittedState() { * {@link RMAppAttemptState#SUBMITTED} -> {@link RMAppAttemptState#FAILED} */ private void testAppAttemptSubmittedToFailedState(String diagnostics) { + sendAttemptSavedEvent(applicationAttempt); assertEquals(RMAppAttemptState.FAILED, applicationAttempt.getAppAttemptState()); assertEquals(diagnostics, applicationAttempt.getDiagnostics()); @@ -354,6 +355,7 @@ private void testAppAttemptSubmittedToFailedState(String diagnostics) { */ private void testAppAttemptKilledState(Container amContainer, String diagnostics) { + sendAttemptSavedEvent(applicationAttempt); assertEquals(RMAppAttemptState.KILLED, applicationAttempt.getAppAttemptState()); assertEquals(diagnostics, applicationAttempt.getDiagnostics()); @@ -427,6 +429,7 @@ private void testAppAttemptAllocatedState(Container amContainer) { */ private void testAppAttemptFailedState(Container container, String diagnostics) { + sendAttemptSavedEvent(applicationAttempt); assertEquals(RMAppAttemptState.FAILED, applicationAttempt.getAppAttemptState()); assertEquals(diagnostics, applicationAttempt.getDiagnostics()); @@ -483,6 +486,7 @@ private void testAppAttemptFinishingState(Container container, FinalApplicationStatus finalStatus, String trackingUrl, String diagnostics) { + sendAttemptSavedEvent(applicationAttempt); assertEquals(RMAppAttemptState.FINISHING, applicationAttempt.getAppAttemptState()); assertEquals(diagnostics, applicationAttempt.getDiagnostics()); @@ -647,7 +651,13 @@ private void testUnmanagedAMSuccess(String url) { testAppAttemptFinishedState(null, finalStatus, url, diagnostics, 1, true); } - + + private void sendAttemptSavedEvent(RMAppAttempt applicationAttempt) { + applicationAttempt.handle( + new RMAppAttemptStoredEvent( + applicationAttempt.getAppAttemptId(), null)); + } + @Test public void testUnmanagedAMUnexpectedRegistration() { unmanagedAM = true; @@ -745,6 +755,7 @@ public void testAMCrashAtAllocated() { ContainerState.COMPLETE, containerDiagMsg, exitCode); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( applicationAttempt.getAppAttemptId(), cs)); + sendAttemptSavedEvent(applicationAttempt); assertEquals(RMAppAttemptState.FAILED, applicationAttempt.getAppAttemptState()); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); @@ -762,6 +773,7 @@ public void testRunningToFailed() { ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId(); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( appAttemptId, cs)); + sendAttemptSavedEvent(applicationAttempt); assertEquals(RMAppAttemptState.FAILED, applicationAttempt.getAppAttemptState()); assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); @@ -782,6 +794,7 @@ public void testRunningToKilled() { new RMAppAttemptEvent( applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.KILL)); + sendAttemptSavedEvent(applicationAttempt); assertEquals(RMAppAttemptState.KILLED, applicationAttempt.getAppAttemptState()); assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); @@ -800,6 +813,7 @@ public void testLaunchedExpire() { launchApplicationAttempt(amContainer); applicationAttempt.handle(new RMAppAttemptEvent( applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.EXPIRE)); + sendAttemptSavedEvent(applicationAttempt); assertEquals(RMAppAttemptState.FAILED, applicationAttempt.getAppAttemptState()); assertTrue("expire diagnostics missing", @@ -818,6 +832,7 @@ public void testRunningExpire() { runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false); applicationAttempt.handle(new RMAppAttemptEvent( applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.EXPIRE)); + sendAttemptSavedEvent(applicationAttempt); assertEquals(RMAppAttemptState.FAILED, applicationAttempt.getAppAttemptState()); assertTrue("expire diagnostics missing",