diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto index eeb1479..1aa3469 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto @@ -71,14 +71,46 @@ message GetGroupsForUserResponseProto { //////////////////////////////////////////////////////////////////////// ////// RM recovery related records ///////////////////////////////////// //////////////////////////////////////////////////////////////////////// +enum RMAppAttemptStateProto { + RMATTEMPT_NEW = 1; + RMATTEMPT_SUBMITTED = 2; + RMATTEMPT_SCHEDULED = 3; + RMATTEMPT_ALLOCATED = 4; + RMATTEMPT_LAUNCHED = 5; + RMATTEMPT_FAILED = 6; + RMATTEMPT_RUNNING = 7; + RMATTEMPT_FINISHING = 8; + RMATTEMPT_FINISHED = 9; + RMATTEMPT_KILLED = 10; + RMATTEMPT_ALLOCATED_SAVING = 11; + RMATTEMPT_LAUNCHED_UNMANAGED_SAVING = 12; + RMATTEMPT_RECOVERED = 13; + RMATTEMPT_FINAL_SAVING = 14; +} + +enum RMAppStateProto { + RMAPP_NEW = 1; + RMAPP_NEW_SAVING = 2; + RMAPP_SUBMITTED = 3; + RMAPP_ACCEPTED = 4; + RMAPP_RUNNING = 5; + RMAPP_FINAL_SAVING = 6; + RMAPP_FINISHING = 7; + RMAPP_FINISHED = 8; + RMAPP_FAILED = 9; + RMAPP_KILLED = 10; +} + message ApplicationStateDataProto { optional int64 submit_time = 1; optional ApplicationSubmissionContextProto application_submission_context = 2; optional string user = 3; + optional RMAppStateProto application_final_state = 4; } message ApplicationAttemptStateDataProto { optional ApplicationAttemptIdProto attemptId = 1; optional ContainerProto master_container = 2; optional bytes app_attempt_tokens = 3; + optional RMAppAttemptStateProto app_attempt_final_state = 4; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index 90c88c2..d259965 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -317,7 +317,7 @@ public SubmitApplicationResponse submitApplication( try { // call RMAppManager to submit application directly rmAppManager.submitApplication(submissionContext, - System.currentTimeMillis(), false, user); + System.currentTimeMillis(), false, user, true); LOG.info("Application with id " + applicationId.getId() + " submitted by user " + user); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index e661344..4f79c9c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; @@ -236,7 +237,7 @@ protected synchronized void checkAppNumCompletedLimit() { @SuppressWarnings("unchecked") protected void submitApplication( ApplicationSubmissionContext submissionContext, long submitTime, - boolean isRecovered, String user) throws YarnException { + boolean isRecovered, String user, boolean startApp) throws YarnException { ApplicationId applicationId = submissionContext.getApplicationId(); // Validation of the ApplicationSubmissionContext needs to be completed @@ -282,31 +283,31 @@ protected void submitApplication( this.applicationACLsManager.addApplication(applicationId, submissionContext.getAMContainerSpec().getApplicationACLs()); - try { - // Setup tokens for renewal - if (UserGroupInformation.isSecurityEnabled()) { - this.rmContext.getDelegationTokenRenewer().addApplication( - applicationId,parseCredentials(submissionContext), - submissionContext.getCancelTokensWhenComplete() - ); + if (startApp) { + try { + // Setup tokens for renewal + if (UserGroupInformation.isSecurityEnabled()) { + this.rmContext.getDelegationTokenRenewer().addApplication( + applicationId, parseCredentials(submissionContext), + submissionContext.getCancelTokensWhenComplete()); + } + } catch (IOException ie) { + LOG.warn( + "Unable to add the application to the delegation token renewer.", ie); + // Sending APP_REJECTED is fine, since we assume that the + // RMApp is in NEW state and thus we havne't yet informed the + // Scheduler about the existence of the application + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMAppRejectedEvent(applicationId, ie.getMessage())); + throw RPCUtil.getRemoteException(ie); } - } catch (IOException ie) { - LOG.warn( - "Unable to add the application to the delegation token renewer.", - ie); - // Sending APP_REJECTED is fine, since we assume that the - // RMApp is in NEW state and thus we havne't yet informed the - // Scheduler about the existence of the application + + // All done, start the RMApp this.rmContext.getDispatcher().getEventHandler().handle( - new RMAppRejectedEvent(applicationId, ie.getMessage())); - throw RPCUtil.getRemoteException(ie); + new RMAppEvent(applicationId, isRecovered ? RMAppEventType.RECOVER + : RMAppEventType.START)); } - - // All done, start the RMApp - this.rmContext.getDispatcher().getEventHandler().handle( - new RMAppEvent(applicationId, isRecovered ? RMAppEventType.RECOVER: - RMAppEventType.START)); - } +} private Credentials parseCredentials(ApplicationSubmissionContext application) throws IOException { @@ -365,11 +366,12 @@ public void recover(RMState state) throws Exception { // populating the state if(shouldRecover) { LOG.info("Recovering application " + appState.getAppId()); - submitApplication(appState.getApplicationSubmissionContext(), - appState.getSubmitTime(), true, appState.getUser()); + boolean startApp = appState.getFinalState() == null; + submitApplication(appState.getApplicationSubmissionContext(), + appState.getSubmitTime(), true, appState.getUser(), startApp); // re-populate attempt information in application - RMAppImpl appImpl = (RMAppImpl) rmContext.getRMApps().get( - appState.getAppId()); + RMAppImpl appImpl = + (RMAppImpl) rmContext.getRMApps().get(appState.getAppId()); appImpl.recover(state); } else { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index 062f5cc..170078b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -130,7 +130,8 @@ private void loadRMAppState(RMState rmState) throws Exception { ApplicationState appState = new ApplicationState(appStateData.getSubmitTime(), appStateData.getApplicationSubmissionContext(), - appStateData.getUser()); + appStateData.getUser(), + appStateData.getApplicationFinalState()); // assert child node name is same as actual applicationId assert appId.equals(appState.context.getApplicationId()); rmState.appState.put(appId, appState); @@ -152,7 +153,8 @@ private void loadRMAppState(RMState rmState) throws Exception { } ApplicationAttemptState attemptState = new ApplicationAttemptState(attemptId, - attemptStateData.getMasterContainer(), credentials); + attemptStateData.getMasterContainer(), credentials, + attemptStateData.getFinalState()); // assert child node name is same as application attempt id assert attemptId.equals(attemptState.getAttemptId()); @@ -345,7 +347,7 @@ private void deleteFile(Path deletePath) throws Exception { } private void writeFile(Path outputPath, byte[] data) throws Exception { - FSDataOutputStream fsOut = fs.create(outputPath, false); + FSDataOutputStream fsOut = fs.create(outputPath, true); fsOut.write(data); fsOut.close(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index 0852ce8..5a24b5c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -79,15 +79,18 @@ protected synchronized void closeInternal() throws Exception { public void storeApplicationState(String appId, ApplicationStateDataPBImpl appStateData) throws Exception { - ApplicationState appState = new ApplicationState( - appStateData.getSubmitTime(), - appStateData.getApplicationSubmissionContext(), appStateData.getUser()); + ApplicationState appState = + new ApplicationState(appStateData.getSubmitTime(), + appStateData.getApplicationSubmissionContext(), + appStateData.getUser(), appStateData.getApplicationFinalState()); if (state.appState.containsKey(appState.getAppId())) { - Exception e = new IOException("App: " + appId + " is already stored."); - LOG.info("Error storing info for app: " + appId, e); - throw e; + LOG.info("Storing final state " + + appStateData.getApplicationFinalState() + " for app: " + appId); + state.appState.get(appState.getAppId()).finalState = + appStateData.getApplicationFinalState(); + } else { + state.appState.put(appState.getAppId(), appState); } - state.appState.put(appState.getAppId(), appState); } @Override @@ -105,20 +108,17 @@ public synchronized void storeApplicationAttemptState(String attemptIdStr, } ApplicationAttemptState attemptState = new ApplicationAttemptState(attemptId, - attemptStateData.getMasterContainer(), credentials); + attemptStateData.getMasterContainer(), credentials, + attemptStateData.getFinalState()); ApplicationState appState = state.getApplicationState().get( attemptState.getAttemptId().getApplicationId()); if (appState == null) { throw new YarnRuntimeException("Application doesn't exist"); } - if (appState.attempts.containsKey(attemptState.getAttemptId())) { - Exception e = new IOException("Attempt: " + - attemptState.getAttemptId() + " is already stored."); - LOG.info("Error storing info for attempt: " + - attemptState.getAttemptId(), e); - throw e; + LOG.info("storing final state " + attemptState.getFinalState() + + " for attempt: " + attemptState.getAttemptId()); } appState.attempts.put(attemptState.getAttemptId(), attemptState); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 2f4b896..1dc20dc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -50,9 +50,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRemovedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent; @Private @@ -86,13 +88,15 @@ public RMStateStore() { final ApplicationAttemptId attemptId; final Container masterContainer; final Credentials appAttemptCredentials; + RMAppAttemptState finalState; public ApplicationAttemptState(ApplicationAttemptId attemptId, - Container masterContainer, - Credentials appAttemptCredentials) { + Container masterContainer, Credentials appAttemptCredentials, + RMAppAttemptState finalState) { this.attemptId = attemptId; this.masterContainer = masterContainer; this.appAttemptCredentials = appAttemptCredentials; + this.finalState = finalState; } public Container getMasterContainer() { @@ -104,6 +108,9 @@ public ApplicationAttemptId getAttemptId() { public Credentials getAppAttemptCredentials() { return appAttemptCredentials; } + public RMAppAttemptState getFinalState(){ + return finalState; + } } /** @@ -113,14 +120,16 @@ public Credentials getAppAttemptCredentials() { final ApplicationSubmissionContext context; final long submitTime; final String user; + RMAppState finalState; Map attempts = new HashMap(); - + ApplicationState(long submitTime, ApplicationSubmissionContext context, - String user) { + String user, RMAppState finalState) { this.submitTime = submitTime; this.context = context; this.user = user; + this.finalState = finalState; } public ApplicationId getAppId() { @@ -141,6 +150,9 @@ public ApplicationAttemptState getAttempt(ApplicationAttemptId attemptId) { public String getUser() { return user; } + public RMAppState getFinalState() { + return finalState; + } } public static class RMDTSecretManagerState { @@ -253,8 +265,9 @@ public synchronized void storeApplication(RMApp app) { ApplicationSubmissionContext context = app .getApplicationSubmissionContext(); assert context instanceof ApplicationSubmissionContextPBImpl; - ApplicationState appState = new ApplicationState( - app.getSubmitTime(), context, app.getUser()); + ApplicationState appState = + new ApplicationState(app.getSubmitTime(), context, app.getUser(), + app.getApplicationFinalState()); dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState)); } @@ -264,8 +277,7 @@ public synchronized void storeApplication(RMApp app) { * application. */ protected abstract void storeApplicationState(String appId, - ApplicationStateDataPBImpl appStateData) - throws Exception; + ApplicationStateDataPBImpl appStateData) throws Exception; @SuppressWarnings("unchecked") /** @@ -279,7 +291,8 @@ public synchronized void storeApplicationAttempt(RMAppAttempt appAttempt) { ApplicationAttemptState attemptState = new ApplicationAttemptState(appAttempt.getAppAttemptId(), - appAttempt.getMasterContainer(), credentials); + appAttempt.getMasterContainer(), credentials, + appAttempt.getAppAttemptFinalState()); dispatcher.getEventHandler().handle( new RMStateStoreAppAttemptEvent(attemptState)); @@ -291,8 +304,7 @@ public synchronized void storeApplicationAttempt(RMAppAttempt appAttempt) { * application attempt */ protected abstract void storeApplicationAttemptState(String attemptId, - ApplicationAttemptStateDataPBImpl attemptStateData) - throws Exception; + ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception; /** @@ -373,12 +385,13 @@ protected abstract void removeRMDTMasterKeyState(DelegationKey delegationKey) public synchronized void removeApplication(RMApp app) { ApplicationState appState = new ApplicationState( app.getSubmitTime(), app.getApplicationSubmissionContext(), - app.getUser()); + app.getUser(), app.getApplicationFinalState()); for(RMAppAttempt appAttempt : app.getAppAttempts().values()) { Credentials credentials = getCredentialsFromAppAttempt(appAttempt); ApplicationAttemptState attemptState = new ApplicationAttemptState(appAttempt.getAppAttemptId(), - appAttempt.getMasterContainer(), credentials); + appAttempt.getMasterContainer(), credentials, + appAttempt.getAppAttemptFinalState()); appState.attempts.put(attemptState.getAttemptId(), attemptState); } @@ -430,17 +443,17 @@ private synchronized void handleStoreEvent(RMStateStoreEvent event) { switch(event.getType()) { case STORE_APP: { - ApplicationState apptState = + ApplicationState appState = ((RMStateStoreAppEvent) event).getAppState(); Exception storedException = null; ApplicationStateDataPBImpl appStateData = - new ApplicationStateDataPBImpl(); - appStateData.setSubmitTime(apptState.getSubmitTime()); - appStateData.setApplicationSubmissionContext( - apptState.getApplicationSubmissionContext()); - appStateData.setUser(apptState.getUser()); + (ApplicationStateDataPBImpl) ApplicationStateDataPBImpl + .newApplicationStateData(appState.getSubmitTime(), + appState.getUser(), appState.getApplicationSubmissionContext(), + appState.getFinalState()); + ApplicationId appId = - apptState.getApplicationSubmissionContext().getApplicationId(); + appState.getApplicationSubmissionContext().getApplicationId(); LOG.info("Storing info for app: " + appId); try { @@ -469,9 +482,10 @@ private synchronized void handleStoreEvent(RMStateStoreEvent event) { ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); } ApplicationAttemptStateDataPBImpl attemptStateData = - (ApplicationAttemptStateDataPBImpl) ApplicationAttemptStateDataPBImpl + (ApplicationAttemptStateDataPBImpl) ApplicationAttemptStateDataPBImpl .newApplicationAttemptStateData(attemptState.getAttemptId(), - attemptState.getMasterContainer(), appAttemptTokens); + attemptState.getMasterContainer(), appAttemptTokens, + attemptState.getFinalState()); if (LOG.isDebugEnabled()) { LOG.debug("Storing info for attempt: " + attemptState.getAttemptId()); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 41c95d3..4cebac9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -224,8 +224,9 @@ private synchronized void loadRMAppState(RMState rmState) throws Exception { ApplicationStateDataProto.parseFrom(childData)); ApplicationState appState = new ApplicationState(appStateData.getSubmitTime(), - appStateData.getApplicationSubmissionContext(), - appStateData.getUser()); + appStateData.getApplicationSubmissionContext(), + appStateData.getUser(), + appStateData.getApplicationFinalState()); if (!appId.equals(appState.context.getApplicationId())) { throw new YarnRuntimeException("The child node name is different " + "from the application id"); @@ -249,7 +250,8 @@ private synchronized void loadRMAppState(RMState rmState) throws Exception { } ApplicationAttemptState attemptState = new ApplicationAttemptState(attemptId, - attemptStateData.getMasterContainer(), credentials); + attemptStateData.getMasterContainer(), credentials, + attemptStateData.getFinalState()); if (!attemptId.equals(attemptState.getAttemptId())) { throw new YarnRuntimeException("The child node name is different " + "from the application attempt id"); @@ -289,8 +291,12 @@ public synchronized void storeApplicationState( LOG.debug("Storing info for app: " + appId + " at: " + nodeCreatePath); } byte[] appStateData = appStateDataPB.getProto().toByteArray(); - createWithRetries( - nodeCreatePath, appStateData, zkAcl, CreateMode.PERSISTENT); + if (zkClient.exists(nodeCreatePath, true) != null) { + setDataWithRetries(nodeCreatePath, appStateData, 0); + } else { + createWithRetries(nodeCreatePath, appStateData, zkAcl, + CreateMode.PERSISTENT); + } } @Override @@ -303,8 +309,12 @@ public synchronized void storeApplicationAttemptState( + nodeCreatePath); } byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray(); - createWithRetries(nodeCreatePath, attemptStateData, zkAcl, + if (zkClient.exists(nodeCreatePath, true) != null) { + setDataWithRetries(nodeCreatePath, attemptStateData, 0); + } else { + createWithRetries(nodeCreatePath, attemptStateData, zkAcl, CreateMode.PERSISTENT); + } } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java index 2622b0e..c4fe378 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java @@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; /* * Contains the state data that needs to be persisted for an ApplicationAttempt @@ -61,4 +62,12 @@ public ByteBuffer getAppAttemptTokens(); public void setAppAttemptTokens(ByteBuffer attemptTokens); + + /** + * Get the final state of the application attempt. + * @return + */ + public RMAppAttemptState getFinalState(); + + public void setFinalState(RMAppAttemptState finalState); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java index 35b12e5..3c4bb9c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java @@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; /** * Contains all the state data that needs to be stored persistently @@ -66,6 +67,13 @@ @Public @Unstable public void setApplicationSubmissionContext( - ApplicationSubmissionContext context); + ApplicationSubmissionContext context); + /** + * Get the final state of the application. + * @return the final state of the application. + */ + public RMAppState getApplicationFinalState(); + + public void setApplicationFinalState(RMAppState finalState); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java index b539bec..2b84c87 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java @@ -29,7 +29,9 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMAppAttemptStateProto; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; public class ApplicationAttemptStateDataPBImpl extends ProtoBase @@ -156,14 +158,41 @@ public void setAppAttemptTokens(ByteBuffer attemptTokens) { this.appAttemptTokens = attemptTokens; } + @Override + public RMAppAttemptState getFinalState() { + ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasAppAttemptFinalState()) { + return null; + } + return convertFromProtoFormat(p.getAppAttemptFinalState()); + } + + @Override + public void setFinalState(RMAppAttemptState finalState) { + maybeInitBuilder(); + if (finalState == null) { + builder.clearAppAttemptFinalState(); + return; + } + builder.setAppAttemptFinalState(convertToProtoFormat(finalState)); + } public static ApplicationAttemptStateData newApplicationAttemptStateData( ApplicationAttemptId attemptId, Container container, - ByteBuffer attemptTokens) { + ByteBuffer attemptTokens, RMAppAttemptState finalState) { ApplicationAttemptStateData attemptStateData = recordFactory.newRecordInstance(ApplicationAttemptStateData.class); attemptStateData.setAttemptId(attemptId); attemptStateData.setMasterContainer(container); attemptStateData.setAppAttemptTokens(attemptTokens); + attemptStateData.setFinalState(finalState); return attemptStateData; } + + private static String RM_APP_ATTEMPT_PREFIX = "RMATTEMPT_"; + public static RMAppAttemptStateProto convertToProtoFormat(RMAppAttemptState e) { + return RMAppAttemptStateProto.valueOf(RM_APP_ATTEMPT_PREFIX + e.name()); + } + public static RMAppAttemptState convertFromProtoFormat(RMAppAttemptStateProto e) { + return RMAppAttemptState.valueOf(e.name().replace(RM_APP_ATTEMPT_PREFIX, "")); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java index b02e056..00be4ac 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java @@ -21,14 +21,20 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMAppStateProto; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; public class ApplicationStateDataPBImpl extends ProtoBase implements ApplicationStateData { - + private static final RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); + ApplicationStateDataProto proto = ApplicationStateDataProto.getDefaultInstance(); ApplicationStateDataProto.Builder builder = null; @@ -132,4 +138,43 @@ public void setApplicationSubmissionContext( this.applicationSubmissionContext = context; } + @Override + public RMAppState getApplicationFinalState() { + ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasApplicationFinalState()) { + return null; + } + return convertFromProtoFormat(p.getApplicationFinalState()); + } + + @Override + public void setApplicationFinalState(RMAppState finalState) { + maybeInitBuilder(); + if (finalState == null) { + builder.clearApplicationFinalState(); + return; + } + builder.setApplicationFinalState(convertToProtoFormat(finalState)); + } + + public static ApplicationStateData newApplicationStateData(long submitTime, + String user, ApplicationSubmissionContext submissionContext, + RMAppState finalState) { + + ApplicationStateData appState = + recordFactory.newRecordInstance(ApplicationStateData.class); + appState.setSubmitTime(submitTime); + appState.setUser(user); + appState.setApplicationSubmissionContext(submissionContext); + appState.setApplicationFinalState(finalState); + return appState; + } + + private static String RM_APP_PREFIX = "RMAPP_"; + public static RMAppStateProto convertToProtoFormat(RMAppState e) { + return RMAppStateProto.valueOf(RM_APP_PREFIX + e.name()); + } + public static RMAppState convertFromProtoFormat(RMAppStateProto e) { + return RMAppState.valueOf(e.name().replace(RM_APP_PREFIX, "")); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index fadaa3b..0056f18 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -211,4 +211,11 @@ ApplicationReport createAndGetApplicationReport(String clientUserName, * @return the external user-facing state of ApplicationMaster. */ YarnApplicationState createApplicationState(); + + /** + * Get the final state of the RMApp. + * @return the final state of the RMApp, null if final state is + * not yet available + */ + RMAppState getApplicationFinalState(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index a8a0af4..45f08e3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -54,10 +54,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -111,8 +109,11 @@ private static final FinalTransition FINAL_TRANSITION = new FinalTransition(); private static final AppFinishedTransition FINISHED_TRANSITION = new AppFinishedTransition(); - private boolean isAppRemovalRequestSent = false; - private RMAppState previousStateAtRemoving; + private boolean isFinalStateSavingRequestSent = false; + private RMAppState stateBeforeFinalSaving; + private RMAppEvent eventCausingFinalSaving; + private RMAppState appFinalState; + private boolean isRecoveredAndCompleted = false; private static final StateMachineFactory { @Override - public void transition(RMAppImpl app, RMAppEvent event) { - if (event.getType().equals(RMAppEventType.APP_REMOVED)) { - RMAppRemovedEvent removeEvent = (RMAppRemovedEvent) event; - if (removeEvent.getRemovedException() != null) { - LOG.error( - "Failed to remove application: " + removeEvent.getApplicationId(), - removeEvent.getRemovedException()); - ExitUtil.terminate(1, removeEvent.getRemovedException()); + public RMAppState transition(RMAppImpl app, RMAppEvent event) { + app.finishTime = System.currentTimeMillis(); + RMAppStoredEvent storeEvent = (RMAppStoredEvent) event; + if (storeEvent.getStoredException() != null) { + LOG.error( + "Failed to store application final state: " + + storeEvent.getApplicationId(), storeEvent.getStoredException()); + ExitUtil.terminate(1, storeEvent.getStoredException()); + } + + RMAppEventType eventType = app.eventCausingFinalSaving.getType(); + switch (eventType) { + case APP_REJECTED: + new AppRejectedTransition() + .transition(app, app.eventCausingFinalSaving); + return RMAppState.FAILED; + case ATTEMPT_FAILED: + String msg = null; + RMAppFailedAttemptEvent failedEvent = + (RMAppFailedAttemptEvent) app.eventCausingFinalSaving; + if (app.submissionContext.getUnmanagedAM()) { + // RM does not manage the AM. Do not retry + msg = "Unmanaged application " + app.getApplicationId() + + " failed due to " + failedEvent.getDiagnostics() + + ". Failing the application."; + } else if (app.attempts.size() >= app.maxAppAttempts) { + msg = "Application " + app.getApplicationId() + " failed " + + app.maxAppAttempts + " times due to " + + failedEvent.getDiagnostics() + ". Failing the application."; } + LOG.info(msg); + app.diagnostics.append(msg); + // Inform the node for app-finish + FINAL_TRANSITION.transition(app, app.eventCausingFinalSaving); + return RMAppState.FAILED; + case KILL: + new KillAppAndAttemptTransition().transition(app, + app.eventCausingFinalSaving); + return RMAppState.KILLED; + default: + return RMAppState.FINISHING; } - app.finishTime = System.currentTimeMillis(); } } @@ -680,12 +722,30 @@ public void transition(RMAppImpl app, RMAppEvent event) { } } - private static final class RMAppRemovingTransition extends RMAppTransition { + private static final class RMAppFinalStateSavingTransition extends + RMAppTransition { @Override public void transition(RMAppImpl app, RMAppEvent event) { - LOG.info("Removing application with id " + app.applicationId); - app.removeApplicationState(); - app.previousStateAtRemoving = app.getState(); + app.stateBeforeFinalSaving = app.getState(); + app.eventCausingFinalSaving = event; + + switch (event.getType()) { + case APP_REJECTED: + case ATTEMPT_FAILED: + app.appFinalState = RMAppState.FAILED; + break; + case KILL: + app.appFinalState = RMAppState.KILLED; + break; + default: + app.appFinalState = RMAppState.FINISHED; + } + LOG.info("Storing application final state " + app.appFinalState + + " with id " + app.applicationId); + if (!app.isFinalStateSavingRequestSent) { + app.rmContext.getStateStore().storeApplication(app); + app.isFinalStateSavingRequestSent = true; + } } } @@ -698,23 +758,19 @@ public void transition(RMAppImpl app, RMAppEvent event) { }; } - private static class AppKilledTransition extends FinalTransition { - @Override - public void transition(RMAppImpl app, RMAppEvent event) { - app.diagnostics.append("Application killed by user."); - super.transition(app, event); - }; - } - - private static class KillAppAndAttemptTransition extends AppKilledTransition { + private static class KillAppAndAttemptTransition extends FinalTransition { @SuppressWarnings("unchecked") @Override public void transition(RMAppImpl app, RMAppEvent event) { - app.handler.handle(new RMAppAttemptEvent(app.currentAttempt.getAppAttemptId(), - RMAppAttemptEventType.KILL)); + app.diagnostics.append("Application killed by user."); + if (app.currentAttempt != null) { + app.handler.handle(new RMAppAttemptEvent(app.currentAttempt + .getAppAttemptId(), RMAppAttemptEventType.KILL)); + } super.transition(app, event); } } + private static final class AppRejectedTransition extends FinalTransition{ public void transition(RMAppImpl app, RMAppEvent event) { @@ -744,9 +800,6 @@ public void transition(RMAppImpl app, RMAppEvent event) { if (app.getState() != RMAppState.FINISHING) { app.finishTime = System.currentTimeMillis(); } - // application completely done and remove from state store. - app.removeApplicationState(); - app.handler.handle( new RMAppManagerEvent(app.applicationId, RMAppManagerEventType.APP_COMPLETED)); @@ -764,32 +817,13 @@ public AttemptFailedTransition(RMAppState initialState) { @Override public RMAppState transition(RMAppImpl app, RMAppEvent event) { - - RMAppFailedAttemptEvent failedEvent = ((RMAppFailedAttemptEvent) event); - boolean retryApp = true; - String msg = null; - if (app.submissionContext.getUnmanagedAM()) { - // RM does not manage the AM. Do not retry - retryApp = false; - msg = "Unmanaged application " + app.getApplicationId() - + " failed due to " + failedEvent.getDiagnostics() - + ". Failing the application."; - } else if (app.attempts.size() >= app.maxAppAttempts) { - retryApp = false; - msg = "Application " + app.getApplicationId() + " failed " - + app.maxAppAttempts + " times due to " + failedEvent.getDiagnostics() - + ". Failing the application."; - } - - if (retryApp) { + if (!app.submissionContext.getUnmanagedAM() + && app.attempts.size() < app.maxAppAttempts) { app.createNewAttempt(true); return initialState; } else { - LOG.info(msg); - app.diagnostics.append(msg); - // Inform the node for app-finish - FINAL_TRANSITION.transition(app, event); - return RMAppState.FAILED; + new RMAppFinalStateSavingTransition().transition(app, event); + return RMAppState.FINAL_SAVING; } } @@ -814,9 +848,9 @@ public boolean isAppSafeToUnregister() { @Override public YarnApplicationState createApplicationState() { RMAppState rmAppState = getState(); - // If App is in REMOVING state, return its previous state. - if (rmAppState.equals(RMAppState.REMOVING)) { - rmAppState = previousStateAtRemoving; + // If App is in FINAL_SAVING state, return its previous state. + if (rmAppState.equals(RMAppState.FINAL_SAVING)) { + rmAppState = stateBeforeFinalSaving; } switch (rmAppState) { case NEW: @@ -841,10 +875,7 @@ public YarnApplicationState createApplicationState() { } } - private void removeApplicationState(){ - if (!isAppRemovalRequestSent) { - rmContext.getStateStore().removeApplication(this); - isAppRemovalRequestSent = true; - } + public RMAppState getApplicationFinalState() { + return this.appFinalState; } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java index e9ce5b4..ececdae 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java @@ -24,7 +24,7 @@ SUBMITTED, ACCEPTED, RUNNING, - REMOVING, + FINAL_SAVING, FINISHING, FINISHED, FAILED, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java index 335dbda..3247b5e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java @@ -178,4 +178,11 @@ * @return the start time of the application. */ long getStartTime(); + + /** + * Get the final {@link RMAppAttempt} state. + * @return the final {@link RMAppAttempt} state, null if final state is not + * yet available + */ + RMAppAttemptState getAppAttemptFinalState(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index f68a4a5..1456d22 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -40,7 +40,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -157,6 +156,12 @@ private static final ExpiredTransition EXPIRED_TRANSITION = new ExpiredTransition(); + private RMAppAttemptState stateBeforeFinalSaving; + private RMAppAttemptEvent eventCausingFinalSaving; + private RMAppAttemptState attemptFinalState; + private boolean isFinalSavingRequestSent = false; + private boolean isRecoveredAndCompleted = false; + private static final StateMachineFactory { + @Override + public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, + RMAppAttemptEvent event) { + appAttempt.checkAttemptStoreError(event); + RMAppAttemptEvent causeEvent = appAttempt.eventCausingFinalSaving; + RMAppAttemptState beforeState = appAttempt.stateBeforeFinalSaving; + + switch (causeEvent.getType()) { + case APP_REJECTED: + new AppRejectedTransition().transition(appAttempt, causeEvent); + return RMAppAttemptState.FAILED; + case REGISTERED: + new UnexpectedAMRegisteredTransition().transition(appAttempt, + causeEvent); + return RMAppAttemptState.FAILED; + case LAUNCH_FAILED: + new LaunchFailedTransition().transition(appAttempt, causeEvent); + return RMAppAttemptState.FAILED; + case CONTAINER_FINISHED: + if (beforeState.equals(RMAppAttemptState.RUNNING)) { + RMAppAttemptContainerFinishedEvent containerFinishedEvent = + (RMAppAttemptContainerFinishedEvent) causeEvent; + ContainerStatus containerStatus = + containerFinishedEvent.getContainerStatus(); + // container associated with AM. must not be unmanaged + assert appAttempt.submissionContext.getUnmanagedAM() == false; + // Setup diagnostic message + appAttempt.diagnostics.append("AM Container for " + + appAttempt.getAppAttemptId() + " exited with " + " exitCode: " + + containerStatus.getExitStatus() + " due to: " + + containerStatus.getDiagnostics() + "." + + "Failing this attempt."); + new FinalTransition(RMAppAttemptState.FAILED).transition(appAttempt, + causeEvent); + } else { + new AMContainerCrashedTransition().transition(appAttempt, causeEvent); + } + return RMAppAttemptState.FAILED; + case EXPIRE: + EXPIRED_TRANSITION.transition(appAttempt, causeEvent); + return RMAppAttemptState.FAILED; + case KILL: + if (beforeState.equals(RMAppAttemptState.ALLOCATED)) { + new KillAllocatedAMTransition().transition(appAttempt, causeEvent); + } else if (beforeState.equals(RMAppAttemptState.LAUNCHED) + || beforeState.equals(RMAppAttemptState.RUNNING)) { + new FinalTransition(RMAppAttemptState.KILLED).transition(appAttempt, + causeEvent); + } else { + new BaseFinalTransition(RMAppAttemptState.KILLED).transition( + appAttempt, causeEvent); + } + return RMAppAttemptState.KILLED; + case UNREGISTERED: + appAttempt.unregisterApplicationMaster(appAttempt, causeEvent); + return RMAppAttemptState.FINISHING; + default: + // To do + return RMAppAttemptState.FINISHING; + } + } + } private static class BaseFinalTransition extends BaseTransition { @@ -1125,38 +1254,39 @@ public void transition(RMAppAttemptImpl appAttempt, @Override public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { - ApplicationAttemptId appAttemptId = appAttempt.getAppAttemptId(); - - appAttempt.rmContext.getAMLivelinessMonitor().unregister(appAttemptId); - - appAttempt.progress = 1.0f; - - RMAppAttemptUnregistrationEvent unregisterEvent - = (RMAppAttemptUnregistrationEvent) event; - appAttempt.diagnostics.append(unregisterEvent.getDiagnostics()); - appAttempt.origTrackingUrl = - sanitizeTrackingUrl(unregisterEvent.getTrackingUrl()); - appAttempt.proxiedTrackingUrl = - appAttempt.generateProxyUriWithScheme(appAttempt.origTrackingUrl); - appAttempt.finalStatus = unregisterEvent.getFinalApplicationStatus(); - // Tell the app if (appAttempt.getSubmissionContext().getUnmanagedAM()) { // Unmanaged AMs have no container to wait for, so they skip // the FINISHING state and go straight to FINISHED. + appAttempt.unregisterApplicationMaster(appAttempt, event); new FinalTransition(RMAppAttemptState.FINISHED).transition( appAttempt, event); return RMAppAttemptState.FINISHED; } - appAttempt.rmContext.getAMFinishingMonitor().register(appAttemptId); + // Saving the attempt final state + new AttemptFinalStateSavingTransition().transition(appAttempt, event); ApplicationId applicationId = appAttempt.getAppAttemptId().getApplicationId(); - appAttempt.eventHandler.handle( - new RMAppEvent(applicationId, RMAppEventType.ATTEMPT_UNREGISTERED)); - return RMAppAttemptState.FINISHING; + appAttempt.eventHandler.handle(new RMAppEvent(applicationId, + RMAppEventType.ATTEMPT_UNREGISTERED)); + return RMAppAttemptState.FINAL_SAVING; } } + private void unregisterApplicationMaster(RMAppAttempt appAttempt, + RMAppAttemptEvent event) { + ApplicationAttemptId appAttemptId = appAttempt.getAppAttemptId(); + rmContext.getAMLivelinessMonitor().unregister(appAttemptId); + progress = 1.0f; + + RMAppAttemptUnregistrationEvent unregisterEvent = + (RMAppAttemptUnregistrationEvent) event; + diagnostics.append(unregisterEvent.getDiagnostics()); + origTrackingUrl = sanitizeTrackingUrl(unregisterEvent.getTrackingUrl()); + proxiedTrackingUrl = generateProxyUriWithScheme(origTrackingUrl); + finalStatus = unregisterEvent.getFinalApplicationStatus(); + } + private static final class ContainerAcquiredTransition extends BaseTransition { @Override @@ -1186,18 +1316,9 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, if (appAttempt.masterContainer != null && appAttempt.masterContainer.getId().equals( containerStatus.getContainerId())) { - // container associated with AM. must not be unmanaged - assert appAttempt.submissionContext.getUnmanagedAM() == false; - // Setup diagnostic message - appAttempt.diagnostics.append("AM Container for " + - appAttempt.getAppAttemptId() + " exited with " + - " exitCode: " + containerStatus.getExitStatus() + - " due to: " + containerStatus.getDiagnostics() + "." + - "Failing this attempt."); - - new FinalTransition(RMAppAttemptState.FAILED).transition( - appAttempt, containerFinishedEvent); - return RMAppAttemptState.FAILED; + // Saving the final attempt state. + new AttemptFinalStateSavingTransition().transition(appAttempt, event); + return RMAppAttemptState.FINAL_SAVING; } // Normal container. @@ -1267,7 +1388,7 @@ private void checkAttemptStoreError(RMAppAttemptEvent event) { } } - private void storeAttempt(RMStateStore store) { + private void storeAttempt() { // store attempt data in a non-blocking manner to prevent dispatcher // thread starvation and wait for state to be saved LOG.info("Storing attempt: AppId: " + @@ -1275,7 +1396,11 @@ private void storeAttempt(RMStateStore store) { + " AttemptId: " + getAppAttemptId() + " MasterContainer: " + masterContainer); - store.storeApplicationAttempt(this); + rmContext.getStateStore().storeApplicationAttempt(this); + } + + public RMAppAttemptState getAppAttemptFinalState() { + return this.attemptFinalState; } private void removeCredentials(RMAppAttemptImpl appAttempt) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java index 3eb13ed..2551ed1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java @@ -20,5 +20,6 @@ public enum RMAppAttemptState { NEW, SUBMITTED, SCHEDULED, ALLOCATED, LAUNCHED, FAILED, RUNNING, FINISHING, - FINISHED, KILLED, ALLOCATED_SAVING, LAUNCHED_UNMANAGED_SAVING, RECOVERED + FINISHED, KILLED, ALLOCATED_SAVING, LAUNCHED_UNMANAGED_SAVING, RECOVERED, + FINAL_SAVING } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index 5bc4554..119ca9e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -216,4 +216,8 @@ public Object run() throws Exception { } }); } + + public ApplicationAttemptId getApplicationAttemptId() { + return this.attemptId; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java index 6698412..c3a1ad1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java @@ -172,7 +172,7 @@ public void submitApplication( ApplicationSubmissionContext submissionContext, String user) throws YarnException { super.submitApplication(submissionContext, System.currentTimeMillis(), - false, user); + false, user, true); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 81f4bce..aca8e6b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -135,9 +135,10 @@ public void testRMRestart() throws Exception { am0.waitForState(RMAppAttemptState.FINISHED); rm1.waitForState(app0.getApplicationId(), RMAppState.FINISHED); - // spot check that app is not saved anymore - Assert.assertEquals(0, rmAppState.size()); - + // check that app is saved with the final state + Assert.assertEquals(RMAppState.FINISHED, + rmAppState.get(app0.getApplicationId()).getFinalState()); + // create app that gets launched and does allocate before RM restart RMApp app1 = rm1.submitApp(200); // assert app1 info is saved @@ -209,7 +210,6 @@ public void testRMRestart() throws Exception { .getApplicationId(), appUnmanaged.getApplicationSubmissionContext() .getApplicationId()); - // PHASE 2: create new RM and start from old state // create new RM to represent restart and recover state @@ -223,11 +223,16 @@ public void testRMRestart() throws Exception { nm2.setResourceTrackerService(rm2.getResourceTrackerService()); // verify load of old state - // only 2 apps are loaded since unmanaged app is not loaded back since it - // cannot be restarted by the RM this will change with work preserving RM - // restart in which AMs/NMs are not rebooted - Assert.assertEquals(2, rm2.getRMContext().getRMApps().size()); - + // 3 apps are loaded. + // FINISHED app is also loaded back. + // Unmanaged app is not loaded back since it cannot be restarted by the RM + // this will change with work preserving RM restart in which AMs/NMs are not + // rebooted. + Assert.assertEquals(3, rm2.getRMContext().getRMApps().size()); + // check that earlier finished app is alsoo loaded back. + Assert.assertEquals(RMAppState.FINISHED, + rmAppState.get(app0.getApplicationId()).getFinalState()); + // verify correct number of attempts and other data RMApp loadedApp1 = rm2.getRMContext().getRMApps().get(app1.getApplicationId()); Assert.assertNotNull(loadedApp1); @@ -331,26 +336,37 @@ public void testRMRestart() throws Exception { new ArrayList()).getAllocatedContainers()); Thread.sleep(500); } + // finish the AMs + finishApplicationMaster(loadedApp1, rm2, am1Node, am1); + finishApplicationMaster(loadedApp2, rm2, am2Node, am2); - // finish the AM's - am1.unregisterAppAttempt(); - rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.FINISHING); - am1Node.nodeHeartbeat(attempt1.getAppAttemptId(), 1, ContainerState.COMPLETE); - am1.waitForState(RMAppAttemptState.FINISHED); - - am2.unregisterAppAttempt(); - rm2.waitForState(loadedApp2.getApplicationId(), RMAppState.FINISHING); - am2Node.nodeHeartbeat(attempt2.getAppAttemptId(), 1, ContainerState.COMPLETE); - am2.waitForState(RMAppAttemptState.FINISHED); - // stop RM's rm2.stop(); rm1.stop(); - // completed apps should be removed - Assert.assertEquals(0, rmAppState.size()); + // completed apps are not removed immediately after app finish + // And finished app is also loaded back. + Assert.assertEquals(3, rmAppState.size()); } - + + private void finishApplicationMaster(RMApp rmApp, MockRM rm, MockNM nm, + MockAM am) throws Exception { + RMState rmState = + ((MemoryRMStateStore) rm.getRMContext().getStateStore()).getState(); + Map rmAppState = + rmState.getApplicationState(); + am.unregisterAppAttempt(); + rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHING); + nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am.waitForState(RMAppAttemptState.FINISHED); + // check that app/attempt is saved with the final state + ApplicationState appState = rmAppState.get(rmApp.getApplicationId()); + Assert + .assertEquals(RMAppState.FINISHED, appState.getFinalState()); + Assert.assertEquals(RMAppAttemptState.FINISHED, + appState.getAttempt(am.getApplicationAttemptId()).getFinalState()); + } + @Test public void testRMRestartOnMaxAppAttempts() throws Exception { Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index 73b9cf7..f4386e2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -153,6 +153,11 @@ public boolean isAppSafeToUnregister() { public YarnApplicationState createApplicationState() { throw new UnsupportedOperationException("Not supported yet."); } + + @Override + public RMAppState getApplicationFinalState() { + throw new UnsupportedOperationException("Not supported yet."); + } } public static RMApp newApplication(int i) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java index d75fc7d..7cbcbcb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java @@ -65,14 +65,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMDTSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.util.ConverterUtils; - import org.apache.zookeeper.ZooKeeper; - import org.junit.Test; public class TestRMStateStore extends ClientBaseWithFixes{ @@ -231,8 +231,8 @@ void waitNotify(TestDispatcher dispatcher) { dispatcher.notified = false; } - void storeApp( - RMStateStore store, ApplicationId appId, long time) throws Exception { + void storeApp(RMStateStore store, ApplicationId appId, long time, + RMAppState finalState) throws Exception { ApplicationSubmissionContext context = new ApplicationSubmissionContextPBImpl(); context.setApplicationId(appId); @@ -242,13 +242,14 @@ void storeApp( when(mockApp.getSubmitTime()).thenReturn(time); when(mockApp.getApplicationSubmissionContext()).thenReturn(context); when(mockApp.getUser()).thenReturn("test"); + when(mockApp.getApplicationFinalState()).thenReturn(finalState); store.storeApplication(mockApp); } ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId, String containerIdStr, Token appToken, - SecretKey clientTokenMasterKey, TestDispatcher dispatcher) - throws Exception { + SecretKey clientTokenMasterKey, TestDispatcher dispatcher, + RMAppAttemptState finalState) throws Exception { Container container = new ContainerPBImpl(); container.setId(ConverterUtils.toContainerId(containerIdStr)); @@ -258,6 +259,7 @@ ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId, when(mockAttempt.getAMRMToken()).thenReturn(appToken); when(mockAttempt.getClientTokenMasterKey()) .thenReturn(clientTokenMasterKey); + when(mockAttempt.getAppAttemptFinalState()).thenReturn(finalState); dispatcher.attemptId = attemptId; dispatcher.storedException = null; store.storeApplicationAttempt(mockAttempt); @@ -281,7 +283,7 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) ApplicationAttemptId attemptId1 = ConverterUtils .toApplicationAttemptId("appattempt_1352994193343_0001_000001"); ApplicationId appId1 = attemptId1.getApplicationId(); - storeApp(store, appId1, submitTime); + storeApp(store, appId1, submitTime, RMAppState.FINISHED); // create application token and client token key for attempt1 Token appAttemptToken1 = @@ -292,8 +294,8 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) clientToAMTokenMgr.createMasterKey(attemptId1); ContainerId containerId1 = storeAttempt(store, attemptId1, - "container_1352994193343_0001_01_000001", - appAttemptToken1, clientTokenKey1, dispatcher); + "container_1352994193343_0001_01_000001", appAttemptToken1, + clientTokenKey1, dispatcher, RMAppAttemptState.FINISHED); String appAttemptIdStr2 = "appattempt_1352994193343_0001_000002"; ApplicationAttemptId attemptId2 = @@ -308,15 +310,15 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) clientToAMTokenMgr.createMasterKey(attemptId2); ContainerId containerId2 = storeAttempt(store, attemptId2, - "container_1352994193343_0001_02_000001", - appAttemptToken2, clientTokenKey2, dispatcher); + "container_1352994193343_0001_02_000001", appAttemptToken2, + clientTokenKey2, dispatcher, RMAppAttemptState.FAILED); ApplicationAttemptId attemptIdRemoved = ConverterUtils .toApplicationAttemptId("appattempt_1352994193343_0002_000001"); ApplicationId appIdRemoved = attemptIdRemoved.getApplicationId(); - storeApp(store, appIdRemoved, submitTime); + storeApp(store, appIdRemoved, submitTime, RMAppState.FAILED); storeAttempt(store, attemptIdRemoved, - "container_1352994193343_0002_01_000001", null, null, dispatcher); + "container_1352994193343_0002_01_000001", null, null, dispatcher, null); RMApp mockRemovedApp = mock(RMApp.class); HashMap attempts = @@ -350,6 +352,9 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) // submission context is loaded correctly assertEquals(appId1, appState.getApplicationSubmissionContext().getApplicationId()); + // app final state is loaded correctly + assertEquals(RMAppState.FINISHED, appState.getFinalState()); + ApplicationAttemptState attemptState = appState.getAttempt(attemptId1); // attempt1 is loaded correctly assertNotNull(attemptState); @@ -364,6 +369,7 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) assertArrayEquals(clientTokenKey1.getEncoded(), attemptState.getAppAttemptCredentials() .getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME)); + assertEquals(RMAppAttemptState.FINISHED, attemptState.getFinalState()); attemptState = appState.getAttempt(attemptId2); // attempt2 is loaded correctly @@ -379,6 +385,7 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) assertArrayEquals(clientTokenKey2.getEncoded(), attemptState.getAppAttemptCredentials() .getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME)); + assertEquals(RMAppAttemptState.FAILED, attemptState.getFinalState()); // assert store is in expected state after everything is cleaned assertTrue(stateStoreHelper.isFinalStateValid()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index 73dc8d3..9542910 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -19,19 +19,18 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp; import java.util.Collection; - import java.util.LinkedHashMap; import java.util.Map; +import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -226,5 +225,10 @@ public boolean isAppSafeToUnregister() { @Override public YarnApplicationState createApplicationState() { return null; + } + + @Override + public RMAppState getApplicationFinalState() { + return null; }; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index 2075921..c179ace 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -24,6 +24,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.times; import java.io.IOException; import java.util.Arrays; @@ -286,7 +287,8 @@ private static void assertFinalAppStatus(FinalApplicationStatus status, RMApp ap } // test to make sure times are set when app finishes - private static void assertTimesAtFinish(RMApp application) { + private void assertTimesAtFinish(RMApp application) { + sendAppSavedEvent(application); assertStartTimeSet(application); Assert.assertTrue("application finish time is not greater then 0", (application.getFinishTime() > 0)); @@ -294,11 +296,14 @@ private static void assertTimesAtFinish(RMApp application) { (application.getFinishTime() >= application.getStartTime())); } - private void assertAppRemoved(RMApp application){ - verify(store).removeApplication(application); + private void assertAppFinalStateSaved(RMApp application){ + // storeApplication is called twice, one for the application submission and + // the other for saving the final state of the application + verify(store, times(2)).storeApplication(application); } - private static void assertKilled(RMApp application) { + private void assertKilled(RMApp application) { + sendAppSavedEvent(application); assertTimesAtFinish(application); assertAppState(RMAppState.KILLED, application); assertFinalAppStatus(FinalApplicationStatus.KILLED, application); @@ -307,14 +312,19 @@ private static void assertKilled(RMApp application) { "Application killed by user.", diag.toString()); } - private static void assertAppAndAttemptKilled(RMApp application) throws InterruptedException { + private void assertAppAndAttemptKilled(RMApp application) + throws InterruptedException { assertKilled(application); - Assert.assertEquals( RMAppAttemptState.KILLED, - application.getCurrentAppAttempt().getAppAttemptState() - ); + // send attempt final state saved event. + application.getCurrentAppAttempt().handle( + new RMAppAttemptStoredEvent(application.getCurrentAppAttempt() + .getAppAttemptId(), null)); + Assert.assertEquals(RMAppAttemptState.KILLED, application + .getCurrentAppAttempt().getAppAttemptState()); } - private static void assertFailed(RMApp application, String regex) { + private void assertFailed(RMApp application, String regex) { + sendAppSavedEvent(application); assertTimesAtFinish(application); assertAppState(RMAppState.FAILED, application); assertFinalAppStatus(FinalApplicationStatus.FAILED, application); @@ -323,6 +333,13 @@ private static void assertFailed(RMApp application, String regex) { diag.toString().matches(regex)); } + private void sendAppSavedEvent(RMApp application) { + RMAppEvent event = + new RMAppStoredEvent(application.getApplicationId(), null); + application.handle(event); + rmDispatcher.await(); + } + protected RMApp testCreateAppNewSaving( ApplicationSubmissionContext submissionContext) throws IOException { RMApp application = createNewTestApp(submissionContext); @@ -386,15 +403,15 @@ protected RMApp testCreateAppRunning( return application; } - protected RMApp testCreateAppRemoving( + protected RMApp testCreateAppFinalSaving( ApplicationSubmissionContext submissionContext) throws IOException { RMApp application = testCreateAppRunning(submissionContext); RMAppEvent finishingEvent = new RMAppEvent(application.getApplicationId(), RMAppEventType.ATTEMPT_UNREGISTERED); application.handle(finishingEvent); - assertAppState(RMAppState.REMOVING, application); - assertAppRemoved(application); + assertAppState(RMAppState.FINAL_SAVING, application); + assertAppFinalStateSaved(application); return application; } @@ -402,10 +419,10 @@ protected RMApp testCreateAppFinishing( ApplicationSubmissionContext submissionContext) throws IOException { // unmanaged AMs don't use the FINISHING state assert submissionContext == null || !submissionContext.getUnmanagedAM(); - RMApp application = testCreateAppRemoving(submissionContext); - // REMOVING => FINISHING event RMAppEventType.APP_REMOVED + RMApp application = testCreateAppFinalSaving(submissionContext); + // FINAL_SAVING => FINISHING event RMAppEventType.APP_SAVED RMAppEvent finishingEvent = - new RMAppRemovedEvent(application.getApplicationId(), null); + new RMAppStoredEvent(application.getApplicationId(), null); application.handle(finishingEvent); assertAppState(RMAppState.FINISHING, application); assertTimesAtFinish(application); @@ -552,7 +569,6 @@ public void testAppSubmittedKill() throws IOException, InterruptedException { RMAppEventType.KILL); application.handle(event); rmDispatcher.await(); - assertKilled(application); assertAppAndAttemptKilled(application); } @@ -597,7 +613,6 @@ public void testAppAcceptedKill() throws IOException, InterruptedException { RMAppEventType.KILL); application.handle(event); rmDispatcher.await(); - assertKilled(application); assertAppAndAttemptKilled(application); } @@ -666,10 +681,10 @@ public void testAppRunningFailed() throws IOException { } @Test - public void testAppRemovingFinished() throws IOException { - LOG.info("--- START: testAppRemovingFINISHED ---"); - RMApp application = testCreateAppRemoving(null); - // APP_REMOVING => FINISHED event RMAppEventType.ATTEMPT_FINISHED + public void testAppSavingFinalStateFinished() throws IOException { + LOG.info("--- START: testAppSavingFinalStateFinished ---"); + RMApp application = testCreateAppFinalSaving(null); + // FINAL_SAVING => FINISHED event RMAppEventType.ATTEMPT_FINISHED RMAppEvent finishedEvent = new RMAppFinishedAttemptEvent( application.getApplicationId(), null); application.handle(finishedEvent); @@ -678,10 +693,10 @@ public void testAppRemovingFinished() throws IOException { } @Test - public void testAppRemovingKilled() throws IOException { - LOG.info("--- START: testAppRemovingKilledD ---"); - RMApp application = testCreateAppRemoving(null); - // APP_REMOVING => KILLED event RMAppEventType.KILL + public void testAppFinalSavingKilled() throws IOException { + LOG.info("--- START: testAppFinalSavingKilled ---"); + RMApp application = testCreateAppFinalSaving(null); + // FINAL_SAVING => KILLED event RMAppEventType.KILL RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); application.handle(event); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index 1f3c506..ca51b1a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -330,6 +330,7 @@ private void testAppAttemptSubmittedState() { * {@link RMAppAttemptState#SUBMITTED} -> {@link RMAppAttemptState#FAILED} */ private void testAppAttemptSubmittedToFailedState(String diagnostics) { + sendAttemptSavedEvent(applicationAttempt); assertEquals(RMAppAttemptState.FAILED, applicationAttempt.getAppAttemptState()); assertEquals(diagnostics, applicationAttempt.getDiagnostics()); @@ -354,6 +355,7 @@ private void testAppAttemptSubmittedToFailedState(String diagnostics) { */ private void testAppAttemptKilledState(Container amContainer, String diagnostics) { + sendAttemptSavedEvent(applicationAttempt); assertEquals(RMAppAttemptState.KILLED, applicationAttempt.getAppAttemptState()); assertEquals(diagnostics, applicationAttempt.getDiagnostics()); @@ -427,6 +429,7 @@ private void testAppAttemptAllocatedState(Container amContainer) { */ private void testAppAttemptFailedState(Container container, String diagnostics) { + sendAttemptSavedEvent(applicationAttempt); assertEquals(RMAppAttemptState.FAILED, applicationAttempt.getAppAttemptState()); assertEquals(diagnostics, applicationAttempt.getDiagnostics()); @@ -483,6 +486,7 @@ private void testAppAttemptFinishingState(Container container, FinalApplicationStatus finalStatus, String trackingUrl, String diagnostics) { + sendAttemptSavedEvent(applicationAttempt); assertEquals(RMAppAttemptState.FINISHING, applicationAttempt.getAppAttemptState()); assertEquals(diagnostics, applicationAttempt.getDiagnostics()); @@ -647,7 +651,13 @@ private void testUnmanagedAMSuccess(String url) { testAppAttemptFinishedState(null, finalStatus, url, diagnostics, 1, true); } - + + private void sendAttemptSavedEvent(RMAppAttempt applicationAttempt) { + applicationAttempt.handle( + new RMAppAttemptStoredEvent( + applicationAttempt.getAppAttemptId(), null)); + } + @Test public void testUnmanagedAMUnexpectedRegistration() { unmanagedAM = true; @@ -745,6 +755,7 @@ public void testAMCrashAtAllocated() { ContainerState.COMPLETE, containerDiagMsg, exitCode); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( applicationAttempt.getAppAttemptId(), cs)); + sendAttemptSavedEvent(applicationAttempt); assertEquals(RMAppAttemptState.FAILED, applicationAttempt.getAppAttemptState()); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); @@ -762,6 +773,7 @@ public void testRunningToFailed() { ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId(); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( appAttemptId, cs)); + sendAttemptSavedEvent(applicationAttempt); assertEquals(RMAppAttemptState.FAILED, applicationAttempt.getAppAttemptState()); assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); @@ -782,6 +794,7 @@ public void testRunningToKilled() { new RMAppAttemptEvent( applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.KILL)); + sendAttemptSavedEvent(applicationAttempt); assertEquals(RMAppAttemptState.KILLED, applicationAttempt.getAppAttemptState()); assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); @@ -800,6 +813,7 @@ public void testLaunchedExpire() { launchApplicationAttempt(amContainer); applicationAttempt.handle(new RMAppAttemptEvent( applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.EXPIRE)); + sendAttemptSavedEvent(applicationAttempt); assertEquals(RMAppAttemptState.FAILED, applicationAttempt.getAppAttemptState()); assertTrue("expire diagnostics missing", @@ -818,6 +832,7 @@ public void testRunningExpire() { runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false); applicationAttempt.handle(new RMAppAttemptEvent( applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.EXPIRE)); + sendAttemptSavedEvent(applicationAttempt); assertEquals(RMAppAttemptState.FAILED, applicationAttempt.getAppAttemptState()); assertTrue("expire diagnostics missing",