diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 51024cf..b84ee70 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -39,9 +39,9 @@ import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; @@ -299,11 +299,11 @@ protected void submitApplication( @SuppressWarnings("unchecked") protected void - recoverApplication(ApplicationState appState, RMState rmState) + recoverApplication(ApplicationStateData appState, RMState rmState) throws Exception { ApplicationSubmissionContext appContext = appState.getApplicationSubmissionContext(); - ApplicationId appId = appState.getAppId(); + ApplicationId appId = appContext.getApplicationId(); // create and recover app. RMAppImpl application = @@ -419,9 +419,10 @@ public void recover(RMState state) throws Exception { RMStateStore store = rmContext.getStateStore(); assert store != null; // recover applications - Map appStates = state.getApplicationState(); + Map appStates = + state.getApplicationState(); LOG.info("Recovering " + appStates.size() + " applications"); - for (ApplicationState appState : appStates.values()) { + for (ApplicationStateData appState : appStates.values()) { recoverApplication(appState, state); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index 0a3b269..eca701a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -221,8 +221,8 @@ private void loadAMRMTokenSecretManagerState(RMState rmState) private void loadRMAppState(RMState rmState) throws Exception { try { - List attempts = - new ArrayList(); + List attempts = + new ArrayList(); for (FileStatus appDir : fs.listStatus(rmAppRoot)) { checkAndResumeUpdateOperation(appDir.getPath()); @@ -239,19 +239,14 @@ private void loadRMAppState(RMState rmState) throws Exception { if (LOG.isDebugEnabled()) { LOG.debug("Loading application from node: " + childNodeName); } - ApplicationId appId = ConverterUtils.toApplicationId(childNodeName); - ApplicationStateDataPBImpl appStateData = + ApplicationStateDataPBImpl appState = new ApplicationStateDataPBImpl( ApplicationStateDataProto.parseFrom(childData)); - ApplicationState appState = - new ApplicationState(appStateData.getSubmitTime(), - appStateData.getStartTime(), - appStateData.getApplicationSubmissionContext(), - appStateData.getUser(), - appStateData.getState(), - appStateData.getDiagnostics(), appStateData.getFinishTime()); + ApplicationId appId = + appState.getApplicationSubmissionContext().getApplicationId(); // assert child node name is same as actual applicationId - assert appId.equals(appState.context.getApplicationId()); + assert appId.equals( + appState.getApplicationSubmissionContext().getApplicationId()); rmState.appState.put(appId, appState); } else if (childNodeName .startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) { @@ -262,25 +257,9 @@ private void loadRMAppState(RMState rmState) throws Exception { } ApplicationAttemptId attemptId = ConverterUtils.toApplicationAttemptId(childNodeName); - ApplicationAttemptStateDataPBImpl attemptStateData = + ApplicationAttemptStateDataPBImpl attemptState = new ApplicationAttemptStateDataPBImpl( ApplicationAttemptStateDataProto.parseFrom(childData)); - Credentials credentials = null; - if (attemptStateData.getAppAttemptTokens() != null) { - credentials = new Credentials(); - DataInputByteBuffer dibb = new DataInputByteBuffer(); - dibb.reset(attemptStateData.getAppAttemptTokens()); - credentials.readTokenStorageStream(dibb); - } - ApplicationAttemptState attemptState = - new ApplicationAttemptState(attemptId, - attemptStateData.getMasterContainer(), credentials, - attemptStateData.getStartTime(), - attemptStateData.getState(), - attemptStateData.getFinalTrackingUrl(), - attemptStateData.getDiagnostics(), - attemptStateData.getFinalApplicationStatus(), - attemptStateData.getAMContainerExitStatus()); // assert child node name is same as application attempt id assert attemptId.equals(attemptState.getAttemptId()); @@ -294,9 +273,9 @@ private void loadRMAppState(RMState rmState) throws Exception { // go through all attempts and add them to their apps, Ideally, each // attempt node must have a corresponding app node, because remove // directory operation remove both at the same time - for (ApplicationAttemptState attemptState : attempts) { + for (ApplicationAttemptStateData attemptState : attempts) { ApplicationId appId = attemptState.getAttemptId().getApplicationId(); - ApplicationState appState = rmState.appState.get(appId); + ApplicationStateData appState = rmState.appState.get(appId); assert appState != null; appState.attempts.put(attemptState.getAttemptId(), attemptState); } @@ -381,10 +360,9 @@ private void loadRMDTSecretManagerState(RMState rmState) throws Exception { @Override public synchronized void storeApplicationStateInternal(ApplicationId appId, ApplicationStateData appStateDataPB) throws Exception { - String appIdStr = appId.toString(); - Path appDirPath = getAppDir(rmAppRoot, appIdStr); + Path appDirPath = getAppDir(rmAppRoot, appId); fs.mkdirs(appDirPath); - Path nodeCreatePath = getNodePath(appDirPath, appIdStr); + Path nodeCreatePath = getNodePath(appDirPath, appId.toString()); LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath); byte[] appStateData = appStateDataPB.getProto().toByteArray(); @@ -401,9 +379,8 @@ public synchronized void storeApplicationStateInternal(ApplicationId appId, @Override public synchronized void updateApplicationStateInternal(ApplicationId appId, ApplicationStateData appStateDataPB) throws Exception { - String appIdStr = appId.toString(); - Path appDirPath = getAppDir(rmAppRoot, appIdStr); - Path nodeCreatePath = getNodePath(appDirPath, appIdStr); + Path appDirPath = getAppDir(rmAppRoot, appId); + Path nodeCreatePath = getNodePath(appDirPath, appId.toString()); LOG.info("Updating info for app: " + appId + " at: " + nodeCreatePath); byte[] appStateData = appStateDataPB.getProto().toByteArray(); @@ -423,7 +400,7 @@ public synchronized void storeApplicationAttemptStateInternal( ApplicationAttemptStateData attemptStateDataPB) throws Exception { Path appDirPath = - getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString()); + getAppDir(rmAppRoot, appAttemptId.getApplicationId()); Path nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString()); LOG.info("Storing info for attempt: " + appAttemptId + " at: " + nodeCreatePath); @@ -444,7 +421,7 @@ public synchronized void updateApplicationAttemptStateInternal( ApplicationAttemptStateData attemptStateDataPB) throws Exception { Path appDirPath = - getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString()); + getAppDir(rmAppRoot, appAttemptId.getApplicationId()); Path nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString()); LOG.info("Updating info for attempt: " + appAttemptId + " at: " + nodeCreatePath); @@ -460,9 +437,11 @@ public synchronized void updateApplicationAttemptStateInternal( } @Override - public synchronized void removeApplicationStateInternal(ApplicationState appState) + public synchronized void removeApplicationStateInternal( + ApplicationStateData appState) throws Exception { - String appId = appState.getAppId().toString(); + ApplicationId appId = + appState.getApplicationSubmissionContext().getApplicationId(); Path nodeRemovePath = getAppDir(rmAppRoot, appId); LOG.info("Removing info for app: " + appId + " at: " + nodeRemovePath); deleteFile(nodeRemovePath); @@ -558,8 +537,8 @@ public synchronized void deleteStore() throws IOException { } } - private Path getAppDir(Path root, String appId) { - return getNodePath(root, appId); + private Path getAppDir(Path root, ApplicationId appId) { + return getNodePath(root, appId.toString()); } // FileSystem related code diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index f56517c..fbd9b51 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -25,8 +25,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.DataInputByteBuffer; -import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -93,54 +91,30 @@ protected synchronized void closeInternal() throws Exception { } @Override - public void storeApplicationStateInternal(ApplicationId appId, - ApplicationStateData appStateData) + public void storeApplicationStateInternal( + ApplicationId appId, ApplicationStateData appState) throws Exception { - ApplicationState appState = - new ApplicationState(appStateData.getSubmitTime(), - appStateData.getStartTime(), - appStateData.getApplicationSubmissionContext(), - appStateData.getUser()); state.appState.put(appId, appState); } @Override public void updateApplicationStateInternal(ApplicationId appId, - ApplicationStateData appStateData) throws Exception { - ApplicationState updatedAppState = - new ApplicationState(appStateData.getSubmitTime(), - appStateData.getStartTime(), - appStateData.getApplicationSubmissionContext(), - appStateData.getUser(), appStateData.getState(), - appStateData.getDiagnostics(), appStateData.getFinishTime()); - LOG.info("Updating final state " + appStateData.getState() + " for app: " + ApplicationStateData appState) throws Exception { + LOG.info("Updating final state " + appState.getState() + " for app: " + appId); if (state.appState.get(appId) != null) { // add the earlier attempts back - updatedAppState.attempts - .putAll(state.appState.get(appId).attempts); + appState.attempts.putAll(state.appState.get(appId).attempts); } - state.appState.put(appId, updatedAppState); + state.appState.put(appId, appState); } @Override public synchronized void storeApplicationAttemptStateInternal( ApplicationAttemptId appAttemptId, - ApplicationAttemptStateData attemptStateData) + ApplicationAttemptStateData attemptState) throws Exception { - Credentials credentials = null; - if(attemptStateData.getAppAttemptTokens() != null){ - DataInputByteBuffer dibb = new DataInputByteBuffer(); - credentials = new Credentials(); - dibb.reset(attemptStateData.getAppAttemptTokens()); - credentials.readTokenStorageStream(dibb); - } - ApplicationAttemptState attemptState = - new ApplicationAttemptState(appAttemptId, - attemptStateData.getMasterContainer(), credentials, - attemptStateData.getStartTime()); - - ApplicationState appState = state.getApplicationState().get( + ApplicationStateData appState = state.getApplicationState().get( attemptState.getAttemptId().getApplicationId()); if (appState == null) { throw new YarnRuntimeException("Application doesn't exist"); @@ -151,41 +125,24 @@ public synchronized void storeApplicationAttemptStateInternal( @Override public synchronized void updateApplicationAttemptStateInternal( ApplicationAttemptId appAttemptId, - ApplicationAttemptStateData attemptStateData) + ApplicationAttemptStateData attemptState) throws Exception { - Credentials credentials = null; - if (attemptStateData.getAppAttemptTokens() != null) { - DataInputByteBuffer dibb = new DataInputByteBuffer(); - credentials = new Credentials(); - dibb.reset(attemptStateData.getAppAttemptTokens()); - credentials.readTokenStorageStream(dibb); - } - ApplicationAttemptState updatedAttemptState = - new ApplicationAttemptState(appAttemptId, - attemptStateData.getMasterContainer(), credentials, - attemptStateData.getStartTime(), attemptStateData.getState(), - attemptStateData.getFinalTrackingUrl(), - attemptStateData.getDiagnostics(), - attemptStateData.getFinalApplicationStatus(), - attemptStateData.getAMContainerExitStatus()); - - ApplicationState appState = - state.getApplicationState().get( - updatedAttemptState.getAttemptId().getApplicationId()); + ApplicationStateData appState = + state.getApplicationState().get(appAttemptId.getApplicationId()); if (appState == null) { throw new YarnRuntimeException("Application doesn't exist"); } - LOG.info("Updating final state " + updatedAttemptState.getState() - + " for attempt: " + updatedAttemptState.getAttemptId()); - appState.attempts.put(updatedAttemptState.getAttemptId(), - updatedAttemptState); + LOG.info("Updating final state " + attemptState.getState() + + " for attempt: " + attemptState.getAttemptId()); + appState.attempts.put(attemptState.getAttemptId(), attemptState); } @Override public synchronized void removeApplicationStateInternal( - ApplicationState appState) throws Exception { - ApplicationId appId = appState.getAppId(); - ApplicationState removed = state.appState.remove(appId); + ApplicationStateData appState) throws Exception { + ApplicationId appId = + appState.getApplicationSubmissionContext().getApplicationId(); + ApplicationStateData removed = state.appState.remove(appId); if (removed == null) { throw new YarnRuntimeException("Removing non-exsisting application state"); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java index e910c19..99b881a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java @@ -71,7 +71,7 @@ protected void storeApplicationAttemptStateInternal(ApplicationAttemptId attempt } @Override - protected void removeApplicationStateInternal(ApplicationState appState) + protected void removeApplicationStateInternal(ApplicationStateData appState) throws Exception { // Do nothing } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 714a108..ab8b54d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; +import java.io.IOException; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -37,9 +38,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerExitStatus; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; @@ -54,11 +52,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.StateMachine; @@ -125,13 +121,13 @@ public void transition(RMStateStore store, RMStateStoreEvent event) { LOG.error("Illegal event type: " + event.getClass()); return; } - ApplicationState appState = ((RMStateStoreAppEvent) event).getAppState(); - ApplicationId appId = appState.getAppId(); - ApplicationStateData appStateData = ApplicationStateData - .newInstance(appState); + ApplicationStateData appState = + ((RMStateStoreAppEvent) event).getAppState(); + ApplicationId appId = + appState.getApplicationSubmissionContext().getApplicationId(); LOG.info("Storing info for app: " + appId); try { - store.storeApplicationStateInternal(appId, appStateData); + store.storeApplicationStateInternal(appId, appState); store.notifyApplication(new RMAppEvent(appId, RMAppEventType.APP_NEW_SAVED)); } catch (Exception e) { @@ -150,13 +146,13 @@ public void transition(RMStateStore store, RMStateStoreEvent event) { LOG.error("Illegal event type: " + event.getClass()); return; } - ApplicationState appState = ((RMStateUpdateAppEvent) event).getAppState(); - ApplicationId appId = appState.getAppId(); - ApplicationStateData appStateData = ApplicationStateData - .newInstance(appState); + ApplicationStateData appState = + ((RMStateUpdateAppEvent) event).getAppState(); + ApplicationId appId = + appState.getApplicationSubmissionContext().getApplicationId(); LOG.info("Updating info for app: " + appId); try { - store.updateApplicationStateInternal(appId, appStateData); + store.updateApplicationStateInternal(appId, appState); store.notifyApplication(new RMAppEvent(appId, RMAppEventType.APP_UPDATE_SAVED)); } catch (Exception e) { @@ -175,9 +171,10 @@ public void transition(RMStateStore store, RMStateStoreEvent event) { LOG.error("Illegal event type: " + event.getClass()); return; } - ApplicationState appState = ((RMStateStoreRemoveAppEvent) event) - .getAppState(); - ApplicationId appId = appState.getAppId(); + ApplicationStateData appState = + ((RMStateStoreRemoveAppEvent) event).getAppState(); + ApplicationId appId = + appState.getApplicationSubmissionContext().getApplicationId(); LOG.info("Removing info for app: " + appId); try { store.removeApplicationStateInternal(appState); @@ -197,16 +194,14 @@ public void transition(RMStateStore store, RMStateStoreEvent event) { LOG.error("Illegal event type: " + event.getClass()); return; } - ApplicationAttemptState attemptState = + ApplicationAttemptStateData attemptState = ((RMStateStoreAppAttemptEvent) event).getAppAttemptState(); try { - ApplicationAttemptStateData attemptStateData = - ApplicationAttemptStateData.newInstance(attemptState); if (LOG.isDebugEnabled()) { LOG.debug("Storing info for attempt: " + attemptState.getAttemptId()); } store.storeApplicationAttemptStateInternal(attemptState.getAttemptId(), - attemptStateData); + attemptState); store.notifyApplicationAttempt(new RMAppAttemptEvent (attemptState.getAttemptId(), RMAppAttemptEventType.ATTEMPT_NEW_SAVED)); @@ -226,16 +221,14 @@ public void transition(RMStateStore store, RMStateStoreEvent event) { LOG.error("Illegal event type: " + event.getClass()); return; } - ApplicationAttemptState attemptState = + ApplicationAttemptStateData attemptState = ((RMStateUpdateAppAttemptEvent) event).getAppAttemptState(); try { - ApplicationAttemptStateData attemptStateData = ApplicationAttemptStateData - .newInstance(attemptState); if (LOG.isDebugEnabled()) { LOG.debug("Updating info for attempt: " + attemptState.getAttemptId()); } store.updateApplicationAttemptStateInternal(attemptState.getAttemptId(), - attemptStateData); + attemptState); store.notifyApplicationAttempt(new RMAppAttemptEvent (attemptState.getAttemptId(), RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED)); @@ -251,137 +244,6 @@ public RMStateStore() { stateMachine = stateMachineFactory.make(this); } - /** - * State of an application attempt - */ - public static class ApplicationAttemptState { - final ApplicationAttemptId attemptId; - final Container masterContainer; - final Credentials appAttemptCredentials; - long startTime = 0; - // fields set when attempt completes - RMAppAttemptState state; - String finalTrackingUrl = "N/A"; - String diagnostics; - int exitStatus = ContainerExitStatus.INVALID; - FinalApplicationStatus amUnregisteredFinalStatus; - - public ApplicationAttemptState(ApplicationAttemptId attemptId, - Container masterContainer, Credentials appAttemptCredentials, - long startTime) { - this(attemptId, masterContainer, appAttemptCredentials, startTime, null, - null, "", null, ContainerExitStatus.INVALID); - } - - public ApplicationAttemptState(ApplicationAttemptId attemptId, - Container masterContainer, Credentials appAttemptCredentials, - long startTime, RMAppAttemptState state, String finalTrackingUrl, - String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus, - int exitStatus) { - this.attemptId = attemptId; - this.masterContainer = masterContainer; - this.appAttemptCredentials = appAttemptCredentials; - this.startTime = startTime; - this.state = state; - this.finalTrackingUrl = finalTrackingUrl; - this.diagnostics = diagnostics == null ? "" : diagnostics; - this.amUnregisteredFinalStatus = amUnregisteredFinalStatus; - this.exitStatus = exitStatus; - } - - public Container getMasterContainer() { - return masterContainer; - } - public ApplicationAttemptId getAttemptId() { - return attemptId; - } - public Credentials getAppAttemptCredentials() { - return appAttemptCredentials; - } - public RMAppAttemptState getState(){ - return state; - } - public String getFinalTrackingUrl() { - return finalTrackingUrl; - } - public String getDiagnostics() { - return diagnostics; - } - public long getStartTime() { - return startTime; - } - public FinalApplicationStatus getFinalApplicationStatus() { - return amUnregisteredFinalStatus; - } - public int getAMContainerExitStatus(){ - return this.exitStatus; - } - } - - /** - * State of an application application - */ - public static class ApplicationState { - final ApplicationSubmissionContext context; - final long submitTime; - final long startTime; - final String user; - Map attempts = - new HashMap(); - // fields set when application completes. - RMAppState state; - String diagnostics; - long finishTime; - - public ApplicationState(long submitTime, - long startTime, ApplicationSubmissionContext context, String user) { - this(submitTime, startTime, context, user, null, "", 0); - } - - public ApplicationState(long submitTime, - long startTime,ApplicationSubmissionContext context, - String user, RMAppState state, String diagnostics, long finishTime) { - this.submitTime = submitTime; - this.startTime = startTime; - this.context = context; - this.user = user; - this.state = state; - this.diagnostics = diagnostics == null ? "" : diagnostics; - this.finishTime = finishTime; - } - - public ApplicationId getAppId() { - return context.getApplicationId(); - } - public long getSubmitTime() { - return submitTime; - } - public long getStartTime() { - return startTime; - } - public int getAttemptCount() { - return attempts.size(); - } - public ApplicationSubmissionContext getApplicationSubmissionContext() { - return context; - } - public ApplicationAttemptState getAttempt(ApplicationAttemptId attemptId) { - return attempts.get(attemptId); - } - public String getUser() { - return user; - } - public RMAppState getState() { - return state; - } - public String getDiagnostics() { - return diagnostics; - } - public long getFinishTime() { - return finishTime; - } - } - public static class RMDTSecretManagerState { // DTIdentifier -> renewDate Map delegationTokenState = @@ -409,14 +271,14 @@ public int getDTSequenceNumber() { * State of the ResourceManager */ public static class RMState { - Map appState = - new HashMap(); + Map appState = + new HashMap(); RMDTSecretManagerState rmSecretManagerState = new RMDTSecretManagerState(); AMRMTokenSecretManagerState amrmTokenSecretManagerState = null; - public Map getApplicationState() { + public Map getApplicationState() { return appState; } @@ -555,14 +417,15 @@ public synchronized void storeNewApplication(RMApp app) { ApplicationSubmissionContext context = app .getApplicationSubmissionContext(); assert context instanceof ApplicationSubmissionContextPBImpl; - ApplicationState appState = - new ApplicationState(app.getSubmitTime(), app.getStartTime(), context, - app.getUser()); + ApplicationStateData appState = + ApplicationStateData.newInstance( + app.getSubmitTime(), app.getStartTime(), context, app.getUser()); dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState)); } @SuppressWarnings("unchecked") - public synchronized void updateApplicationState(ApplicationState appState) { + public synchronized void updateApplicationState( + ApplicationStateData appState) { dispatcher.getEventHandler().handle(new RMStateUpdateAppEvent(appState)); } @@ -587,10 +450,18 @@ protected abstract void updateApplicationStateInternal(ApplicationId appId, public synchronized void storeNewApplicationAttempt(RMAppAttempt appAttempt) { Credentials credentials = getCredentialsFromAppAttempt(appAttempt); - ApplicationAttemptState attemptState = - new ApplicationAttemptState(appAttempt.getAppAttemptId(), - appAttempt.getMasterContainer(), credentials, - appAttempt.getStartTime()); + ApplicationAttemptStateData attemptState = null; + try { + attemptState = + ApplicationAttemptStateData.newInstance( + appAttempt.getAppAttemptId(), + appAttempt.getMasterContainer(), ApplicationAttemptStateData + .convertCredentialsToByteBuffer(credentials), + appAttempt.getStartTime()); + } catch (IOException ioe) { + LOG.info("Error storing info for new application", ioe); + notifyStoreOperationFailed(ioe); + } dispatcher.getEventHandler().handle( new RMStateStoreAppAttemptEvent(attemptState)); @@ -598,7 +469,7 @@ public synchronized void storeNewApplicationAttempt(RMAppAttempt appAttempt) { @SuppressWarnings("unchecked") public synchronized void updateApplicationAttemptState( - ApplicationAttemptState attemptState) { + ApplicationAttemptStateData attemptState) { dispatcher.getEventHandler().handle( new RMStateUpdateAppAttemptEvent(attemptState)); } @@ -738,16 +609,12 @@ public abstract void storeOrUpdateAMRMTokenSecretManagerState( */ @SuppressWarnings("unchecked") public synchronized void removeApplication(RMApp app) { - ApplicationState appState = new ApplicationState( + ApplicationStateData appState = + ApplicationStateData.newInstance( app.getSubmitTime(), app.getStartTime(), app.getApplicationSubmissionContext(), app.getUser()); for(RMAppAttempt appAttempt : app.getAppAttempts().values()) { - Credentials credentials = getCredentialsFromAppAttempt(appAttempt); - ApplicationAttemptState attemptState = - new ApplicationAttemptState(appAttempt.getAppAttemptId(), - appAttempt.getMasterContainer(), credentials, - appAttempt.getStartTime()); - appState.attempts.put(attemptState.getAttemptId(), attemptState); + appState.attempts.put(appAttempt.getAppAttemptId(), null); } dispatcher.getEventHandler().handle(new RMStateStoreRemoveAppEvent(appState)); @@ -759,7 +626,7 @@ public synchronized void removeApplication(RMApp app) { * application and its attempts */ protected abstract void removeApplicationStateInternal( - ApplicationState appState) throws Exception; + ApplicationStateData appState) throws Exception; // TODO: This should eventually become cluster-Id + "AM_RM_TOKEN_SERVICE". See // YARN-1779 diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppAttemptEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppAttemptEvent.java index c4a04bc..3399431 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppAttemptEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppAttemptEvent.java @@ -18,17 +18,17 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; public class RMStateStoreAppAttemptEvent extends RMStateStoreEvent { - ApplicationAttemptState attemptState; + ApplicationAttemptStateData attemptState; - public RMStateStoreAppAttemptEvent(ApplicationAttemptState attemptState) { + public RMStateStoreAppAttemptEvent(ApplicationAttemptStateData attemptState) { super(RMStateStoreEventType.STORE_APP_ATTEMPT); this.attemptState = attemptState; } - public ApplicationAttemptState getAppAttemptState() { + public ApplicationAttemptStateData getAppAttemptState() { return attemptState; } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppEvent.java index 99f8e37..50e59f7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppEvent.java @@ -18,18 +18,18 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; public class RMStateStoreAppEvent extends RMStateStoreEvent { - private final ApplicationState appState; + private final ApplicationStateData appState; - public RMStateStoreAppEvent(ApplicationState appState) { + public RMStateStoreAppEvent(ApplicationStateData appState) { super(RMStateStoreEventType.STORE_APP); this.appState = appState; } - public ApplicationState getAppState() { + public ApplicationStateData getAppState() { return appState; } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java index 402feb9..fbba64c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java @@ -18,17 +18,17 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; public class RMStateStoreRemoveAppEvent extends RMStateStoreEvent { - ApplicationState appState; + ApplicationStateData appState; - RMStateStoreRemoveAppEvent(ApplicationState appState) { + RMStateStoreRemoveAppEvent(ApplicationStateData appState) { super(RMStateStoreEventType.REMOVE_APP); this.appState = appState; } - public ApplicationState getAppState() { + public ApplicationStateData getAppState() { return appState; } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppAttemptEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppAttemptEvent.java index 9ded673..14f8e9d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppAttemptEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppAttemptEvent.java @@ -18,18 +18,19 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; public class RMStateUpdateAppAttemptEvent extends RMStateStoreEvent { - ApplicationAttemptState attemptState; + ApplicationAttemptStateData attemptState; - public RMStateUpdateAppAttemptEvent(ApplicationAttemptState attemptState) { + public RMStateUpdateAppAttemptEvent( + ApplicationAttemptStateData attemptState) { super(RMStateStoreEventType.UPDATE_APP_ATTEMPT); this.attemptState = attemptState; } - public ApplicationAttemptState getAppAttemptState() { + public ApplicationAttemptStateData getAppAttemptState() { return attemptState; } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java index 9bb96e5..cec364c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java @@ -18,17 +18,17 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; public class RMStateUpdateAppEvent extends RMStateStoreEvent { - private final ApplicationState appState; + private final ApplicationStateData appState; - public RMStateUpdateAppEvent(ApplicationState appState) { + public RMStateUpdateAppEvent(ApplicationStateData appState) { super(RMStateStoreEventType.UPDATE_APP); this.appState = appState; } - public ApplicationState getAppState() { + public ApplicationStateData getAppState() { return appState; } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 1b1ec76..9ed6b8a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -34,8 +34,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.DataInputByteBuffer; -import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.ZKUtil; @@ -551,17 +549,11 @@ private synchronized void loadRMAppState(RMState rmState) throws Exception { LOG.debug("Loading application from znode: " + childNodeName); } ApplicationId appId = ConverterUtils.toApplicationId(childNodeName); - ApplicationStateDataPBImpl appStateData = + ApplicationStateDataPBImpl appState = new ApplicationStateDataPBImpl( ApplicationStateDataProto.parseFrom(childData)); - ApplicationState appState = - new ApplicationState(appStateData.getSubmitTime(), - appStateData.getStartTime(), - appStateData.getApplicationSubmissionContext(), - appStateData.getUser(), - appStateData.getState(), - appStateData.getDiagnostics(), appStateData.getFinishTime()); - if (!appId.equals(appState.context.getApplicationId())) { + if (!appId.equals( + appState.getApplicationSubmissionContext().getApplicationId())) { throw new YarnRuntimeException("The child node name is different " + "from the application id"); } @@ -573,7 +565,7 @@ private synchronized void loadRMAppState(RMState rmState) throws Exception { } } - private void loadApplicationAttemptState(ApplicationState appState, + private void loadApplicationAttemptState(ApplicationStateData appState, ApplicationId appId) throws Exception { String appPath = getNodePath(rmAppRoot, appId.toString()); @@ -583,27 +575,9 @@ private void loadApplicationAttemptState(ApplicationState appState, String attemptPath = getNodePath(appPath, attemptIDStr); byte[] attemptData = getDataWithRetries(attemptPath, true); - ApplicationAttemptId attemptId = - ConverterUtils.toApplicationAttemptId(attemptIDStr); - ApplicationAttemptStateDataPBImpl attemptStateData = + ApplicationAttemptStateDataPBImpl attemptState = new ApplicationAttemptStateDataPBImpl( ApplicationAttemptStateDataProto.parseFrom(attemptData)); - Credentials credentials = null; - if (attemptStateData.getAppAttemptTokens() != null) { - credentials = new Credentials(); - DataInputByteBuffer dibb = new DataInputByteBuffer(); - dibb.reset(attemptStateData.getAppAttemptTokens()); - credentials.readTokenStorageStream(dibb); - } - - ApplicationAttemptState attemptState = - new ApplicationAttemptState(attemptId, - attemptStateData.getMasterContainer(), credentials, - attemptStateData.getStartTime(), attemptStateData.getState(), - attemptStateData.getFinalTrackingUrl(), - attemptStateData.getDiagnostics(), - attemptStateData.getFinalApplicationStatus(), - attemptStateData.getAMContainerExitStatus()); appState.attempts.put(attemptState.getAttemptId(), attemptState); } @@ -690,9 +664,11 @@ public synchronized void updateApplicationAttemptStateInternal( } @Override - public synchronized void removeApplicationStateInternal(ApplicationState appState) + public synchronized void removeApplicationStateInternal( + ApplicationStateData appState) throws Exception { - String appId = appState.getAppId().toString(); + String appId = appState.getApplicationSubmissionContext().getApplicationId() + .toString(); String appIdRemovePath = getNodePath(rmAppRoot, appId); ArrayList opList = new ArrayList(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java index 5cb9787..0508deb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java @@ -23,13 +23,15 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.util.Records; @@ -51,7 +53,7 @@ public static ApplicationAttemptStateData newInstance( attemptStateData.setAppAttemptTokens(attemptTokens); attemptStateData.setState(finalState); attemptStateData.setFinalTrackingUrl(finalTrackingUrl); - attemptStateData.setDiagnostics(diagnostics); + attemptStateData.setDiagnostics(diagnostics == null ? "" : diagnostics); attemptStateData.setStartTime(startTime); attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus); attemptStateData.setAMContainerExitStatus(exitStatus); @@ -59,20 +61,42 @@ public static ApplicationAttemptStateData newInstance( } public static ApplicationAttemptStateData newInstance( - ApplicationAttemptState attemptState) throws IOException { - Credentials credentials = attemptState.getAppAttemptCredentials(); + ApplicationAttemptId attemptId, Container masterContainer, + ByteBuffer attemptTokens, long startTime) { + return newInstance(attemptId, masterContainer, attemptTokens, + startTime, null, "N/A", "", null, ContainerExitStatus.INVALID); + } + + public static ByteBuffer convertCredentialsToByteBuffer( + Credentials credentials) throws IOException { ByteBuffer appAttemptTokens = null; - if (credentials != null) { - DataOutputBuffer dob = new DataOutputBuffer(); - credentials.writeTokenStorageToStream(dob); - appAttemptTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + DataOutputBuffer dob = new DataOutputBuffer(); + try { + if (credentials != null) { + credentials.writeTokenStorageToStream(dob); + appAttemptTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + } + return appAttemptTokens; + } finally { + IOUtils.closeStream(dob); + } + } + + public static Credentials convertCredentialsFromByteBuffer( + ByteBuffer appAttemptTokens) throws IOException { + DataInputByteBuffer dibb = new DataInputByteBuffer(); + try { + Credentials credentials = null; + if (appAttemptTokens != null) { + credentials = new Credentials(); + appAttemptTokens.rewind(); + dibb.reset(appAttemptTokens); + credentials.readTokenStorageStream(dibb); + } + return credentials; + } finally { + IOUtils.closeStream(dibb); } - return newInstance(attemptState.getAttemptId(), - attemptState.getMasterContainer(), appAttemptTokens, - attemptState.getStartTime(), attemptState.getState(), - attemptState.getFinalTrackingUrl(), attemptState.getDiagnostics(), - attemptState.getFinalApplicationStatus(), - attemptState.getAMContainerExitStatus()); } public abstract ApplicationAttemptStateDataProto getProto(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java index eff0445..43046a9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java @@ -18,14 +18,16 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery.records; +import java.util.HashMap; +import java.util.Map; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.util.Records; @@ -36,6 +38,9 @@ @Public @Unstable public abstract class ApplicationStateData { + public Map attempts = + new HashMap(); + public static ApplicationStateData newInstance(long submitTime, long startTime, String user, ApplicationSubmissionContext submissionContext, @@ -51,12 +56,18 @@ public static ApplicationStateData newInstance(long submitTime, return appState; } - public static ApplicationStateData newInstance( - ApplicationState appState) { - return newInstance(appState.getSubmitTime(), appState.getStartTime(), - appState.getUser(), appState.getApplicationSubmissionContext(), - appState.getState(), appState.getDiagnostics(), - appState.getFinishTime()); + public static ApplicationStateData newInstance(long submitTime, + long startTime, ApplicationSubmissionContext context, String user) { + return newInstance(submitTime, startTime, user, context, null, "", 0); + } + + public int getAttemptCount() { + return attempts.size(); + } + + public ApplicationAttemptStateData getAttempt( + ApplicationAttemptId attemptId) { + return attempts.get(attemptId); } public abstract ApplicationStateDataProto getProto(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 48cf460..f29459e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -58,9 +58,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; @@ -665,7 +665,8 @@ public void handle(RMAppEvent event) { @Override public void recover(RMState state) throws Exception{ - ApplicationState appState = state.getApplicationState().get(getApplicationId()); + ApplicationStateData appState = + state.getApplicationState().get(getApplicationId()); this.recoveredFinalState = appState.getState(); LOG.info("Recovering app: " + getApplicationId() + " with " + + appState.getAttemptCount() + " attempts and final state = " + this.recoveredFinalState ); @@ -937,10 +938,10 @@ private void rememberTargetTransitionsAndStoreState(RMAppEvent event, default: break; } - ApplicationState appState = - new ApplicationState(this.submitTime, this.startTime, - this.submissionContext, this.user, stateToBeStored, diags, - this.storedFinishTime); + ApplicationStateData appState = + ApplicationStateData.newInstance(this.submitTime, this.startTime, + this.user, this.submissionContext, + stateToBeStored, diags, this.storedFinishTime); this.rmContext.getStateStore().updateApplicationState(appState); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 19fc800..6b7be0e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; @@ -42,7 +43,6 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -66,9 +66,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -712,9 +712,9 @@ public ApplicationResourceUsageReport getApplicationResourceUsageReport() { @Override public void recover(RMState state) throws Exception { - ApplicationState appState = + ApplicationStateData appState = state.getApplicationState().get(getAppAttemptId().getApplicationId()); - ApplicationAttemptState attemptState = + ApplicationAttemptStateData attemptState = appState.getAttempt(getAppAttemptId()); assert attemptState != null; LOG.info("Recovering attempt: " + getAppAttemptId() + " with final state: " @@ -726,7 +726,7 @@ public void recover(RMState state) throws Exception { this.attemptMetrics.setIsPreempted(); } setMasterContainer(attemptState.getMasterContainer()); - recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials(), + recoverAppAttemptCredentials(attemptState.getAppAttemptTokens(), attemptState.getState()); this.recoveredFinalState = attemptState.getState(); this.originalTrackingUrl = attemptState.getFinalTrackingUrl(); @@ -739,16 +739,22 @@ public void transferStateFromPreviousAttempt(RMAppAttempt attempt) { this.justFinishedContainers = attempt.getJustFinishedContainers(); } - private void recoverAppAttemptCredentials(Credentials appAttemptTokens, + private void recoverAppAttemptCredentials(ByteBuffer appAttemptTokens, RMAppAttemptState state) throws IOException { if (appAttemptTokens == null || state == RMAppAttemptState.FAILED || state == RMAppAttemptState.FINISHED || state == RMAppAttemptState.KILLED) { return; } + + Credentials credentials = null; + if (appAttemptTokens.hasArray()) { + credentials = ApplicationAttemptStateData + .convertCredentialsFromByteBuffer(appAttemptTokens); + } if (UserGroupInformation.isSecurityEnabled()) { - byte[] clientTokenMasterKeyBytes = appAttemptTokens.getSecretKey( + byte[] clientTokenMasterKeyBytes = credentials.getSecretKey( RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME); clientTokenMasterKey = rmContext.getClientToAMTokenSecretManager() .registerMasterKey(applicationAttemptId, clientTokenMasterKeyBytes); @@ -1019,10 +1025,17 @@ private void rememberTargetTransitionsAndStoreState(RMAppAttemptEvent event, } RMStateStore rmStore = rmContext.getStateStore(); - ApplicationAttemptState attemptState = - new ApplicationAttemptState(applicationAttemptId, getMasterContainer(), - rmStore.getCredentialsFromAppAttempt(this), startTime, - stateToBeStored, finalTrackingUrl, diags, finalStatus, exitStatus); + ApplicationAttemptStateData attemptState = null; + try { + attemptState = ApplicationAttemptStateData + .newInstance(applicationAttemptId, getMasterContainer(), + ApplicationAttemptStateData.convertCredentialsToByteBuffer( + rmStore.getCredentialsFromAppAttempt(this)), startTime, + stateToBeStored, finalTrackingUrl, diags, finalStatus, exitStatus); + } catch (IOException e) { + // TODO: handling invalid state + LOG.fatal(e); + } LOG.info("Updating application attempt " + applicationAttemptId + " with final state: " + targetedFinalState + ", and exit status: " + exitStatus); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 5f63caf..13c870e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -81,8 +81,6 @@ import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreEvent; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; @@ -137,7 +135,7 @@ public void testRMRestart() throws Exception { MemoryRMStateStore memStore = new MemoryRMStateStore(); memStore.init(conf); RMState rmState = memStore.getState(); - Map rmAppState = + Map rmAppState = rmState.getApplicationState(); @@ -169,7 +167,7 @@ public void testRMRestart() throws Exception { // create app that gets launched and does allocate before RM restart RMApp app1 = rm1.submitApp(200); // assert app1 info is saved - ApplicationState appState = rmAppState.get(app1.getApplicationId()); + ApplicationStateData appState = rmAppState.get(app1.getApplicationId()); Assert.assertNotNull(appState); Assert.assertEquals(0, appState.getAttemptCount()); Assert.assertEquals(appState.getApplicationSubmissionContext() @@ -184,7 +182,7 @@ public void testRMRestart() throws Exception { ApplicationAttemptId attemptId1 = attempt1.getAppAttemptId(); rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED); Assert.assertEquals(1, appState.getAttemptCount()); - ApplicationAttemptState attemptState = + ApplicationAttemptStateData attemptState = appState.getAttempt(attemptId1); Assert.assertNotNull(attemptState); Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1), @@ -401,7 +399,7 @@ public void testRMRestartAppRunningAMFailed() throws Exception { MemoryRMStateStore memStore = new MemoryRMStateStore(); memStore.init(conf); RMState rmState = memStore.getState(); - Map rmAppState = + Map rmAppState = rmState.getApplicationState(); // start RM @@ -419,7 +417,7 @@ public void testRMRestartAppRunningAMFailed() throws Exception { nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE); am0.waitForState(RMAppAttemptState.FAILED); - ApplicationState appState = rmAppState.get(app0.getApplicationId()); + ApplicationStateData appState = rmAppState.get(app0.getApplicationId()); // assert the AM failed state is saved. Assert.assertEquals(RMAppAttemptState.FAILED, appState.getAttempt(am0.getApplicationAttemptId()).getState()); @@ -457,7 +455,7 @@ public void testRMRestartWaitForPreviousAMToFinish() throws Exception { MemoryRMStateStore memStore = new MemoryRMStateStore(); memStore.init(conf); RMState rmState = memStore.getState(); - Map rmAppState = + Map rmAppState = rmState.getApplicationState(); // start RM @@ -624,7 +622,7 @@ public void updateApplicationStateInternal(ApplicationId appId, }; memStore.init(conf); RMState rmState = memStore.getState(); - Map rmAppState = + Map rmAppState = rmState.getApplicationState(); // start RM @@ -663,7 +661,7 @@ public void testRMRestartFailedApp() throws Exception { MemoryRMStateStore memStore = new MemoryRMStateStore(); memStore.init(conf); RMState rmState = memStore.getState(); - Map rmAppState = + Map rmAppState = rmState.getApplicationState(); // start RM @@ -683,7 +681,7 @@ public void testRMRestartFailedApp() throws Exception { rm1.waitForState(app0.getApplicationId(), RMAppState.FAILED); // assert the app/attempt failed state is saved. - ApplicationState appState = rmAppState.get(app0.getApplicationId()); + ApplicationStateData appState = rmAppState.get(app0.getApplicationId()); Assert.assertEquals(RMAppState.FAILED, appState.getState()); Assert.assertEquals(RMAppAttemptState.FAILED, appState.getAttempt(am0.getApplicationAttemptId()).getState()); @@ -713,7 +711,7 @@ public void testRMRestartKilledApp() throws Exception{ MemoryRMStateStore memStore = new MemoryRMStateStore(); memStore.init(conf); RMState rmState = memStore.getState(); - Map rmAppState = + Map rmAppState = rmState.getApplicationState(); // start RM @@ -733,7 +731,7 @@ public void testRMRestartKilledApp() throws Exception{ rm1.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.KILLED); // killed state is saved. - ApplicationState appState = rmAppState.get(app0.getApplicationId()); + ApplicationStateData appState = rmAppState.get(app0.getApplicationId()); Assert.assertEquals(RMAppState.KILLED, appState.getState()); Assert.assertEquals(RMAppAttemptState.KILLED, appState.getAttempt(am0.getApplicationAttemptId()).getState()); @@ -801,7 +799,7 @@ public void testRMRestartSucceededApp() throws Exception { MemoryRMStateStore memStore = new MemoryRMStateStore(); memStore.init(conf); RMState rmState = memStore.getState(); - Map rmAppState = + Map rmAppState = rmState.getApplicationState(); // start RM @@ -822,8 +820,8 @@ public void testRMRestartSucceededApp() throws Exception { finishApplicationMaster(app0, rm1, nm1, am0, req); // check the state store about the unregistered info. - ApplicationState appState = rmAppState.get(app0.getApplicationId()); - ApplicationAttemptState attemptState0 = + ApplicationStateData appState = rmAppState.get(app0.getApplicationId()); + ApplicationAttemptStateData attemptState0 = appState.getAttempt(am0.getApplicationAttemptId()); Assert.assertEquals("diagnostics", attemptState0.getDiagnostics()); Assert.assertEquals(FinalApplicationStatus.SUCCEEDED, @@ -979,7 +977,7 @@ private void finishApplicationMaster(RMApp rmApp, MockRM rm, MockNM nm, MockAM am, FinishApplicationMasterRequest req) throws Exception { RMState rmState = ((MemoryRMStateStore) rm.getRMContext().getStateStore()).getState(); - Map rmAppState = + Map rmAppState = rmState.getApplicationState(); am.unregisterAppAttempt(req,true); am.waitForState(RMAppAttemptState.FINISHING); @@ -987,7 +985,7 @@ private void finishApplicationMaster(RMApp rmApp, MockRM rm, MockNM nm, am.waitForState(RMAppAttemptState.FINISHED); rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHED); // check that app/attempt is saved with the final state - ApplicationState appState = rmAppState.get(rmApp.getApplicationId()); + ApplicationStateData appState = rmAppState.get(rmApp.getApplicationId()); Assert .assertEquals(RMAppState.FINISHED, appState.getState()); Assert.assertEquals(RMAppAttemptState.FINISHED, @@ -1003,7 +1001,7 @@ public void testRMRestartOnMaxAppAttempts() throws Exception { memStore.init(conf); RMState rmState = memStore.getState(); - Map rmAppState = + Map rmAppState = rmState.getApplicationState(); MockRM rm1 = new MockRM(conf, memStore); rm1.start(); @@ -1021,7 +1019,7 @@ public void testRMRestartOnMaxAppAttempts() throws Exception { null); // assert app1 info is saved - ApplicationState appState = rmAppState.get(app1.getApplicationId()); + ApplicationStateData appState = rmAppState.get(app1.getApplicationId()); Assert.assertNotNull(appState); Assert.assertEquals(0, appState.getAttemptCount()); Assert.assertEquals(appState.getApplicationSubmissionContext() @@ -1034,7 +1032,7 @@ public void testRMRestartOnMaxAppAttempts() throws Exception { ApplicationAttemptId attemptId1 = attempt.getAppAttemptId(); rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED); Assert.assertEquals(1, appState.getAttemptCount()); - ApplicationAttemptState attemptState = + ApplicationAttemptStateData attemptState = appState.getAttempt(attemptId1); Assert.assertNotNull(attemptState); Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1), @@ -1080,7 +1078,7 @@ public void testDelegationTokenRestoredInDelegationTokenRenewer() memStore.init(conf); RMState rmState = memStore.getState(); - Map rmAppState = + Map rmAppState = rmState.getApplicationState(); MockRM rm1 = new TestSecurityMockRM(conf, memStore); rm1.start(); @@ -1119,7 +1117,7 @@ public void testDelegationTokenRestoredInDelegationTokenRenewer() new HashMap(), false, "default", 1, ts); // assert app info is saved - ApplicationState appState = rmAppState.get(app.getApplicationId()); + ApplicationStateData appState = rmAppState.get(app.getApplicationId()); Assert.assertNotNull(appState); // assert delegation tokens exist in rm1 DelegationTokenRenewr @@ -1179,7 +1177,7 @@ public void testAppAttemptTokensRestoredOnRMRestart() throws Exception { memStore.init(conf); RMState rmState = memStore.getState(); - Map rmAppState = + Map rmAppState = rmState.getApplicationState(); MockRM rm1 = new TestSecurityMockRM(conf, memStore); rm1.start(); @@ -1193,7 +1191,7 @@ public void testAppAttemptTokensRestoredOnRMRestart() throws Exception { new HashMap(), "default"); // assert app info is saved - ApplicationState appState = rmAppState.get(app1.getApplicationId()); + ApplicationStateData appState = rmAppState.get(app1.getApplicationId()); Assert.assertNotNull(appState); // Allocate the AM @@ -1203,7 +1201,7 @@ public void testAppAttemptTokensRestoredOnRMRestart() throws Exception { rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED); // assert attempt info is saved - ApplicationAttemptState attemptState = appState.getAttempt(attemptId1); + ApplicationAttemptStateData attemptState = appState.getAttempt(attemptId1); Assert.assertNotNull(attemptState); Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1), attemptState.getMasterContainer().getId()); @@ -1214,7 +1212,9 @@ public void testAppAttemptTokensRestoredOnRMRestart() throws Exception { attempt1.getClientTokenMasterKey().getEncoded(); // assert application credentials are saved - Credentials savedCredentials = attemptState.getAppAttemptCredentials(); + Credentials savedCredentials = + ApplicationAttemptStateData.convertCredentialsFromByteBuffer( + attemptState.getAppAttemptTokens()); Assert.assertArrayEquals("client token master key not saved", clientTokenMasterKey, savedCredentials.getSecretKey( RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME)); @@ -1262,7 +1262,7 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception { memStore.init(conf); RMState rmState = memStore.getState(); - Map rmAppState = + Map rmAppState = rmState.getApplicationState(); Map rmDTState = rmState.getRMDTSecretManagerState().getTokenState(); @@ -1299,7 +1299,7 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception { new HashMap(), false, "default", 1, ts); // assert app info is saved - ApplicationState appState = rmAppState.get(app.getApplicationId()); + ApplicationStateData appState = rmAppState.get(app.getApplicationId()); Assert.assertNotNull(appState); // assert all master keys are saved @@ -1477,7 +1477,7 @@ protected void handleStoreEvent(RMStateStoreEvent event) { // queue, and will be processed once rm.stop() is called. // Nothing exist in state store before stop is called. - Map rmAppState = + Map rmAppState = memStore.getState().getApplicationState(); Assert.assertTrue(rmAppState.size() == 0); @@ -1487,7 +1487,7 @@ protected void handleStoreEvent(RMStateStoreEvent event) { // Assert app info is still saved even if stop is called with pending saving // request on dispatcher. for (RMApp app : appList) { - ApplicationState appState = rmAppState.get(app.getApplicationId()); + ApplicationStateData appState = rmAppState.get(app.getApplicationId()); Assert.assertNotNull(appState); Assert.assertEquals(0, appState.getAttemptCount()); Assert.assertEquals(appState.getApplicationSubmissionContext() @@ -1521,7 +1521,7 @@ public void testFinishedAppRemovalAfterRMRestart() throws Exception { nm1.setResourceTrackerService(rm2.getResourceTrackerService()); nm1 = rm2.registerNode("127.0.0.1:1234", 15120); - Map rmAppState = + Map rmAppState = rmState.getApplicationState(); // app0 exits in both state store and rmContext @@ -1908,4 +1908,5 @@ protected void doSecureLogin() throws IOException { // Do nothing. } } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index 6c5c818..babb297 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -42,7 +42,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -383,7 +383,7 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception { am1.waitForState(RMAppAttemptState.FAILED); Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry()); rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); - ApplicationState appState = + ApplicationStateData appState = memStore.getState().getApplicationState().get(app1.getApplicationId()); // AM should be restarted even though max-am-attempt is 1. MockAM am2 = @@ -494,7 +494,7 @@ public void testPreemptedAMRestartOnRMRestart() throws Exception { rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); // state store has 1 attempt stored. - ApplicationState appState = + ApplicationStateData appState = memStore.getState().getApplicationState().get(app1.getApplicationId()); Assert.assertEquals(1, appState.getAttemptCount()); // attempt stored has the preempted container exit status. @@ -552,7 +552,7 @@ public void testRMRestartOrFailoverNotCountedForAMFailures() // Restart rm. MockRM rm2 = new MockRM(conf, memStore); rm2.start(); - ApplicationState appState = + ApplicationStateData appState = memStore.getState().getApplicationState().get(app1.getApplicationId()); // re-register the NM nm1.setResourceTrackerService(rm2.getResourceTrackerService()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index 620ba9f..1e806f3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -57,8 +57,8 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMDTSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; @@ -232,6 +232,7 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) when(mockRemovedApp.getSubmitTime()).thenReturn(submitTime); when(mockRemovedApp.getApplicationSubmissionContext()).thenReturn(context); when(mockRemovedApp.getAppAttempts()).thenReturn(attempts); + when(mockRemovedApp.getUser()).thenReturn("user1"); RMAppAttempt mockRemovedAttempt = mock(RMAppAttempt.class); when(mockRemovedAttempt.getAppAttemptId()).thenReturn(attemptIdRemoved); attempts.put(attemptIdRemoved, mockRemovedAttempt); @@ -254,10 +255,10 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) store = stateStoreHelper.getRMStateStore(); store.setRMDispatcher(dispatcher); RMState state = store.loadState(); - Map rmAppState = + Map rmAppState = state.getApplicationState(); - ApplicationState appState = rmAppState.get(appId1); + ApplicationStateData appState = rmAppState.get(appId1); // app is loaded assertNotNull(appState); // app is loaded correctly @@ -266,7 +267,7 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) // submission context is loaded correctly assertEquals(appId1, appState.getApplicationSubmissionContext().getApplicationId()); - ApplicationAttemptState attemptState = appState.getAttempt(attemptId1); + ApplicationAttemptStateData attemptState = appState.getAttempt(attemptId1); // attempt1 is loaded correctly assertNotNull(attemptState); assertEquals(attemptId1, attemptState.getAttemptId()); @@ -275,8 +276,9 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) assertEquals(containerId1, attemptState.getMasterContainer().getId()); // attempt1 client token master key is loaded correctly assertArrayEquals(clientTokenKey1.getEncoded(), - attemptState.getAppAttemptCredentials() - .getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME)); + ApplicationAttemptStateData.convertCredentialsFromByteBuffer( + attemptState.getAppAttemptTokens()) + .getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME)); attemptState = appState.getAttempt(attemptId2); // attempt2 is loaded correctly @@ -286,25 +288,28 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) assertEquals(containerId2, attemptState.getMasterContainer().getId()); // attempt2 client token master key is loaded correctly assertArrayEquals(clientTokenKey2.getEncoded(), - attemptState.getAppAttemptCredentials() - .getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME)); + ApplicationAttemptStateData.convertCredentialsFromByteBuffer( + attemptState.getAppAttemptTokens()) + .getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME)); //******* update application/attempt state *******// - ApplicationState appState2 = - new ApplicationState(appState.submitTime, appState.startTime, - appState.context, appState.user, RMAppState.FINISHED, - "appDiagnostics", 1234); + ApplicationStateData appState2 = + ApplicationStateData.newInstance(appState.getSubmitTime(), + appState.getStartTime(), appState.getUser(), + appState.getApplicationSubmissionContext(), RMAppState.FINISHED, + "appDiagnostics", 1234); appState2.attempts.putAll(appState.attempts); store.updateApplicationState(appState2); - ApplicationAttemptState oldAttemptState = attemptState; - ApplicationAttemptState newAttemptState = - new ApplicationAttemptState(oldAttemptState.getAttemptId(), - oldAttemptState.getMasterContainer(), - oldAttemptState.getAppAttemptCredentials(), - oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED, - "myTrackingUrl", "attemptDiagnostics", - FinalApplicationStatus.SUCCEEDED, 100); + ApplicationAttemptStateData oldAttemptState = attemptState; + ApplicationAttemptStateData newAttemptState = + ApplicationAttemptStateData.newInstance( + oldAttemptState.getAttemptId(), + oldAttemptState.getMasterContainer(), + oldAttemptState.getAppAttemptTokens(), + oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED, + "myTrackingUrl", "attemptDiagnostics", + FinalApplicationStatus.SUCCEEDED, 100); store.updateApplicationAttemptState(newAttemptState); // test updating the state of an app/attempt whose initial state was not @@ -313,21 +318,21 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) ApplicationSubmissionContext dummyContext = new ApplicationSubmissionContextPBImpl(); dummyContext.setApplicationId(dummyAppId); - ApplicationState dummyApp = - new ApplicationState(appState.submitTime, appState.startTime, - dummyContext, appState.user, RMAppState.FINISHED, "appDiagnostics", - 1234); + ApplicationStateData dummyApp = + ApplicationStateData.newInstance(appState.getSubmitTime(), + appState.getStartTime(), appState.getUser(), dummyContext, + RMAppState.FINISHED, "appDiagnostics", 1234); store.updateApplicationState(dummyApp); ApplicationAttemptId dummyAttemptId = ApplicationAttemptId.newInstance(dummyAppId, 6); - ApplicationAttemptState dummyAttempt = - new ApplicationAttemptState(dummyAttemptId, - oldAttemptState.getMasterContainer(), - oldAttemptState.getAppAttemptCredentials(), - oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED, - "myTrackingUrl", "attemptDiagnostics", - FinalApplicationStatus.SUCCEEDED, 111); + ApplicationAttemptStateData dummyAttempt = + ApplicationAttemptStateData.newInstance( + dummyAttemptId, oldAttemptState.getMasterContainer(), + oldAttemptState.getAppAttemptTokens(), + oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED, + "myTrackingUrl", "attemptDiagnostics", + FinalApplicationStatus.SUCCEEDED, 111); store.updateApplicationAttemptState(dummyAttempt); // let things settle down @@ -338,11 +343,13 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) store = stateStoreHelper.getRMStateStore(); store.setRMDispatcher(dispatcher); RMState newRMState = store.loadState(); - Map newRMAppState = + Map newRMAppState = newRMState.getApplicationState(); - assertNotNull(newRMAppState.get(dummyApp.getAppId())); - ApplicationState updatedAppState = newRMAppState.get(appId1); - assertEquals(appState.getAppId(),updatedAppState.getAppId()); + assertNotNull(newRMAppState.get( + dummyApp.getApplicationSubmissionContext().getApplicationId())); + ApplicationStateData updatedAppState = newRMAppState.get(appId1); + assertEquals(appState.getApplicationSubmissionContext().getApplicationId(), + updatedAppState.getApplicationSubmissionContext().getApplicationId()); assertEquals(appState.getSubmitTime(), updatedAppState.getSubmitTime()); assertEquals(appState.getStartTime(), updatedAppState.getStartTime()); assertEquals(appState.getUser(), updatedAppState.getUser()); @@ -352,16 +359,17 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) assertEquals(1234, updatedAppState.getFinishTime()); // check updated attempt state - assertNotNull(newRMAppState.get(dummyApp.getAppId()).getAttempt( - dummyAttemptId)); - ApplicationAttemptState updatedAttemptState = + assertNotNull(newRMAppState.get(dummyApp.getApplicationSubmissionContext + ().getApplicationId()).getAttempt(dummyAttemptId)); + ApplicationAttemptStateData updatedAttemptState = updatedAppState.getAttempt(newAttemptState.getAttemptId()); assertEquals(oldAttemptState.getAttemptId(), updatedAttemptState.getAttemptId()); assertEquals(containerId2, updatedAttemptState.getMasterContainer().getId()); assertArrayEquals(clientTokenKey2.getEncoded(), - updatedAttemptState.getAppAttemptCredentials().getSecretKey( - RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME)); + ApplicationAttemptStateData.convertCredentialsFromByteBuffer( + attemptState.getAppAttemptTokens()) + .getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME)); // new attempt state fields assertEquals(RMAppAttemptState.FINISHED, updatedAttemptState.getState()); assertEquals("myTrackingUrl", updatedAttemptState.getFinalTrackingUrl()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index 2fc4431..0b9a4e9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; @@ -54,7 +55,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -300,11 +300,13 @@ private void assertTimesAtFinish(RMApp application) { } private void assertAppFinalStateSaved(RMApp application){ - verify(store, times(1)).updateApplicationState(any(ApplicationState.class)); + verify(store, times(1)).updateApplicationState( + any(ApplicationStateData.class)); } private void assertAppFinalStateNotSaved(RMApp application){ - verify(store, times(0)).updateApplicationState(any(ApplicationState.class)); + verify(store, times(0)).updateApplicationState( + any(ApplicationStateData.class)); } private void assertKilled(RMApp application) { @@ -868,22 +870,25 @@ public void testAppKilledKilled() throws IOException { @Test(timeout = 30000) public void testAppsRecoveringStates() throws Exception { RMState state = new RMState(); - Map applicationState = + Map applicationState = state.getApplicationState(); createRMStateForApplications(applicationState, RMAppState.FINISHED); createRMStateForApplications(applicationState, RMAppState.KILLED); createRMStateForApplications(applicationState, RMAppState.FAILED); - for (ApplicationState appState : applicationState.values()) { + for (ApplicationStateData appState : applicationState.values()) { testRecoverApplication(appState, state); } } - public void testRecoverApplication(ApplicationState appState, RMState rmState) + public void testRecoverApplication(ApplicationStateData appState, + RMState rmState) throws Exception { ApplicationSubmissionContext submissionContext = appState.getApplicationSubmissionContext(); RMAppImpl application = - new RMAppImpl(appState.getAppId(), rmContext, conf, + new RMAppImpl( + appState.getApplicationSubmissionContext().getApplicationId(), + rmContext, conf, submissionContext.getApplicationName(), null, submissionContext.getQueue(), submissionContext, null, null, appState.getSubmitTime(), submissionContext.getApplicationType(), @@ -896,7 +901,8 @@ public void testRecoverApplication(ApplicationState appState, RMState rmState) RMAppImpl.isAppInFinalState(application)); // Trigger RECOVER event. - application.handle(new RMAppEvent(appState.getAppId(), + application.handle(new RMAppEvent( + appState.getApplicationSubmissionContext().getApplicationId(), RMAppEventType.RECOVER)); rmDispatcher.await(); RMAppState finalState = appState.getState(); @@ -905,12 +911,12 @@ public void testRecoverApplication(ApplicationState appState, RMState rmState) } public void createRMStateForApplications( - Map applicationState, + Map applicationState, RMAppState rmAppState) { RMApp app = createNewTestApp(null); - ApplicationState appState = - new ApplicationState(app.getSubmitTime(), app.getStartTime(), - app.getApplicationSubmissionContext(), app.getUser(), rmAppState, + ApplicationStateData appState = + ApplicationStateData.newInstance(app.getSubmitTime(), app.getStartTime(), + app.getUser(), app.getApplicationSubmissionContext(), rmAppState, null, app.getFinishTime()); applicationState.put(app.getApplicationId(), appState); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index efcecd9..9bbe9ad 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -70,8 +70,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent; @@ -1301,7 +1301,7 @@ private void verifyUrl(String url1, String url2) { private void verifyAttemptFinalStateSaved() { verify(store, times(1)).updateApplicationAttemptState( - any(ApplicationAttemptState.class)); + any(ApplicationAttemptStateData.class)); } private void verifyAMHostAndPortInvalidated() {