diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_server_resourcemanager_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_server_resourcemanager_service_protos.proto index 52e21d7..142dc45 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_server_resourcemanager_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_server_resourcemanager_service_protos.proto @@ -72,4 +72,5 @@ message ApplicationStateDataProto { message ApplicationAttemptStateDataProto { optional ApplicationAttemptIdProto attemptId = 1; optional ContainerProto master_container = 2; + optional bytes app_attempt_tokens = 3; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 4dcb6f2..14970f5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -315,10 +315,6 @@ public void recover(RMState state) throws Exception { LOG.info("Recovering " + appStates.size() + " applications"); for(ApplicationState appState : appStates.values()) { boolean shouldRecover = true; - // re-submit the application - // this is going to send an app start event but since the async dispatcher - // has not started that event will be queued until we have completed re - // populating the state if(appState.getApplicationSubmissionContext().getUnmanagedAM()) { // do not recover unmanaged applications since current recovery // mechanism of restarting attempts does not work for them. @@ -348,6 +344,10 @@ public void recover(RMState state) throws Exception { shouldRecover = false; } + // re-submit the application + // this is going to send an app start event but since the async dispatcher + // has not started that event will be queued until we have completed re + // populating the state if(shouldRecover) { LOG.info("Recovering application " + appState.getAppId()); submitApplication(appState.getApplicationSubmissionContext(), diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java index a9d40eb..d9c0337 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java @@ -34,7 +34,6 @@ import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.StringUtils; @@ -48,12 +47,10 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; -import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -202,25 +199,17 @@ private void setupTokensAndEnv( credentials.readTokenStorageStream(dibb); } - ApplicationTokenIdentifier id = new ApplicationTokenIdentifier( - application.getAppAttemptId()); - Token appMasterToken = - new Token(id, - this.rmContext.getApplicationTokenSecretManager()); - InetSocketAddress serviceAddr = conf.getSocketAddr( - YarnConfiguration.RM_SCHEDULER_ADDRESS, - YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, - YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); - // normally the client should set the service after acquiring the token, - // but this token is directly provided to the AMs - SecurityUtil.setTokenService(appMasterToken, serviceAddr); - - // Add the ApplicationMaster token - credentials.addToken(appMasterToken.getService(), appMasterToken); + // Add app attempt-specific tokens + if(application.getAppAttemptTokens() != null) { + Credentials appAttemptCredentials = new Credentials(); + dibb.reset(application.getAppAttemptTokens()); + appAttemptCredentials.readTokenStorageStream(dibb); + credentials.mergeAll(appAttemptCredentials); + } DataOutputBuffer dob = new DataOutputBuffer(); credentials.writeTokenStorageToStream(dob); - container.setContainerTokens( - ByteBuffer.wrap(dob.getData(), 0, dob.getLength())); + container.setContainerTokens(ByteBuffer.wrap(dob.getData(), 0, + dob.getLength())); SecretKey clientSecretKey = this.rmContext.getClientToAMTokenSecretManager().getMasterKey( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index c4990da..7223610 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -80,6 +80,7 @@ protected synchronized void closeInternal() throws Exception { fs.close(); } + @SuppressWarnings("unchecked") @Override public synchronized RMState loadState() throws Exception { try { @@ -114,8 +115,12 @@ public synchronized RMState loadState() throws Exception { ApplicationAttemptStateDataPBImpl attemptStateData = new ApplicationAttemptStateDataPBImpl( ApplicationAttemptStateDataProto.parseFrom(childData)); - ApplicationAttemptState attemptState = new ApplicationAttemptState( - attemptId, attemptStateData.getMasterContainer()); + + ApplicationAttemptState attemptState = + new ApplicationAttemptState(attemptId, + attemptStateData.getMasterContainer(), + attemptStateData.getAppAttemptTokens()); + // assert child node name is same as application attempt id assert attemptId.equals(attemptState.getAttemptId()); attempts.add(attemptState); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index 5fb1167..1b33307 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -79,8 +79,10 @@ public synchronized void storeApplicationAttemptState(String attemptIdStr, throws Exception { ApplicationAttemptId attemptId = ConverterUtils .toApplicationAttemptId(attemptIdStr); - ApplicationAttemptState attemptState = new ApplicationAttemptState( - attemptId, attemptStateData.getMasterContainer()); + ApplicationAttemptState attemptState = + new ApplicationAttemptState(attemptId, + attemptStateData.getMasterContainer(), + attemptStateData.getAppAttemptTokens()); ApplicationState appState = state.getApplicationState().get( attemptState.getAttemptId().getApplicationId()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index d5c5015..5b42ead 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; @@ -58,19 +59,25 @@ public static class ApplicationAttemptState { final ApplicationAttemptId attemptId; final Container masterContainer; - + final ByteBuffer appAttemptTokens; + public ApplicationAttemptState(ApplicationAttemptId attemptId, - Container masterContainer) { + Container masterContainer, + ByteBuffer appAttemptTokens) { this.attemptId = attemptId; this.masterContainer = masterContainer; + this.appAttemptTokens = appAttemptTokens; } - + public Container getMasterContainer() { return masterContainer; } public ApplicationAttemptId getAttemptId() { return attemptId; } + public ByteBuffer getAppAttemptTokens() { + return appAttemptTokens; + } } /** @@ -199,10 +206,13 @@ protected abstract void storeApplicationState(String appId, * RMAppAttemptStoredEvent will be sent on completion to notify the RMAppAttempt */ public synchronized void storeApplicationAttempt(RMAppAttempt appAttempt) { - ApplicationAttemptState attemptState = new ApplicationAttemptState( - appAttempt.getAppAttemptId(), appAttempt.getMasterContainer()); + + ApplicationAttemptState attemptState = + new ApplicationAttemptState(appAttempt.getAppAttemptId(), + appAttempt.getMasterContainer(), appAttempt.getAppAttemptTokens()); + dispatcher.getEventHandler().handle( - new RMStateStoreAppAttemptEvent(attemptState)); + new RMStateStoreAppAttemptEvent(attemptState)); } /** @@ -226,8 +236,9 @@ public synchronized void removeApplication(RMApp app) { ApplicationState appState = new ApplicationState( app.getSubmitTime(), app.getApplicationSubmissionContext()); for(RMAppAttempt appAttempt : app.getAppAttempts().values()) { - ApplicationAttemptState attemptState = new ApplicationAttemptState( - appAttempt.getAppAttemptId(), appAttempt.getMasterContainer()); + ApplicationAttemptState attemptState = + new ApplicationAttemptState(appAttempt.getAppAttemptId(), + appAttempt.getMasterContainer(), appAttempt.getAppAttemptTokens()); appState.attempts.put(attemptState.getAttemptId(), attemptState); } @@ -288,8 +299,14 @@ private synchronized void handleStoreEvent(RMStateStoreEvent event) { attemptStateData.setAttemptId(attemptState.getAttemptId()); attemptStateData.setMasterContainer(attemptState.getMasterContainer()); - LOG.info("Storing info for attempt: " + attemptState.getAttemptId()); + ByteBuffer appAttemptTokens = attemptState.getAppAttemptTokens(); + try { + if(appAttemptTokens != null){ + attemptStateData.setAppAttemptTokens(appAttemptTokens); + } + + LOG.info("Storing info for attempt: " + attemptState.getAttemptId()); storeApplicationAttemptState(attemptState.getAttemptId().toString(), attemptStateData); } catch (Exception e) { @@ -358,7 +375,5 @@ private void notifyDoneStoringApplicationAttempt(ApplicationAttemptId attemptId, public void handle(RMStateStoreEvent event) { handleStoreEvent(event); } - } - -} \ No newline at end of file +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java index 64e3ccb..1c98998 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery.records; +import java.nio.ByteBuffer; + import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -49,4 +51,14 @@ public Container getMasterContainer(); public void setMasterContainer(Container container); + + /** + * The application attempt tokens that belong to this attempt + * @return The application attempt tokens that belong to this attempt + */ + @Public + @Unstable + public ByteBuffer getAppAttemptTokens(); + + public void setAppAttemptTokens(ByteBuffer token); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateDataPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateDataPBImpl.java index d033f5c..12062cf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateDataPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateDataPBImpl.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery.records; +import java.nio.ByteBuffer; + import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ProtoBase; @@ -37,7 +39,8 @@ private ApplicationAttemptId attemptId = null; private Container masterContainer = null; - + private ByteBuffer appAttemptTokens = null; + public ApplicationAttemptStateDataPBImpl() { builder = ApplicationAttemptStateDataProto.newBuilder(); } @@ -62,6 +65,9 @@ private void mergeLocalToBuilder() { if(this.masterContainer != null) { builder.setMasterContainer(((ContainerPBImpl)masterContainer).getProto()); } + if(this.appAttemptTokens != null) { + builder.setAppAttemptTokens(convertToProtoFormat(this.appAttemptTokens)); + } } private void mergeLocalToProto() { @@ -123,4 +129,26 @@ public void setMasterContainer(Container container) { this.masterContainer = container; } + @Override + public ByteBuffer getAppAttemptTokens() { + ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; + if(appAttemptTokens != null) { + return appAttemptTokens; + } + if(!p.hasAppAttemptTokens()) { + return null; + } + this.appAttemptTokens = convertFromProtoFormat(p.getAppAttemptTokens()); + return appAttemptTokens; + } + + @Override + public void setAppAttemptTokens(ByteBuffer applicationToken) { + maybeInitBuilder(); + if(applicationToken == null) { + builder.clearAppAttemptTokens(); + } + this.appAttemptTokens = applicationToken; + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 874232b..13e9424 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -32,6 +32,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -51,6 +52,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; @@ -95,6 +97,7 @@ = new LinkedHashMap(); private final long submitTime; private final Set updatedNodes = new HashSet(); + private RMState rmState; // Mutable fields private long startTime; @@ -275,6 +278,10 @@ public ApplicationSubmissionContext getApplicationSubmissionContext() { return this.submissionContext; } + private RMState getRMState(){ + return this.rmState; + } + @Override public FinalApplicationStatus getFinalApplicationStatus() { this.readLock.lock(); @@ -555,6 +562,7 @@ public void handle(RMAppEvent event) { @Override public void recover(RMState state) { + rmState = state; ApplicationState appState = state.getApplicationState().get(getApplicationId()); LOG.info("Recovering app: " + getApplicationId() + " with " + + appState.getAttemptCount() + " attempts"); @@ -567,7 +575,7 @@ public void recover(RMState state) { } @SuppressWarnings("unchecked") - private void createNewAttempt(boolean startAttempt) { + private RMAppAttempt createNewAttempt(boolean startAttempt) { ApplicationAttemptId appAttemptId = Records .newRecord(ApplicationAttemptId.class); appAttemptId.setApplicationId(applicationId); @@ -582,8 +590,9 @@ private void createNewAttempt(boolean startAttempt) { handler.handle( new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.START)); } + return attempt; } - + private void processNodeUpdate(RMAppNodeUpdateType type, RMNode node) { NodeState nodeState = node.getState(); updatedNodes.add(node); @@ -608,7 +617,8 @@ public void transition(RMAppImpl app, RMAppEvent event) { private static final class StartAppAttemptTransition extends RMAppTransition { public void transition(RMAppImpl app, RMAppEvent event) { - if (event.getType().equals(RMAppEventType.APP_SAVED)) { + switch (event.getType()) { + case APP_SAVED: assert app.getState().equals(RMAppState.NEW_SAVING); RMAppStoredEvent storeEvent = (RMAppStoredEvent) event; if(storeEvent.getStoredException() != null) { @@ -619,9 +629,29 @@ public void transition(RMAppImpl app, RMAppEvent event) { storeEvent.getStoredException()); ExitUtil.terminate(1, storeEvent.getStoredException()); } + app.createNewAttempt(true); + break; + case RECOVER: + RMAppAttempt attempt = app.createNewAttempt(true); + + // reuse the appToken from previous attempt + if (UserGroupInformation.isSecurityEnabled()) { + ApplicationAttemptId previousAttempt = + Records.newRecord(ApplicationAttemptId.class); + previousAttempt.setApplicationId(app.getApplicationId()); + previousAttempt.setAttemptId(app.getAppAttempts().size() - 1); + ApplicationState appState = app.getRMState().getApplicationState() + .get(app.getApplicationId()); + ApplicationAttemptState attemptState = + appState.getAttempt(previousAttempt); + assert attemptState != null; + ((RMAppAttemptImpl) attempt).recoverAppAttemptTokens(attemptState + .getAppAttemptTokens()); + } + break; + default: + LOG.warn("Invalid eventType: " + event.getType()); } - - app.createNewAttempt(true); }; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java index 510c4fe..d4f925a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt; +import java.nio.ByteBuffer; import java.util.List; import java.util.Set; @@ -147,6 +148,12 @@ ApplicationSubmissionContext getSubmissionContext(); /** + * The tokens belong to this app attempt + * @return The tokens belong to this app attempt + */ + ByteBuffer getAppAttemptTokens(); + + /** * Get application container and resource usage information. * @return an ApplicationResourceUsageReport object. */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index eaa15f5..9a29026 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -20,8 +20,11 @@ import static org.apache.hadoop.yarn.util.StringHelper.pjoin; +import java.io.IOException; +import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -38,6 +41,10 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.http.HttpConfig; +import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.ExitUtil; @@ -58,6 +65,8 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier; +import org.apache.hadoop.yarn.security.ApplicationTokenSelector; import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -125,6 +134,7 @@ private final ApplicationAttemptId applicationAttemptId; private ClientToken clientToken; private final ApplicationSubmissionContext submissionContext; + private ByteBuffer appAttemptTokens = null; //nodes on while this attempt's containers ran private final Set ranNodes = @@ -371,12 +381,40 @@ public RMAppAttemptImpl(ApplicationAttemptId appAttemptId, this.rmContext.getClientToAMTokenSecretManager().registerApplication( appAttemptId); - Token token = + // create clientToken + Token clientToken = new Token(new ClientTokenIdentifier( appAttemptId), this.rmContext.getClientToAMTokenSecretManager()); this.clientToken = - BuilderUtils.newClientToken(token.getIdentifier(), token.getKind() - .toString(), token.getPassword(), token.getService().toString()); + BuilderUtils.newClientToken(clientToken.getIdentifier(), clientToken + .getKind().toString(), clientToken.getPassword(), clientToken + .getService().toString()); + + // create application token + ApplicationTokenIdentifier id = + new ApplicationTokenIdentifier(appAttemptId); + Token applicationToken = + new Token(id, + this.rmContext.getApplicationTokenSecretManager()); + InetSocketAddress serviceAddr = + this.conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); + // normally the client should set the service after acquiring the + // token, but this token is directly provided to the AMs + SecurityUtil.setTokenService(applicationToken, serviceAddr); + + Credentials credentials = new Credentials(); + credentials.addToken(applicationToken.getService(), applicationToken); + // credentials.addToken(clientToken.getService(), clientToken); + DataOutputBuffer dob = new DataOutputBuffer(); + try { + credentials.writeTokenStorageToStream(dob); + } catch (IOException e) { + LOG.warn("Error in writing app attempt tokens"); + } + this.appAttemptTokens = + ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); } ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); @@ -507,6 +545,11 @@ public ClientToken getClientToken() { } @Override + public ByteBuffer getAppAttemptTokens() { + return this.appAttemptTokens; + } + + @Override public String getDiagnostics() { this.readLock.lock(); @@ -657,6 +700,8 @@ public void recover(RMState state) { ApplicationAttemptState attemptState = appState.getAttempt(getAppAttemptId()); assert attemptState != null; setMasterContainer(attemptState.getMasterContainer()); + recoverAppAttemptTokens(attemptState.getAppAttemptTokens()); + LOG.info("Recovered attempt: AppId: " + getAppAttemptId().getApplicationId() + " AttemptId: " + getAppAttemptId() + " MasterContainer: " + masterContainer); @@ -664,7 +709,34 @@ public void recover(RMState state) { handle(new RMAppAttemptEvent(getAppAttemptId(), RMAppAttemptEventType.RECOVER)); } - + + public void recoverAppAttemptTokens(ByteBuffer appAttemptTokens) { + if (appAttemptTokens == null) { + return; + } + + this.appAttemptTokens = appAttemptTokens; + DataInputByteBuffer dibb = new DataInputByteBuffer(); + Credentials appAttemptCredentials = new Credentials(); + dibb.reset(appAttemptTokens); + try { + appAttemptCredentials.readTokenStorageStream(dibb); + } catch (IOException e) { + LOG.error("Error in reading app attempt tokens"); + } + ApplicationTokenSelector appTokenSelector = new ApplicationTokenSelector(); + InetSocketAddress serviceAddr = + this.conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); + Token appToken = + appTokenSelector.selectToken( + SecurityUtil.buildTokenService(serviceAddr), + appAttemptCredentials.getAllTokens()); + this.rmContext.getApplicationTokenSecretManager().addPassword( + applicationAttemptId, appToken.getPassword()); + } + private static class BaseTransition implements SingleArcTransition { @@ -992,7 +1064,6 @@ public void transition(RMAppAttemptImpl appAttempt, appAttempt.rmContext.getAMFinishingMonitor().unregister( appAttempt.getAppAttemptId()); - // Unregister from the ClientTokenSecretManager if (UserGroupInformation.isSecurityEnabled()) { appAttempt.rmContext.getClientToAMTokenSecretManager() @@ -1191,7 +1262,7 @@ public long getStartTime() { this.readLock.unlock(); } } - + private void launchAttempt(){ // Send event to launch the AM Container eventHandler.handle(new AMLauncherEvent(AMLauncherEventType.LAUNCH, this)); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/ApplicationTokenSecretManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/ApplicationTokenSecretManager.java index 8a65c09..379d47a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/ApplicationTokenSecretManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/ApplicationTokenSecretManager.java @@ -119,10 +119,20 @@ synchronized void rollMasterKey() { LOG.debug("Creating password for " + applicationAttemptId); } byte[] password = createPassword(identifier.getBytes(), masterKey); - this.passwords.put(applicationAttemptId, password); + addPassword(applicationAttemptId, password); return password; } + public void addPassword(ApplicationAttemptId applicationAttemptId, + byte[] password) { + this.passwords.put(applicationAttemptId, password); + } + + // for test + public boolean hasPassword( ApplicationAttemptId applicationAttemptId) { + return this.passwords.containsKey(applicationAttemptId); + } + /** * Retrieve the password for the given {@link ApplicationTokenIdentifier}. * Used by RPC layer to validate a remote {@link ApplicationTokenIdentifier}. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index fded9fb..feaa743 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; import org.apache.hadoop.yarn.util.BuilderUtils; +import org.apache.hadoop.yarn.util.Records; import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -404,7 +405,7 @@ public void testRMRestartOnMaxAppAttempts() throws Exception { } @Test - public void testTokenRestoredOnRMrestart() throws Exception { + public void testDelegationTokenRestoredOnRMrestart() throws Exception { Logger rootLogger = LogManager.getRootLogger(); rootLogger.setLevel(Level.DEBUG); ExitUtil.disableSystemExit(); @@ -423,7 +424,7 @@ public void testTokenRestoredOnRMrestart() throws Exception { Map rmAppState = rmState.getApplicationState(); - MockRM rm1 = new MyMockRM(conf, memStore); + MockRM rm1 = new TestSecurityMockRM(conf, memStore); rm1.start(); HashSet> tokenSet = @@ -471,7 +472,7 @@ public void testTokenRestoredOnRMrestart() throws Exception { .getContainerTokens()); // start new RM - MockRM rm2 = new MyMockRM(conf, memStore); + MockRM rm2 = new TestSecurityMockRM(conf, memStore); rm2.start(); // verify tokens are properly populated back to DelegationTokenRenewer @@ -483,9 +484,91 @@ public void testTokenRestoredOnRMrestart() throws Exception { rm2.stop(); } - class MyMockRM extends MockRM { + @Test + public void testApplicationTokenRestoredOnRMRestart() throws Exception { + Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.DEBUG); + ExitUtil.disableSystemExit(); + + YarnConfiguration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + UserGroupInformation.setConfiguration(conf); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + RMState rmState = memStore.getState(); + + Map rmAppState = + rmState.getApplicationState(); + MockRM rm1 = new TestSecurityMockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("0.0.0.0:4321", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // submit an app + RMApp app1 = + rm1.submitApp(200, "name", "user", + new HashMap(), "default"); + + // assert app info is saved + ApplicationState appState = rmAppState.get(app1.getApplicationId()); + Assert.assertNotNull(appState); + + // Allocate the AM + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + ApplicationAttemptId attemptId1 = attempt1.getAppAttemptId(); + rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED); + + // assert attempt info is saved + ApplicationAttemptState attemptState = appState.getAttempt(attemptId1); + Assert.assertNotNull(attemptState); + Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1), + attemptState.getMasterContainer().getId()); + + // the appToken that is generated when RMAppAttempt is created + ByteBuffer attempt1Token = + attempt1.getAppAttemptTokens(); + + // assert application Token is saved + Assert.assertEquals(attempt1Token, attemptState.getAppAttemptTokens()); + + // start new RM + MockRM rm2 = new TestSecurityMockRM(conf, memStore); + rm2.start(); + + RMApp loadedApp1 = + rm2.getRMContext().getRMApps().get(app1.getApplicationId()); + RMAppAttempt newAttempt = loadedApp1.getCurrentAppAttempt(); + + // the newly created attempt id + ApplicationAttemptId desiredNewAttemptId = + Records.newRecord(ApplicationAttemptId.class); + desiredNewAttemptId.setApplicationId(app1.getApplicationId()); + desiredNewAttemptId.setAttemptId(app1.getAppAttempts().size() + 1); + + // assert the new Attempt id is the same as the desired new attempt id + Assert.assertEquals(desiredNewAttemptId, newAttempt.getAppAttemptId()); + + // assert new attempt reuse previous attempt tokens + Assert.assertEquals(attempt1Token, newAttempt.getAppAttemptTokens()); + + // assert ApplicationTokenSecretManager has the password populated + Assert.assertTrue(rm2.getApplicationTokenSecretManager().hasPassword( + newAttempt.getAppAttemptId())); + + rm1.stop(); + rm2.stop(); + } + + class TestSecurityMockRM extends MockRM { - public MyMockRM(Configuration conf, RMStateStore store) { + public TestSecurityMockRM(Configuration conf, RMStateStore store) { super(conf, store); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java index 440908f..40fda29 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java @@ -18,14 +18,18 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; - +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; -import org.junit.Test; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -34,6 +38,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -44,13 +51,16 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent; +import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.junit.Test; public class TestRMStateStore { @@ -141,7 +151,7 @@ public void addOrphanAttemptIfNeeded(RMStateStore testStore, ApplicationAttemptId attemptId = ConverterUtils.toApplicationAttemptId( "appattempt_1352994193343_0003_000001"); storeAttempt(testStore, attemptId, - "container_1352994193343_0003_01_000001", dispatcher); + "container_1352994193343_0003_01_000001", null, dispatcher); } @Override @@ -186,14 +196,15 @@ void storeApp(RMStateStore store, ApplicationId appId, long time) } ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId, - String containerIdStr, TestDispatcher dispatcher) - throws Exception { + String containerIdStr,ByteBuffer appAttemptTokens, + TestDispatcher dispatcher) throws Exception { Container container = new ContainerPBImpl(); container.setId(ConverterUtils.toContainerId(containerIdStr)); RMAppAttempt mockAttempt = mock(RMAppAttempt.class); when(mockAttempt.getAppAttemptId()).thenReturn(attemptId); when(mockAttempt.getMasterContainer()).thenReturn(container); + when(mockAttempt.getAppAttemptTokens()).thenReturn(appAttemptTokens); dispatcher.attemptId = attemptId; dispatcher.storedException = null; store.storeApplicationAttempt(mockAttempt); @@ -203,6 +214,7 @@ ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId, void testRMStateStore(RMStateStoreHelper stateStoreHelper) throws Exception { long submitTime = System.currentTimeMillis(); + Configuration conf = new YarnConfiguration(); RMStateStore store = stateStoreHelper.getRMStateStore(); TestDispatcher dispatcher = new TestDispatcher(); store.setDispatcher(dispatcher); @@ -211,20 +223,29 @@ void testRMStateStore(RMStateStoreHelper stateStoreHelper) throws Exception { .toApplicationAttemptId("appattempt_1352994193343_0001_000001"); ApplicationId appId1 = attemptId1.getApplicationId(); storeApp(store, appId1, submitTime); + + // create application token1 for attempt1 + ByteBuffer appAttemptToken1 = generateTokens(attemptId1, conf); + ContainerId containerId1 = storeAttempt(store, attemptId1, - "container_1352994193343_0001_01_000001", dispatcher); + "container_1352994193343_0001_01_000001", appAttemptToken1, dispatcher); + String appAttemptIdStr2 = "appattempt_1352994193343_0001_000002"; ApplicationAttemptId attemptId2 = - ConverterUtils.toApplicationAttemptId(appAttemptIdStr2); + ConverterUtils.toApplicationAttemptId(appAttemptIdStr2); + + // create application token2 for attempt2 + ByteBuffer appAttemptToken2 = generateTokens(attemptId2, conf); + ContainerId containerId2 = storeAttempt(store, attemptId2, - "container_1352994193343_0001_02_000001", dispatcher); + "container_1352994193343_0001_02_000001", appAttemptToken2, dispatcher); ApplicationAttemptId attemptIdRemoved = ConverterUtils .toApplicationAttemptId("appattempt_1352994193343_0002_000001"); ApplicationId appIdRemoved = attemptIdRemoved.getApplicationId(); storeApp(store, appIdRemoved, submitTime); storeAttempt(store, attemptIdRemoved, - "container_1352994193343_0002_01_000001", dispatcher); + "container_1352994193343_0002_01_000001", null, dispatcher); RMApp mockRemovedApp = mock(RMApp.class); HashMap attempts = @@ -268,12 +289,17 @@ void testRMStateStore(RMStateStoreHelper stateStoreHelper) throws Exception { assertEquals(attemptId1, attemptState.getAttemptId()); // attempt1 container is loaded correctly assertEquals(containerId1, attemptState.getMasterContainer().getId()); + // attempt1 applicationToken is loaded correctly + assertEquals(appAttemptToken1, attemptState.getAppAttemptTokens()); + attemptState = appState.getAttempt(attemptId2); // attempt2 is loaded correctly assertNotNull(attemptState); assertEquals(attemptId2, attemptState.getAttemptId()); // attempt2 container is loaded correctly assertEquals(containerId2, attemptState.getMasterContainer().getId()); + // attempt2 applicationToken is loaded correctly + assertEquals(appAttemptToken2, attemptState.getAppAttemptTokens()); // assert store is in expected state after everything is cleaned assertTrue(stateStoreHelper.isFinalStateValid()); @@ -281,4 +307,23 @@ void testRMStateStore(RMStateStoreHelper stateStoreHelper) throws Exception { store.close(); } + private ByteBuffer generateTokens(ApplicationAttemptId attemptId, + Configuration conf) { + ApplicationTokenSecretManager appTokenMgr = + new ApplicationTokenSecretManager(conf); + ApplicationTokenIdentifier appTokenId = + new ApplicationTokenIdentifier(attemptId); + Token appToken = + new Token(appTokenId, appTokenMgr); + + Credentials credentials = new Credentials(); + credentials.addToken(appToken.getService(), appToken); + DataOutputBuffer dob = new DataOutputBuffer(); + try { + credentials.writeTokenStorageToStream(dob); + } catch (IOException e) { + LOG.warn("Error in writing app attempt tokens"); + } + return ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + } }