diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_server_resourcemanager_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_server_resourcemanager_service_protos.proto index 52e21d7..cd765f5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_server_resourcemanager_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_server_resourcemanager_service_protos.proto @@ -72,4 +72,5 @@ message ApplicationStateDataProto { message ApplicationAttemptStateDataProto { optional ApplicationAttemptIdProto attemptId = 1; optional ContainerProto master_container = 2; + optional bytes application_token = 3; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 4dcb6f2..14970f5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -315,10 +315,6 @@ public void recover(RMState state) throws Exception { LOG.info("Recovering " + appStates.size() + " applications"); for(ApplicationState appState : appStates.values()) { boolean shouldRecover = true; - // re-submit the application - // this is going to send an app start event but since the async dispatcher - // has not started that event will be queued until we have completed re - // populating the state if(appState.getApplicationSubmissionContext().getUnmanagedAM()) { // do not recover unmanaged applications since current recovery // mechanism of restarting attempts does not work for them. @@ -348,6 +344,10 @@ public void recover(RMState state) throws Exception { shouldRecover = false; } + // re-submit the application + // this is going to send an app start event but since the async dispatcher + // has not started that event will be queued until we have completed re + // populating the state if(shouldRecover) { LOG.info("Recovering application " + appState.getAppId()); submitApplication(appState.getApplicationSubmissionContext(), diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java index a9d40eb..2934c9c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java @@ -34,7 +34,6 @@ import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.StringUtils; @@ -48,7 +47,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -202,21 +200,11 @@ private void setupTokensAndEnv( credentials.readTokenStorageStream(dibb); } - ApplicationTokenIdentifier id = new ApplicationTokenIdentifier( - application.getAppAttemptId()); - Token appMasterToken = - new Token(id, - this.rmContext.getApplicationTokenSecretManager()); - InetSocketAddress serviceAddr = conf.getSocketAddr( - YarnConfiguration.RM_SCHEDULER_ADDRESS, - YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, - YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); - // normally the client should set the service after acquiring the token, - // but this token is directly provided to the AMs - SecurityUtil.setTokenService(appMasterToken, serviceAddr); - // Add the ApplicationMaster token - credentials.addToken(appMasterToken.getService(), appMasterToken); + Token applicationToken = + application.getApplicationToken(); + assert applicationToken != null; + credentials.addToken(applicationToken.getService(), applicationToken); DataOutputBuffer dob = new DataOutputBuffer(); credentials.writeTokenStorageToStream(dob); container.setContainerTokens( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index c4990da..6ad7368 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -31,11 +31,15 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; +import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -80,6 +84,7 @@ protected synchronized void closeInternal() throws Exception { fs.close(); } + @SuppressWarnings("unchecked") @Override public synchronized RMState loadState() throws Exception { try { @@ -114,8 +119,24 @@ public synchronized RMState loadState() throws Exception { ApplicationAttemptStateDataPBImpl attemptStateData = new ApplicationAttemptStateDataPBImpl( ApplicationAttemptStateDataProto.parseFrom(childData)); - ApplicationAttemptState attemptState = new ApplicationAttemptState( - attemptId, attemptStateData.getMasterContainer()); + + Token appToken = null; + if (attemptStateData.getApplicationToken() != null) { + // restore application token + Credentials credentials = new Credentials(); + DataInputByteBuffer dibb = new DataInputByteBuffer(); + dibb.reset(attemptStateData.getApplicationToken()); + credentials.readTokenStorageStream(dibb); + + // assert only 1 token as we only have one appToken per attempt + assert credentials.numberOfTokens() == 1; + appToken = (Token) credentials + .getAllTokens().iterator().next(); + } + ApplicationAttemptState attemptState = + new ApplicationAttemptState(attemptId, + attemptStateData.getMasterContainer(), appToken); + // assert child node name is same as application attempt id assert attemptId.equals(attemptState.getAttemptId()); attempts.add(attemptState); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index 5fb1167..90c6543 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -23,8 +23,12 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -73,14 +77,30 @@ public void storeApplicationState(String appId, state.appState.put(appState.getAppId(), appState); } + @SuppressWarnings("unchecked") @Override public synchronized void storeApplicationAttemptState(String attemptIdStr, ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception { ApplicationAttemptId attemptId = ConverterUtils .toApplicationAttemptId(attemptIdStr); - ApplicationAttemptState attemptState = new ApplicationAttemptState( - attemptId, attemptStateData.getMasterContainer()); + Token appToken = null; + if (attemptStateData.getApplicationToken() != null) { + // restore application token + Credentials credentials = new Credentials(); + DataInputByteBuffer dibb = new DataInputByteBuffer(); + dibb.reset(attemptStateData.getApplicationToken()); + credentials.readTokenStorageStream(dibb); + + // assert only 1 token as we only have one application token per attempt + assert credentials.numberOfTokens() == 1; + appToken = (Token) credentials.getAllTokens() + .iterator().next(); + } + + ApplicationAttemptState attemptState = + new ApplicationAttemptState(attemptId, + attemptStateData.getMasterContainer(), appToken); ApplicationState appState = state.getApplicationState().get( attemptState.getAttemptId().getApplicationId()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index d5c5015..1365c6e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; @@ -26,6 +27,9 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -34,6 +38,7 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -58,19 +63,25 @@ public static class ApplicationAttemptState { final ApplicationAttemptId attemptId; final Container masterContainer; - + final Token applicationToken; + public ApplicationAttemptState(ApplicationAttemptId attemptId, - Container masterContainer) { + Container masterContainer, + Token applicationToken) { this.attemptId = attemptId; this.masterContainer = masterContainer; + this.applicationToken = applicationToken; } - + public Container getMasterContainer() { return masterContainer; } public ApplicationAttemptId getAttemptId() { return attemptId; } + public Token getApplicationToken() { + return applicationToken; + } } /** @@ -199,10 +210,13 @@ protected abstract void storeApplicationState(String appId, * RMAppAttemptStoredEvent will be sent on completion to notify the RMAppAttempt */ public synchronized void storeApplicationAttempt(RMAppAttempt appAttempt) { - ApplicationAttemptState attemptState = new ApplicationAttemptState( - appAttempt.getAppAttemptId(), appAttempt.getMasterContainer()); + + ApplicationAttemptState attemptState = + new ApplicationAttemptState(appAttempt.getAppAttemptId(), + appAttempt.getMasterContainer(), appAttempt.getApplicationToken()); + dispatcher.getEventHandler().handle( - new RMStateStoreAppAttemptEvent(attemptState)); + new RMStateStoreAppAttemptEvent(attemptState)); } /** @@ -226,8 +240,9 @@ public synchronized void removeApplication(RMApp app) { ApplicationState appState = new ApplicationState( app.getSubmitTime(), app.getApplicationSubmissionContext()); for(RMAppAttempt appAttempt : app.getAppAttempts().values()) { - ApplicationAttemptState attemptState = new ApplicationAttemptState( - appAttempt.getAppAttemptId(), appAttempt.getMasterContainer()); + ApplicationAttemptState attemptState = + new ApplicationAttemptState(appAttempt.getAppAttemptId(), + appAttempt.getMasterContainer(), appAttempt.getApplicationToken()); appState.attempts.put(attemptState.getAttemptId(), attemptState); } @@ -288,8 +303,20 @@ private synchronized void handleStoreEvent(RMStateStoreEvent event) { attemptStateData.setAttemptId(attemptState.getAttemptId()); attemptStateData.setMasterContainer(attemptState.getMasterContainer()); - LOG.info("Storing info for attempt: " + attemptState.getAttemptId()); + Token appToken = + attemptState.getApplicationToken(); + try { + if(appToken != null){ + Credentials credentials = new Credentials(); + credentials.addToken(appToken.getService(), appToken); + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + attemptStateData.setApplicationToken( + ByteBuffer.wrap(dob.getData(), 0, dob.getLength())); + } + + LOG.info("Storing info for attempt: " + attemptState.getAttemptId()); storeApplicationAttemptState(attemptState.getAttemptId().toString(), attemptStateData); } catch (Exception e) { @@ -358,7 +385,5 @@ private void notifyDoneStoringApplicationAttempt(ApplicationAttemptId attemptId, public void handle(RMStateStoreEvent event) { handleStoreEvent(event); } - } - -} \ No newline at end of file +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java index 64e3ccb..afdabe4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery.records; +import java.nio.ByteBuffer; + import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -49,4 +51,14 @@ public Container getMasterContainer(); public void setMasterContainer(Container container); + + /** + * The token that application master uses to talk with ResourceManager + * @return token that application master uses to talk with ResourceManager + */ + @Public + @Unstable + public ByteBuffer getApplicationToken(); + + public void setApplicationToken(ByteBuffer token); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateDataPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateDataPBImpl.java index d033f5c..933ff69 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateDataPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateDataPBImpl.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery.records; +import java.nio.ByteBuffer; + import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ProtoBase; @@ -37,7 +39,8 @@ private ApplicationAttemptId attemptId = null; private Container masterContainer = null; - + private ByteBuffer applicationToken = null; + public ApplicationAttemptStateDataPBImpl() { builder = ApplicationAttemptStateDataProto.newBuilder(); } @@ -62,6 +65,9 @@ private void mergeLocalToBuilder() { if(this.masterContainer != null) { builder.setMasterContainer(((ContainerPBImpl)masterContainer).getProto()); } + if(this.applicationToken != null) { + builder.setApplicationToken(convertToProtoFormat(this.applicationToken)); + } } private void mergeLocalToProto() { @@ -123,4 +129,26 @@ public void setMasterContainer(Container container) { this.masterContainer = container; } + @Override + public ByteBuffer getApplicationToken() { + ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; + if(applicationToken != null) { + return applicationToken; + } + if(!p.hasApplicationToken()) { + return null; + } + this.applicationToken = convertFromProtoFormat(p.getApplicationToken()); + return applicationToken; + } + + @Override + public void setApplicationToken(ByteBuffer applicationToken) { + maybeInitBuilder(); + if(applicationToken == null) { + builder.clearApplicationToken(); + } + this.applicationToken = applicationToken; + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 874232b..a525c21 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -32,6 +32,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -51,6 +52,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; @@ -95,6 +97,7 @@ = new LinkedHashMap(); private final long submitTime; private final Set updatedNodes = new HashSet(); + private RMState rmState; // Mutable fields private long startTime; @@ -275,6 +278,10 @@ public ApplicationSubmissionContext getApplicationSubmissionContext() { return this.submissionContext; } + private RMState getRMState(){ + return this.rmState; + } + @Override public FinalApplicationStatus getFinalApplicationStatus() { this.readLock.lock(); @@ -555,6 +562,7 @@ public void handle(RMAppEvent event) { @Override public void recover(RMState state) { + rmState = state; ApplicationState appState = state.getApplicationState().get(getApplicationId()); LOG.info("Recovering app: " + getApplicationId() + " with " + + appState.getAttemptCount() + " attempts"); @@ -567,7 +575,7 @@ public void recover(RMState state) { } @SuppressWarnings("unchecked") - private void createNewAttempt(boolean startAttempt) { + private RMAppAttempt createNewAttempt(boolean startAttempt) { ApplicationAttemptId appAttemptId = Records .newRecord(ApplicationAttemptId.class); appAttemptId.setApplicationId(applicationId); @@ -582,8 +590,9 @@ private void createNewAttempt(boolean startAttempt) { handler.handle( new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.START)); } + return attempt; } - + private void processNodeUpdate(RMAppNodeUpdateType type, RMNode node) { NodeState nodeState = node.getState(); updatedNodes.add(node); @@ -608,7 +617,8 @@ public void transition(RMAppImpl app, RMAppEvent event) { private static final class StartAppAttemptTransition extends RMAppTransition { public void transition(RMAppImpl app, RMAppEvent event) { - if (event.getType().equals(RMAppEventType.APP_SAVED)) { + switch (event.getType()) { + case APP_SAVED: assert app.getState().equals(RMAppState.NEW_SAVING); RMAppStoredEvent storeEvent = (RMAppStoredEvent) event; if(storeEvent.getStoredException() != null) { @@ -619,9 +629,29 @@ public void transition(RMAppImpl app, RMAppEvent event) { storeEvent.getStoredException()); ExitUtil.terminate(1, storeEvent.getStoredException()); } + app.createNewAttempt(true); + break; + case RECOVER: + RMAppAttempt attempt = app.createNewAttempt(true); + + // reuse the appToken from previous attempt + if (UserGroupInformation.isSecurityEnabled()) { + ApplicationAttemptId previousAttempt = + Records.newRecord(ApplicationAttemptId.class); + previousAttempt.setApplicationId(app.getApplicationId()); + previousAttempt.setAttemptId(app.getAppAttempts().size() - 1); + ApplicationState appState = app.getRMState().getApplicationState() + .get(app.getApplicationId()); + ApplicationAttemptState attemptState = + appState.getAttempt(previousAttempt); + assert attemptState != null; + ((RMAppAttemptImpl) attempt).recoverAppAttemptToken(attemptState + .getApplicationToken()); + } + break; + default: + LOG.warn("Invalid eventType: " + event.getType()); } - - app.createNewAttempt(true); }; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java index 510c4fe..457621a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Set; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -31,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; /** @@ -147,6 +149,12 @@ ApplicationSubmissionContext getSubmissionContext(); /** + * The application token that application master uses to talk with RM + * @return the application token that application master uses to talk with RM + */ + Token getApplicationToken(); + + /** * Get application container and resource usage information. * @return an ApplicationResourceUsageReport object. */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index eaa15f5..e390999 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.yarn.util.StringHelper.pjoin; +import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; @@ -38,6 +39,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.http.HttpConfig; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.ExitUtil; @@ -58,6 +60,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier; import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -125,6 +128,7 @@ private final ApplicationAttemptId applicationAttemptId; private ClientToken clientToken; private final ApplicationSubmissionContext submissionContext; + private Token applicationToken = null; //nodes on while this attempt's containers ran private final Set ranNodes = @@ -139,6 +143,7 @@ private String origTrackingUrl = "N/A"; private String proxiedTrackingUrl = "N/A"; private long startTime = 0; + private boolean isRecovered = false; // Set to null initially. Will eventually get set // if an RMAppAttemptUnregistrationEvent occurs @@ -371,6 +376,7 @@ public RMAppAttemptImpl(ApplicationAttemptId appAttemptId, this.rmContext.getClientToAMTokenSecretManager().registerApplication( appAttemptId); + // create clientToken Token token = new Token(new ClientTokenIdentifier( appAttemptId), this.rmContext.getClientToAMTokenSecretManager()); @@ -507,6 +513,11 @@ public ClientToken getClientToken() { } @Override + public Token getApplicationToken() { + return this.applicationToken; + } + + @Override public String getDiagnostics() { this.readLock.lock(); @@ -652,11 +663,13 @@ public ApplicationResourceUsageReport getApplicationResourceUsageReport() { @Override public void recover(RMState state) { + isRecovered = true; ApplicationState appState = state.getApplicationState().get(getAppAttemptId().getApplicationId()); ApplicationAttemptState attemptState = appState.getAttempt(getAppAttemptId()); assert attemptState != null; setMasterContainer(attemptState.getMasterContainer()); + this.applicationToken = attemptState.getApplicationToken(); LOG.info("Recovered attempt: AppId: " + getAppAttemptId().getApplicationId() + " AttemptId: " + getAppAttemptId() + " MasterContainer: " + masterContainer); @@ -664,7 +677,12 @@ public void recover(RMState state) { handle(new RMAppAttemptEvent(getAppAttemptId(), RMAppAttemptEventType.RECOVER)); } - + + public void recoverAppAttemptToken(Token appToken) { + isRecovered = true; + this.applicationToken = appToken; + } + private static class BaseTransition implements SingleArcTransition { @@ -772,8 +790,30 @@ public void transition(RMAppAttemptImpl appAttempt, EMPTY_CONTAINER_RELEASE_LIST); // Set the masterContainer - appAttempt.setMasterContainer(amContainerAllocation.getContainers().get( - 0)); + appAttempt.setMasterContainer(amContainerAllocation.getContainers() + .get(0)); + + if (UserGroupInformation.isSecurityEnabled()) { + + if (!appAttempt.isRecovered()) { + // create new attempt-specific application token + ApplicationTokenIdentifier id = + new ApplicationTokenIdentifier(appAttempt.applicationAttemptId); + Token applicationToken = + new Token(id, + appAttempt.rmContext.getApplicationTokenSecretManager()); + InetSocketAddress serviceAddr = + appAttempt.conf.getSocketAddr( + YarnConfiguration.RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); + // normally the client should set the service after acquiring the + // token, but this token is directly provided to the AMs + SecurityUtil.setTokenService(applicationToken, serviceAddr); + appAttempt.applicationToken = applicationToken; + } + } + appAttempt.getSubmissionContext().setResource( appAttempt.getMasterContainer().getResource()); RMStateStore store = appAttempt.rmContext.getStateStore(); @@ -1191,7 +1231,11 @@ public long getStartTime() { this.readLock.unlock(); } } - + + private boolean isRecovered() { + return this.isRecovered; + } + private void launchAttempt(){ // Send event to launch the AM Container eventHandler.handle(new AMLauncherEvent(AMLauncherEventType.LAUNCH, this)); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index fded9fb..366dc4c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.NodeAction; @@ -58,6 +59,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; import org.apache.hadoop.yarn.util.BuilderUtils; +import org.apache.hadoop.yarn.util.Records; import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -404,7 +406,7 @@ public void testRMRestartOnMaxAppAttempts() throws Exception { } @Test - public void testTokenRestoredOnRMrestart() throws Exception { + public void testDelegationTokenRestoredOnRMrestart() throws Exception { Logger rootLogger = LogManager.getRootLogger(); rootLogger.setLevel(Level.DEBUG); ExitUtil.disableSystemExit(); @@ -423,7 +425,7 @@ public void testTokenRestoredOnRMrestart() throws Exception { Map rmAppState = rmState.getApplicationState(); - MockRM rm1 = new MyMockRM(conf, memStore); + MockRM rm1 = new TestSecurityMockRM(conf, memStore); rm1.start(); HashSet> tokenSet = @@ -471,7 +473,7 @@ public void testTokenRestoredOnRMrestart() throws Exception { .getContainerTokens()); // start new RM - MockRM rm2 = new MyMockRM(conf, memStore); + MockRM rm2 = new TestSecurityMockRM(conf, memStore); rm2.start(); // verify tokens are properly populated back to DelegationTokenRenewer @@ -483,9 +485,98 @@ public void testTokenRestoredOnRMrestart() throws Exception { rm2.stop(); } - class MyMockRM extends MockRM { + @Test + public void testApplicationTokenRestoredOnRMRestart() throws Exception { + Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.DEBUG); + ExitUtil.disableSystemExit(); + + YarnConfiguration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + UserGroupInformation.setConfiguration(conf); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + RMState rmState = memStore.getState(); + + Map rmAppState = + rmState.getApplicationState(); + MockRM rm1 = new TestSecurityMockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("0.0.0.0:4321", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // submit an app + RMApp app1 = + rm1.submitApp(200, "name", "user", + new HashMap(), "default"); + + // assert app info is saved + ApplicationState appState = rmAppState.get(app1.getApplicationId()); + Assert.assertNotNull(appState); + + // Allocate the AM + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + ApplicationAttemptId attemptId1 = attempt1.getAppAttemptId(); + rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED); + + // assert attempt info is saved + ApplicationAttemptState attemptState = appState.getAttempt(attemptId1); + Assert.assertNotNull(attemptState); + Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1), + attemptState.getMasterContainer().getId()); + + // the appToken that is generated when RMAppAttempt is created + Token attempt1Token = + attempt1.getApplicationToken(); + + // assert application Token is saved + Assert.assertEquals(attempt1Token, attemptState.getApplicationToken()); + + // start new RM + MockRM rm2 = new TestSecurityMockRM(conf, memStore); + rm2.start(); + // point nm1 to the new rm2 + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + + RMApp loadedApp1 = + rm2.getRMContext().getRMApps().get(app1.getApplicationId()); + RMAppAttempt loadedAttempt1 = loadedApp1.getCurrentAppAttempt(); + rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED); + + // nm1 is rebooted, re-register with rm2 + nm1 = rm2.registerNode("0.0.0.0:4321", 15120); + + // Allocate the AM + nm1.nodeHeartbeat(true); + rm2.waitForState(loadedAttempt1.getAppAttemptId(), + RMAppAttemptState.ALLOCATED); + + // the newly created attempt id + ApplicationAttemptId desiredNewAttemptId = + Records.newRecord(ApplicationAttemptId.class); + desiredNewAttemptId.setApplicationId(app1.getApplicationId()); + desiredNewAttemptId.setAttemptId(app1.getAppAttempts().size() + 1); + + // assert the loadedAttempt id is the same as the desired new attempt id + Assert.assertEquals(desiredNewAttemptId, loadedAttempt1.getAppAttemptId()); + + // assert appToken is properly populated back to the attempt + Assert.assertEquals(attempt1Token, loadedAttempt1.getApplicationToken()); + + rm1.stop(); + rm2.stop(); + } + + class TestSecurityMockRM extends MockRM { - public MyMockRM(Configuration conf, RMStateStore store) { + public TestSecurityMockRM(Configuration conf, RMStateStore store) { super(conf, store); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java index 440908f..10f19c2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java @@ -18,14 +18,16 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.util.HashMap; import java.util.Map; -import org.junit.Test; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -34,6 +36,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -44,13 +47,16 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent; +import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.junit.Test; public class TestRMStateStore { @@ -141,7 +147,7 @@ public void addOrphanAttemptIfNeeded(RMStateStore testStore, ApplicationAttemptId attemptId = ConverterUtils.toApplicationAttemptId( "appattempt_1352994193343_0003_000001"); storeAttempt(testStore, attemptId, - "container_1352994193343_0003_01_000001", dispatcher); + "container_1352994193343_0003_01_000001", null, dispatcher); } @Override @@ -186,14 +192,15 @@ void storeApp(RMStateStore store, ApplicationId appId, long time) } ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId, - String containerIdStr, TestDispatcher dispatcher) - throws Exception { + String containerIdStr, Token appToken, + TestDispatcher dispatcher) throws Exception { Container container = new ContainerPBImpl(); container.setId(ConverterUtils.toContainerId(containerIdStr)); RMAppAttempt mockAttempt = mock(RMAppAttempt.class); when(mockAttempt.getAppAttemptId()).thenReturn(attemptId); when(mockAttempt.getMasterContainer()).thenReturn(container); + when(mockAttempt.getApplicationToken()).thenReturn(appToken); dispatcher.attemptId = attemptId; dispatcher.storedException = null; store.storeApplicationAttempt(mockAttempt); @@ -203,6 +210,7 @@ ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId, void testRMStateStore(RMStateStoreHelper stateStoreHelper) throws Exception { long submitTime = System.currentTimeMillis(); + Configuration conf = new YarnConfiguration(); RMStateStore store = stateStoreHelper.getRMStateStore(); TestDispatcher dispatcher = new TestDispatcher(); store.setDispatcher(dispatcher); @@ -211,20 +219,36 @@ void testRMStateStore(RMStateStoreHelper stateStoreHelper) throws Exception { .toApplicationAttemptId("appattempt_1352994193343_0001_000001"); ApplicationId appId1 = attemptId1.getApplicationId(); storeApp(store, appId1, submitTime); + + // create application token1 for attempt1 + ApplicationTokenIdentifier appTokenId1 = + new ApplicationTokenIdentifier(attemptId1); + ApplicationTokenSecretManager appTokenMgr = + new ApplicationTokenSecretManager(conf); + Token appToken1 = + new Token(appTokenId1, appTokenMgr); + ContainerId containerId1 = storeAttempt(store, attemptId1, - "container_1352994193343_0001_01_000001", dispatcher); + "container_1352994193343_0001_01_000001", appToken1, dispatcher); + String appAttemptIdStr2 = "appattempt_1352994193343_0001_000002"; ApplicationAttemptId attemptId2 = - ConverterUtils.toApplicationAttemptId(appAttemptIdStr2); + ConverterUtils.toApplicationAttemptId(appAttemptIdStr2); + + // create application token2 for attempt2 + ApplicationTokenIdentifier appTokenId2 = + new ApplicationTokenIdentifier(attemptId2); + Token appToken2 = + new Token(appTokenId2, appTokenMgr); ContainerId containerId2 = storeAttempt(store, attemptId2, - "container_1352994193343_0001_02_000001", dispatcher); + "container_1352994193343_0001_02_000001", appToken2, dispatcher); ApplicationAttemptId attemptIdRemoved = ConverterUtils .toApplicationAttemptId("appattempt_1352994193343_0002_000001"); ApplicationId appIdRemoved = attemptIdRemoved.getApplicationId(); storeApp(store, appIdRemoved, submitTime); storeAttempt(store, attemptIdRemoved, - "container_1352994193343_0002_01_000001", dispatcher); + "container_1352994193343_0002_01_000001", null, dispatcher); RMApp mockRemovedApp = mock(RMApp.class); HashMap attempts = @@ -268,17 +292,21 @@ void testRMStateStore(RMStateStoreHelper stateStoreHelper) throws Exception { assertEquals(attemptId1, attemptState.getAttemptId()); // attempt1 container is loaded correctly assertEquals(containerId1, attemptState.getMasterContainer().getId()); + // attempt1 applicationToken is loaded correctly + assertEquals(appToken1, attemptState.getApplicationToken()); + attemptState = appState.getAttempt(attemptId2); // attempt2 is loaded correctly assertNotNull(attemptState); assertEquals(attemptId2, attemptState.getAttemptId()); // attempt2 container is loaded correctly assertEquals(containerId2, attemptState.getMasterContainer().getId()); + // attempt2 applicationToken is loaded correctly + assertEquals(appToken2, attemptState.getApplicationToken()); // assert store is in expected state after everything is cleaned assertTrue(stateStoreHelper.isFinalStateValid()); store.close(); } - }