diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index fc4537c..2db6f96 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; -import java.nio.ByteBuffer; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -31,7 +30,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; @@ -61,6 +59,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent; +import org.apache.hadoop.yarn.state.InvalidStateTransitonException; +import org.apache.hadoop.yarn.state.SingleArcTransition; +import org.apache.hadoop.yarn.state.StateMachine; +import org.apache.hadoop.yarn.state.StateMachineFactory; @Private @Unstable @@ -83,8 +85,167 @@ public static final Log LOG = LogFactory.getLog(RMStateStore.class); + private enum RMStateStoreState { + DEFAULT + }; + + private static final StateMachineFactory + stateMachineFactory = new StateMachineFactory( + RMStateStoreState.DEFAULT) + .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT, + RMStateStoreEventType.STORE_APP, new StoreAppTransition()) + .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT, + RMStateStoreEventType.UPDATE_APP, new UpdateAppTransition()) + .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT, + RMStateStoreEventType.REMOVE_APP, new RemoveAppTransition()) + .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT, + RMStateStoreEventType.STORE_APP_ATTEMPT, new StoreAppAttemptTransition()) + .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT, + RMStateStoreEventType.UPDATE_APP_ATTEMPT, new UpdateAppAttemptTransition()); + + private final StateMachine stateMachine; + + private static class StoreAppTransition + implements SingleArcTransition { + @Override + public void transition(RMStateStore operand, RMStateStoreEvent event) { + if (!(event instanceof RMStateStoreAppEvent)) { + // should never happen + LOG.error("Illegal event type: " + event.getClass()); + return; + } + ApplicationState appState = ((RMStateStoreAppEvent) event).getAppState(); + ApplicationId appId = appState.getAppId(); + ApplicationStateDataPBImpl appStateData = + (ApplicationStateDataPBImpl) ApplicationStateDataPBImpl + .newInstance(appState); + LOG.info("Storing info for app: " + appId); + try { + operand.storeApplicationStateInternal(appId, appStateData); + operand.notifyDoneStoringApplication(appId, null); + } catch (Exception e) { + LOG.error("Error storing app: " + appId, e); + operand.notifyStoreOperationFailed(e); + } + }; + } + + private static class UpdateAppTransition implements + SingleArcTransition { + @Override + public void transition(RMStateStore operand, RMStateStoreEvent event) { + if (!(event instanceof RMStateUpdateAppEvent)) { + // should never happen + LOG.error("Illegal event type: " + event.getClass()); + return; + } + ApplicationState appState = ((RMStateUpdateAppEvent) event).getAppState(); + ApplicationId appId = appState.getAppId(); + ApplicationStateDataPBImpl appStateData = + (ApplicationStateDataPBImpl) ApplicationStateDataPBImpl + .newInstance(appState); + LOG.info("Updating info for app: " + appId); + try { + operand.updateApplicationStateInternal(appId, appStateData); + operand.notifyDoneUpdatingApplication(appId, null); + } catch (Exception e) { + LOG.error("Error updating app: " + appId, e); + operand.notifyStoreOperationFailed(e); + } + }; + } + + private static class RemoveAppTransition implements + SingleArcTransition { + @Override + public void transition(RMStateStore operand, RMStateStoreEvent event) { + if (!(event instanceof RMStateStoreRemoveAppEvent)) { + // should never happen + LOG.error("Illegal event type: " + event.getClass()); + return; + } + ApplicationState appState = ((RMStateStoreRemoveAppEvent) event) + .getAppState(); + ApplicationId appId = appState.getAppId(); + LOG.info("Removing info for app: " + appId); + try { + operand.removeApplicationStateInternal(appState); + } catch (Exception e) { + LOG.error("Error removing app: " + appId, e); + operand.notifyStoreOperationFailed(e); + } + }; + } + + private static class StoreAppAttemptTransition implements + SingleArcTransition { + @Override + public void transition(RMStateStore operand, RMStateStoreEvent event) { + if (!(event instanceof RMStateStoreAppAttemptEvent)) { + // should never happen + LOG.error("Illegal event type: " + event.getClass()); + return; + } + ApplicationAttemptState attemptState = + ((RMStateStoreAppAttemptEvent) event).getAppAttemptState(); + try { + ApplicationAttemptStateDataPBImpl attemptStateData = + (ApplicationAttemptStateDataPBImpl) ApplicationAttemptStateDataPBImpl + .newInstance(attemptState); + if (LOG.isDebugEnabled()) { + LOG.debug("Storing info for attempt: " + attemptState.getAttemptId()); + } + operand.storeApplicationAttemptStateInternal(attemptState.getAttemptId(), + attemptStateData); + operand.notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(), + null); + } catch (Exception e) { + LOG.error("Error storing appAttempt: " + attemptState.getAttemptId(), e); + operand.notifyStoreOperationFailed(e); + } + }; + } + + private static class UpdateAppAttemptTransition implements + SingleArcTransition { + @Override + public void transition(RMStateStore operand, RMStateStoreEvent event) { + if (!(event instanceof RMStateUpdateAppAttemptEvent)) { + // should never happen + LOG.error("Illegal event type: " + event.getClass()); + return; + } + ApplicationAttemptState attemptState = + ((RMStateUpdateAppAttemptEvent) event).getAppAttemptState(); + try { + ApplicationAttemptStateDataPBImpl attemptStateData = + (ApplicationAttemptStateDataPBImpl) ApplicationAttemptStateDataPBImpl + .newInstance(attemptState); + if (LOG.isDebugEnabled()) { + LOG.debug("Updating info for attempt: " + attemptState.getAttemptId()); + } + operand.updateApplicationAttemptStateInternal(attemptState.getAttemptId(), + attemptStateData); + operand.notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(), + null); + } catch (Exception e) { + LOG.error("Error updating appAttempt: " + attemptState.getAttemptId(), e); + operand.notifyStoreOperationFailed(e); + } + }; + } + public RMStateStore() { super(RMStateStore.class.getName()); + stateMachine = stateMachineFactory.make(this); } /** @@ -596,105 +757,10 @@ public Credentials getCredentialsFromAppAttempt(RMAppAttempt appAttempt) { // Dispatcher related code protected void handleStoreEvent(RMStateStoreEvent event) { - if (event.getType().equals(RMStateStoreEventType.STORE_APP) - || event.getType().equals(RMStateStoreEventType.UPDATE_APP)) { - ApplicationState appState = null; - if (event.getType().equals(RMStateStoreEventType.STORE_APP)) { - appState = ((RMStateStoreAppEvent) event).getAppState(); - } else { - assert event.getType().equals(RMStateStoreEventType.UPDATE_APP); - appState = ((RMStateUpdateAppEvent) event).getAppState(); - } - - Exception storedException = null; - ApplicationStateDataPBImpl appStateData = - (ApplicationStateDataPBImpl) ApplicationStateDataPBImpl - .newApplicationStateData(appState.getSubmitTime(), - appState.getStartTime(), appState.getUser(), - appState.getApplicationSubmissionContext(), appState.getState(), - appState.getDiagnostics(), appState.getFinishTime()); - - ApplicationId appId = - appState.getApplicationSubmissionContext().getApplicationId(); - - LOG.info("Storing info for app: " + appId); - try { - if (event.getType().equals(RMStateStoreEventType.STORE_APP)) { - storeApplicationStateInternal(appId, appStateData); - notifyDoneStoringApplication(appId, storedException); - } else { - assert event.getType().equals(RMStateStoreEventType.UPDATE_APP); - updateApplicationStateInternal(appId, appStateData); - notifyDoneUpdatingApplication(appId, storedException); - } - } catch (Exception e) { - LOG.error("Error storing/updating app: " + appId, e); - notifyStoreOperationFailed(e); - } - } else if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT) - || event.getType().equals(RMStateStoreEventType.UPDATE_APP_ATTEMPT)) { - - ApplicationAttemptState attemptState = null; - if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) { - attemptState = - ((RMStateStoreAppAttemptEvent) event).getAppAttemptState(); - } else { - assert event.getType().equals(RMStateStoreEventType.UPDATE_APP_ATTEMPT); - attemptState = - ((RMStateUpdateAppAttemptEvent) event).getAppAttemptState(); - } - - Exception storedException = null; - Credentials credentials = attemptState.getAppAttemptCredentials(); - ByteBuffer appAttemptTokens = null; - try { - if (credentials != null) { - DataOutputBuffer dob = new DataOutputBuffer(); - credentials.writeTokenStorageToStream(dob); - appAttemptTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); - } - ApplicationAttemptStateDataPBImpl attemptStateData = - (ApplicationAttemptStateDataPBImpl) ApplicationAttemptStateDataPBImpl - .newApplicationAttemptStateData(attemptState.getAttemptId(), - attemptState.getMasterContainer(), appAttemptTokens, - attemptState.getStartTime(), attemptState.getState(), - attemptState.getFinalTrackingUrl(), - attemptState.getDiagnostics(), - attemptState.getFinalApplicationStatus()); - if (LOG.isDebugEnabled()) { - LOG.debug("Storing info for attempt: " + attemptState.getAttemptId()); - } - if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) { - storeApplicationAttemptStateInternal(attemptState.getAttemptId(), - attemptStateData); - notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(), - storedException); - } else { - assert event.getType().equals( - RMStateStoreEventType.UPDATE_APP_ATTEMPT); - updateApplicationAttemptStateInternal(attemptState.getAttemptId(), - attemptStateData); - notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(), - storedException); - } - } catch (Exception e) { - LOG.error( - "Error storing/updating appAttempt: " + attemptState.getAttemptId(), e); - notifyStoreOperationFailed(e); - } - } else if (event.getType().equals(RMStateStoreEventType.REMOVE_APP)) { - ApplicationState appState = - ((RMStateStoreRemoveAppEvent) event).getAppState(); - ApplicationId appId = appState.getAppId(); - LOG.info("Removing info for app: " + appId); - try { - removeApplicationStateInternal(appState); - } catch (Exception e) { - LOG.error("Error removing app: " + appId, e); - notifyStoreOperationFailed(e); - } - } else { - LOG.error("Unknown RMStateStoreEvent type: " + event.getType()); + try { + this.stateMachine.doTransition(event.getType(), event); + } catch (InvalidStateTransitonException e) { + LOG.error("Can't handle this event at current state", e); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java index 255800e..2b216ea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java @@ -18,31 +18,70 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery.records; +import java.io.IOException; import java.nio.ByteBuffer; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.util.Records; /* * Contains the state data that needs to be persisted for an ApplicationAttempt */ @Public @Unstable -public interface ApplicationAttemptStateData { - +public abstract class ApplicationAttemptStateData { + public static ApplicationAttemptStateData newInstance( + ApplicationAttemptId attemptId, Container container, + ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState, + String finalTrackingUrl, String diagnostics, + FinalApplicationStatus amUnregisteredFinalStatus) { + ApplicationAttemptStateData attemptStateData = + Records.newRecord(ApplicationAttemptStateData.class); + attemptStateData.setAttemptId(attemptId); + attemptStateData.setMasterContainer(container); + attemptStateData.setAppAttemptTokens(attemptTokens); + attemptStateData.setState(finalState); + attemptStateData.setFinalTrackingUrl(finalTrackingUrl); + attemptStateData.setDiagnostics(diagnostics); + attemptStateData.setStartTime(startTime); + attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus); + return attemptStateData; + } + + public static ApplicationAttemptStateData newInstance( + ApplicationAttemptState attemptState) throws IOException { + Credentials credentials = attemptState.getAppAttemptCredentials(); + ByteBuffer appAttemptTokens = null; + if (credentials != null) { + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + appAttemptTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + } + return newInstance(attemptState.getAttemptId(), + attemptState.getMasterContainer(), appAttemptTokens, + attemptState.getStartTime(), attemptState.getState(), + attemptState.getFinalTrackingUrl(), + attemptState.getDiagnostics(), + attemptState.getFinalApplicationStatus()); + } + /** * The ApplicationAttemptId for the application attempt * @return ApplicationAttemptId for the application attempt */ @Public @Unstable - public ApplicationAttemptId getAttemptId(); + public abstract ApplicationAttemptId getAttemptId(); - public void setAttemptId(ApplicationAttemptId attemptId); + public abstract void setAttemptId(ApplicationAttemptId attemptId); /* * The master container running the application attempt @@ -50,9 +89,9 @@ */ @Public @Unstable - public Container getMasterContainer(); + public abstract Container getMasterContainer(); - public void setMasterContainer(Container container); + public abstract void setMasterContainer(Container container); /** * The application attempt tokens that belong to this attempt @@ -60,17 +99,17 @@ */ @Public @Unstable - public ByteBuffer getAppAttemptTokens(); + public abstract ByteBuffer getAppAttemptTokens(); - public void setAppAttemptTokens(ByteBuffer attemptTokens); + public abstract void setAppAttemptTokens(ByteBuffer attemptTokens); /** * Get the final state of the application attempt. * @return the final state of the application attempt. */ - public RMAppAttemptState getState(); + public abstract RMAppAttemptState getState(); - public void setState(RMAppAttemptState state); + public abstract void setState(RMAppAttemptState state); /** * Get the original not-proxied final tracking url for the @@ -79,34 +118,34 @@ * @return the original not-proxied final tracking url for the * application */ - public String getFinalTrackingUrl(); + public abstract String getFinalTrackingUrl(); /** * Set the final tracking Url of the AM. * @param url */ - public void setFinalTrackingUrl(String url); + public abstract void setFinalTrackingUrl(String url); /** * Get the diagnositic information of the attempt * @return diagnositic information of the attempt */ - public String getDiagnostics(); + public abstract String getDiagnostics(); - public void setDiagnostics(String diagnostics); + public abstract void setDiagnostics(String diagnostics); /** * Get the start time of the application. * @return start time of the application */ - public long getStartTime(); + public abstract long getStartTime(); - public void setStartTime(long startTime); + public abstract void setStartTime(long startTime); /** * Get the final finish status of the application. * @return final finish status of the application */ - public FinalApplicationStatus getFinalApplicationStatus(); + public abstract FinalApplicationStatus getFinalApplicationStatus(); - public void setFinalApplicationStatus(FinalApplicationStatus finishState); + public abstract void setFinalApplicationStatus(FinalApplicationStatus finishState); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java index 9fce6cf..e0d8b48 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java @@ -24,7 +24,9 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.util.Records; /** * Contains all the state data that needs to be stored persistently @@ -32,19 +34,41 @@ */ @Public @Unstable -public interface ApplicationStateData { - +public abstract class ApplicationStateData { + public static ApplicationStateData newInstance(long submitTime, + long startTime, String user, + ApplicationSubmissionContext submissionContext, + RMAppState state, String diagnostics, long finishTime) { + ApplicationStateData appState = Records.newRecord(ApplicationStateData.class); + appState.setSubmitTime(submitTime); + appState.setStartTime(startTime); + appState.setUser(user); + appState.setApplicationSubmissionContext(submissionContext); + appState.setState(state); + appState.setDiagnostics(diagnostics); + appState.setFinishTime(finishTime); + return appState; + } + + public static ApplicationStateData newInstance( + ApplicationState appState) { + return newInstance(appState.getSubmitTime(), appState.getStartTime(), + appState.getUser(), appState.getApplicationSubmissionContext(), + appState.getState(), appState.getDiagnostics(), + appState.getFinishTime()); + } + /** * The time at which the application was received by the Resource Manager * @return submitTime */ @Public @Unstable - public long getSubmitTime(); + public abstract long getSubmitTime(); @Public @Unstable - public void setSubmitTime(long submitTime); + public abstract void setSubmitTime(long submitTime); /** * Get the start time of the application. @@ -63,11 +87,11 @@ */ @Public @Unstable - public void setUser(String user); + public abstract void setUser(String user); @Public @Unstable - public String getUser(); + public abstract String getUser(); /** * The {@link ApplicationSubmissionContext} for the application @@ -76,34 +100,34 @@ */ @Public @Unstable - public ApplicationSubmissionContext getApplicationSubmissionContext(); + public abstract ApplicationSubmissionContext getApplicationSubmissionContext(); @Public @Unstable - public void setApplicationSubmissionContext( + public abstract void setApplicationSubmissionContext( ApplicationSubmissionContext context); /** * Get the final state of the application. * @return the final state of the application. */ - public RMAppState getState(); + public abstract RMAppState getState(); - public void setState(RMAppState state); + public abstract void setState(RMAppState state); /** * Get the diagnostics information for the application master. * @return the diagnostics information for the application master. */ - public String getDiagnostics(); + public abstract String getDiagnostics(); - public void setDiagnostics(String diagnostics); + public abstract void setDiagnostics(String diagnostics); /** * The finish time of the application. * @return the finish time of the application., */ - public long getFinishTime(); + public abstract long getFinishTime(); - public void setFinishTime(long finishTime); + public abstract void setFinishTime(long finishTime); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java index 75ac2ee..1a7ac94 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java @@ -25,10 +25,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProtoOrBuilder; @@ -36,12 +33,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; -public class ApplicationAttemptStateDataPBImpl -extends ProtoBase -implements ApplicationAttemptStateData { - private static final RecordFactory recordFactory = RecordFactoryProvider - .getRecordFactory(null); +import com.google.protobuf.TextFormat; +public class ApplicationAttemptStateDataPBImpl extends + ApplicationAttemptStateData { ApplicationAttemptStateDataProto proto = ApplicationAttemptStateDataProto.getDefaultInstance(); ApplicationAttemptStateDataProto.Builder builder = null; @@ -76,7 +71,8 @@ private void mergeLocalToBuilder() { builder.setMasterContainer(((ContainerPBImpl)masterContainer).getProto()); } if(this.appAttemptTokens != null) { - builder.setAppAttemptTokens(convertToProtoFormat(this.appAttemptTokens)); + builder.setAppAttemptTokens(ProtoUtils.convertToProtoFormat( + this.appAttemptTokens)); } } @@ -148,7 +144,8 @@ public ByteBuffer getAppAttemptTokens() { if(!p.hasAppAttemptTokens()) { return null; } - this.appAttemptTokens = convertFromProtoFormat(p.getAppAttemptTokens()); + this.appAttemptTokens = ProtoUtils.convertFromProtoFormat( + p.getAppAttemptTokens()); return appAttemptTokens; } @@ -249,24 +246,26 @@ public void setFinalApplicationStatus(FinalApplicationStatus finishState) { builder.setFinalApplicationStatus(convertToProtoFormat(finishState)); } - public static ApplicationAttemptStateData newApplicationAttemptStateData( - ApplicationAttemptId attemptId, Container container, - ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState, - String finalTrackingUrl, String diagnostics, - FinalApplicationStatus amUnregisteredFinalStatus) { - ApplicationAttemptStateData attemptStateData = - recordFactory.newRecordInstance(ApplicationAttemptStateData.class); - attemptStateData.setAttemptId(attemptId); - attemptStateData.setMasterContainer(container); - attemptStateData.setAppAttemptTokens(attemptTokens); - attemptStateData.setState(finalState); - attemptStateData.setFinalTrackingUrl(finalTrackingUrl); - attemptStateData.setDiagnostics(diagnostics); - attemptStateData.setStartTime(startTime); - attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus); - return attemptStateData; + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; } + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + private static String RM_APP_ATTEMPT_PREFIX = "RMATTEMPT_"; public static RMAppAttemptStateProto convertToProtoFormat(RMAppAttemptState e) { return RMAppAttemptStateProto.valueOf(RM_APP_ATTEMPT_PREFIX + e.name()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java index ede8ca7..9bf33fe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java @@ -20,21 +20,15 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMAppStateProto; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; -public class ApplicationStateDataPBImpl -extends ProtoBase -implements ApplicationStateData { - private static final RecordFactory recordFactory = RecordFactoryProvider - .getRecordFactory(null); +import com.google.protobuf.TextFormat; +public class ApplicationStateDataPBImpl extends ApplicationStateData { ApplicationStateDataProto proto = ApplicationStateDataProto.getDefaultInstance(); ApplicationStateDataProto.Builder builder = null; @@ -136,7 +130,7 @@ public ApplicationSubmissionContext getApplicationSubmissionContext() { } applicationSubmissionContext = new ApplicationSubmissionContextPBImpl( - p.getApplicationSubmissionContext()); + p.getApplicationSubmissionContext()); return applicationSubmissionContext; } @@ -200,21 +194,24 @@ public void setFinishTime(long finishTime) { builder.setFinishTime(finishTime); } - public static ApplicationStateData newApplicationStateData(long submitTime, - long startTime, String user, - ApplicationSubmissionContext submissionContext, RMAppState state, - String diagnostics, long finishTime) { - - ApplicationStateData appState = - recordFactory.newRecordInstance(ApplicationStateData.class); - appState.setSubmitTime(submitTime); - appState.setStartTime(startTime); - appState.setUser(user); - appState.setApplicationSubmissionContext(submissionContext); - appState.setState(state); - appState.setDiagnostics(diagnostics); - appState.setFinishTime(finishTime); - return appState; + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); } private static String RM_APP_PREFIX = "RMAPP_";