diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto index eeb1479..31758f9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto @@ -71,14 +71,52 @@ message GetGroupsForUserResponseProto { //////////////////////////////////////////////////////////////////////// ////// RM recovery related records ///////////////////////////////////// //////////////////////////////////////////////////////////////////////// +enum RMAppAttemptStateProto { + RMATTEMPT_NEW = 1; + RMATTEMPT_SUBMITTED = 2; + RMATTEMPT_SCHEDULED = 3; + RMATTEMPT_ALLOCATED = 4; + RMATTEMPT_LAUNCHED = 5; + RMATTEMPT_FAILED = 6; + RMATTEMPT_RUNNING = 7; + RMATTEMPT_FINISHING = 8; + RMATTEMPT_FINISHED = 9; + RMATTEMPT_KILLED = 10; + RMATTEMPT_ALLOCATED_SAVING = 11; + RMATTEMPT_LAUNCHED_UNMANAGED_SAVING = 12; + RMATTEMPT_RECOVERED = 13; + RMATTEMPT_FINAL_SAVING = 14; +} + +enum RMAppStateProto { + RMAPP_NEW = 1; + RMAPP_NEW_SAVING = 2; + RMAPP_SUBMITTED = 3; + RMAPP_ACCEPTED = 4; + RMAPP_RUNNING = 5; + RMAPP_FINAL_SAVING = 6; + RMAPP_FINISHING = 7; + RMAPP_FINISHED = 8; + RMAPP_FAILED = 9; + RMAPP_KILLED = 10; +} + message ApplicationStateDataProto { optional int64 submit_time = 1; optional ApplicationSubmissionContextProto application_submission_context = 2; optional string user = 3; + optional RMAppStateProto application_state = 4; + optional string diagnostics = 5 [default = "N/A"]; + optional int64 finish_time = 6; } message ApplicationAttemptStateDataProto { optional ApplicationAttemptIdProto attemptId = 1; optional ContainerProto master_container = 2; optional bytes app_attempt_tokens = 3; + optional RMAppAttemptStateProto app_attempt_state = 4; + optional string orig_tracking_url = 5; + optional string diagnostics = 6 [default = "N/A"]; + optional int64 start_time = 7; + optional FinalApplicationStatusProto final_application_status = 8; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index e85ba92..cd089c6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -134,7 +134,10 @@ private void loadRMAppState(RMState rmState) throws Exception { ApplicationState appState = new ApplicationState(appStateData.getSubmitTime(), appStateData.getApplicationSubmissionContext(), - appStateData.getUser()); + appStateData.getUser(), + appStateData.getState(), + appStateData.getDiagnostics(), + appStateData.getFinishTime()); // assert child node name is same as actual applicationId assert appId.equals(appState.context.getApplicationId()); rmState.appState.put(appId, appState); @@ -156,7 +159,12 @@ private void loadRMAppState(RMState rmState) throws Exception { } ApplicationAttemptState attemptState = new ApplicationAttemptState(attemptId, - attemptStateData.getMasterContainer(), credentials); + attemptStateData.getMasterContainer(), credentials, + attemptStateData.getStartTime(), + attemptStateData.getState(), + attemptStateData.getOrigTrackingUrl(), + attemptStateData.getDiagnostics(), + attemptStateData.getFinalApplicationStatus()); // assert child node name is same as application attempt id assert attemptId.equals(attemptState.getAttemptId()); @@ -232,7 +240,7 @@ private void loadRMDTSecretManagerState(RMState rmState) throws Exception { } @Override - public synchronized void storeApplicationState(String appId, + public synchronized void storeApplicationStateInternal(String appId, ApplicationStateDataPBImpl appStateDataPB) throws Exception { Path appDirPath = getAppDir(rmAppRoot, appId); fs.mkdirs(appDirPath); @@ -251,7 +259,7 @@ public synchronized void storeApplicationState(String appId, } @Override - public synchronized void storeApplicationAttemptState(String attemptId, + public synchronized void storeApplicationAttemptStateInternal(String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB) throws Exception { ApplicationAttemptId appAttemptId = ConverterUtils.toApplicationAttemptId(attemptId); @@ -370,10 +378,15 @@ private void deleteFile(Path deletePath) throws Exception { * atomic for underlying file system. */ private void writeFile(Path outputPath, byte[] data) throws Exception { + if (fs.exists(outputPath)) { + deleteFile(outputPath); + } Path tempPath = new Path(outputPath.getParent(), outputPath.getName() + ".tmp"); FSDataOutputStream fsOut = null; - fsOut = fs.create(tempPath, false); + // This file will be overwritten when app/attempt finishes for saving the + // final status. + fsOut = fs.create(tempPath, true); fsOut.write(data); fsOut.close(); fs.rename(tempPath, outputPath); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index 0852ce8..611f403 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -76,22 +76,28 @@ protected synchronized void closeInternal() throws Exception { } @Override - public void storeApplicationState(String appId, + public void storeApplicationStateInternal(String appId, ApplicationStateDataPBImpl appStateData) throws Exception { - ApplicationState appState = new ApplicationState( - appStateData.getSubmitTime(), - appStateData.getApplicationSubmissionContext(), appStateData.getUser()); + ApplicationState appState = + new ApplicationState(appStateData.getSubmitTime(), + appStateData.getApplicationSubmissionContext(), + appStateData.getUser(), appStateData.getState(), + appStateData.getDiagnostics(), appStateData.getFinishTime()); if (state.appState.containsKey(appState.getAppId())) { - Exception e = new IOException("App: " + appId + " is already stored."); - LOG.info("Error storing info for app: " + appId, e); - throw e; + LOG.info("Storing final state " + + appStateData.getState() + " for app: " + appId); + } + if (state.appState.get(appState.getAppId()) != null) { + // add the earlier attempts back + appState.attempts + .putAll(state.appState.get(appState.getAppId()).attempts); } state.appState.put(appState.getAppId(), appState); } @Override - public synchronized void storeApplicationAttemptState(String attemptIdStr, + public synchronized void storeApplicationAttemptStateInternal(String attemptIdStr, ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception { ApplicationAttemptId attemptId = ConverterUtils @@ -105,20 +111,20 @@ public synchronized void storeApplicationAttemptState(String attemptIdStr, } ApplicationAttemptState attemptState = new ApplicationAttemptState(attemptId, - attemptStateData.getMasterContainer(), credentials); + attemptStateData.getMasterContainer(), credentials, + attemptStateData.getStartTime(), attemptStateData.getState(), + attemptStateData.getOrigTrackingUrl(), + attemptStateData.getDiagnostics(), + attemptStateData.getFinalApplicationStatus()); ApplicationState appState = state.getApplicationState().get( attemptState.getAttemptId().getApplicationId()); if (appState == null) { throw new YarnRuntimeException("Application doesn't exist"); } - if (appState.attempts.containsKey(attemptState.getAttemptId())) { - Exception e = new IOException("Attempt: " + - attemptState.getAttemptId() + " is already stored."); - LOG.info("Error storing info for attempt: " + - attemptState.getAttemptId(), e); - throw e; + LOG.info("storing final state " + attemptState.getState() + + " for attempt: " + attemptState.getAttemptId()); } appState.attempts.put(attemptState.getAttemptId(), attemptState); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java index 003346b..dfbe21b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java @@ -50,13 +50,13 @@ public RMState loadState() throws Exception { } @Override - protected void storeApplicationState(String appId, + protected void storeApplicationStateInternal(String appId, ApplicationStateDataPBImpl appStateData) throws Exception { // Do nothing } @Override - protected void storeApplicationAttemptState(String attemptId, + protected void storeApplicationAttemptStateInternal(String attemptId, ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception { // Do nothing } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 2f4b896..4b58888 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; @@ -50,9 +51,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRemovedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent; @Private @@ -86,13 +89,32 @@ public RMStateStore() { final ApplicationAttemptId attemptId; final Container masterContainer; final Credentials appAttemptCredentials; + long startTime = 0; + // fields set when attempt completes + RMAppAttemptState state; + String origTrackingUrl = "N/A"; + String diagnostics; + FinalApplicationStatus amUnregisteredFinalStatus; + + public ApplicationAttemptState(ApplicationAttemptId attemptId, + Container masterContainer, Credentials appAttemptCredentials, + long startTime) { + this(attemptId, masterContainer, appAttemptCredentials, startTime, null, + null, "", null); + } public ApplicationAttemptState(ApplicationAttemptId attemptId, - Container masterContainer, - Credentials appAttemptCredentials) { + Container masterContainer, Credentials appAttemptCredentials, + long startTime, RMAppAttemptState state, String origTrackingUrl, + String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus) { this.attemptId = attemptId; this.masterContainer = masterContainer; this.appAttemptCredentials = appAttemptCredentials; + this.startTime = startTime; + this.state = state; + this.origTrackingUrl = origTrackingUrl; + this.diagnostics = diagnostics == null ? "" : diagnostics; + this.amUnregisteredFinalStatus = amUnregisteredFinalStatus; } public Container getMasterContainer() { @@ -104,6 +126,21 @@ public ApplicationAttemptId getAttemptId() { public Credentials getAppAttemptCredentials() { return appAttemptCredentials; } + public RMAppAttemptState getState(){ + return state; + } + public String getOrigTrackingUrl() { + return origTrackingUrl; + } + public String getDiagnostics() { + return diagnostics; + } + public long getStartTime() { + return startTime; + } + public FinalApplicationStatus getFinalApplicationStatus() { + return amUnregisteredFinalStatus; + } } /** @@ -115,12 +152,25 @@ public Credentials getAppAttemptCredentials() { final String user; Map attempts = new HashMap(); - - ApplicationState(long submitTime, ApplicationSubmissionContext context, - String user) { + // fields set when application completes. + RMAppState state; + String diagnostics; + long finishTime; + + public ApplicationState(long submitTime, + ApplicationSubmissionContext context, String user) { + this(submitTime, context, user, null, "", 0); + } + + public ApplicationState(long submitTime, + ApplicationSubmissionContext context, String user, + RMAppState state, String diagnostics, long finishTime) { this.submitTime = submitTime; this.context = context; this.user = user; + this.state = state; + this.diagnostics = diagnostics == null ? "" : diagnostics; + this.finishTime = finishTime; } public ApplicationId getAppId() { @@ -141,6 +191,15 @@ public ApplicationAttemptState getAttempt(ApplicationAttemptId attemptId) { public String getUser() { return user; } + public RMAppState getState() { + return state; + } + public String getDiagnostics() { + return diagnostics; + } + public long getFinishTime() { + return finishTime; + } } public static class RMDTSecretManagerState { @@ -249,23 +308,27 @@ public synchronized void serviceStop() throws Exception { * RMAppStoredEvent will be sent on completion to notify the RMApp */ @SuppressWarnings("unchecked") - public synchronized void storeApplication(RMApp app) { + public synchronized void storeNewApplication(RMApp app) { ApplicationSubmissionContext context = app .getApplicationSubmissionContext(); assert context instanceof ApplicationSubmissionContextPBImpl; - ApplicationState appState = new ApplicationState( - app.getSubmitTime(), context, app.getUser()); + ApplicationState appState = + new ApplicationState(app.getSubmitTime(), context, app.getUser()); dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState)); } - + + @SuppressWarnings("unchecked") + public synchronized void updateApplicationState(ApplicationState appState) { + dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState)); + } + /** * Blocking API * Derived classes must implement this method to store the state of an * application. */ - protected abstract void storeApplicationState(String appId, - ApplicationStateDataPBImpl appStateData) - throws Exception; + protected abstract void storeApplicationStateInternal(String appId, + ApplicationStateDataPBImpl appStateData) throws Exception; @SuppressWarnings("unchecked") /** @@ -274,25 +337,32 @@ protected abstract void storeApplicationState(String appId, * This does not block the dispatcher threads * RMAppAttemptStoredEvent will be sent on completion to notify the RMAppAttempt */ - public synchronized void storeApplicationAttempt(RMAppAttempt appAttempt) { + public synchronized void storeNewApplicationAttempt(RMAppAttempt appAttempt) { Credentials credentials = getCredentialsFromAppAttempt(appAttempt); ApplicationAttemptState attemptState = new ApplicationAttemptState(appAttempt.getAppAttemptId(), - appAttempt.getMasterContainer(), credentials); + appAttempt.getMasterContainer(), credentials, + appAttempt.getStartTime()); dispatcher.getEventHandler().handle( new RMStateStoreAppAttemptEvent(attemptState)); } - + + @SuppressWarnings("unchecked") + public synchronized void updateApplicationAttemptState( + ApplicationAttemptState attemptState) { + dispatcher.getEventHandler().handle( + new RMStateStoreAppAttemptEvent(attemptState)); + } + /** * Blocking API * Derived classes must implement this method to store the state of an * application attempt */ - protected abstract void storeApplicationAttemptState(String attemptId, - ApplicationAttemptStateDataPBImpl attemptStateData) - throws Exception; + protected abstract void storeApplicationAttemptStateInternal(String attemptId, + ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception; /** @@ -378,7 +448,8 @@ public synchronized void removeApplication(RMApp app) { Credentials credentials = getCredentialsFromAppAttempt(appAttempt); ApplicationAttemptState attemptState = new ApplicationAttemptState(appAttempt.getAppAttemptId(), - appAttempt.getMasterContainer(), credentials); + appAttempt.getMasterContainer(), credentials, + appAttempt.getStartTime()); appState.attempts.put(attemptState.getAttemptId(), attemptState); } @@ -409,7 +480,7 @@ protected abstract void removeApplicationState(ApplicationState appState) public static final Text AM_CLIENT_TOKEN_MASTER_KEY_NAME = new Text("YARN_CLIENT_TOKEN_MASTER_KEY"); - private Credentials getCredentialsFromAppAttempt(RMAppAttempt appAttempt) { + public Credentials getCredentialsFromAppAttempt(RMAppAttempt appAttempt) { Credentials credentials = new Credentials(); Token appToken = appAttempt.getAMRMToken(); if(appToken != null){ @@ -430,21 +501,22 @@ private synchronized void handleStoreEvent(RMStateStoreEvent event) { switch(event.getType()) { case STORE_APP: { - ApplicationState apptState = + ApplicationState appState = ((RMStateStoreAppEvent) event).getAppState(); Exception storedException = null; ApplicationStateDataPBImpl appStateData = - new ApplicationStateDataPBImpl(); - appStateData.setSubmitTime(apptState.getSubmitTime()); - appStateData.setApplicationSubmissionContext( - apptState.getApplicationSubmissionContext()); - appStateData.setUser(apptState.getUser()); + (ApplicationStateDataPBImpl) ApplicationStateDataPBImpl + .newApplicationStateData(appState.getSubmitTime(), + appState.getUser(), appState.getApplicationSubmissionContext(), + appState.getState(), appState.getDiagnostics(), + appState.getFinishTime()); + ApplicationId appId = - apptState.getApplicationSubmissionContext().getApplicationId(); + appState.getApplicationSubmissionContext().getApplicationId(); LOG.info("Storing info for app: " + appId); try { - storeApplicationState(appId.toString(), appStateData); + storeApplicationStateInternal(appId.toString(), appStateData); } catch (Exception e) { LOG.error("Error storing app: " + appId, e); storedException = e; @@ -468,14 +540,18 @@ private synchronized void handleStoreEvent(RMStateStoreEvent event) { appAttemptTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); } - ApplicationAttemptStateDataPBImpl attemptStateData = - (ApplicationAttemptStateDataPBImpl) ApplicationAttemptStateDataPBImpl - .newApplicationAttemptStateData(attemptState.getAttemptId(), - attemptState.getMasterContainer(), appAttemptTokens); - if (LOG.isDebugEnabled()) { + ApplicationAttemptStateDataPBImpl attemptStateData = + (ApplicationAttemptStateDataPBImpl) ApplicationAttemptStateDataPBImpl + .newApplicationAttemptStateData(attemptState.getAttemptId(), + attemptState.getMasterContainer(), appAttemptTokens, + attemptState.getStartTime(), attemptState.getState(), + attemptState.getOrigTrackingUrl(), + attemptState.getDiagnostics(), + attemptState.getFinalApplicationStatus()); + if (LOG.isDebugEnabled()) { LOG.debug("Storing info for attempt: " + attemptState.getAttemptId()); } - storeApplicationAttemptState(attemptState.getAttemptId().toString(), + storeApplicationAttemptStateInternal(attemptState.getAttemptId().toString(), attemptStateData); } catch (Exception e) { LOG.error("Error storing appAttempt: " diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 41c95d3..3daf4ac 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -18,7 +18,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; -import com.google.common.annotations.VisibleForTesting; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -27,6 +34,8 @@ import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.ZKUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -37,9 +46,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; -import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.util.ZKUtil; - import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; @@ -51,13 +57,7 @@ import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; +import com.google.common.annotations.VisibleForTesting; @Private @Unstable @@ -224,8 +224,11 @@ private synchronized void loadRMAppState(RMState rmState) throws Exception { ApplicationStateDataProto.parseFrom(childData)); ApplicationState appState = new ApplicationState(appStateData.getSubmitTime(), - appStateData.getApplicationSubmissionContext(), - appStateData.getUser()); + appStateData.getApplicationSubmissionContext(), + appStateData.getUser(), + appStateData.getState(), + appStateData.getDiagnostics(), + appStateData.getFinishTime()); if (!appId.equals(appState.context.getApplicationId())) { throw new YarnRuntimeException("The child node name is different " + "from the application id"); @@ -249,7 +252,12 @@ private synchronized void loadRMAppState(RMState rmState) throws Exception { } ApplicationAttemptState attemptState = new ApplicationAttemptState(attemptId, - attemptStateData.getMasterContainer(), credentials); + attemptStateData.getMasterContainer(), credentials, + attemptStateData.getStartTime(), + attemptStateData.getState(), + attemptStateData.getOrigTrackingUrl(), + attemptStateData.getDiagnostics(), + attemptStateData.getFinalApplicationStatus()); if (!attemptId.equals(attemptState.getAttemptId())) { throw new YarnRuntimeException("The child node name is different " + "from the application attempt id"); @@ -280,7 +288,7 @@ private synchronized void loadRMAppState(RMState rmState) throws Exception { } @Override - public synchronized void storeApplicationState( + public synchronized void storeApplicationStateInternal( String appId, ApplicationStateDataPBImpl appStateDataPB) throws Exception { String nodeCreatePath = getNodePath(rmAppRoot, appId); @@ -289,12 +297,16 @@ public synchronized void storeApplicationState( LOG.debug("Storing info for app: " + appId + " at: " + nodeCreatePath); } byte[] appStateData = appStateDataPB.getProto().toByteArray(); - createWithRetries( - nodeCreatePath, appStateData, zkAcl, CreateMode.PERSISTENT); + if (zkClient.exists(nodeCreatePath, true) != null) { + setDataWithRetries(nodeCreatePath, appStateData, 0); + } else { + createWithRetries(nodeCreatePath, appStateData, zkAcl, + CreateMode.PERSISTENT); + } } @Override - public synchronized void storeApplicationAttemptState( + public synchronized void storeApplicationAttemptStateInternal( String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB) throws Exception { String nodeCreatePath = getNodePath(rmAppRoot, attemptId); @@ -303,8 +315,12 @@ public synchronized void storeApplicationAttemptState( + nodeCreatePath); } byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray(); - createWithRetries(nodeCreatePath, attemptStateData, zkAcl, + if (zkClient.exists(nodeCreatePath, true) != null) { + setDataWithRetries(nodeCreatePath, attemptStateData, 0); + } else { + createWithRetries(nodeCreatePath, attemptStateData, zkAcl, CreateMode.PERSISTENT); + } } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java index 2622b0e..778c846 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java @@ -24,6 +24,8 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; /* * Contains the state data that needs to be persisted for an ApplicationAttempt @@ -61,4 +63,44 @@ public ByteBuffer getAppAttemptTokens(); public void setAppAttemptTokens(ByteBuffer attemptTokens); + + /** + * Get the final state of the application attempt. + * @return the final state of the application attempt. + */ + public RMAppAttemptState getState(); + + public void setState(RMAppAttemptState state); + + /** + * Get the original not-proxied tracking url for the application. + * This is intended to only be used by the proxy itself. + * @return the original not-proxied tracking url for the application + */ + public String getOrigTrackingUrl(); + + public void setOrigTrackingUrl(String url); + /** + * Get the diagnositic information of the attempt + * @return diagnositic information of the attempt + */ + public String getDiagnostics(); + + public void setDiagnostics(String diagnostics); + + /** + * Get the start time of the application. + * @return start time of the application + */ + public long getStartTime(); + + public void setStartTime(long startTime); + + /** + * Get the final finish status of the application. + * @return final finish status of the application + */ + public FinalApplicationStatus getFinalApplicationStatus(); + + public void setFinalApplicationStatus(FinalApplicationStatus finishState); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java index 35b12e5..3832ba0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java @@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; /** * Contains all the state data that needs to be stored persistently @@ -66,6 +67,29 @@ @Public @Unstable public void setApplicationSubmissionContext( - ApplicationSubmissionContext context); + ApplicationSubmissionContext context); + /** + * Get the final state of the application. + * @return the final state of the application. + */ + public RMAppState getState(); + + public void setState(RMAppState state); + + /** + * Get the diagnostics information for the application master. + * @return the diagnostics information for the application master. + */ + public String getDiagnostics(); + + public void setDiagnostics(String diagnostics); + + /** + * The finish time of the application. + * @return the finish time of the application., + */ + public long getFinishTime(); + + public void setFinishTime(long finishTime); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java index b539bec..004ab3f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java @@ -22,14 +22,19 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMAppAttemptStateProto; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; public class ApplicationAttemptStateDataPBImpl extends ProtoBase @@ -156,14 +161,125 @@ public void setAppAttemptTokens(ByteBuffer attemptTokens) { this.appAttemptTokens = attemptTokens; } + @Override + public RMAppAttemptState getState() { + ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasAppAttemptState()) { + return null; + } + return convertFromProtoFormat(p.getAppAttemptState()); + } + + @Override + public void setState(RMAppAttemptState state) { + maybeInitBuilder(); + if (state == null) { + builder.clearAppAttemptState(); + return; + } + builder.setAppAttemptState(convertToProtoFormat(state)); + } + + @Override + public String getOrigTrackingUrl() { + ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasOrigTrackingUrl()) { + return null; + } + return p.getOrigTrackingUrl(); + } + + @Override + public void setOrigTrackingUrl(String url) { + maybeInitBuilder(); + if (url == null) { + builder.clearOrigTrackingUrl(); + return; + } + builder.setOrigTrackingUrl(url); + } + + @Override + public String getDiagnostics() { + ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasDiagnostics()) { + return null; + } + return p.getDiagnostics(); + } + + @Override + public void setDiagnostics(String diagnostics) { + maybeInitBuilder(); + if (diagnostics == null) { + builder.clearDiagnostics(); + return; + } + builder.setDiagnostics(diagnostics); + } + + @Override + public long getStartTime() { + ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; + return p.getStartTime(); + } + + @Override + public void setStartTime(long startTime) { + maybeInitBuilder(); + builder.setStartTime(startTime); + } + + @Override + public FinalApplicationStatus getFinalApplicationStatus() { + ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasFinalApplicationStatus()) { + return null; + } + return convertFromProtoFormat(p.getFinalApplicationStatus()); + } + + @Override + public void setFinalApplicationStatus(FinalApplicationStatus finishState) { + maybeInitBuilder(); + if (finishState == null) { + builder.clearFinalApplicationStatus(); + return; + } + builder.setFinalApplicationStatus(convertToProtoFormat(finishState)); + } + public static ApplicationAttemptStateData newApplicationAttemptStateData( ApplicationAttemptId attemptId, Container container, - ByteBuffer attemptTokens) { + ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState, + String origTrackingUrl, String diagnostics, + FinalApplicationStatus amUnregisteredFinalStatus) { ApplicationAttemptStateData attemptStateData = recordFactory.newRecordInstance(ApplicationAttemptStateData.class); attemptStateData.setAttemptId(attemptId); attemptStateData.setMasterContainer(container); attemptStateData.setAppAttemptTokens(attemptTokens); + attemptStateData.setState(finalState); + attemptStateData.setOrigTrackingUrl(origTrackingUrl); + attemptStateData.setDiagnostics(diagnostics); + attemptStateData.setStartTime(startTime); + attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus); return attemptStateData; } + + private static String RM_APP_ATTEMPT_PREFIX = "RMATTEMPT_"; + public static RMAppAttemptStateProto convertToProtoFormat(RMAppAttemptState e) { + return RMAppAttemptStateProto.valueOf(RM_APP_ATTEMPT_PREFIX + e.name()); + } + public static RMAppAttemptState convertFromProtoFormat(RMAppAttemptStateProto e) { + return RMAppAttemptState.valueOf(e.name().replace(RM_APP_ATTEMPT_PREFIX, "")); + } + + private FinalApplicationStatusProto convertToProtoFormat(FinalApplicationStatus s) { + return ProtoUtils.convertToProtoFormat(s); + } + private FinalApplicationStatus convertFromProtoFormat(FinalApplicationStatusProto s) { + return ProtoUtils.convertFromProtoFormat(s); + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java index b02e056..68fb41d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java @@ -21,14 +21,20 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMAppStateProto; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; public class ApplicationStateDataPBImpl extends ProtoBase implements ApplicationStateData { - + private static final RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); + ApplicationStateDataProto proto = ApplicationStateDataProto.getDefaultInstance(); ApplicationStateDataProto.Builder builder = null; @@ -132,4 +138,76 @@ public void setApplicationSubmissionContext( this.applicationSubmissionContext = context; } + @Override + public RMAppState getState() { + ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasApplicationState()) { + return null; + } + return convertFromProtoFormat(p.getApplicationState()); + } + + @Override + public void setState(RMAppState finalState) { + maybeInitBuilder(); + if (finalState == null) { + builder.clearApplicationState(); + return; + } + builder.setApplicationState(convertToProtoFormat(finalState)); + } + + @Override + public String getDiagnostics() { + ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasDiagnostics()) { + return null; + } + return p.getDiagnostics(); + } + + @Override + public void setDiagnostics(String diagnostics) { + maybeInitBuilder(); + if (diagnostics == null) { + builder.clearDiagnostics(); + return; + } + builder.setDiagnostics(diagnostics); + } + + @Override + public long getFinishTime() { + ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder; + return p.getFinishTime(); + } + + @Override + public void setFinishTime(long finishTime) { + maybeInitBuilder(); + builder.setFinishTime(finishTime); + } + + public static ApplicationStateData newApplicationStateData(long submitTime, + String user, ApplicationSubmissionContext submissionContext, + RMAppState state, String diagnostics, long finishTime) { + + ApplicationStateData appState = + recordFactory.newRecordInstance(ApplicationStateData.class); + appState.setSubmitTime(submitTime); + appState.setUser(user); + appState.setApplicationSubmissionContext(submissionContext); + appState.setState(state); + appState.setDiagnostics(diagnostics); + appState.setFinishTime(finishTime); + return appState; + } + + private static String RM_APP_PREFIX = "RMAPP_"; + public static RMAppStateProto convertToProtoFormat(RMAppState e) { + return RMAppStateProto.valueOf(RM_APP_PREFIX + e.name()); + } + public static RMAppState convertFromProtoFormat(RMAppStateProto e) { + return RMAppState.valueOf(e.name().replace(RM_APP_PREFIX, "")); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index a8a0af4..b8be6ae 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -54,10 +54,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -103,7 +101,8 @@ // Mutable fields private long startTime; - private long finishTime; + private long finishTime = 0; + private long finishingTime = 0; private RMAppAttempt currentAttempt; private String queue; @SuppressWarnings("rawtypes") @@ -111,8 +110,12 @@ private static final FinalTransition FINAL_TRANSITION = new FinalTransition(); private static final AppFinishedTransition FINISHED_TRANSITION = new AppFinishedTransition(); - private boolean isAppRemovalRequestSent = false; - private RMAppState previousStateAtRemoving; + private RMAppState stateBeforeFinalSaving; + private boolean isFinalSavingRequestSent; + private RMAppEvent eventCausingFinalSaving; + private RMAppState targetedFinalState; + private RMAppState recoveredFinalState; + Object transitionTodo; private static final StateMachineFactory { + + @SuppressWarnings({ "rawtypes", "unchecked" }) @Override - public void transition(RMAppImpl app, RMAppEvent event) { - if (event.getType().equals(RMAppEventType.APP_REMOVED)) { - RMAppRemovedEvent removeEvent = (RMAppRemovedEvent) event; - if (removeEvent.getRemovedException() != null) { - LOG.error( - "Failed to remove application: " + removeEvent.getApplicationId(), - removeEvent.getRemovedException()); - ExitUtil.terminate(1, removeEvent.getRemovedException()); - } + public RMAppState transition(RMAppImpl app, RMAppEvent event) { + RMAppStoredEvent storeEvent = (RMAppStoredEvent) event; + if (storeEvent.getStoredException() != null) { + LOG.error("Failed to store application final state: " + + storeEvent.getApplicationId(), storeEvent.getStoredException()); + ExitUtil.terminate(1, storeEvent.getStoredException()); + } + + if (app.transitionTodo instanceof SingleArcTransition) { + ((SingleArcTransition) app.transitionTodo).transition(app, + app.eventCausingFinalSaving); + } else if (app.transitionTodo instanceof MultipleArcTransition) { + ((MultipleArcTransition) app.transitionTodo).transition(app, + app.eventCausingFinalSaving); } - app.finishTime = System.currentTimeMillis(); + + if (app.transitionTodo instanceof AttemptUnregisteredTransition) { + return RMAppState.FINISHING; + } else { + return app.targetedFinalState; + } + } + } + + private static class AttemptFailedFinalStateSavedTransition extends + RMAppTransition { + @Override + public void transition(RMAppImpl app, RMAppEvent event) { + String msg = app.getAppAttemptFailedDiagnostics(event); + LOG.info(msg); + app.diagnostics.append(msg); + // Inform the node for app-finish + FINAL_TRANSITION.transition(app, event); } } + private String getAppAttemptFailedDiagnostics(RMAppEvent event) { + if (!(event instanceof RMAppFailedAttemptEvent)) { + return ""; + } + String msg = null; + RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event; + if (this.submissionContext.getUnmanagedAM()) { + // RM does not manage the AM. Do not retry + msg = "Unmanaged application " + this.getApplicationId() + + " failed due to " + failedEvent.getDiagnostics() + + ". Failing the application."; + } else if (this.attempts.size() >= this.maxAppAttempts) { + msg = "Application " + this.getApplicationId() + " failed " + + this.maxAppAttempts + " times due to " + + failedEvent.getDiagnostics() + ". Failing the application."; + } + return msg; + } + private static final class RMAppSavingTransition extends RMAppTransition { @Override public void transition(RMAppImpl app, RMAppEvent event) { + // If recovery is enabled then store the application information in a // non-blocking call so make sure that RM has stored the information // needed to restart the AM after RM restart without further client // communication LOG.info("Storing application with id " + app.applicationId); - app.rmContext.getStateStore().storeApplication(app); + app.rmContext.getStateStore().storeNewApplication(app); + } + } + + private static final class RMAppFinalStateSavingTransition extends + RMAppTransition { + Object transitionToDo; + RMAppState targetedFinalState; + + public RMAppFinalStateSavingTransition(Object transitionToDo, + RMAppState targetedFinalState) { + this.transitionToDo = transitionToDo; + this.targetedFinalState = targetedFinalState; + } + + @Override + public void transition(RMAppImpl app, RMAppEvent event) { + + app.stateBeforeFinalSaving = app.getState(); + app.transitionTodo = this.transitionToDo; + app.targetedFinalState = this.targetedFinalState; + app.eventCausingFinalSaving = event; + app.finishingTime = System.currentTimeMillis(); + // we lost attempt_finished diagnostics here, because attempt_finished + // diagnostics is sent after app final state is saved.fix me by also + // attaching the attempt diagnostics with Attempt_Unregistered event ?? + String diagnostics = app.getDiagnostics() + .append(app.getAppAttemptFailedDiagnostics(event)).toString(); + + if (!app.isFinalSavingRequestSent) { + LOG.info("Storing application " + app.applicationId + + "with final state: " + app.targetedFinalState); + ApplicationState appState = + new ApplicationState(app.getStartTime(), + app.getApplicationSubmissionContext(), app.getUser(), + app.targetedFinalState, diagnostics, app.finishingTime); + app.rmContext.getStateStore().updateApplicationState(appState); + app.isFinalSavingRequestSent = true; + } } } - private static final class RMAppRemovingTransition extends RMAppTransition { + private static class AttemptUnregisteredTransition extends RMAppTransition { @Override public void transition(RMAppImpl app, RMAppEvent event) { - LOG.info("Removing application with id " + app.applicationId); - app.removeApplicationState(); - app.previousStateAtRemoving = app.getState(); + app.finishTime = app.finishingTime; } } @@ -698,6 +814,33 @@ public void transition(RMAppImpl app, RMAppEvent event) { }; } + private static class AppFinishedFinalStateSavingTransition extends + RMAppTransition { + @Override + public void transition(RMAppImpl app, RMAppEvent event) { + // pass in the earlier attempt_unregistered event, as it is needed in + // AppFinishedFinalStateSavedTransition later on + new RMAppFinalStateSavingTransition( + new AppFinishedFinalStateSavedTransition(app.eventCausingFinalSaving), + RMAppState.FINISHED).transition(app, event); + }; + } + + private static class AppFinishedFinalStateSavedTransition extends + RMAppTransition { + RMAppEvent attemptUnregistered; + + public AppFinishedFinalStateSavedTransition(RMAppEvent attemptUnregistered) { + this.attemptUnregistered = attemptUnregistered; + } + @Override + public void transition(RMAppImpl app, RMAppEvent event) { + new AttemptUnregisteredTransition().transition(app, attemptUnregistered); + FINISHED_TRANSITION.transition(app, event); + }; + } + + private static class AppKilledTransition extends FinalTransition { @Override public void transition(RMAppImpl app, RMAppEvent event) { @@ -741,12 +884,10 @@ public void transition(RMAppImpl app, RMAppEvent event) { app.handler.handle( new RMNodeCleanAppEvent(nodeId, app.applicationId)); } - if (app.getState() != RMAppState.FINISHING) { + app.finishTime = app.finishingTime; + if (app.finishTime == 0 ) { app.finishTime = System.currentTimeMillis(); } - // application completely done and remove from state store. - app.removeApplicationState(); - app.handler.handle( new RMAppManagerEvent(app.applicationId, RMAppManagerEventType.APP_COMPLETED)); @@ -764,32 +905,15 @@ public AttemptFailedTransition(RMAppState initialState) { @Override public RMAppState transition(RMAppImpl app, RMAppEvent event) { - - RMAppFailedAttemptEvent failedEvent = ((RMAppFailedAttemptEvent) event); - boolean retryApp = true; - String msg = null; - if (app.submissionContext.getUnmanagedAM()) { - // RM does not manage the AM. Do not retry - retryApp = false; - msg = "Unmanaged application " + app.getApplicationId() - + " failed due to " + failedEvent.getDiagnostics() - + ". Failing the application."; - } else if (app.attempts.size() >= app.maxAppAttempts) { - retryApp = false; - msg = "Application " + app.getApplicationId() + " failed " - + app.maxAppAttempts + " times due to " + failedEvent.getDiagnostics() - + ". Failing the application."; - } - - if (retryApp) { + if (!app.submissionContext.getUnmanagedAM() + && app.attempts.size() < app.maxAppAttempts) { app.createNewAttempt(true); return initialState; } else { - LOG.info(msg); - app.diagnostics.append(msg); - // Inform the node for app-finish - FINAL_TRANSITION.transition(app, event); - return RMAppState.FAILED; + new RMAppFinalStateSavingTransition( + new AttemptFailedFinalStateSavedTransition(), RMAppState.FAILED) + .transition(app, event); + return RMAppState.FINAL_SAVING; } } @@ -814,9 +938,9 @@ public boolean isAppSafeToUnregister() { @Override public YarnApplicationState createApplicationState() { RMAppState rmAppState = getState(); - // If App is in REMOVING state, return its previous state. - if (rmAppState.equals(RMAppState.REMOVING)) { - rmAppState = previousStateAtRemoving; + // If App is in FINAL_SAVING state, return its previous state. + if (rmAppState.equals(RMAppState.FINAL_SAVING)) { + rmAppState = stateBeforeFinalSaving; } switch (rmAppState) { case NEW: @@ -840,11 +964,4 @@ public YarnApplicationState createApplicationState() { throw new YarnRuntimeException("Unknown state passed!"); } } - - private void removeApplicationState(){ - if (!isAppRemovalRequestSent) { - rmContext.getStateStore().removeApplication(this); - isAppRemovalRequestSent = true; - } - } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java index e9ce5b4..ececdae 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java @@ -24,7 +24,7 @@ SUBMITTED, ACCEPTED, RUNNING, - REMOVING, + FINAL_SAVING, FINISHING, FINISHED, FAILED, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index f68a4a5..65cae8e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -40,7 +40,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -157,6 +156,12 @@ private static final ExpiredTransition EXPIRED_TRANSITION = new ExpiredTransition(); + private boolean isFinalSavingRequestSent = false; + private RMAppAttemptEvent eventCausingFinalSaving; + private RMAppAttemptState targetedFinalState; + private RMAppAttemptState recoveredFinalState; + private Object transitionTodo; + private static final StateMachineFactory { + @Override + public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, + RMAppAttemptEvent event) { + if (appAttempt.recoveredFinalState != null) { + appAttempt.progress = 1.0f; + return appAttempt.recoveredFinalState; + } else { + return RMAppAttemptState.RECOVERED; + } + } + } + + private static class AttemptFinalStateSavingTransition extends BaseTransition { + + Object transitionToDo; + RMAppAttemptState targetedFinalState; + + public AttemptFinalStateSavingTransition(Object transitionToDo, + RMAppAttemptState targetedFinalState) { + this.transitionToDo = transitionToDo; + this.targetedFinalState = targetedFinalState; + } + + @Override + public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { + + appAttempt.transitionTodo = this.transitionToDo; + appAttempt.targetedFinalState = this.targetedFinalState; + appAttempt.eventCausingFinalSaving = event; + + appAttempt.eventCausingFinalSaving = event; + String diagnostics = appAttempt.getDiagnostics(); + String origTrackingUrl = appAttempt.getOriginalTrackingUrl(); + long startTime = appAttempt.getStartTime(); + FinalApplicationStatus finalStatus = appAttempt.getFinalApplicationStatus(); + if (event.getType().equals(RMAppAttemptEventType.UNREGISTERED)) { + RMAppAttemptUnregistrationEvent unregisterEvent = + (RMAppAttemptUnregistrationEvent) event; + diagnostics = unregisterEvent.getDiagnostics(); + origTrackingUrl = sanitizeTrackingUrl(unregisterEvent.getTrackingUrl()); + finalStatus = unregisterEvent.getFinalApplicationStatus(); + } + + if (!appAttempt.isFinalSavingRequestSent) { + RMStateStore rmStore = appAttempt.rmContext.getStateStore(); + ApplicationAttemptState attemptState = + new ApplicationAttemptState(appAttempt.applicationAttemptId, + appAttempt.getMasterContainer(), + rmStore.getCredentialsFromAppAttempt(appAttempt), startTime, + appAttempt.targetedFinalState, origTrackingUrl, diagnostics, + finalStatus); + LOG.info("Storing application attempt " + + appAttempt.applicationAttemptId + "with final state: " + + appAttempt.targetedFinalState); + rmStore.updateApplicationAttemptState(attemptState); + appAttempt.isFinalSavingRequestSent = true; + } + } + } + + private static class AttemptFinalStateSavedTransition implements + MultipleArcTransition { + @Override + public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, + RMAppAttemptEvent event) { + appAttempt.checkAttemptStoreError(event); + RMAppAttemptEvent causeEvent = appAttempt.eventCausingFinalSaving; + + if (appAttempt.transitionTodo instanceof SingleArcTransition) { + ((SingleArcTransition) appAttempt.transitionTodo).transition( + appAttempt, causeEvent); + } else if (appAttempt.transitionTodo instanceof MultipleArcTransition) { + ((MultipleArcTransition) appAttempt.transitionTodo).transition( + appAttempt, causeEvent); + } + + if (appAttempt.transitionTodo instanceof AMUnregisteredFinalStateSavedTransition) { + return RMAppAttemptState.FINISHING; + } else { + return appAttempt.targetedFinalState; + } + } + } private static class BaseFinalTransition extends BaseTransition { @@ -1125,38 +1265,50 @@ public void transition(RMAppAttemptImpl appAttempt, @Override public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { - ApplicationAttemptId appAttemptId = appAttempt.getAppAttemptId(); - - appAttempt.rmContext.getAMLivelinessMonitor().unregister(appAttemptId); - - appAttempt.progress = 1.0f; - - RMAppAttemptUnregistrationEvent unregisterEvent - = (RMAppAttemptUnregistrationEvent) event; - appAttempt.diagnostics.append(unregisterEvent.getDiagnostics()); - appAttempt.origTrackingUrl = - sanitizeTrackingUrl(unregisterEvent.getTrackingUrl()); - appAttempt.proxiedTrackingUrl = - appAttempt.generateProxyUriWithScheme(appAttempt.origTrackingUrl); - appAttempt.finalStatus = unregisterEvent.getFinalApplicationStatus(); - // Tell the app if (appAttempt.getSubmissionContext().getUnmanagedAM()) { // Unmanaged AMs have no container to wait for, so they skip // the FINISHING state and go straight to FINISHED. + appAttempt.unregisterApplicationMaster(appAttempt, event); new FinalTransition(RMAppAttemptState.FINISHED).transition( appAttempt, event); return RMAppAttemptState.FINISHED; } - appAttempt.rmContext.getAMFinishingMonitor().register(appAttemptId); + // Saving the attempt final state + new AttemptFinalStateSavingTransition( + new AMUnregisteredFinalStateSavedTransition(), + RMAppAttemptState.FINISHED).transition(appAttempt, event); ApplicationId applicationId = appAttempt.getAppAttemptId().getApplicationId(); - appAttempt.eventHandler.handle( - new RMAppEvent(applicationId, RMAppEventType.ATTEMPT_UNREGISTERED)); - return RMAppAttemptState.FINISHING; + appAttempt.eventHandler.handle(new RMAppEvent(applicationId, + RMAppEventType.ATTEMPT_UNREGISTERED)); + return RMAppAttemptState.FINAL_SAVING; + } + } + + private static class AMUnregisteredFinalStateSavedTransition extends + BaseTransition { + @Override + public void + transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { + appAttempt.unregisterApplicationMaster(appAttempt, event); } } + private void unregisterApplicationMaster(RMAppAttempt appAttempt, + RMAppAttemptEvent event) { + ApplicationAttemptId appAttemptId = appAttempt.getAppAttemptId(); + rmContext.getAMLivelinessMonitor().unregister(appAttemptId); + progress = 1.0f; + + RMAppAttemptUnregistrationEvent unregisterEvent = + (RMAppAttemptUnregistrationEvent) event; + setDiagnostics(unregisterEvent.getDiagnostics()); + origTrackingUrl = sanitizeTrackingUrl(unregisterEvent.getTrackingUrl()); + proxiedTrackingUrl = generateProxyUriWithScheme(origTrackingUrl); + finalStatus = unregisterEvent.getFinalApplicationStatus(); + } + private static final class ContainerAcquiredTransition extends BaseTransition { @Override @@ -1186,28 +1338,40 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, if (appAttempt.masterContainer != null && appAttempt.masterContainer.getId().equals( containerStatus.getContainerId())) { - // container associated with AM. must not be unmanaged - assert appAttempt.submissionContext.getUnmanagedAM() == false; - // Setup diagnostic message - appAttempt.diagnostics.append("AM Container for " + - appAttempt.getAppAttemptId() + " exited with " + - " exitCode: " + containerStatus.getExitStatus() + - " due to: " + containerStatus.getDiagnostics() + "." + - "Failing this attempt."); - - new FinalTransition(RMAppAttemptState.FAILED).transition( - appAttempt, containerFinishedEvent); - return RMAppAttemptState.FAILED; + // Saving the final attempt state. + new AttemptFinalStateSavingTransition( + new ContainerFinishedFinalStateSavedTransition(), + RMAppAttemptState.FAILED).transition(appAttempt, event); + return RMAppAttemptState.FINAL_SAVING; } - // Normal container. - - // Put it in completedcontainers list + // Normal container.Put it in completedcontainers list appAttempt.justFinishedContainers.add(containerStatus); return RMAppAttemptState.RUNNING; } } + private static class ContainerFinishedFinalStateSavedTransition extends + BaseTransition { + @Override + public void + transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { + RMAppAttemptContainerFinishedEvent containerFinishedEvent = + (RMAppAttemptContainerFinishedEvent) event; + ContainerStatus containerStatus = + containerFinishedEvent.getContainerStatus(); + // container associated with AM. must not be unmanaged + assert appAttempt.submissionContext.getUnmanagedAM() == false; + // Setup diagnostic message + appAttempt.diagnostics.append("AM Container for " + + appAttempt.getAppAttemptId() + " exited with " + " exitCode: " + + containerStatus.getExitStatus() + " due to: " + + containerStatus.getDiagnostics() + "." + "Failing this attempt."); + new FinalTransition(RMAppAttemptState.FAILED).transition(appAttempt, + event); + } + } + private static final class AMFinishingContainerFinishedTransition implements MultipleArcTransition { @@ -1228,13 +1392,72 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, appAttempt, containerFinishedEvent); return RMAppAttemptState.FINISHED; } - // Normal container. appAttempt.justFinishedContainers.add(containerStatus); return RMAppAttemptState.FINISHING; } } + private static class AMFinishedFinalStateSavingTransition extends + BaseTransition { + @Override + public void + transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { + RMAppAttemptContainerFinishedEvent containerFinishedEvent = + (RMAppAttemptContainerFinishedEvent) event; + ContainerStatus containerStatus = + containerFinishedEvent.getContainerStatus(); + + // If this is the AM container, it means the AM container is finished, + // but we are not yet acknowledged that the final state has been saved. + // Thus, we still return FINAL_SAVING state here. + if (appAttempt.masterContainer.getId().equals( + containerStatus.getContainerId())) { + new AttemptFinalStateSavingTransition( + // pass in the earlier AMUnregistered Event also, as this is needed for + // AMFinishedFinalStateSavedTransition later on + new AMFinishedFinalStateSavedTransition( + appAttempt.eventCausingFinalSaving), RMAppAttemptState.FINISHED) + .transition(appAttempt, event); + return; + } + // Normal container. + appAttempt.justFinishedContainers.add(containerStatus); + } + } + + private static class AMFinishedFinalStateSavedTransition extends + BaseTransition { + RMAppAttemptEvent amUnregisteredEvent; + public AMFinishedFinalStateSavedTransition( + RMAppAttemptEvent amUnregisteredEvent) { + this.amUnregisteredEvent = amUnregisteredEvent; + } + + @Override + public void + transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { + new AMUnregisteredFinalStateSavedTransition().transition(appAttempt, + amUnregisteredEvent); + new FinalTransition(RMAppAttemptState.FINISHED).transition(appAttempt, + event); + } + } + + private static class AMExpiredFinalStateSavingTransition extends + BaseTransition { + @Override + public void + transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { + new AttemptFinalStateSavingTransition( + // pass in the earlier AMUnregistered Event also, as this is needed for + // AMFinishedFinalStateSavedTransition later on + new AMFinishedFinalStateSavedTransition( + appAttempt.eventCausingFinalSaving), RMAppAttemptState.FINISHED) + .transition(appAttempt, event); + } + } + @Override public long getStartTime() { this.readLock.lock(); @@ -1267,7 +1490,7 @@ private void checkAttemptStoreError(RMAppAttemptEvent event) { } } - private void storeAttempt(RMStateStore store) { + private void storeAttempt() { // store attempt data in a non-blocking manner to prevent dispatcher // thread starvation and wait for state to be saved LOG.info("Storing attempt: AppId: " + @@ -1275,7 +1498,7 @@ private void storeAttempt(RMStateStore store) { + " AttemptId: " + getAppAttemptId() + " MasterContainer: " + masterContainer); - store.storeApplicationAttempt(this); + rmContext.getStateStore().storeNewApplicationAttempt(this); } private void removeCredentials(RMAppAttemptImpl appAttempt) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java index 3eb13ed..2551ed1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java @@ -20,5 +20,6 @@ public enum RMAppAttemptState { NEW, SUBMITTED, SCHEDULED, ALLOCATED, LAUNCHED, FAILED, RUNNING, FINISHING, - FINISHED, KILLED, ALLOCATED_SAVING, LAUNCHED_UNMANAGED_SAVING, RECOVERED + FINISHED, KILLED, ALLOCATED_SAVING, LAUNCHED_UNMANAGED_SAVING, RECOVERED, + FINAL_SAVING } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index 5bc4554..2e65b0c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -202,6 +202,12 @@ public void unregisterAppAttempt() throws Exception { final FinishApplicationMasterRequest req = FinishApplicationMasterRequest.newInstance( FinalApplicationStatus.SUCCEEDED, "", ""); + unregisterAppAttempt(req); + } + + public void unregisterAppAttempt(final FinishApplicationMasterRequest req) + throws Exception { + waitForState(RMAppAttemptState.RUNNING); UserGroupInformation ugi = UserGroupInformation.createRemoteUser(attemptId.toString()); Token token = @@ -216,4 +222,8 @@ public Object run() throws Exception { } }); } + + public ApplicationAttemptId getApplicationAttemptId() { + return this.attemptId; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 81f4bce..a39a41e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -22,6 +22,7 @@ import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; +import java.sql.Time; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -41,15 +42,20 @@ import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; @@ -122,7 +128,7 @@ public void testRMRestart() throws Exception { nm1.registerNode(); nm2.registerNode(); // nm2 will not heartbeat with RM1 - // create app that will not be saved because it will finish + // create app that will finish and the final state should be saved. RMApp app0 = rm1.submitApp(200); RMAppAttempt attempt0 = app0.getCurrentAppAttempt(); // spot check that app is saved @@ -130,14 +136,8 @@ public void testRMRestart() throws Exception { nm1.nodeHeartbeat(true); MockAM am0 = rm1.sendAMLaunched(attempt0.getAppAttemptId()); am0.registerAppAttempt(); - am0.unregisterAppAttempt(); - nm1.nodeHeartbeat(attempt0.getAppAttemptId(), 1, ContainerState.COMPLETE); - am0.waitForState(RMAppAttemptState.FINISHED); - rm1.waitForState(app0.getApplicationId(), RMAppState.FINISHED); + finishApplicationMaster(app0, rm1, nm1, am0); - // spot check that app is not saved anymore - Assert.assertEquals(0, rmAppState.size()); - // create app that gets launched and does allocate before RM restart RMApp app1 = rm1.submitApp(200); // assert app1 info is saved @@ -209,7 +209,6 @@ public void testRMRestart() throws Exception { .getApplicationId(), appUnmanaged.getApplicationSubmissionContext() .getApplicationId()); - // PHASE 2: create new RM and start from old state // create new RM to represent restart and recover state @@ -223,11 +222,17 @@ public void testRMRestart() throws Exception { nm2.setResourceTrackerService(rm2.getResourceTrackerService()); // verify load of old state - // only 2 apps are loaded since unmanaged app is not loaded back since it - // cannot be restarted by the RM this will change with work preserving RM - // restart in which AMs/NMs are not rebooted - Assert.assertEquals(2, rm2.getRMContext().getRMApps().size()); - + // 3 apps are loaded. + // FINISHED app and attempt is also loaded back. + // Unmanaged app is not loaded back since it cannot be restarted by the RM + // this will change with work preserving RM restart in which AMs/NMs are not + // rebooted. + Assert.assertEquals(3, rm2.getRMContext().getRMApps().size()); + // check that earlier finished app and attempt is also loaded back and move + // to finished state. + rm2.waitForState(app0.getApplicationId(), RMAppState.FINISHED); + rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.FINISHED); + // verify correct number of attempts and other data RMApp loadedApp1 = rm2.getRMContext().getRMApps().get(app1.getApplicationId()); Assert.assertNotNull(loadedApp1); @@ -331,26 +336,152 @@ public void testRMRestart() throws Exception { new ArrayList()).getAllocatedContainers()); Thread.sleep(500); } + // finish the AMs + finishApplicationMaster(loadedApp1, rm2, am1Node, am1); + finishApplicationMaster(loadedApp2, rm2, am2Node, am2); - // finish the AM's - am1.unregisterAppAttempt(); - rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.FINISHING); - am1Node.nodeHeartbeat(attempt1.getAppAttemptId(), 1, ContainerState.COMPLETE); - am1.waitForState(RMAppAttemptState.FINISHED); - - am2.unregisterAppAttempt(); - rm2.waitForState(loadedApp2.getApplicationId(), RMAppState.FINISHING); - am2Node.nodeHeartbeat(attempt2.getAppAttemptId(), 1, ContainerState.COMPLETE); - am2.waitForState(RMAppAttemptState.FINISHED); - // stop RM's rm2.stop(); rm1.stop(); - // completed apps should be removed - Assert.assertEquals(0, rmAppState.size()); + // completed apps are not removed immediately after app finish + // And finished app is also loaded back. + Assert.assertEquals(3, rmAppState.size()); } - + + @Test + public void testRMRestartAppRunningAMFailed() throws Exception { + Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + RMState rmState = memStore.getState(); + Map rmAppState = + rmState.getApplicationState(); + + // start RM + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // create app and launch the AM + RMApp app0 = rm1.submitApp(200); + RMAppAttempt attempt0 = app0.getCurrentAppAttempt(); + nm1.nodeHeartbeat(true); + MockAM am0 = rm1.sendAMLaunched(attempt0.getAppAttemptId()); + am0.registerAppAttempt(); + rm1.waitForState(app0.getApplicationId(), RMAppState.RUNNING); + // fail the AM by sending CONTAINER_FINISHED event without registering. + nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am0.waitForState(RMAppAttemptState.FAILED); + + ApplicationState appState = rmAppState.get(app0.getApplicationId()); + // assert the AM failed state is saved. + Assert.assertEquals(RMAppAttemptState.FAILED, + appState.getAttempt(am0.getApplicationAttemptId()).getState()); + // assert app state has not been saved. + Assert.assertNull(rmAppState.get(app0.getApplicationId()).getState()); + // new AM started but not registered, app stays at ACCECPTED state. + rm1.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED); + + // start new RM + MockRM rm2 = new MockRM(conf, memStore); + rm2.start(); + // assert the previous AM state is loaded back on RM recovery. + RMApp recoveredApp = + rm2.getRMContext().getRMApps().get(app0.getApplicationId()); + Assert.assertEquals(RMAppAttemptState.FAILED, recoveredApp + .getAppAttempts().get(am0.getApplicationAttemptId()).getAppAttemptState()); + } + + @Test + public void testRMRestartGetApplicationReport() throws Exception { + Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + RMState rmState = memStore.getState(); + Map rmAppState = + rmState.getApplicationState(); + + // start RM + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // create an app and finish the app. + RMApp app0 = rm1.submitApp(200); + RMAppAttempt attempt0 = app0.getCurrentAppAttempt(); + nm1.nodeHeartbeat(true); + MockAM am0 = rm1.sendAMLaunched(attempt0.getAppAttemptId()); + am0.registerAppAttempt(); + final FinishApplicationMasterRequest req = + FinishApplicationMasterRequest.newInstance( + FinalApplicationStatus.SUCCEEDED, "diagnostics", "trackingUrl"); + + am0.unregisterAppAttempt(req); + + am0.waitForState(RMAppAttemptState.FINISHING); + nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am0.waitForState(RMAppAttemptState.FINISHED); + rm1.waitForState(app0.getApplicationId(), RMAppState.FINISHED); + + // check that app/attempt is saved with the final state + ApplicationState appState = rmAppState.get(app0.getApplicationId()); + Assert.assertEquals(RMAppState.FINISHED, appState.getState()); + Assert.assertEquals(app0.getFinishTime(), appState.getFinishTime()); +// Assert.assertEquals("diagnostics", appState.getDiagnostics()); + + Assert.assertEquals(app0.getFinishTime(), appState.getFinishTime()); + ApplicationAttemptState attemptState0 = + appState.getAttempt(am0.getApplicationAttemptId()); + Assert.assertEquals(RMAppAttemptState.FINISHED, + attemptState0.getState()); + Assert.assertEquals("diagnostics", attemptState0.getDiagnostics()); + Assert.assertEquals(FinalApplicationStatus.SUCCEEDED, + attemptState0.getFinalApplicationStatus()); + Assert.assertEquals("trackingUrl", attemptState0.getOrigTrackingUrl()); + + // create new RM to represent restart and recover state + MockRM rm2 = new MockRM(conf, memStore); + rm2.start(); + + GetApplicationReportRequest reportRequest = + GetApplicationReportRequest.newInstance(app0.getApplicationId()); + GetApplicationReportResponse response = rm2.getClientRMService().getApplicationReport(reportRequest); + ApplicationReport report = response.getApplicationReport(); +// Assert.assertEquals("diagnostics", report.getDiagnostics()); + Assert.assertEquals(FinalApplicationStatus.SUCCEEDED, + report.getFinalApplicationStatus()); + Assert.assertEquals("trackingUrl", report.getOriginalTrackingUrl()); + Assert.assertEquals(app0.getFinishTime(), report.getFinishTime()); + } + + private void finishApplicationMaster(RMApp rmApp, MockRM rm, MockNM nm, + MockAM am) throws Exception { + RMState rmState = + ((MemoryRMStateStore) rm.getRMContext().getStateStore()).getState(); + Map rmAppState = + rmState.getApplicationState(); + am.unregisterAppAttempt(); + am.waitForState(RMAppAttemptState.FINISHING); + nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am.waitForState(RMAppAttemptState.FINISHED); + rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHED); + // check that app/attempt is saved with the final state + ApplicationState appState = rmAppState.get(rmApp.getApplicationId()); + Assert + .assertEquals(RMAppState.FINISHED, appState.getState()); + Assert.assertEquals(RMAppAttemptState.FINISHED, + appState.getAttempt(am.getApplicationAttemptId()).getState()); + } + @Test public void testRMRestartOnMaxAppAttempts() throws Exception { Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index 72ef37f..0648353 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -26,10 +26,8 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import java.io.IOException; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import javax.crypto.SecretKey; @@ -39,13 +37,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.ha.ClientBaseWithFixes; -import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.delegation.DelegationKey; @@ -72,10 +64,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.util.ConverterUtils; -import org.apache.zookeeper.ZooKeeper; - -import org.junit.Test; - public class RMStateStoreTestBase extends ClientBaseWithFixes{ public static final Log LOG = LogFactory.getLog(RMStateStoreTestBase.class); @@ -145,7 +133,7 @@ void storeApp( when(mockApp.getSubmitTime()).thenReturn(time); when(mockApp.getApplicationSubmissionContext()).thenReturn(context); when(mockApp.getUser()).thenReturn("test"); - store.storeApplication(mockApp); + store.storeNewApplication(mockApp); } ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId, @@ -163,7 +151,7 @@ ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId, .thenReturn(clientTokenMasterKey); dispatcher.attemptId = attemptId; dispatcher.storedException = null; - store.storeApplicationAttempt(mockAttempt); + store.storeNewApplicationAttempt(mockAttempt); waitNotify(dispatcher); return container.getId(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index 73dc8d3..bcb2f6f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -19,19 +19,18 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp; import java.util.Collection; - import java.util.LinkedHashMap; import java.util.Map; +import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -226,5 +225,5 @@ public boolean isAppSafeToUnregister() { @Override public YarnApplicationState createApplicationState() { return null; - }; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index 2075921..39d5248 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -24,6 +24,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.times; import java.io.IOException; import java.util.Arrays; @@ -57,6 +58,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; @@ -286,7 +288,8 @@ private static void assertFinalAppStatus(FinalApplicationStatus status, RMApp ap } // test to make sure times are set when app finishes - private static void assertTimesAtFinish(RMApp application) { + private void assertTimesAtFinish(RMApp application) { + sendAppSavedEvent(application); assertStartTimeSet(application); Assert.assertTrue("application finish time is not greater then 0", (application.getFinishTime() > 0)); @@ -294,11 +297,12 @@ private static void assertTimesAtFinish(RMApp application) { (application.getFinishTime() >= application.getStartTime())); } - private void assertAppRemoved(RMApp application){ - verify(store).removeApplication(application); + private void assertAppFinalStateSaved(RMApp application){ + verify(store, times(1)).updateApplicationState(any(ApplicationState.class)); } - private static void assertKilled(RMApp application) { + private void assertKilled(RMApp application) { + sendAppSavedEvent(application); assertTimesAtFinish(application); assertAppState(RMAppState.KILLED, application); assertFinalAppStatus(FinalApplicationStatus.KILLED, application); @@ -307,20 +311,34 @@ private static void assertKilled(RMApp application) { "Application killed by user.", diag.toString()); } - private static void assertAppAndAttemptKilled(RMApp application) throws InterruptedException { + private void assertAppAndAttemptKilled(RMApp application) + throws InterruptedException { assertKilled(application); - Assert.assertEquals( RMAppAttemptState.KILLED, - application.getCurrentAppAttempt().getAppAttemptState() - ); + // send attempt final state saved event. + application.getCurrentAppAttempt().handle( + new RMAppAttemptStoredEvent(application.getCurrentAppAttempt() + .getAppAttemptId(), null)); + Assert.assertEquals(RMAppAttemptState.KILLED, application + .getCurrentAppAttempt().getAppAttemptState()); + assertAppFinalStateSaved(application); } - private static void assertFailed(RMApp application, String regex) { + private void assertFailed(RMApp application, String regex) { + sendAppSavedEvent(application); assertTimesAtFinish(application); assertAppState(RMAppState.FAILED, application); assertFinalAppStatus(FinalApplicationStatus.FAILED, application); StringBuilder diag = application.getDiagnostics(); Assert.assertTrue("application diagnostics is not correct", diag.toString().matches(regex)); + assertAppFinalStateSaved(application); + } + + private void sendAppSavedEvent(RMApp application) { + RMAppEvent event = + new RMAppStoredEvent(application.getApplicationId(), null); + application.handle(event); + rmDispatcher.await(); } protected RMApp testCreateAppNewSaving( @@ -386,15 +404,15 @@ protected RMApp testCreateAppRunning( return application; } - protected RMApp testCreateAppRemoving( + protected RMApp testCreateAppFinalSaving( ApplicationSubmissionContext submissionContext) throws IOException { RMApp application = testCreateAppRunning(submissionContext); RMAppEvent finishingEvent = new RMAppEvent(application.getApplicationId(), RMAppEventType.ATTEMPT_UNREGISTERED); application.handle(finishingEvent); - assertAppState(RMAppState.REMOVING, application); - assertAppRemoved(application); + assertAppState(RMAppState.FINAL_SAVING, application); + assertAppFinalStateSaved(application); return application; } @@ -402,11 +420,11 @@ protected RMApp testCreateAppFinishing( ApplicationSubmissionContext submissionContext) throws IOException { // unmanaged AMs don't use the FINISHING state assert submissionContext == null || !submissionContext.getUnmanagedAM(); - RMApp application = testCreateAppRemoving(submissionContext); - // REMOVING => FINISHING event RMAppEventType.APP_REMOVED - RMAppEvent finishingEvent = - new RMAppRemovedEvent(application.getApplicationId(), null); - application.handle(finishingEvent); + RMApp application = testCreateAppFinalSaving(submissionContext); + // FINAL_SAVING => FINISHING event RMAppEventType.APP_SAVED + RMAppEvent appUpdated = + new RMAppStoredEvent(application.getApplicationId(), null); + application.handle(appUpdated); assertAppState(RMAppState.FINISHING, application); assertTimesAtFinish(application); return application; @@ -552,7 +570,6 @@ public void testAppSubmittedKill() throws IOException, InterruptedException { RMAppEventType.KILL); application.handle(event); rmDispatcher.await(); - assertKilled(application); assertAppAndAttemptKilled(application); } @@ -597,7 +614,6 @@ public void testAppAcceptedKill() throws IOException, InterruptedException { RMAppEventType.KILL); application.handle(event); rmDispatcher.await(); - assertKilled(application); assertAppAndAttemptKilled(application); } @@ -666,40 +682,37 @@ public void testAppRunningFailed() throws IOException { } @Test - public void testAppRemovingFinished() throws IOException { - LOG.info("--- START: testAppRemovingFINISHED ---"); - RMApp application = testCreateAppRemoving(null); - // APP_REMOVING => FINISHED event RMAppEventType.ATTEMPT_FINISHED - RMAppEvent finishedEvent = new RMAppFinishedAttemptEvent( - application.getApplicationId(), null); - application.handle(finishedEvent); - rmDispatcher.await(); - assertAppState(RMAppState.FINISHED, application); - } + public void testAppFinishingKill() throws IOException { + LOG.info("--- START: testAppFinishedFinished ---"); - @Test - public void testAppRemovingKilled() throws IOException { - LOG.info("--- START: testAppRemovingKilledD ---"); - RMApp application = testCreateAppRemoving(null); - // APP_REMOVING => KILLED event RMAppEventType.KILL + RMApp application = testCreateAppFinishing(null); + // FINISHING => FINISHED event RMAppEventType.KILL RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); application.handle(event); rmDispatcher.await(); - assertAppState(RMAppState.KILLED, application); + assertAppState(RMAppState.FINISHED, application); } @Test - public void testAppFinishingKill() throws IOException { - LOG.info("--- START: testAppFinishedFinished ---"); - - RMApp application = testCreateAppFinishing(null); - // FINISHING => FINISHED event RMAppEventType.KILL + public void testAppFinalSavingToFinished() throws IOException { + RMApp application = testCreateAppFinalSaving(null); + final String diagMsg = "some diagnostics"; + // attempt_finished event comes before attempt_saved event RMAppEvent event = - new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); + new RMAppFinishedAttemptEvent(application.getApplicationId(), diagMsg); application.handle(event); - rmDispatcher.await(); + assertAppState(RMAppState.FINAL_SAVING, application); + RMAppEvent appUpdated = + new RMAppStoredEvent(application.getApplicationId(), null); + application.handle(appUpdated); assertAppState(RMAppState.FINISHED, application); + + assertTimesAtFinish(application); + // finished without a proper unregister implies failed + assertFinalAppStatus(FinalApplicationStatus.FAILED, application); + Assert.assertTrue("Finished app missing diagnostics", application + .getDiagnostics().indexOf(diagMsg) != -1); } @Test diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index 1f3c506..f176379 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; @@ -114,7 +115,8 @@ private ApplicationMasterLauncher applicationMasterLauncher; private AMLivelinessMonitor amLivelinessMonitor; private AMLivelinessMonitor amFinishingMonitor; - + private RMStateStore store; + private RMApp application; private RMAppAttempt applicationAttempt; @@ -209,7 +211,7 @@ public void setUp() throws Exception { new NMTokenSecretManagerInRM(conf), clientToAMTokenManager); - RMStateStore store = mock(RMStateStore.class); + store = mock(RMStateStore.class); ((RMContextImpl) rmContext).setStateStore(store); scheduler = mock(YarnScheduler.class); @@ -330,6 +332,7 @@ private void testAppAttemptSubmittedState() { * {@link RMAppAttemptState#SUBMITTED} -> {@link RMAppAttemptState#FAILED} */ private void testAppAttemptSubmittedToFailedState(String diagnostics) { + sendAttemptSavedEvent(applicationAttempt); assertEquals(RMAppAttemptState.FAILED, applicationAttempt.getAppAttemptState()); assertEquals(diagnostics, applicationAttempt.getDiagnostics()); @@ -354,6 +357,7 @@ private void testAppAttemptSubmittedToFailedState(String diagnostics) { */ private void testAppAttemptKilledState(Container amContainer, String diagnostics) { + sendAttemptSavedEvent(applicationAttempt); assertEquals(RMAppAttemptState.KILLED, applicationAttempt.getAppAttemptState()); assertEquals(diagnostics, applicationAttempt.getDiagnostics()); @@ -363,6 +367,7 @@ private void testAppAttemptKilledState(Container amContainer, assertEquals(0, applicationAttempt.getRanNodes().size()); assertNull(applicationAttempt.getFinalApplicationStatus()); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); + verifyAttemptFinalStateSaved(); } /** @@ -427,6 +432,7 @@ private void testAppAttemptAllocatedState(Container amContainer) { */ private void testAppAttemptFailedState(Container container, String diagnostics) { + sendAttemptSavedEvent(applicationAttempt); assertEquals(RMAppAttemptState.FAILED, applicationAttempt.getAppAttemptState()); assertEquals(diagnostics, applicationAttempt.getDiagnostics()); @@ -437,8 +443,8 @@ private void testAppAttemptFailedState(Container container, // Check events verify(application, times(2)).handle(any(RMAppFailedAttemptEvent.class)); - verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); + verifyAttemptFinalStateSaved(); } /** @@ -483,6 +489,7 @@ private void testAppAttemptFinishingState(Container container, FinalApplicationStatus finalStatus, String trackingUrl, String diagnostics) { + sendAttemptSavedEvent(applicationAttempt); assertEquals(RMAppAttemptState.FINISHING, applicationAttempt.getAppAttemptState()); assertEquals(diagnostics, applicationAttempt.getDiagnostics()); @@ -492,6 +499,7 @@ private void testAppAttemptFinishingState(Container container, assertEquals(container, applicationAttempt.getMasterContainer()); assertEquals(finalStatus, applicationAttempt.getFinalApplicationStatus()); verifyTokenCount(applicationAttempt.getAppAttemptId(), 0); + verifyAttemptFinalStateSaved(); } /** @@ -507,11 +515,11 @@ private void testAppAttemptFinishedState(Container container, assertEquals(diagnostics, applicationAttempt.getDiagnostics()); verifyUrl(trackingUrl, applicationAttempt.getOriginalTrackingUrl()); if (unmanagedAM) { - verifyUrl(trackingUrl, applicationAttempt.getTrackingUrl()); - + verifyUrl(trackingUrl, applicationAttempt.getTrackingUrl()); } else { assertEquals(getProxyUrl(applicationAttempt), applicationAttempt.getTrackingUrl()); + verifyAttemptFinalStateSaved(); } assertEquals(finishedContainerCount, applicationAttempt .getJustFinishedContainers().size()); @@ -647,7 +655,13 @@ private void testUnmanagedAMSuccess(String url) { testAppAttemptFinishedState(null, finalStatus, url, diagnostics, 1, true); } - + + private void sendAttemptSavedEvent(RMAppAttempt applicationAttempt) { + applicationAttempt.handle( + new RMAppAttemptStoredEvent( + applicationAttempt.getAppAttemptId(), null)); + } + @Test public void testUnmanagedAMUnexpectedRegistration() { unmanagedAM = true; @@ -745,6 +759,7 @@ public void testAMCrashAtAllocated() { ContainerState.COMPLETE, containerDiagMsg, exitCode); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( applicationAttempt.getAppAttemptId(), cs)); + sendAttemptSavedEvent(applicationAttempt); assertEquals(RMAppAttemptState.FAILED, applicationAttempt.getAppAttemptState()); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); @@ -762,6 +777,7 @@ public void testRunningToFailed() { ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId(); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( appAttemptId, cs)); + sendAttemptSavedEvent(applicationAttempt); assertEquals(RMAppAttemptState.FAILED, applicationAttempt.getAppAttemptState()); assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); @@ -782,6 +798,7 @@ public void testRunningToKilled() { new RMAppAttemptEvent( applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.KILL)); + sendAttemptSavedEvent(applicationAttempt); assertEquals(RMAppAttemptState.KILLED, applicationAttempt.getAppAttemptState()); assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); @@ -800,6 +817,7 @@ public void testLaunchedExpire() { launchApplicationAttempt(amContainer); applicationAttempt.handle(new RMAppAttemptEvent( applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.EXPIRE)); + sendAttemptSavedEvent(applicationAttempt); assertEquals(RMAppAttemptState.FAILED, applicationAttempt.getAppAttemptState()); assertTrue("expire diagnostics missing", @@ -818,6 +836,7 @@ public void testRunningExpire() { runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false); applicationAttempt.handle(new RMAppAttemptEvent( applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.EXPIRE)); + sendAttemptSavedEvent(applicationAttempt); assertEquals(RMAppAttemptState.FAILED, applicationAttempt.getAppAttemptState()); assertTrue("expire diagnostics missing", @@ -962,7 +981,57 @@ public void testSuccessfulFinishingToFinished() { testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl, diagnostics, 0, false); } - + + @Test + public void + testFinalSavingToFinishedOnFirstContainerFinishedThenAttemptSaved() { + Container amContainer = allocateApplicationAttempt(); + launchApplicationAttempt(amContainer); + runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false); + FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED; + String trackingUrl = "mytrackingurl"; + String diagnostics = "Successful"; + applicationAttempt.handle(new RMAppAttemptUnregistrationEvent( + applicationAttempt.getAppAttemptId(), trackingUrl, finalStatus, + diagnostics)); + assertEquals(RMAppAttemptState.FINAL_SAVING, + applicationAttempt.getAppAttemptState()); + // Container_finished event comes before Attempt_Saved event. + applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( + applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus( + amContainer.getId(), ContainerState.COMPLETE, "", 0))); + assertEquals(RMAppAttemptState.FINAL_SAVING, + applicationAttempt.getAppAttemptState()); + // send attempt_saved + sendAttemptSavedEvent(applicationAttempt); + testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl, + diagnostics, 0, false); + } + + @Test + public void testFinalSavingToFinishedOnFirstExpireThenAttemptSaved() { + Container amContainer = allocateApplicationAttempt(); + launchApplicationAttempt(amContainer); + runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false); + FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED; + String trackingUrl = "mytrackingurl"; + String diagnostics = "Successssseeeful"; + applicationAttempt.handle(new RMAppAttemptUnregistrationEvent( + applicationAttempt.getAppAttemptId(), trackingUrl, finalStatus, + diagnostics)); + assertEquals(RMAppAttemptState.FINAL_SAVING, + applicationAttempt.getAppAttemptState()); + // Expire event comes before Attempt_saved event. + applicationAttempt.handle(new RMAppAttemptEvent(applicationAttempt + .getAppAttemptId(), RMAppAttemptEventType.EXPIRE)); + assertEquals(RMAppAttemptState.FINAL_SAVING, + applicationAttempt.getAppAttemptState()); + // send attempt_saved + sendAttemptSavedEvent(applicationAttempt); + testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl, + diagnostics, 0, false); + } + private void verifyTokenCount(ApplicationAttemptId appAttemptId, int count) { verify(amRMTokenManager, times(count)).applicationMasterFinished(appAttemptId); if (UserGroupInformation.isSecurityEnabled()) { @@ -980,4 +1049,9 @@ private void verifyUrl(String url1, String url2) { assertEquals(url1, url2); } } + + private void verifyAttemptFinalStateSaved() { + verify(store, times(1)).updateApplicationAttemptState( + any(ApplicationAttemptState.class)); + } }