diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 51024cf..89b8738 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -39,9 +39,9 @@ import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; @@ -303,7 +303,7 @@ protected void submitApplication( throws Exception { ApplicationSubmissionContext appContext = appState.getApplicationSubmissionContext(); - ApplicationId appId = appState.getAppId(); + ApplicationId appId = appContext.getApplicationId(); // create and recover app. RMAppImpl application = @@ -419,7 +419,8 @@ public void recover(RMState state) throws Exception { RMStateStore store = rmContext.getStateStore(); assert store != null; // recover applications - Map appStates = state.getApplicationState(); + Map appStates = + state.getApplicationState(); LOG.info("Recovering " + appStates.size() + " applications"); for (ApplicationState appState : appStates.values()) { recoverApplication(appState, state); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index 0a3b269..5897b8b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -38,8 +38,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.io.DataInputByteBuffer; -import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -53,12 +51,12 @@ import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AMRMTokenSecretManagerStatePBImpl; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStatePBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStatePBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -239,19 +237,14 @@ private void loadRMAppState(RMState rmState) throws Exception { if (LOG.isDebugEnabled()) { LOG.debug("Loading application from node: " + childNodeName); } - ApplicationId appId = ConverterUtils.toApplicationId(childNodeName); - ApplicationStateDataPBImpl appStateData = - new ApplicationStateDataPBImpl( + ApplicationStatePBImpl appState = + new ApplicationStatePBImpl( ApplicationStateDataProto.parseFrom(childData)); - ApplicationState appState = - new ApplicationState(appStateData.getSubmitTime(), - appStateData.getStartTime(), - appStateData.getApplicationSubmissionContext(), - appStateData.getUser(), - appStateData.getState(), - appStateData.getDiagnostics(), appStateData.getFinishTime()); + ApplicationId appId = + appState.getApplicationSubmissionContext().getApplicationId(); // assert child node name is same as actual applicationId - assert appId.equals(appState.context.getApplicationId()); + assert appId.equals( + appState.getApplicationSubmissionContext().getApplicationId()); rmState.appState.put(appId, appState); } else if (childNodeName .startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) { @@ -262,25 +255,9 @@ private void loadRMAppState(RMState rmState) throws Exception { } ApplicationAttemptId attemptId = ConverterUtils.toApplicationAttemptId(childNodeName); - ApplicationAttemptStateDataPBImpl attemptStateData = - new ApplicationAttemptStateDataPBImpl( + ApplicationAttemptStatePBImpl attemptState = + new ApplicationAttemptStatePBImpl( ApplicationAttemptStateDataProto.parseFrom(childData)); - Credentials credentials = null; - if (attemptStateData.getAppAttemptTokens() != null) { - credentials = new Credentials(); - DataInputByteBuffer dibb = new DataInputByteBuffer(); - dibb.reset(attemptStateData.getAppAttemptTokens()); - credentials.readTokenStorageStream(dibb); - } - ApplicationAttemptState attemptState = - new ApplicationAttemptState(attemptId, - attemptStateData.getMasterContainer(), credentials, - attemptStateData.getStartTime(), - attemptStateData.getState(), - attemptStateData.getFinalTrackingUrl(), - attemptStateData.getDiagnostics(), - attemptStateData.getFinalApplicationStatus(), - attemptStateData.getAMContainerExitStatus()); // assert child node name is same as application attempt id assert attemptId.equals(attemptState.getAttemptId()); @@ -380,7 +357,7 @@ private void loadRMDTSecretManagerState(RMState rmState) throws Exception { @Override public synchronized void storeApplicationStateInternal(ApplicationId appId, - ApplicationStateData appStateDataPB) throws Exception { + ApplicationState appStateDataPB) throws Exception { String appIdStr = appId.toString(); Path appDirPath = getAppDir(rmAppRoot, appIdStr); fs.mkdirs(appDirPath); @@ -400,7 +377,7 @@ public synchronized void storeApplicationStateInternal(ApplicationId appId, @Override public synchronized void updateApplicationStateInternal(ApplicationId appId, - ApplicationStateData appStateDataPB) throws Exception { + ApplicationState appStateDataPB) throws Exception { String appIdStr = appId.toString(); Path appDirPath = getAppDir(rmAppRoot, appIdStr); Path nodeCreatePath = getNodePath(appDirPath, appIdStr); @@ -420,7 +397,7 @@ public synchronized void updateApplicationStateInternal(ApplicationId appId, @Override public synchronized void storeApplicationAttemptStateInternal( ApplicationAttemptId appAttemptId, - ApplicationAttemptStateData attemptStateDataPB) + ApplicationAttemptState attemptStateDataPB) throws Exception { Path appDirPath = getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString()); @@ -441,7 +418,7 @@ public synchronized void storeApplicationAttemptStateInternal( @Override public synchronized void updateApplicationAttemptStateInternal( ApplicationAttemptId appAttemptId, - ApplicationAttemptStateData attemptStateDataPB) + ApplicationAttemptState attemptStateDataPB) throws Exception { Path appDirPath = getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString()); @@ -460,9 +437,11 @@ public synchronized void updateApplicationAttemptStateInternal( } @Override - public synchronized void removeApplicationStateInternal(ApplicationState appState) + public synchronized void removeApplicationStateInternal( + ApplicationState appState) throws Exception { - String appId = appState.getAppId().toString(); + String appId = appState.getApplicationSubmissionContext().getApplicationId() + .toString(); Path nodeRemovePath = getAppDir(rmAppRoot, appId); LOG.info("Removing info for app: " + appId + " at: " + nodeRemovePath); deleteFile(nodeRemovePath); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index f56517c..122e820 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -25,8 +25,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.DataInputByteBuffer; -import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -34,8 +32,8 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationState; import com.google.common.annotations.VisibleForTesting; @@ -93,53 +91,29 @@ protected synchronized void closeInternal() throws Exception { } @Override - public void storeApplicationStateInternal(ApplicationId appId, - ApplicationStateData appStateData) + public void storeApplicationStateInternal( + ApplicationId appId, ApplicationState appState) throws Exception { - ApplicationState appState = - new ApplicationState(appStateData.getSubmitTime(), - appStateData.getStartTime(), - appStateData.getApplicationSubmissionContext(), - appStateData.getUser()); state.appState.put(appId, appState); } @Override public void updateApplicationStateInternal(ApplicationId appId, - ApplicationStateData appStateData) throws Exception { - ApplicationState updatedAppState = - new ApplicationState(appStateData.getSubmitTime(), - appStateData.getStartTime(), - appStateData.getApplicationSubmissionContext(), - appStateData.getUser(), appStateData.getState(), - appStateData.getDiagnostics(), appStateData.getFinishTime()); - LOG.info("Updating final state " + appStateData.getState() + " for app: " + ApplicationState appState) throws Exception { + LOG.info("Updating final state " + appState.getState() + " for app: " + appId); if (state.appState.get(appId) != null) { // add the earlier attempts back - updatedAppState.attempts - .putAll(state.appState.get(appId).attempts); + appState.attempts.putAll(state.appState.get(appId).attempts); } - state.appState.put(appId, updatedAppState); + state.appState.put(appId, appState); } @Override public synchronized void storeApplicationAttemptStateInternal( ApplicationAttemptId appAttemptId, - ApplicationAttemptStateData attemptStateData) + ApplicationAttemptState attemptState) throws Exception { - Credentials credentials = null; - if(attemptStateData.getAppAttemptTokens() != null){ - DataInputByteBuffer dibb = new DataInputByteBuffer(); - credentials = new Credentials(); - dibb.reset(attemptStateData.getAppAttemptTokens()); - credentials.readTokenStorageStream(dibb); - } - ApplicationAttemptState attemptState = - new ApplicationAttemptState(appAttemptId, - attemptStateData.getMasterContainer(), credentials, - attemptStateData.getStartTime()); - ApplicationState appState = state.getApplicationState().get( attemptState.getAttemptId().getApplicationId()); if (appState == null) { @@ -151,40 +125,23 @@ public synchronized void storeApplicationAttemptStateInternal( @Override public synchronized void updateApplicationAttemptStateInternal( ApplicationAttemptId appAttemptId, - ApplicationAttemptStateData attemptStateData) + ApplicationAttemptState attemptState) throws Exception { - Credentials credentials = null; - if (attemptStateData.getAppAttemptTokens() != null) { - DataInputByteBuffer dibb = new DataInputByteBuffer(); - credentials = new Credentials(); - dibb.reset(attemptStateData.getAppAttemptTokens()); - credentials.readTokenStorageStream(dibb); - } - ApplicationAttemptState updatedAttemptState = - new ApplicationAttemptState(appAttemptId, - attemptStateData.getMasterContainer(), credentials, - attemptStateData.getStartTime(), attemptStateData.getState(), - attemptStateData.getFinalTrackingUrl(), - attemptStateData.getDiagnostics(), - attemptStateData.getFinalApplicationStatus(), - attemptStateData.getAMContainerExitStatus()); - ApplicationState appState = - state.getApplicationState().get( - updatedAttemptState.getAttemptId().getApplicationId()); + state.getApplicationState().get(appAttemptId.getApplicationId()); if (appState == null) { throw new YarnRuntimeException("Application doesn't exist"); } - LOG.info("Updating final state " + updatedAttemptState.getState() - + " for attempt: " + updatedAttemptState.getAttemptId()); - appState.attempts.put(updatedAttemptState.getAttemptId(), - updatedAttemptState); + LOG.info("Updating final state " + attemptState.getState() + + " for attempt: " + attemptState.getAttemptId()); + appState.attempts.put(attemptState.getAttemptId(), attemptState); } @Override public synchronized void removeApplicationStateInternal( ApplicationState appState) throws Exception { - ApplicationId appId = appState.getAppId(); + ApplicationId appId = + appState.getApplicationSubmissionContext().getApplicationId(); ApplicationState removed = state.appState.remove(appId); if (removed == null) { throw new YarnRuntimeException("Removing non-exsisting application state"); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java index e910c19..ea9363f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java @@ -27,8 +27,8 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationState; @Unstable public class NullRMStateStore extends RMStateStore { @@ -60,13 +60,13 @@ public RMState loadState() throws Exception { @Override protected void storeApplicationStateInternal(ApplicationId appId, - ApplicationStateData appStateData) throws Exception { + ApplicationState appStateData) throws Exception { // Do nothing } @Override protected void storeApplicationAttemptStateInternal(ApplicationAttemptId attemptId, - ApplicationAttemptStateData attemptStateData) throws Exception { + ApplicationAttemptState attemptStateData) throws Exception { // Do nothing } @@ -108,13 +108,13 @@ public void removeRMDTMasterKeyState(DelegationKey delegationKey) throws Excepti @Override protected void updateApplicationStateInternal(ApplicationId appId, - ApplicationStateData appStateData) throws Exception { + ApplicationState appStateData) throws Exception { // Do nothing } @Override protected void updateApplicationAttemptStateInternal(ApplicationAttemptId attemptId, - ApplicationAttemptStateData attemptStateData) throws Exception { + ApplicationAttemptState attemptStateData) throws Exception { } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 714a108..b26a950 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; +import java.io.IOException; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -37,9 +38,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerExitStatus; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; @@ -49,16 +47,15 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent; import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; + import org.apache.hadoop.yarn.state.InvalidStateTransitonException; import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.StateMachine; @@ -125,13 +122,13 @@ public void transition(RMStateStore store, RMStateStoreEvent event) { LOG.error("Illegal event type: " + event.getClass()); return; } - ApplicationState appState = ((RMStateStoreAppEvent) event).getAppState(); - ApplicationId appId = appState.getAppId(); - ApplicationStateData appStateData = ApplicationStateData - .newInstance(appState); + ApplicationState appState = + ((RMStateStoreAppEvent) event).getAppState(); + ApplicationId appId = + appState.getApplicationSubmissionContext().getApplicationId(); LOG.info("Storing info for app: " + appId); try { - store.storeApplicationStateInternal(appId, appStateData); + store.storeApplicationStateInternal(appId, appState); store.notifyApplication(new RMAppEvent(appId, RMAppEventType.APP_NEW_SAVED)); } catch (Exception e) { @@ -150,13 +147,13 @@ public void transition(RMStateStore store, RMStateStoreEvent event) { LOG.error("Illegal event type: " + event.getClass()); return; } - ApplicationState appState = ((RMStateUpdateAppEvent) event).getAppState(); - ApplicationId appId = appState.getAppId(); - ApplicationStateData appStateData = ApplicationStateData - .newInstance(appState); + ApplicationState appState = + ((RMStateUpdateAppEvent) event).getAppState(); + ApplicationId appId = + appState.getApplicationSubmissionContext().getApplicationId(); LOG.info("Updating info for app: " + appId); try { - store.updateApplicationStateInternal(appId, appStateData); + store.updateApplicationStateInternal(appId, appState); store.notifyApplication(new RMAppEvent(appId, RMAppEventType.APP_UPDATE_SAVED)); } catch (Exception e) { @@ -175,9 +172,10 @@ public void transition(RMStateStore store, RMStateStoreEvent event) { LOG.error("Illegal event type: " + event.getClass()); return; } - ApplicationState appState = ((RMStateStoreRemoveAppEvent) event) - .getAppState(); - ApplicationId appId = appState.getAppId(); + ApplicationState appState = + ((RMStateStoreRemoveAppEvent) event).getAppState(); + ApplicationId appId = + appState.getApplicationSubmissionContext().getApplicationId(); LOG.info("Removing info for app: " + appId); try { store.removeApplicationStateInternal(appState); @@ -200,13 +198,11 @@ public void transition(RMStateStore store, RMStateStoreEvent event) { ApplicationAttemptState attemptState = ((RMStateStoreAppAttemptEvent) event).getAppAttemptState(); try { - ApplicationAttemptStateData attemptStateData = - ApplicationAttemptStateData.newInstance(attemptState); if (LOG.isDebugEnabled()) { LOG.debug("Storing info for attempt: " + attemptState.getAttemptId()); } store.storeApplicationAttemptStateInternal(attemptState.getAttemptId(), - attemptStateData); + attemptState); store.notifyApplicationAttempt(new RMAppAttemptEvent (attemptState.getAttemptId(), RMAppAttemptEventType.ATTEMPT_NEW_SAVED)); @@ -229,13 +225,11 @@ public void transition(RMStateStore store, RMStateStoreEvent event) { ApplicationAttemptState attemptState = ((RMStateUpdateAppAttemptEvent) event).getAppAttemptState(); try { - ApplicationAttemptStateData attemptStateData = ApplicationAttemptStateData - .newInstance(attemptState); if (LOG.isDebugEnabled()) { LOG.debug("Updating info for attempt: " + attemptState.getAttemptId()); } store.updateApplicationAttemptStateInternal(attemptState.getAttemptId(), - attemptStateData); + attemptState); store.notifyApplicationAttempt(new RMAppAttemptEvent (attemptState.getAttemptId(), RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED)); @@ -251,137 +245,6 @@ public RMStateStore() { stateMachine = stateMachineFactory.make(this); } - /** - * State of an application attempt - */ - public static class ApplicationAttemptState { - final ApplicationAttemptId attemptId; - final Container masterContainer; - final Credentials appAttemptCredentials; - long startTime = 0; - // fields set when attempt completes - RMAppAttemptState state; - String finalTrackingUrl = "N/A"; - String diagnostics; - int exitStatus = ContainerExitStatus.INVALID; - FinalApplicationStatus amUnregisteredFinalStatus; - - public ApplicationAttemptState(ApplicationAttemptId attemptId, - Container masterContainer, Credentials appAttemptCredentials, - long startTime) { - this(attemptId, masterContainer, appAttemptCredentials, startTime, null, - null, "", null, ContainerExitStatus.INVALID); - } - - public ApplicationAttemptState(ApplicationAttemptId attemptId, - Container masterContainer, Credentials appAttemptCredentials, - long startTime, RMAppAttemptState state, String finalTrackingUrl, - String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus, - int exitStatus) { - this.attemptId = attemptId; - this.masterContainer = masterContainer; - this.appAttemptCredentials = appAttemptCredentials; - this.startTime = startTime; - this.state = state; - this.finalTrackingUrl = finalTrackingUrl; - this.diagnostics = diagnostics == null ? "" : diagnostics; - this.amUnregisteredFinalStatus = amUnregisteredFinalStatus; - this.exitStatus = exitStatus; - } - - public Container getMasterContainer() { - return masterContainer; - } - public ApplicationAttemptId getAttemptId() { - return attemptId; - } - public Credentials getAppAttemptCredentials() { - return appAttemptCredentials; - } - public RMAppAttemptState getState(){ - return state; - } - public String getFinalTrackingUrl() { - return finalTrackingUrl; - } - public String getDiagnostics() { - return diagnostics; - } - public long getStartTime() { - return startTime; - } - public FinalApplicationStatus getFinalApplicationStatus() { - return amUnregisteredFinalStatus; - } - public int getAMContainerExitStatus(){ - return this.exitStatus; - } - } - - /** - * State of an application application - */ - public static class ApplicationState { - final ApplicationSubmissionContext context; - final long submitTime; - final long startTime; - final String user; - Map attempts = - new HashMap(); - // fields set when application completes. - RMAppState state; - String diagnostics; - long finishTime; - - public ApplicationState(long submitTime, - long startTime, ApplicationSubmissionContext context, String user) { - this(submitTime, startTime, context, user, null, "", 0); - } - - public ApplicationState(long submitTime, - long startTime,ApplicationSubmissionContext context, - String user, RMAppState state, String diagnostics, long finishTime) { - this.submitTime = submitTime; - this.startTime = startTime; - this.context = context; - this.user = user; - this.state = state; - this.diagnostics = diagnostics == null ? "" : diagnostics; - this.finishTime = finishTime; - } - - public ApplicationId getAppId() { - return context.getApplicationId(); - } - public long getSubmitTime() { - return submitTime; - } - public long getStartTime() { - return startTime; - } - public int getAttemptCount() { - return attempts.size(); - } - public ApplicationSubmissionContext getApplicationSubmissionContext() { - return context; - } - public ApplicationAttemptState getAttempt(ApplicationAttemptId attemptId) { - return attempts.get(attemptId); - } - public String getUser() { - return user; - } - public RMAppState getState() { - return state; - } - public String getDiagnostics() { - return diagnostics; - } - public long getFinishTime() { - return finishTime; - } - } - public static class RMDTSecretManagerState { // DTIdentifier -> renewDate Map delegationTokenState = @@ -556,13 +419,15 @@ public synchronized void storeNewApplication(RMApp app) { .getApplicationSubmissionContext(); assert context instanceof ApplicationSubmissionContextPBImpl; ApplicationState appState = - new ApplicationState(app.getSubmitTime(), app.getStartTime(), context, - app.getUser()); + ApplicationState + .newInstance(app.getSubmitTime(), app.getStartTime(), context, + app.getUser()); dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState)); } @SuppressWarnings("unchecked") - public synchronized void updateApplicationState(ApplicationState appState) { + public synchronized void updateApplicationState( + ApplicationState appState) { dispatcher.getEventHandler().handle(new RMStateUpdateAppEvent(appState)); } @@ -572,10 +437,10 @@ public synchronized void updateApplicationState(ApplicationState appState) { * application. */ protected abstract void storeApplicationStateInternal(ApplicationId appId, - ApplicationStateData appStateData) throws Exception; + ApplicationState appStateData) throws Exception; protected abstract void updateApplicationStateInternal(ApplicationId appId, - ApplicationStateData appStateData) throws Exception; + ApplicationState appStateData) throws Exception; @SuppressWarnings("unchecked") /** @@ -587,10 +452,16 @@ protected abstract void updateApplicationStateInternal(ApplicationId appId, public synchronized void storeNewApplicationAttempt(RMAppAttempt appAttempt) { Credentials credentials = getCredentialsFromAppAttempt(appAttempt); - ApplicationAttemptState attemptState = - new ApplicationAttemptState(appAttempt.getAppAttemptId(), - appAttempt.getMasterContainer(), credentials, - appAttempt.getStartTime()); + ApplicationAttemptState attemptState = null; + try { + attemptState = + ApplicationAttemptState.newInstance(appAttempt.getAppAttemptId(), + appAttempt.getMasterContainer(), ApplicationAttemptState + .convertCredentialsToByteBuffer(credentials), + appAttempt.getStartTime()); + } catch (IOException ioe) { + notifyStoreOperationFailed(ioe); + } dispatcher.getEventHandler().handle( new RMStateStoreAppAttemptEvent(attemptState)); @@ -610,11 +481,11 @@ public synchronized void updateApplicationAttemptState( */ protected abstract void storeApplicationAttemptStateInternal( ApplicationAttemptId attemptId, - ApplicationAttemptStateData attemptStateData) throws Exception; + ApplicationAttemptState attemptStateData) throws Exception; protected abstract void updateApplicationAttemptStateInternal( ApplicationAttemptId attemptId, - ApplicationAttemptStateData attemptStateData) throws Exception; + ApplicationAttemptState attemptStateData) throws Exception; /** * RMDTSecretManager call this to store the state of a delegation token @@ -738,15 +609,21 @@ public abstract void storeOrUpdateAMRMTokenSecretManagerState( */ @SuppressWarnings("unchecked") public synchronized void removeApplication(RMApp app) { - ApplicationState appState = new ApplicationState( - app.getSubmitTime(), app.getStartTime(), + ApplicationState appState = + ApplicationState.newInstance(app.getSubmitTime(), app.getStartTime(), app.getApplicationSubmissionContext(), app.getUser()); for(RMAppAttempt appAttempt : app.getAppAttempts().values()) { Credentials credentials = getCredentialsFromAppAttempt(appAttempt); - ApplicationAttemptState attemptState = - new ApplicationAttemptState(appAttempt.getAppAttemptId(), - appAttempt.getMasterContainer(), credentials, - appAttempt.getStartTime()); + ApplicationAttemptState attemptState = null; + try { + attemptState = ApplicationAttemptState + .newInstance(appAttempt.getAppAttemptId(), + appAttempt.getMasterContainer(), ApplicationAttemptState + .convertCredentialsToByteBuffer(credentials), + appAttempt.getStartTime()); + } catch (IOException ioe) { + notifyStoreOperationFailed(ioe); + } appState.attempts.put(attemptState.getAttemptId(), attemptState); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppAttemptEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppAttemptEvent.java index c4a04bc..0479931 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppAttemptEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppAttemptEvent.java @@ -18,7 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptState; public class RMStateStoreAppAttemptEvent extends RMStateStoreEvent { ApplicationAttemptState attemptState; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppEvent.java index 99f8e37..9afe45a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppEvent.java @@ -18,7 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationState; public class RMStateStoreAppEvent extends RMStateStoreEvent { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java index 402feb9..19fc151 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java @@ -18,7 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationState; public class RMStateStoreRemoveAppEvent extends RMStateStoreEvent { ApplicationState appState; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppAttemptEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppAttemptEvent.java index 9ded673..b804d9e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppAttemptEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppAttemptEvent.java @@ -18,13 +18,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptState; public class RMStateUpdateAppAttemptEvent extends RMStateStoreEvent { ApplicationAttemptState attemptState; - public RMStateUpdateAppAttemptEvent(ApplicationAttemptState attemptState) { + public RMStateUpdateAppAttemptEvent( + ApplicationAttemptState attemptState) { super(RMStateStoreEventType.UPDATE_APP_ATTEMPT); this.attemptState = attemptState; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java index 9bb96e5..06088c2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java @@ -18,7 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationState; public class RMStateUpdateAppEvent extends RMStateStoreEvent { private final ApplicationState appState; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 1b1ec76..77d4789 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -34,8 +34,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.DataInputByteBuffer; -import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.ZKUtil; @@ -54,12 +52,12 @@ import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.resourcemanager.RMZKUtils; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AMRMTokenSecretManagerStatePBImpl; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStatePBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStatePBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.zookeeper.CreateMode; @@ -551,17 +549,11 @@ private synchronized void loadRMAppState(RMState rmState) throws Exception { LOG.debug("Loading application from znode: " + childNodeName); } ApplicationId appId = ConverterUtils.toApplicationId(childNodeName); - ApplicationStateDataPBImpl appStateData = - new ApplicationStateDataPBImpl( + ApplicationStatePBImpl appState = + new ApplicationStatePBImpl( ApplicationStateDataProto.parseFrom(childData)); - ApplicationState appState = - new ApplicationState(appStateData.getSubmitTime(), - appStateData.getStartTime(), - appStateData.getApplicationSubmissionContext(), - appStateData.getUser(), - appStateData.getState(), - appStateData.getDiagnostics(), appStateData.getFinishTime()); - if (!appId.equals(appState.context.getApplicationId())) { + if (!appId.equals( + appState.getApplicationSubmissionContext().getApplicationId())) { throw new YarnRuntimeException("The child node name is different " + "from the application id"); } @@ -583,27 +575,9 @@ private void loadApplicationAttemptState(ApplicationState appState, String attemptPath = getNodePath(appPath, attemptIDStr); byte[] attemptData = getDataWithRetries(attemptPath, true); - ApplicationAttemptId attemptId = - ConverterUtils.toApplicationAttemptId(attemptIDStr); - ApplicationAttemptStateDataPBImpl attemptStateData = - new ApplicationAttemptStateDataPBImpl( + ApplicationAttemptStatePBImpl attemptState = + new ApplicationAttemptStatePBImpl( ApplicationAttemptStateDataProto.parseFrom(attemptData)); - Credentials credentials = null; - if (attemptStateData.getAppAttemptTokens() != null) { - credentials = new Credentials(); - DataInputByteBuffer dibb = new DataInputByteBuffer(); - dibb.reset(attemptStateData.getAppAttemptTokens()); - credentials.readTokenStorageStream(dibb); - } - - ApplicationAttemptState attemptState = - new ApplicationAttemptState(attemptId, - attemptStateData.getMasterContainer(), credentials, - attemptStateData.getStartTime(), attemptStateData.getState(), - attemptStateData.getFinalTrackingUrl(), - attemptStateData.getDiagnostics(), - attemptStateData.getFinalApplicationStatus(), - attemptStateData.getAMContainerExitStatus()); appState.attempts.put(attemptState.getAttemptId(), attemptState); } @@ -613,7 +587,7 @@ private void loadApplicationAttemptState(ApplicationState appState, @Override public synchronized void storeApplicationStateInternal(ApplicationId appId, - ApplicationStateData appStateDataPB) throws Exception { + ApplicationState appStateDataPB) throws Exception { String nodeCreatePath = getNodePath(rmAppRoot, appId.toString()); if (LOG.isDebugEnabled()) { @@ -627,7 +601,7 @@ public synchronized void storeApplicationStateInternal(ApplicationId appId, @Override public synchronized void updateApplicationStateInternal(ApplicationId appId, - ApplicationStateData appStateDataPB) throws Exception { + ApplicationState appStateDataPB) throws Exception { String nodeUpdatePath = getNodePath(rmAppRoot, appId.toString()); if (LOG.isDebugEnabled()) { @@ -649,7 +623,7 @@ public synchronized void updateApplicationStateInternal(ApplicationId appId, @Override public synchronized void storeApplicationAttemptStateInternal( ApplicationAttemptId appAttemptId, - ApplicationAttemptStateData attemptStateDataPB) + ApplicationAttemptState attemptStateDataPB) throws Exception { String appDirPath = getNodePath(rmAppRoot, appAttemptId.getApplicationId().toString()); @@ -667,7 +641,7 @@ public synchronized void storeApplicationAttemptStateInternal( @Override public synchronized void updateApplicationAttemptStateInternal( ApplicationAttemptId appAttemptId, - ApplicationAttemptStateData attemptStateDataPB) + ApplicationAttemptState attemptStateDataPB) throws Exception { String appIdStr = appAttemptId.getApplicationId().toString(); String appAttemptIdStr = appAttemptId.toString(); @@ -690,9 +664,11 @@ public synchronized void updateApplicationAttemptStateInternal( } @Override - public synchronized void removeApplicationStateInternal(ApplicationState appState) + public synchronized void removeApplicationStateInternal( + ApplicationState appState) throws Exception { - String appId = appState.getAppId().toString(); + String appId = appState.getApplicationSubmissionContext().getApplicationId() + .toString(); String appIdRemovePath = getNodePath(rmAppRoot, appId); ArrayList opList = new ArrayList(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptState.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptState.java new file mode 100644 index 0000000..ce686e3 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptState.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery.records; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.util.Records; + +/* + * Contains the state data that needs to be persisted for an ApplicationAttempt + */ +@Public +@Unstable +public abstract class ApplicationAttemptState { + public static ApplicationAttemptState newInstance( + ApplicationAttemptId attemptId, Container container, + ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState, + String finalTrackingUrl, String diagnostics, + FinalApplicationStatus amUnregisteredFinalStatus, int exitStatus) { + ApplicationAttemptState attemptStateData = + Records.newRecord(ApplicationAttemptState.class); + attemptStateData.setAttemptId(attemptId); + attemptStateData.setMasterContainer(container); + attemptStateData.setAppAttemptTokens(attemptTokens); + attemptStateData.setState(finalState); + attemptStateData.setFinalTrackingUrl(finalTrackingUrl); + attemptStateData.setDiagnostics(diagnostics == null ? "" : diagnostics); + attemptStateData.setStartTime(startTime); + attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus); + attemptStateData.setAMContainerExitStatus(exitStatus); + return attemptStateData; + } + + public static ApplicationAttemptState newInstance( + ApplicationAttemptId attemptId, Container masterContainer, + ByteBuffer attemptTokens, long startTime) { + return newInstance(attemptId, masterContainer, attemptTokens, + startTime, null, "N/A", "", null, ContainerExitStatus.INVALID); + } + + public static ByteBuffer convertCredentialsToByteBuffer( + Credentials credentials) throws IOException { + ByteBuffer appAttemptTokens = null; + DataOutputBuffer dob = new DataOutputBuffer(); + try { + if (credentials != null) { + credentials.writeTokenStorageToStream(dob); + appAttemptTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + } + return appAttemptTokens; + } finally { + IOUtils.closeStream(dob); + } + } + + public abstract ApplicationAttemptStateDataProto getProto(); + + /** + * The ApplicationAttemptId for the application attempt + * @return ApplicationAttemptId for the application attempt + */ + @Public + @Unstable + public abstract ApplicationAttemptId getAttemptId(); + + public abstract void setAttemptId(ApplicationAttemptId attemptId); + + /* + * The master container running the application attempt + * @return Container that hosts the attempt + */ + @Public + @Unstable + public abstract Container getMasterContainer(); + + public abstract void setMasterContainer(Container container); + + /** + * The application attempt tokens that belong to this attempt + * @return The application attempt tokens that belong to this attempt + */ + @Public + @Unstable + public abstract ByteBuffer getAppAttemptTokens(); + + public abstract void setAppAttemptTokens(ByteBuffer attemptTokens); + + /** + * Get the final state of the application attempt. + * @return the final state of the application attempt. + */ + public abstract RMAppAttemptState getState(); + + public abstract void setState(RMAppAttemptState state); + + /** + * Get the original not-proxied final tracking url for the + * application. This is intended to only be used by the proxy itself. + * + * @return the original not-proxied final tracking url for the + * application + */ + public abstract String getFinalTrackingUrl(); + + /** + * Set the final tracking Url of the AM. + * @param url + */ + public abstract void setFinalTrackingUrl(String url); + /** + * Get the diagnositic information of the attempt + * @return diagnositic information of the attempt + */ + public abstract String getDiagnostics(); + + public abstract void setDiagnostics(String diagnostics); + + /** + * Get the start time of the application. + * @return start time of the application + */ + public abstract long getStartTime(); + + public abstract void setStartTime(long startTime); + + /** + * Get the final finish status of the application. + * @return final finish status of the application + */ + public abstract FinalApplicationStatus getFinalApplicationStatus(); + + public abstract void setFinalApplicationStatus( + FinalApplicationStatus finishState); + + public abstract int getAMContainerExitStatus(); + + public abstract void setAMContainerExitStatus(int exitStatus); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java deleted file mode 100644 index 5cb9787..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java +++ /dev/null @@ -1,160 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.recovery.records; - -import java.io.IOException; -import java.nio.ByteBuffer; - -import org.apache.hadoop.classification.InterfaceAudience.Public; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; -import org.apache.hadoop.yarn.util.Records; - -/* - * Contains the state data that needs to be persisted for an ApplicationAttempt - */ -@Public -@Unstable -public abstract class ApplicationAttemptStateData { - public static ApplicationAttemptStateData newInstance( - ApplicationAttemptId attemptId, Container container, - ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState, - String finalTrackingUrl, String diagnostics, - FinalApplicationStatus amUnregisteredFinalStatus, int exitStatus) { - ApplicationAttemptStateData attemptStateData = - Records.newRecord(ApplicationAttemptStateData.class); - attemptStateData.setAttemptId(attemptId); - attemptStateData.setMasterContainer(container); - attemptStateData.setAppAttemptTokens(attemptTokens); - attemptStateData.setState(finalState); - attemptStateData.setFinalTrackingUrl(finalTrackingUrl); - attemptStateData.setDiagnostics(diagnostics); - attemptStateData.setStartTime(startTime); - attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus); - attemptStateData.setAMContainerExitStatus(exitStatus); - return attemptStateData; - } - - public static ApplicationAttemptStateData newInstance( - ApplicationAttemptState attemptState) throws IOException { - Credentials credentials = attemptState.getAppAttemptCredentials(); - ByteBuffer appAttemptTokens = null; - if (credentials != null) { - DataOutputBuffer dob = new DataOutputBuffer(); - credentials.writeTokenStorageToStream(dob); - appAttemptTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); - } - return newInstance(attemptState.getAttemptId(), - attemptState.getMasterContainer(), appAttemptTokens, - attemptState.getStartTime(), attemptState.getState(), - attemptState.getFinalTrackingUrl(), attemptState.getDiagnostics(), - attemptState.getFinalApplicationStatus(), - attemptState.getAMContainerExitStatus()); - } - - public abstract ApplicationAttemptStateDataProto getProto(); - - /** - * The ApplicationAttemptId for the application attempt - * @return ApplicationAttemptId for the application attempt - */ - @Public - @Unstable - public abstract ApplicationAttemptId getAttemptId(); - - public abstract void setAttemptId(ApplicationAttemptId attemptId); - - /* - * The master container running the application attempt - * @return Container that hosts the attempt - */ - @Public - @Unstable - public abstract Container getMasterContainer(); - - public abstract void setMasterContainer(Container container); - - /** - * The application attempt tokens that belong to this attempt - * @return The application attempt tokens that belong to this attempt - */ - @Public - @Unstable - public abstract ByteBuffer getAppAttemptTokens(); - - public abstract void setAppAttemptTokens(ByteBuffer attemptTokens); - - /** - * Get the final state of the application attempt. - * @return the final state of the application attempt. - */ - public abstract RMAppAttemptState getState(); - - public abstract void setState(RMAppAttemptState state); - - /** - * Get the original not-proxied final tracking url for the - * application. This is intended to only be used by the proxy itself. - * - * @return the original not-proxied final tracking url for the - * application - */ - public abstract String getFinalTrackingUrl(); - - /** - * Set the final tracking Url of the AM. - * @param url - */ - public abstract void setFinalTrackingUrl(String url); - /** - * Get the diagnositic information of the attempt - * @return diagnositic information of the attempt - */ - public abstract String getDiagnostics(); - - public abstract void setDiagnostics(String diagnostics); - - /** - * Get the start time of the application. - * @return start time of the application - */ - public abstract long getStartTime(); - - public abstract void setStartTime(long startTime); - - /** - * Get the final finish status of the application. - * @return final finish status of the application - */ - public abstract FinalApplicationStatus getFinalApplicationStatus(); - - public abstract void setFinalApplicationStatus( - FinalApplicationStatus finishState); - - public abstract int getAMContainerExitStatus(); - - public abstract void setAMContainerExitStatus(int exitStatus); -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationState.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationState.java new file mode 100644 index 0000000..c6ef0b6 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationState.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery.records; + +import java.util.HashMap; +import java.util.Map; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.util.Records; + +/** + * Contains all the state data that needs to be stored persistently + * for an Application + */ +@Public +@Unstable +public abstract class ApplicationState { + public Map attempts = + new HashMap(); + + public static ApplicationState newInstance(long submitTime, + long startTime, String user, + ApplicationSubmissionContext submissionContext, + RMAppState state, String diagnostics, long finishTime) { + ApplicationState appState = Records.newRecord(ApplicationState.class); + appState.setSubmitTime(submitTime); + appState.setStartTime(startTime); + appState.setUser(user); + appState.setApplicationSubmissionContext(submissionContext); + appState.setState(state); + appState.setDiagnostics(diagnostics); + appState.setFinishTime(finishTime); + return appState; + } + + public static ApplicationState newInstance(long submitTime, + long startTime, ApplicationSubmissionContext context, String user) { + return newInstance(submitTime, startTime, user, context, null, "", 0); + } + + public int getAttemptCount() { + return attempts.size(); + } + + public ApplicationAttemptState getAttempt( + ApplicationAttemptId attemptId) { + return attempts.get(attemptId); + } + + public abstract ApplicationStateDataProto getProto(); + + /** + * The time at which the application was received by the Resource Manager + * @return submitTime + */ + @Public + @Unstable + public abstract long getSubmitTime(); + + @Public + @Unstable + public abstract void setSubmitTime(long submitTime); + + /** + * Get the start time of the application. + * @return start time of the application + */ + @Public + @Stable + public abstract long getStartTime(); + + @Private + @Unstable + public abstract void setStartTime(long startTime); + + /** + * The application submitter + */ + @Public + @Unstable + public abstract void setUser(String user); + + @Public + @Unstable + public abstract String getUser(); + + /** + * The {@link ApplicationSubmissionContext} for the application + * {@link ApplicationId} can be obtained from the this + * @return ApplicationSubmissionContext + */ + @Public + @Unstable + public abstract ApplicationSubmissionContext getApplicationSubmissionContext(); + + @Public + @Unstable + public abstract void setApplicationSubmissionContext( + ApplicationSubmissionContext context); + + /** + * Get the final state of the application. + * @return the final state of the application. + */ + public abstract RMAppState getState(); + + public abstract void setState(RMAppState state); + + /** + * Get the diagnostics information for the application master. + * @return the diagnostics information for the application master. + */ + public abstract String getDiagnostics(); + + public abstract void setDiagnostics(String diagnostics); + + /** + * The finish time of the application. + * @return the finish time of the application., + */ + public abstract long getFinishTime(); + + public abstract void setFinishTime(long finishTime); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java deleted file mode 100644 index eff0445..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java +++ /dev/null @@ -1,136 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.recovery.records; - -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceAudience.Public; -import org.apache.hadoop.classification.InterfaceStability.Stable; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; -import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; -import org.apache.hadoop.yarn.util.Records; - -/** - * Contains all the state data that needs to be stored persistently - * for an Application - */ -@Public -@Unstable -public abstract class ApplicationStateData { - public static ApplicationStateData newInstance(long submitTime, - long startTime, String user, - ApplicationSubmissionContext submissionContext, - RMAppState state, String diagnostics, long finishTime) { - ApplicationStateData appState = Records.newRecord(ApplicationStateData.class); - appState.setSubmitTime(submitTime); - appState.setStartTime(startTime); - appState.setUser(user); - appState.setApplicationSubmissionContext(submissionContext); - appState.setState(state); - appState.setDiagnostics(diagnostics); - appState.setFinishTime(finishTime); - return appState; - } - - public static ApplicationStateData newInstance( - ApplicationState appState) { - return newInstance(appState.getSubmitTime(), appState.getStartTime(), - appState.getUser(), appState.getApplicationSubmissionContext(), - appState.getState(), appState.getDiagnostics(), - appState.getFinishTime()); - } - - public abstract ApplicationStateDataProto getProto(); - - /** - * The time at which the application was received by the Resource Manager - * @return submitTime - */ - @Public - @Unstable - public abstract long getSubmitTime(); - - @Public - @Unstable - public abstract void setSubmitTime(long submitTime); - - /** - * Get the start time of the application. - * @return start time of the application - */ - @Public - @Stable - public abstract long getStartTime(); - - @Private - @Unstable - public abstract void setStartTime(long startTime); - - /** - * The application submitter - */ - @Public - @Unstable - public abstract void setUser(String user); - - @Public - @Unstable - public abstract String getUser(); - - /** - * The {@link ApplicationSubmissionContext} for the application - * {@link ApplicationId} can be obtained from the this - * @return ApplicationSubmissionContext - */ - @Public - @Unstable - public abstract ApplicationSubmissionContext getApplicationSubmissionContext(); - - @Public - @Unstable - public abstract void setApplicationSubmissionContext( - ApplicationSubmissionContext context); - - /** - * Get the final state of the application. - * @return the final state of the application. - */ - public abstract RMAppState getState(); - - public abstract void setState(RMAppState state); - - /** - * Get the diagnostics information for the application master. - * @return the diagnostics information for the application master. - */ - public abstract String getDiagnostics(); - - public abstract void setDiagnostics(String diagnostics); - - /** - * The finish time of the application. - * @return the finish time of the application., - */ - public abstract long getFinishTime(); - - public abstract void setFinishTime(long finishTime); -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java deleted file mode 100644 index 5c62d63..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java +++ /dev/null @@ -1,297 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb; - -import java.nio.ByteBuffer; - -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; -import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto; -import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto; -import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProtoOrBuilder; -import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.RMAppAttemptStateProto; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; - -import com.google.protobuf.TextFormat; - -public class ApplicationAttemptStateDataPBImpl extends - ApplicationAttemptStateData { - ApplicationAttemptStateDataProto proto = - ApplicationAttemptStateDataProto.getDefaultInstance(); - ApplicationAttemptStateDataProto.Builder builder = null; - boolean viaProto = false; - - private ApplicationAttemptId attemptId = null; - private Container masterContainer = null; - private ByteBuffer appAttemptTokens = null; - - public ApplicationAttemptStateDataPBImpl() { - builder = ApplicationAttemptStateDataProto.newBuilder(); - } - - public ApplicationAttemptStateDataPBImpl( - ApplicationAttemptStateDataProto proto) { - this.proto = proto; - viaProto = true; - } - - @Override - public ApplicationAttemptStateDataProto getProto() { - mergeLocalToProto(); - proto = viaProto ? proto : builder.build(); - viaProto = true; - return proto; - } - - private void mergeLocalToBuilder() { - if (this.attemptId != null) { - builder.setAttemptId(((ApplicationAttemptIdPBImpl)attemptId).getProto()); - } - if(this.masterContainer != null) { - builder.setMasterContainer(((ContainerPBImpl)masterContainer).getProto()); - } - if(this.appAttemptTokens != null) { - builder.setAppAttemptTokens(ProtoUtils.convertToProtoFormat( - this.appAttemptTokens)); - } - } - - private void mergeLocalToProto() { - if (viaProto) - maybeInitBuilder(); - mergeLocalToBuilder(); - proto = builder.build(); - viaProto = true; - } - - private void maybeInitBuilder() { - if (viaProto || builder == null) { - builder = ApplicationAttemptStateDataProto.newBuilder(proto); - } - viaProto = false; - } - - @Override - public ApplicationAttemptId getAttemptId() { - ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; - if(attemptId != null) { - return attemptId; - } - if (!p.hasAttemptId()) { - return null; - } - attemptId = new ApplicationAttemptIdPBImpl(p.getAttemptId()); - return attemptId; - } - - @Override - public void setAttemptId(ApplicationAttemptId attemptId) { - maybeInitBuilder(); - if (attemptId == null) { - builder.clearAttemptId(); - } - this.attemptId = attemptId; - } - - @Override - public Container getMasterContainer() { - ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; - if(masterContainer != null) { - return masterContainer; - } - if (!p.hasMasterContainer()) { - return null; - } - masterContainer = new ContainerPBImpl(p.getMasterContainer()); - return masterContainer; - } - - @Override - public void setMasterContainer(Container container) { - maybeInitBuilder(); - if (container == null) { - builder.clearMasterContainer(); - } - this.masterContainer = container; - } - - @Override - public ByteBuffer getAppAttemptTokens() { - ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; - if(appAttemptTokens != null) { - return appAttemptTokens; - } - if(!p.hasAppAttemptTokens()) { - return null; - } - this.appAttemptTokens = ProtoUtils.convertFromProtoFormat( - p.getAppAttemptTokens()); - return appAttemptTokens; - } - - @Override - public void setAppAttemptTokens(ByteBuffer attemptTokens) { - maybeInitBuilder(); - if(attemptTokens == null) { - builder.clearAppAttemptTokens(); - } - this.appAttemptTokens = attemptTokens; - } - - @Override - public RMAppAttemptState getState() { - ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; - if (!p.hasAppAttemptState()) { - return null; - } - return convertFromProtoFormat(p.getAppAttemptState()); - } - - @Override - public void setState(RMAppAttemptState state) { - maybeInitBuilder(); - if (state == null) { - builder.clearAppAttemptState(); - return; - } - builder.setAppAttemptState(convertToProtoFormat(state)); - } - - @Override - public String getFinalTrackingUrl() { - ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; - if (!p.hasFinalTrackingUrl()) { - return null; - } - return p.getFinalTrackingUrl(); - } - - @Override - public void setFinalTrackingUrl(String url) { - maybeInitBuilder(); - if (url == null) { - builder.clearFinalTrackingUrl(); - return; - } - builder.setFinalTrackingUrl(url); - } - - @Override - public String getDiagnostics() { - ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; - if (!p.hasDiagnostics()) { - return null; - } - return p.getDiagnostics(); - } - - @Override - public void setDiagnostics(String diagnostics) { - maybeInitBuilder(); - if (diagnostics == null) { - builder.clearDiagnostics(); - return; - } - builder.setDiagnostics(diagnostics); - } - - @Override - public long getStartTime() { - ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; - return p.getStartTime(); - } - - @Override - public void setStartTime(long startTime) { - maybeInitBuilder(); - builder.setStartTime(startTime); - } - - @Override - public FinalApplicationStatus getFinalApplicationStatus() { - ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; - if (!p.hasFinalApplicationStatus()) { - return null; - } - return convertFromProtoFormat(p.getFinalApplicationStatus()); - } - - @Override - public void setFinalApplicationStatus(FinalApplicationStatus finishState) { - maybeInitBuilder(); - if (finishState == null) { - builder.clearFinalApplicationStatus(); - return; - } - builder.setFinalApplicationStatus(convertToProtoFormat(finishState)); - } - - @Override - public int hashCode() { - return getProto().hashCode(); - } - - @Override - public int getAMContainerExitStatus() { - ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; - return p.getAmContainerExitStatus(); - } - - @Override - public void setAMContainerExitStatus(int exitStatus) { - maybeInitBuilder(); - builder.setAmContainerExitStatus(exitStatus); - } - - - @Override - public boolean equals(Object other) { - if (other == null) - return false; - if (other.getClass().isAssignableFrom(this.getClass())) { - return this.getProto().equals(this.getClass().cast(other).getProto()); - } - return false; - } - - @Override - public String toString() { - return TextFormat.shortDebugString(getProto()); - } - - private static String RM_APP_ATTEMPT_PREFIX = "RMATTEMPT_"; - public static RMAppAttemptStateProto convertToProtoFormat(RMAppAttemptState e) { - return RMAppAttemptStateProto.valueOf(RM_APP_ATTEMPT_PREFIX + e.name()); - } - public static RMAppAttemptState convertFromProtoFormat(RMAppAttemptStateProto e) { - return RMAppAttemptState.valueOf(e.name().replace(RM_APP_ATTEMPT_PREFIX, "")); - } - - private FinalApplicationStatusProto convertToProtoFormat(FinalApplicationStatus s) { - return ProtoUtils.convertToProtoFormat(s); - } - private FinalApplicationStatus convertFromProtoFormat(FinalApplicationStatusProto s) { - return ProtoUtils.convertFromProtoFormat(s); - } -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStatePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStatePBImpl.java new file mode 100644 index 0000000..42180d7 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStatePBImpl.java @@ -0,0 +1,295 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; +import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.RMAppAttemptStateProto; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; + +import com.google.protobuf.TextFormat; + +public class ApplicationAttemptStatePBImpl extends ApplicationAttemptState { + ApplicationAttemptStateDataProto proto = + ApplicationAttemptStateDataProto.getDefaultInstance(); + ApplicationAttemptStateDataProto.Builder builder = null; + boolean viaProto = false; + + private ApplicationAttemptId attemptId = null; + private Container masterContainer = null; + private ByteBuffer appAttemptTokens = null; + + public ApplicationAttemptStatePBImpl() { + builder = ApplicationAttemptStateDataProto.newBuilder(); + } + + public ApplicationAttemptStatePBImpl(ApplicationAttemptStateDataProto proto) { + this.proto = proto; + viaProto = true; + } + + @Override + public ApplicationAttemptStateDataProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void mergeLocalToBuilder() { + if (this.attemptId != null) { + builder.setAttemptId(((ApplicationAttemptIdPBImpl)attemptId).getProto()); + } + if(this.masterContainer != null) { + builder.setMasterContainer(((ContainerPBImpl)masterContainer).getProto()); + } + if(this.appAttemptTokens != null) { + builder.setAppAttemptTokens(ProtoUtils.convertToProtoFormat( + this.appAttemptTokens)); + } + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ApplicationAttemptStateDataProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public ApplicationAttemptId getAttemptId() { + ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; + if(attemptId != null) { + return attemptId; + } + if (!p.hasAttemptId()) { + return null; + } + attemptId = new ApplicationAttemptIdPBImpl(p.getAttemptId()); + return attemptId; + } + + @Override + public void setAttemptId(ApplicationAttemptId attemptId) { + maybeInitBuilder(); + if (attemptId == null) { + builder.clearAttemptId(); + } + this.attemptId = attemptId; + } + + @Override + public Container getMasterContainer() { + ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; + if(masterContainer != null) { + return masterContainer; + } + if (!p.hasMasterContainer()) { + return null; + } + masterContainer = new ContainerPBImpl(p.getMasterContainer()); + return masterContainer; + } + + @Override + public void setMasterContainer(Container container) { + maybeInitBuilder(); + if (container == null) { + builder.clearMasterContainer(); + } + this.masterContainer = container; + } + + @Override + public ByteBuffer getAppAttemptTokens() { + ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; + if(appAttemptTokens != null) { + return appAttemptTokens; + } + if(!p.hasAppAttemptTokens()) { + return null; + } + this.appAttemptTokens = ProtoUtils.convertFromProtoFormat( + p.getAppAttemptTokens()); + return appAttemptTokens; + } + + @Override + public void setAppAttemptTokens(ByteBuffer attemptTokens) { + maybeInitBuilder(); + if(attemptTokens == null) { + builder.clearAppAttemptTokens(); + } + this.appAttemptTokens = attemptTokens; + } + + @Override + public RMAppAttemptState getState() { + ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasAppAttemptState()) { + return null; + } + return convertFromProtoFormat(p.getAppAttemptState()); + } + + @Override + public void setState(RMAppAttemptState state) { + maybeInitBuilder(); + if (state == null) { + builder.clearAppAttemptState(); + return; + } + builder.setAppAttemptState(convertToProtoFormat(state)); + } + + @Override + public String getFinalTrackingUrl() { + ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasFinalTrackingUrl()) { + return null; + } + return p.getFinalTrackingUrl(); + } + + @Override + public void setFinalTrackingUrl(String url) { + maybeInitBuilder(); + if (url == null) { + builder.clearFinalTrackingUrl(); + return; + } + builder.setFinalTrackingUrl(url); + } + + @Override + public String getDiagnostics() { + ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasDiagnostics()) { + return null; + } + return p.getDiagnostics(); + } + + @Override + public void setDiagnostics(String diagnostics) { + maybeInitBuilder(); + if (diagnostics == null) { + builder.clearDiagnostics(); + return; + } + builder.setDiagnostics(diagnostics); + } + + @Override + public long getStartTime() { + ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; + return p.getStartTime(); + } + + @Override + public void setStartTime(long startTime) { + maybeInitBuilder(); + builder.setStartTime(startTime); + } + + @Override + public FinalApplicationStatus getFinalApplicationStatus() { + ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasFinalApplicationStatus()) { + return null; + } + return convertFromProtoFormat(p.getFinalApplicationStatus()); + } + + @Override + public void setFinalApplicationStatus(FinalApplicationStatus finishState) { + maybeInitBuilder(); + if (finishState == null) { + builder.clearFinalApplicationStatus(); + return; + } + builder.setFinalApplicationStatus(convertToProtoFormat(finishState)); + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public int getAMContainerExitStatus() { + ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; + return p.getAmContainerExitStatus(); + } + + @Override + public void setAMContainerExitStatus(int exitStatus) { + maybeInitBuilder(); + builder.setAmContainerExitStatus(exitStatus); + } + + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private static String RM_APP_ATTEMPT_PREFIX = "RMATTEMPT_"; + public static RMAppAttemptStateProto convertToProtoFormat(RMAppAttemptState e) { + return RMAppAttemptStateProto.valueOf(RM_APP_ATTEMPT_PREFIX + e.name()); + } + public static RMAppAttemptState convertFromProtoFormat(RMAppAttemptStateProto e) { + return RMAppAttemptState.valueOf(e.name().replace(RM_APP_ATTEMPT_PREFIX, "")); + } + + private FinalApplicationStatusProto convertToProtoFormat(FinalApplicationStatus s) { + return ProtoUtils.convertToProtoFormat(s); + } + private FinalApplicationStatus convertFromProtoFormat(FinalApplicationStatusProto s) { + return ProtoUtils.convertFromProtoFormat(s); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java deleted file mode 100644 index d8cbd23..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java +++ /dev/null @@ -1,225 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb; - -import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; -import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; -import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto; -import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProtoOrBuilder; -import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.RMAppStateProto; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; - -import com.google.protobuf.TextFormat; - -public class ApplicationStateDataPBImpl extends ApplicationStateData { - ApplicationStateDataProto proto = - ApplicationStateDataProto.getDefaultInstance(); - ApplicationStateDataProto.Builder builder = null; - boolean viaProto = false; - - private ApplicationSubmissionContext applicationSubmissionContext = null; - - public ApplicationStateDataPBImpl() { - builder = ApplicationStateDataProto.newBuilder(); - } - - public ApplicationStateDataPBImpl( - ApplicationStateDataProto proto) { - this.proto = proto; - viaProto = true; - } - - @Override - public ApplicationStateDataProto getProto() { - mergeLocalToProto(); - proto = viaProto ? proto : builder.build(); - viaProto = true; - return proto; - } - - private void mergeLocalToBuilder() { - if (this.applicationSubmissionContext != null) { - builder.setApplicationSubmissionContext( - ((ApplicationSubmissionContextPBImpl)applicationSubmissionContext) - .getProto()); - } - } - - private void mergeLocalToProto() { - if (viaProto) - maybeInitBuilder(); - mergeLocalToBuilder(); - proto = builder.build(); - viaProto = true; - } - - private void maybeInitBuilder() { - if (viaProto || builder == null) { - builder = ApplicationStateDataProto.newBuilder(proto); - } - viaProto = false; - } - - @Override - public long getSubmitTime() { - ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder; - if (!p.hasSubmitTime()) { - return -1; - } - return (p.getSubmitTime()); - } - - @Override - public void setSubmitTime(long submitTime) { - maybeInitBuilder(); - builder.setSubmitTime(submitTime); - } - - @Override - public long getStartTime() { - ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder; - return p.getStartTime(); - } - - @Override - public void setStartTime(long startTime) { - maybeInitBuilder(); - builder.setStartTime(startTime); - } - - @Override - public String getUser() { - ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder; - if (!p.hasUser()) { - return null; - } - return (p.getUser()); - - } - - @Override - public void setUser(String user) { - maybeInitBuilder(); - builder.setUser(user); - } - - @Override - public ApplicationSubmissionContext getApplicationSubmissionContext() { - ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder; - if(applicationSubmissionContext != null) { - return applicationSubmissionContext; - } - if (!p.hasApplicationSubmissionContext()) { - return null; - } - applicationSubmissionContext = - new ApplicationSubmissionContextPBImpl( - p.getApplicationSubmissionContext()); - return applicationSubmissionContext; - } - - @Override - public void setApplicationSubmissionContext( - ApplicationSubmissionContext context) { - maybeInitBuilder(); - if (context == null) { - builder.clearApplicationSubmissionContext(); - } - this.applicationSubmissionContext = context; - } - - @Override - public RMAppState getState() { - ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder; - if (!p.hasApplicationState()) { - return null; - } - return convertFromProtoFormat(p.getApplicationState()); - } - - @Override - public void setState(RMAppState finalState) { - maybeInitBuilder(); - if (finalState == null) { - builder.clearApplicationState(); - return; - } - builder.setApplicationState(convertToProtoFormat(finalState)); - } - - @Override - public String getDiagnostics() { - ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder; - if (!p.hasDiagnostics()) { - return null; - } - return p.getDiagnostics(); - } - - @Override - public void setDiagnostics(String diagnostics) { - maybeInitBuilder(); - if (diagnostics == null) { - builder.clearDiagnostics(); - return; - } - builder.setDiagnostics(diagnostics); - } - - @Override - public long getFinishTime() { - ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder; - return p.getFinishTime(); - } - - @Override - public void setFinishTime(long finishTime) { - maybeInitBuilder(); - builder.setFinishTime(finishTime); - } - - @Override - public int hashCode() { - return getProto().hashCode(); - } - - @Override - public boolean equals(Object other) { - if (other == null) - return false; - if (other.getClass().isAssignableFrom(this.getClass())) { - return this.getProto().equals(this.getClass().cast(other).getProto()); - } - return false; - } - - @Override - public String toString() { - return TextFormat.shortDebugString(getProto()); - } - - private static String RM_APP_PREFIX = "RMAPP_"; - public static RMAppStateProto convertToProtoFormat(RMAppState e) { - return RMAppStateProto.valueOf(RM_APP_PREFIX + e.name()); - } - public static RMAppState convertFromProtoFormat(RMAppStateProto e) { - return RMAppState.valueOf(e.name().replace(RM_APP_PREFIX, "")); - } -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStatePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStatePBImpl.java new file mode 100644 index 0000000..70179cd --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStatePBImpl.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb; + +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.RMAppStateProto; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; + +import com.google.protobuf.TextFormat; + +public class ApplicationStatePBImpl extends ApplicationState { + ApplicationStateDataProto proto = + ApplicationStateDataProto.getDefaultInstance(); + ApplicationStateDataProto.Builder builder = null; + boolean viaProto = false; + + private ApplicationSubmissionContext applicationSubmissionContext = null; + + public ApplicationStatePBImpl() { + builder = ApplicationStateDataProto.newBuilder(); + } + + public ApplicationStatePBImpl(ApplicationStateDataProto proto) { + this.proto = proto; + viaProto = true; + } + + @Override + public ApplicationStateDataProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void mergeLocalToBuilder() { + if (this.applicationSubmissionContext != null) { + builder.setApplicationSubmissionContext( + ((ApplicationSubmissionContextPBImpl)applicationSubmissionContext) + .getProto()); + } + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ApplicationStateDataProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public long getSubmitTime() { + ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasSubmitTime()) { + return -1; + } + return (p.getSubmitTime()); + } + + @Override + public void setSubmitTime(long submitTime) { + maybeInitBuilder(); + builder.setSubmitTime(submitTime); + } + + @Override + public long getStartTime() { + ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder; + return p.getStartTime(); + } + + @Override + public void setStartTime(long startTime) { + maybeInitBuilder(); + builder.setStartTime(startTime); + } + + @Override + public String getUser() { + ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasUser()) { + return null; + } + return (p.getUser()); + + } + + @Override + public void setUser(String user) { + maybeInitBuilder(); + builder.setUser(user); + } + + @Override + public ApplicationSubmissionContext getApplicationSubmissionContext() { + ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder; + if(applicationSubmissionContext != null) { + return applicationSubmissionContext; + } + if (!p.hasApplicationSubmissionContext()) { + return null; + } + applicationSubmissionContext = + new ApplicationSubmissionContextPBImpl( + p.getApplicationSubmissionContext()); + return applicationSubmissionContext; + } + + @Override + public void setApplicationSubmissionContext( + ApplicationSubmissionContext context) { + maybeInitBuilder(); + if (context == null) { + builder.clearApplicationSubmissionContext(); + } + this.applicationSubmissionContext = context; + } + + @Override + public RMAppState getState() { + ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasApplicationState()) { + return null; + } + return convertFromProtoFormat(p.getApplicationState()); + } + + @Override + public void setState(RMAppState finalState) { + maybeInitBuilder(); + if (finalState == null) { + builder.clearApplicationState(); + return; + } + builder.setApplicationState(convertToProtoFormat(finalState)); + } + + @Override + public String getDiagnostics() { + ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasDiagnostics()) { + return null; + } + return p.getDiagnostics(); + } + + @Override + public void setDiagnostics(String diagnostics) { + maybeInitBuilder(); + if (diagnostics == null) { + builder.clearDiagnostics(); + return; + } + builder.setDiagnostics(diagnostics); + } + + @Override + public long getFinishTime() { + ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder; + return p.getFinishTime(); + } + + @Override + public void setFinishTime(long finishTime) { + maybeInitBuilder(); + builder.setFinishTime(finishTime); + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private static String RM_APP_PREFIX = "RMAPP_"; + public static RMAppStateProto convertToProtoFormat(RMAppState e) { + return RMAppStateProto.valueOf(RM_APP_PREFIX + e.name()); + } + public static RMAppState convertFromProtoFormat(RMAppStateProto e) { + return RMAppState.valueOf(e.name().replace(RM_APP_PREFIX, "")); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 48cf460..a45f4a8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -35,7 +35,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; @@ -58,9 +57,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; @@ -665,7 +664,8 @@ public void handle(RMAppEvent event) { @Override public void recover(RMState state) throws Exception{ - ApplicationState appState = state.getApplicationState().get(getApplicationId()); + ApplicationState appState = + state.getApplicationState().get(getApplicationId()); this.recoveredFinalState = appState.getState(); LOG.info("Recovering app: " + getApplicationId() + " with " + + appState.getAttemptCount() + " attempts and final state = " + this.recoveredFinalState ); @@ -938,9 +938,9 @@ private void rememberTargetTransitionsAndStoreState(RMAppEvent event, break; } ApplicationState appState = - new ApplicationState(this.submitTime, this.startTime, - this.submissionContext, this.user, stateToBeStored, diags, - this.storedFinishTime); + ApplicationState.newInstance(this.submitTime, this.startTime, this.user, + this.submissionContext, stateToBeStored, diags, + this.storedFinishTime); this.rmContext.getStateStore().updateApplicationState(appState); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 19fc800..70a0e3f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -24,6 +24,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import java.nio.ByteBuffer; import java.util.Collections; import java.util.EnumSet; import java.util.List; @@ -39,10 +40,10 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -66,9 +67,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -726,7 +727,7 @@ public void recover(RMState state) throws Exception { this.attemptMetrics.setIsPreempted(); } setMasterContainer(attemptState.getMasterContainer()); - recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials(), + recoverAppAttemptCredentials(attemptState.getAppAttemptTokens(), attemptState.getState()); this.recoveredFinalState = attemptState.getState(); this.originalTrackingUrl = attemptState.getFinalTrackingUrl(); @@ -739,16 +740,25 @@ public void transferStateFromPreviousAttempt(RMAppAttempt attempt) { this.justFinishedContainers = attempt.getJustFinishedContainers(); } - private void recoverAppAttemptCredentials(Credentials appAttemptTokens, + private void recoverAppAttemptCredentials(ByteBuffer appAttemptTokens, RMAppAttemptState state) throws IOException { if (appAttemptTokens == null || state == RMAppAttemptState.FAILED || state == RMAppAttemptState.FINISHED || state == RMAppAttemptState.KILLED) { return; } + + Credentials credentials = null; + if (appAttemptTokens.hasArray()) { + DataInputByteBuffer dibb = new DataInputByteBuffer(); + credentials = new Credentials(); + appAttemptTokens.rewind(); + dibb.reset(appAttemptTokens); + credentials.readTokenStorageStream(dibb); + } if (UserGroupInformation.isSecurityEnabled()) { - byte[] clientTokenMasterKeyBytes = appAttemptTokens.getSecretKey( + byte[] clientTokenMasterKeyBytes = credentials.getSecretKey( RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME); clientTokenMasterKey = rmContext.getClientToAMTokenSecretManager() .registerMasterKey(applicationAttemptId, clientTokenMasterKeyBytes); @@ -1019,10 +1029,17 @@ private void rememberTargetTransitionsAndStoreState(RMAppAttemptEvent event, } RMStateStore rmStore = rmContext.getStateStore(); - ApplicationAttemptState attemptState = - new ApplicationAttemptState(applicationAttemptId, getMasterContainer(), - rmStore.getCredentialsFromAppAttempt(this), startTime, - stateToBeStored, finalTrackingUrl, diags, finalStatus, exitStatus); + ApplicationAttemptState attemptState = null; + try { + attemptState = ApplicationAttemptState + .newInstance(applicationAttemptId, getMasterContainer(), + ApplicationAttemptState.convertCredentialsToByteBuffer( + rmStore.getCredentialsFromAppAttempt(this)), startTime, + stateToBeStored, finalTrackingUrl, diags, finalStatus, exitStatus); + } catch (IOException e) { + // TODO: handling invalid state + LOG.fatal(e); + } LOG.info("Updating application attempt " + applicationAttemptId + " with final state: " + targetedFinalState + ", and exit status: " + exitStatus); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 5f63caf..d970610 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -40,6 +40,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; @@ -81,12 +82,10 @@ import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreEvent; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -137,7 +136,7 @@ public void testRMRestart() throws Exception { MemoryRMStateStore memStore = new MemoryRMStateStore(); memStore.init(conf); RMState rmState = memStore.getState(); - Map rmAppState = + Map rmAppState = rmState.getApplicationState(); @@ -184,7 +183,7 @@ public void testRMRestart() throws Exception { ApplicationAttemptId attemptId1 = attempt1.getAppAttemptId(); rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED); Assert.assertEquals(1, appState.getAttemptCount()); - ApplicationAttemptState attemptState = + ApplicationAttemptState attemptState = appState.getAttempt(attemptId1); Assert.assertNotNull(attemptState); Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1), @@ -612,7 +611,7 @@ public void testRMRestartWaitForPreviousSucceededAttempt() throws Exception { @Override public void updateApplicationStateInternal(ApplicationId appId, - ApplicationStateData appStateData) throws Exception { + ApplicationState appStateData) throws Exception { if (count == 0) { // do nothing; simulate app final state is not saved. LOG.info(appId + " final state is not saved."); @@ -760,14 +759,14 @@ public void testRMRestartKilledAppWithNoAttempts() throws Exception { @Override public synchronized void storeApplicationAttemptStateInternal( ApplicationAttemptId attemptId, - ApplicationAttemptStateData attemptStateData) throws Exception { + ApplicationAttemptState attemptStateData) throws Exception { // ignore attempt saving request. } @Override public synchronized void updateApplicationAttemptStateInternal( ApplicationAttemptId attemptId, - ApplicationAttemptStateData attemptStateData) throws Exception { + ApplicationAttemptState attemptStateData) throws Exception { // ignore attempt saving request. } }; @@ -1034,7 +1033,7 @@ public void testRMRestartOnMaxAppAttempts() throws Exception { ApplicationAttemptId attemptId1 = attempt.getAppAttemptId(); rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED); Assert.assertEquals(1, appState.getAttemptCount()); - ApplicationAttemptState attemptState = + ApplicationAttemptState attemptState = appState.getAttempt(attemptId1); Assert.assertNotNull(attemptState); Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1), @@ -1214,7 +1213,8 @@ public void testAppAttemptTokensRestoredOnRMRestart() throws Exception { attempt1.getClientTokenMasterKey().getEncoded(); // assert application credentials are saved - Credentials savedCredentials = attemptState.getAppAttemptCredentials(); + Credentials savedCredentials = + convertCredentailsFromByteBuffer(attemptState.getAppAttemptTokens()); Assert.assertArrayEquals("client token master key not saved", clientTokenMasterKey, savedCredentials.getSecretKey( RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME)); @@ -1856,7 +1856,7 @@ public static NMContainerStatus createNMContainerStatus( @Override public void updateApplicationStateInternal(ApplicationId appId, - ApplicationStateData appStateData) throws Exception { + ApplicationState appStateData) throws Exception { updateApp = ++count; super.updateApplicationStateInternal(appId, appStateData); } @@ -1865,7 +1865,7 @@ public void updateApplicationStateInternal(ApplicationId appId, public synchronized void updateApplicationAttemptStateInternal( ApplicationAttemptId attemptId, - ApplicationAttemptStateData attemptStateData) + ApplicationAttemptState attemptStateData) throws Exception { updateAttempt = ++count; super.updateApplicationAttemptStateInternal(attemptId, @@ -1908,4 +1908,20 @@ protected void doSecureLogin() throws IOException { // Do nothing. } } + + private Credentials convertCredentailsFromByteBuffer( + ByteBuffer appAttemptTokens) throws IOException { + DataInputByteBuffer dibb = new DataInputByteBuffer(); + try { + Credentials credentials = null; + if (appAttemptTokens != null) { + credentials = new Credentials(); + dibb.reset(appAttemptTokens); + credentials.readTokenStorageStream(dibb); + } + return credentials; + } finally { + IOUtils.closeStream(dibb); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index 6c5c818..5829461 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -42,7 +42,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index 620ba9f..85edc46 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; +import java.io.IOException; +import java.nio.ByteBuffer; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -39,7 +41,10 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.ClientBaseWithFixes; +import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -57,11 +62,11 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMDTSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -232,6 +237,7 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) when(mockRemovedApp.getSubmitTime()).thenReturn(submitTime); when(mockRemovedApp.getApplicationSubmissionContext()).thenReturn(context); when(mockRemovedApp.getAppAttempts()).thenReturn(attempts); + when(mockRemovedApp.getUser()).thenReturn("user1"); RMAppAttempt mockRemovedAttempt = mock(RMAppAttempt.class); when(mockRemovedAttempt.getAppAttemptId()).thenReturn(attemptIdRemoved); attempts.put(attemptIdRemoved, mockRemovedAttempt); @@ -275,7 +281,7 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) assertEquals(containerId1, attemptState.getMasterContainer().getId()); // attempt1 client token master key is loaded correctly assertArrayEquals(clientTokenKey1.getEncoded(), - attemptState.getAppAttemptCredentials() + convertCredentailsFromByteBuffer(attemptState.getAppAttemptTokens()) .getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME)); attemptState = appState.getAttempt(attemptId2); @@ -286,25 +292,26 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) assertEquals(containerId2, attemptState.getMasterContainer().getId()); // attempt2 client token master key is loaded correctly assertArrayEquals(clientTokenKey2.getEncoded(), - attemptState.getAppAttemptCredentials() + convertCredentailsFromByteBuffer(attemptState.getAppAttemptTokens()) .getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME)); //******* update application/attempt state *******// ApplicationState appState2 = - new ApplicationState(appState.submitTime, appState.startTime, - appState.context, appState.user, RMAppState.FINISHED, - "appDiagnostics", 1234); + ApplicationState + .newInstance(appState.getSubmitTime(), appState.getStartTime(), + appState.getUser(), appState.getApplicationSubmissionContext(), + RMAppState.FINISHED, "appDiagnostics", 1234); appState2.attempts.putAll(appState.attempts); store.updateApplicationState(appState2); ApplicationAttemptState oldAttemptState = attemptState; ApplicationAttemptState newAttemptState = - new ApplicationAttemptState(oldAttemptState.getAttemptId(), - oldAttemptState.getMasterContainer(), - oldAttemptState.getAppAttemptCredentials(), - oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED, - "myTrackingUrl", "attemptDiagnostics", - FinalApplicationStatus.SUCCEEDED, 100); + ApplicationAttemptState.newInstance(oldAttemptState.getAttemptId(), + oldAttemptState.getMasterContainer(), + oldAttemptState.getAppAttemptTokens(), + oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED, + "myTrackingUrl", "attemptDiagnostics", + FinalApplicationStatus.SUCCEEDED, 100); store.updateApplicationAttemptState(newAttemptState); // test updating the state of an app/attempt whose initial state was not @@ -314,20 +321,21 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) new ApplicationSubmissionContextPBImpl(); dummyContext.setApplicationId(dummyAppId); ApplicationState dummyApp = - new ApplicationState(appState.submitTime, appState.startTime, - dummyContext, appState.user, RMAppState.FINISHED, "appDiagnostics", - 1234); + ApplicationState + .newInstance(appState.getSubmitTime(), appState.getStartTime(), + appState.getUser(), dummyContext, RMAppState.FINISHED, + "appDiagnostics", 1234); store.updateApplicationState(dummyApp); ApplicationAttemptId dummyAttemptId = ApplicationAttemptId.newInstance(dummyAppId, 6); ApplicationAttemptState dummyAttempt = - new ApplicationAttemptState(dummyAttemptId, - oldAttemptState.getMasterContainer(), - oldAttemptState.getAppAttemptCredentials(), - oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED, - "myTrackingUrl", "attemptDiagnostics", - FinalApplicationStatus.SUCCEEDED, 111); + ApplicationAttemptState + .newInstance(dummyAttemptId, oldAttemptState.getMasterContainer(), + oldAttemptState.getAppAttemptTokens(), + oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED, + "myTrackingUrl", "attemptDiagnostics", + FinalApplicationStatus.SUCCEEDED, 111); store.updateApplicationAttemptState(dummyAttempt); // let things settle down @@ -340,9 +348,11 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) RMState newRMState = store.loadState(); Map newRMAppState = newRMState.getApplicationState(); - assertNotNull(newRMAppState.get(dummyApp.getAppId())); + assertNotNull(newRMAppState.get( + dummyApp.getApplicationSubmissionContext().getApplicationId())); ApplicationState updatedAppState = newRMAppState.get(appId1); - assertEquals(appState.getAppId(),updatedAppState.getAppId()); + assertEquals(appState.getApplicationSubmissionContext().getApplicationId(), + updatedAppState.getApplicationSubmissionContext().getApplicationId()); assertEquals(appState.getSubmitTime(), updatedAppState.getSubmitTime()); assertEquals(appState.getStartTime(), updatedAppState.getStartTime()); assertEquals(appState.getUser(), updatedAppState.getUser()); @@ -352,16 +362,17 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) assertEquals(1234, updatedAppState.getFinishTime()); // check updated attempt state - assertNotNull(newRMAppState.get(dummyApp.getAppId()).getAttempt( - dummyAttemptId)); + assertNotNull(newRMAppState.get(dummyApp.getApplicationSubmissionContext + ().getApplicationId()).getAttempt(dummyAttemptId)); ApplicationAttemptState updatedAttemptState = updatedAppState.getAttempt(newAttemptState.getAttemptId()); assertEquals(oldAttemptState.getAttemptId(), updatedAttemptState.getAttemptId()); assertEquals(containerId2, updatedAttemptState.getMasterContainer().getId()); assertArrayEquals(clientTokenKey2.getEncoded(), - updatedAttemptState.getAppAttemptCredentials().getSecretKey( - RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME)); + convertCredentailsFromByteBuffer( + updatedAttemptState.getAppAttemptTokens()) + .getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME)); // new attempt state fields assertEquals(RMAppAttemptState.FINISHED, updatedAttemptState.getState()); assertEquals("myTrackingUrl", updatedAttemptState.getFinalTrackingUrl()); @@ -447,6 +458,22 @@ public void testRMDTSecretManagerStateStore( } + private Credentials convertCredentailsFromByteBuffer( + ByteBuffer appAttemptTokens) throws IOException { + DataInputByteBuffer dibb = new DataInputByteBuffer(); + try { + Credentials credentials = null; + if (appAttemptTokens != null) { + credentials = new Credentials(); + dibb.reset(appAttemptTokens); + credentials.readTokenStorageStream(dibb); + } + return credentials; + } finally { + IOUtils.closeStream(dibb); + } + } + private Token generateAMRMToken( ApplicationAttemptId attemptId, AMRMTokenSecretManager appTokenMgr) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java index 88e5393..58fe64c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java @@ -38,7 +38,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; import org.apache.hadoop.yarn.server.records.Version; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -215,8 +215,9 @@ public void run() { try { store.storeApplicationStateInternal( ApplicationId.newInstance(100L, 1), - ApplicationStateData.newInstance(111, 111, "user", null, - RMAppState.ACCEPTED, "diagnostics", 333)); + ApplicationState + .newInstance(111, 111, "user", null, RMAppState.ACCEPTED, + "diagnostics", 333)); } catch (Exception e) { // TODO 0 datanode exception will not be retried by dfs client, fix // that separately. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index 2fc4431..c5dfc64 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -54,8 +54,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; @@ -300,11 +300,13 @@ private void assertTimesAtFinish(RMApp application) { } private void assertAppFinalStateSaved(RMApp application){ - verify(store, times(1)).updateApplicationState(any(ApplicationState.class)); + verify(store, times(1)).updateApplicationState( + any(ApplicationState.class)); } private void assertAppFinalStateNotSaved(RMApp application){ - verify(store, times(0)).updateApplicationState(any(ApplicationState.class)); + verify(store, times(0)).updateApplicationState( + any(ApplicationState.class)); } private void assertKilled(RMApp application) { @@ -878,12 +880,15 @@ public void testAppsRecoveringStates() throws Exception { } } - public void testRecoverApplication(ApplicationState appState, RMState rmState) + public void testRecoverApplication(ApplicationState appState, + RMState rmState) throws Exception { ApplicationSubmissionContext submissionContext = appState.getApplicationSubmissionContext(); RMAppImpl application = - new RMAppImpl(appState.getAppId(), rmContext, conf, + new RMAppImpl( + appState.getApplicationSubmissionContext().getApplicationId(), + rmContext, conf, submissionContext.getApplicationName(), null, submissionContext.getQueue(), submissionContext, null, null, appState.getSubmitTime(), submissionContext.getApplicationType(), @@ -896,7 +901,8 @@ public void testRecoverApplication(ApplicationState appState, RMState rmState) RMAppImpl.isAppInFinalState(application)); // Trigger RECOVER event. - application.handle(new RMAppEvent(appState.getAppId(), + application.handle(new RMAppEvent( + appState.getApplicationSubmissionContext().getApplicationId(), RMAppEventType.RECOVER)); rmDispatcher.await(); RMAppState finalState = appState.getState(); @@ -909,9 +915,10 @@ public void createRMStateForApplications( RMAppState rmAppState) { RMApp app = createNewTestApp(null); ApplicationState appState = - new ApplicationState(app.getSubmitTime(), app.getStartTime(), - app.getApplicationSubmissionContext(), app.getUser(), rmAppState, - null, app.getFinishTime()); + ApplicationState + .newInstance(app.getSubmitTime(), app.getStartTime(), app.getUser(), + app.getApplicationSubmissionContext(), rmAppState, null, + app.getFinishTime()); applicationState.put(app.getApplicationId(), appState); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index efcecd9..804f6f5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -70,8 +70,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent;