diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index fc4537c..d85d61e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; -import java.nio.ByteBuffer; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -31,7 +30,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; @@ -61,6 +59,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent; +import org.apache.hadoop.yarn.state.InvalidStateTransitonException; +import org.apache.hadoop.yarn.state.SingleArcTransition; +import org.apache.hadoop.yarn.state.StateMachine; +import org.apache.hadoop.yarn.state.StateMachineFactory; @Private @Unstable @@ -83,8 +85,142 @@ public static final Log LOG = LogFactory.getLog(RMStateStore.class); + private enum RMStateStoreState { + DEFAULT + }; + + private static final StateMachineFactory + stateMachineFactory = new StateMachineFactory( + RMStateStoreState.DEFAULT) + .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT, + RMStateStoreEventType.STORE_APP, new StoreAppTransition()) + .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT, + RMStateStoreEventType.UPDATE_APP, new UpdateAppTransition()) + .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT, + RMStateStoreEventType.REMOVE_APP, new RemoveAppTransition()) + .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT, + RMStateStoreEventType.STORE_APP_ATTEMPT, new StoreAppAttemptTransition()) + .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT, + RMStateStoreEventType.UPDATE_APP_ATTEMPT, new UpdateAppAttemptTransition()); + + private final StateMachine stateMachine; + + private static class StoreAppTransition + implements SingleArcTransition { + @Override + public void transition(RMStateStore operand, RMStateStoreEvent event) { + ApplicationState appState = ((RMStateStoreAppEvent) event).getAppState(); + ApplicationId appId = appState.getAppId(); + ApplicationStateDataPBImpl appStateData = + (ApplicationStateDataPBImpl) ApplicationStateDataPBImpl + .newApplicationStateData(appState); + LOG.info("Storing info for app: " + appId); + try { + operand.storeApplicationStateInternal(appId, appStateData); + operand.notifyDoneStoringApplication(appId, null); + } catch (Exception e) { + LOG.error("Error storing app: " + appId, e); + operand.notifyStoreOperationFailed(e); + } + }; + } + + private static class UpdateAppTransition implements + SingleArcTransition { + @Override + public void transition(RMStateStore operand, RMStateStoreEvent event) { + ApplicationState appState = ((RMStateUpdateAppEvent) event).getAppState(); + ApplicationId appId = appState.getAppId(); + ApplicationStateDataPBImpl appStateData = + (ApplicationStateDataPBImpl) ApplicationStateDataPBImpl + .newApplicationStateData(appState); + LOG.info("Updating info for app: " + appId); + try { + operand.updateApplicationStateInternal(appId, appStateData); + operand.notifyDoneUpdatingApplication(appId, null); + } catch (Exception e) { + LOG.error("Error updating app: " + appId, e); + operand.notifyStoreOperationFailed(e); + } + }; + } + + private static class RemoveAppTransition implements + SingleArcTransition { + @Override + public void transition(RMStateStore operand, RMStateStoreEvent event) { + ApplicationState appState = ((RMStateStoreRemoveAppEvent) event) + .getAppState(); + ApplicationId appId = appState.getAppId(); + LOG.info("Removing info for app: " + appId); + try { + operand.removeApplicationStateInternal(appState); + } catch (Exception e) { + LOG.error("Error removing app: " + appId, e); + operand.notifyStoreOperationFailed(e); + } + }; + } + + private static class StoreAppAttemptTransition implements + SingleArcTransition { + @Override + public void transition(RMStateStore operand, RMStateStoreEvent event) { + ApplicationAttemptState attemptState = + ((RMStateStoreAppAttemptEvent) event).getAppAttemptState(); + try { + ApplicationAttemptStateDataPBImpl attemptStateData = + (ApplicationAttemptStateDataPBImpl) ApplicationAttemptStateDataPBImpl + .newApplicationAttemptStateData(attemptState); + if (LOG.isDebugEnabled()) { + LOG.debug("Storing info for attempt: " + attemptState.getAttemptId()); + } + operand.storeApplicationAttemptStateInternal(attemptState.getAttemptId(), + attemptStateData); + operand.notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(), + null); + } catch (Exception e) { + LOG.error("Error storing appAttempt: " + attemptState.getAttemptId(), e); + operand.notifyStoreOperationFailed(e); + } + }; + } + + private static class UpdateAppAttemptTransition implements + SingleArcTransition { + @Override + public void transition(RMStateStore operand, RMStateStoreEvent event) { + ApplicationAttemptState attemptState = + ((RMStateUpdateAppAttemptEvent) event).getAppAttemptState(); + try { + ApplicationAttemptStateDataPBImpl attemptStateData = + (ApplicationAttemptStateDataPBImpl) ApplicationAttemptStateDataPBImpl + .newApplicationAttemptStateData(attemptState); + if (LOG.isDebugEnabled()) { + LOG.debug("Updating info for attempt: " + attemptState.getAttemptId()); + } + operand.updateApplicationAttemptStateInternal(attemptState.getAttemptId(), + attemptStateData); + operand.notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(), + null); + } catch (Exception e) { + LOG.error("Error updating appAttempt: " + attemptState.getAttemptId(), e); + operand.notifyStoreOperationFailed(e); + } + }; + } + public RMStateStore() { super(RMStateStore.class.getName()); + stateMachine = stateMachineFactory.make(this); } /** @@ -596,105 +732,10 @@ public Credentials getCredentialsFromAppAttempt(RMAppAttempt appAttempt) { // Dispatcher related code protected void handleStoreEvent(RMStateStoreEvent event) { - if (event.getType().equals(RMStateStoreEventType.STORE_APP) - || event.getType().equals(RMStateStoreEventType.UPDATE_APP)) { - ApplicationState appState = null; - if (event.getType().equals(RMStateStoreEventType.STORE_APP)) { - appState = ((RMStateStoreAppEvent) event).getAppState(); - } else { - assert event.getType().equals(RMStateStoreEventType.UPDATE_APP); - appState = ((RMStateUpdateAppEvent) event).getAppState(); - } - - Exception storedException = null; - ApplicationStateDataPBImpl appStateData = - (ApplicationStateDataPBImpl) ApplicationStateDataPBImpl - .newApplicationStateData(appState.getSubmitTime(), - appState.getStartTime(), appState.getUser(), - appState.getApplicationSubmissionContext(), appState.getState(), - appState.getDiagnostics(), appState.getFinishTime()); - - ApplicationId appId = - appState.getApplicationSubmissionContext().getApplicationId(); - - LOG.info("Storing info for app: " + appId); - try { - if (event.getType().equals(RMStateStoreEventType.STORE_APP)) { - storeApplicationStateInternal(appId, appStateData); - notifyDoneStoringApplication(appId, storedException); - } else { - assert event.getType().equals(RMStateStoreEventType.UPDATE_APP); - updateApplicationStateInternal(appId, appStateData); - notifyDoneUpdatingApplication(appId, storedException); - } - } catch (Exception e) { - LOG.error("Error storing/updating app: " + appId, e); - notifyStoreOperationFailed(e); - } - } else if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT) - || event.getType().equals(RMStateStoreEventType.UPDATE_APP_ATTEMPT)) { - - ApplicationAttemptState attemptState = null; - if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) { - attemptState = - ((RMStateStoreAppAttemptEvent) event).getAppAttemptState(); - } else { - assert event.getType().equals(RMStateStoreEventType.UPDATE_APP_ATTEMPT); - attemptState = - ((RMStateUpdateAppAttemptEvent) event).getAppAttemptState(); - } - - Exception storedException = null; - Credentials credentials = attemptState.getAppAttemptCredentials(); - ByteBuffer appAttemptTokens = null; - try { - if (credentials != null) { - DataOutputBuffer dob = new DataOutputBuffer(); - credentials.writeTokenStorageToStream(dob); - appAttemptTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); - } - ApplicationAttemptStateDataPBImpl attemptStateData = - (ApplicationAttemptStateDataPBImpl) ApplicationAttemptStateDataPBImpl - .newApplicationAttemptStateData(attemptState.getAttemptId(), - attemptState.getMasterContainer(), appAttemptTokens, - attemptState.getStartTime(), attemptState.getState(), - attemptState.getFinalTrackingUrl(), - attemptState.getDiagnostics(), - attemptState.getFinalApplicationStatus()); - if (LOG.isDebugEnabled()) { - LOG.debug("Storing info for attempt: " + attemptState.getAttemptId()); - } - if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) { - storeApplicationAttemptStateInternal(attemptState.getAttemptId(), - attemptStateData); - notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(), - storedException); - } else { - assert event.getType().equals( - RMStateStoreEventType.UPDATE_APP_ATTEMPT); - updateApplicationAttemptStateInternal(attemptState.getAttemptId(), - attemptStateData); - notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(), - storedException); - } - } catch (Exception e) { - LOG.error( - "Error storing/updating appAttempt: " + attemptState.getAttemptId(), e); - notifyStoreOperationFailed(e); - } - } else if (event.getType().equals(RMStateStoreEventType.REMOVE_APP)) { - ApplicationState appState = - ((RMStateStoreRemoveAppEvent) event).getAppState(); - ApplicationId appId = appState.getAppId(); - LOG.info("Removing info for app: " + appId); - try { - removeApplicationStateInternal(appState); - } catch (Exception e) { - LOG.error("Error removing app: " + appId, e); - notifyStoreOperationFailed(e); - } - } else { - LOG.error("Unknown RMStateStoreEvent type: " + event.getType()); + try { + this.stateMachine.doTransition(event.getType(), event); + } catch (InvalidStateTransitonException e) { + LOG.error("Can't handle this event at current state", e); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java index 75ac2ee..dc270ac 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java @@ -18,8 +18,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb; +import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -33,6 +36,7 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMAppAttemptStateProto; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; @@ -267,6 +271,23 @@ public static ApplicationAttemptStateData newApplicationAttemptStateData( return attemptStateData; } + public static ApplicationAttemptStateData newApplicationAttemptStateData( + ApplicationAttemptState attemptState) throws IOException { + Credentials credentials = attemptState.getAppAttemptCredentials(); + ByteBuffer appAttemptTokens = null; + if (credentials != null) { + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + appAttemptTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + } + return newApplicationAttemptStateData(attemptState.getAttemptId(), + attemptState.getMasterContainer(), appAttemptTokens, + attemptState.getStartTime(), attemptState.getState(), + attemptState.getFinalTrackingUrl(), + attemptState.getDiagnostics(), + attemptState.getFinalApplicationStatus()); + } + private static String RM_APP_ATTEMPT_PREFIX = "RMATTEMPT_"; public static RMAppAttemptStateProto convertToProtoFormat(RMAppAttemptState e) { return RMAppAttemptStateProto.valueOf(RM_APP_ATTEMPT_PREFIX + e.name()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java index ede8ca7..df50917 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMAppStateProto; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; @@ -217,6 +218,14 @@ public static ApplicationStateData newApplicationStateData(long submitTime, return appState; } + public static ApplicationStateData newApplicationStateData( + ApplicationState appState) { + return newApplicationStateData(appState.getSubmitTime(), + appState.getStartTime(), appState.getUser(), + appState.getApplicationSubmissionContext(), appState.getState(), + appState.getDiagnostics(), appState.getFinishTime()); + } + private static String RM_APP_PREFIX = "RMAPP_"; public static RMAppStateProto convertToProtoFormat(RMAppState e) { return RMAppStateProto.valueOf(RM_APP_PREFIX + e.name());