diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_server_resourcemanager_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_server_resourcemanager_service_protos.proto index 52e21d7..142dc45 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_server_resourcemanager_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_server_resourcemanager_service_protos.proto @@ -72,4 +72,5 @@ message ApplicationStateDataProto { message ApplicationAttemptStateDataProto { optional ApplicationAttemptIdProto attemptId = 1; optional ContainerProto master_container = 2; + optional bytes app_attempt_tokens = 3; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 4dcb6f2..14970f5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -315,10 +315,6 @@ public void recover(RMState state) throws Exception { LOG.info("Recovering " + appStates.size() + " applications"); for(ApplicationState appState : appStates.values()) { boolean shouldRecover = true; - // re-submit the application - // this is going to send an app start event but since the async dispatcher - // has not started that event will be queued until we have completed re - // populating the state if(appState.getApplicationSubmissionContext().getUnmanagedAM()) { // do not recover unmanaged applications since current recovery // mechanism of restarting attempts does not work for them. @@ -348,6 +344,10 @@ public void recover(RMState state) throws Exception { shouldRecover = false; } + // re-submit the application + // this is going to send an app start event but since the async dispatcher + // has not started that event will be queued until we have completed re + // populating the state if(shouldRecover) { LOG.info("Recovering application " + appState.getAppId()); submitApplication(appState.getApplicationSubmissionContext(), diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java index a9d40eb..d9c0337 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java @@ -34,7 +34,6 @@ import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.StringUtils; @@ -48,12 +47,10 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; -import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -202,25 +199,17 @@ private void setupTokensAndEnv( credentials.readTokenStorageStream(dibb); } - ApplicationTokenIdentifier id = new ApplicationTokenIdentifier( - application.getAppAttemptId()); - Token appMasterToken = - new Token(id, - this.rmContext.getApplicationTokenSecretManager()); - InetSocketAddress serviceAddr = conf.getSocketAddr( - YarnConfiguration.RM_SCHEDULER_ADDRESS, - YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, - YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); - // normally the client should set the service after acquiring the token, - // but this token is directly provided to the AMs - SecurityUtil.setTokenService(appMasterToken, serviceAddr); - - // Add the ApplicationMaster token - credentials.addToken(appMasterToken.getService(), appMasterToken); + // Add app attempt-specific tokens + if(application.getAppAttemptTokens() != null) { + Credentials appAttemptCredentials = new Credentials(); + dibb.reset(application.getAppAttemptTokens()); + appAttemptCredentials.readTokenStorageStream(dibb); + credentials.mergeAll(appAttemptCredentials); + } DataOutputBuffer dob = new DataOutputBuffer(); credentials.writeTokenStorageToStream(dob); - container.setContainerTokens( - ByteBuffer.wrap(dob.getData(), 0, dob.getLength())); + container.setContainerTokens(ByteBuffer.wrap(dob.getData(), 0, + dob.getLength())); SecretKey clientSecretKey = this.rmContext.getClientToAMTokenSecretManager().getMasterKey( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index c4990da..7ee1901 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -114,8 +114,12 @@ public synchronized RMState loadState() throws Exception { ApplicationAttemptStateDataPBImpl attemptStateData = new ApplicationAttemptStateDataPBImpl( ApplicationAttemptStateDataProto.parseFrom(childData)); - ApplicationAttemptState attemptState = new ApplicationAttemptState( - attemptId, attemptStateData.getMasterContainer()); + + ApplicationAttemptState attemptState = + new ApplicationAttemptState(attemptId, + attemptStateData.getMasterContainer(), + attemptStateData.getAppAttemptTokens()); + // assert child node name is same as application attempt id assert attemptId.equals(attemptState.getAttemptId()); attempts.add(attemptState); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index 5fb1167..1b33307 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -79,8 +79,10 @@ public synchronized void storeApplicationAttemptState(String attemptIdStr, throws Exception { ApplicationAttemptId attemptId = ConverterUtils .toApplicationAttemptId(attemptIdStr); - ApplicationAttemptState attemptState = new ApplicationAttemptState( - attemptId, attemptStateData.getMasterContainer()); + ApplicationAttemptState attemptState = + new ApplicationAttemptState(attemptId, + attemptStateData.getMasterContainer(), + attemptStateData.getAppAttemptTokens()); ApplicationState appState = state.getApplicationState().get( attemptState.getAttemptId().getApplicationId()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index d5c5015..95a3b8e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; @@ -58,19 +59,25 @@ public static class ApplicationAttemptState { final ApplicationAttemptId attemptId; final Container masterContainer; - + final ByteBuffer appAttemptTokens; + public ApplicationAttemptState(ApplicationAttemptId attemptId, - Container masterContainer) { + Container masterContainer, + ByteBuffer appAttemptTokens) { this.attemptId = attemptId; this.masterContainer = masterContainer; + this.appAttemptTokens = appAttemptTokens; } - + public Container getMasterContainer() { return masterContainer; } public ApplicationAttemptId getAttemptId() { return attemptId; } + public ByteBuffer getAppAttemptTokens() { + return appAttemptTokens; + } } /** @@ -199,10 +206,13 @@ protected abstract void storeApplicationState(String appId, * RMAppAttemptStoredEvent will be sent on completion to notify the RMAppAttempt */ public synchronized void storeApplicationAttempt(RMAppAttempt appAttempt) { - ApplicationAttemptState attemptState = new ApplicationAttemptState( - appAttempt.getAppAttemptId(), appAttempt.getMasterContainer()); + + ApplicationAttemptState attemptState = + new ApplicationAttemptState(appAttempt.getAppAttemptId(), + appAttempt.getMasterContainer(), appAttempt.getAppAttemptTokens()); + dispatcher.getEventHandler().handle( - new RMStateStoreAppAttemptEvent(attemptState)); + new RMStateStoreAppAttemptEvent(attemptState)); } /** @@ -226,8 +236,9 @@ public synchronized void removeApplication(RMApp app) { ApplicationState appState = new ApplicationState( app.getSubmitTime(), app.getApplicationSubmissionContext()); for(RMAppAttempt appAttempt : app.getAppAttempts().values()) { - ApplicationAttemptState attemptState = new ApplicationAttemptState( - appAttempt.getAppAttemptId(), appAttempt.getMasterContainer()); + ApplicationAttemptState attemptState = + new ApplicationAttemptState(appAttempt.getAppAttemptId(), + appAttempt.getMasterContainer(), appAttempt.getAppAttemptTokens()); appState.attempts.put(attemptState.getAttemptId(), attemptState); } @@ -288,8 +299,12 @@ private synchronized void handleStoreEvent(RMStateStoreEvent event) { attemptStateData.setAttemptId(attemptState.getAttemptId()); attemptStateData.setMasterContainer(attemptState.getMasterContainer()); - LOG.info("Storing info for attempt: " + attemptState.getAttemptId()); + ByteBuffer appAttemptTokens = attemptState.getAppAttemptTokens(); + if(appAttemptTokens != null){ + attemptStateData.setAppAttemptTokens(appAttemptTokens); + } try { + LOG.info("Storing info for attempt: " + attemptState.getAttemptId()); storeApplicationAttemptState(attemptState.getAttemptId().toString(), attemptStateData); } catch (Exception e) { @@ -358,7 +373,5 @@ private void notifyDoneStoringApplicationAttempt(ApplicationAttemptId attemptId, public void handle(RMStateStoreEvent event) { handleStoreEvent(event); } - } - -} \ No newline at end of file +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java index 64e3ccb..2622b0e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery.records; +import java.nio.ByteBuffer; + import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -49,4 +51,14 @@ public Container getMasterContainer(); public void setMasterContainer(Container container); + + /** + * The application attempt tokens that belong to this attempt + * @return The application attempt tokens that belong to this attempt + */ + @Public + @Unstable + public ByteBuffer getAppAttemptTokens(); + + public void setAppAttemptTokens(ByteBuffer attemptTokens); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateDataPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateDataPBImpl.java index d033f5c..c0a5b25 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateDataPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateDataPBImpl.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery.records; +import java.nio.ByteBuffer; + import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ProtoBase; @@ -37,7 +39,8 @@ private ApplicationAttemptId attemptId = null; private Container masterContainer = null; - + private ByteBuffer appAttemptTokens = null; + public ApplicationAttemptStateDataPBImpl() { builder = ApplicationAttemptStateDataProto.newBuilder(); } @@ -62,6 +65,9 @@ private void mergeLocalToBuilder() { if(this.masterContainer != null) { builder.setMasterContainer(((ContainerPBImpl)masterContainer).getProto()); } + if(this.appAttemptTokens != null) { + builder.setAppAttemptTokens(convertToProtoFormat(this.appAttemptTokens)); + } } private void mergeLocalToProto() { @@ -123,4 +129,26 @@ public void setMasterContainer(Container container) { this.masterContainer = container; } + @Override + public ByteBuffer getAppAttemptTokens() { + ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; + if(appAttemptTokens != null) { + return appAttemptTokens; + } + if(!p.hasAppAttemptTokens()) { + return null; + } + this.appAttemptTokens = convertFromProtoFormat(p.getAppAttemptTokens()); + return appAttemptTokens; + } + + @Override + public void setAppAttemptTokens(ByteBuffer attemptTokens) { + maybeInitBuilder(); + if(attemptTokens == null) { + builder.clearAppAttemptTokens(); + } + this.appAttemptTokens = attemptTokens; + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 874232b..fcd7673 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -95,6 +95,7 @@ = new LinkedHashMap(); private final long submitTime; private final Set updatedNodes = new HashSet(); + private RMState rmState; // Mutable fields private long startTime; @@ -275,6 +276,10 @@ public ApplicationSubmissionContext getApplicationSubmissionContext() { return this.submissionContext; } + private RMState getRMState(){ + return this.rmState; + } + @Override public FinalApplicationStatus getFinalApplicationStatus() { this.readLock.lock(); @@ -555,6 +560,7 @@ public void handle(RMAppEvent event) { @Override public void recover(RMState state) { + rmState = state; ApplicationState appState = state.getApplicationState().get(getApplicationId()); LOG.info("Recovering app: " + getApplicationId() + " with " + + appState.getAttemptCount() + " attempts"); @@ -583,7 +589,7 @@ private void createNewAttempt(boolean startAttempt) { new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.START)); } } - + private void processNodeUpdate(RMAppNodeUpdateType type, RMNode node) { NodeState nodeState = node.getState(); updatedNodes.add(node); @@ -608,7 +614,8 @@ public void transition(RMAppImpl app, RMAppEvent event) { private static final class StartAppAttemptTransition extends RMAppTransition { public void transition(RMAppImpl app, RMAppEvent event) { - if (event.getType().equals(RMAppEventType.APP_SAVED)) { + switch (event.getType()) { + case APP_SAVED: assert app.getState().equals(RMAppState.NEW_SAVING); RMAppStoredEvent storeEvent = (RMAppStoredEvent) event; if(storeEvent.getStoredException() != null) { @@ -619,9 +626,14 @@ public void transition(RMAppImpl app, RMAppEvent event) { storeEvent.getStoredException()); ExitUtil.terminate(1, storeEvent.getStoredException()); } + app.createNewAttempt(true); + break; + case RECOVER: + app.createNewAttempt(true); + break; + default: + LOG.warn("Invalid eventType: " + event.getType()); } - - app.createNewAttempt(true); }; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java index 510c4fe..d4f925a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt; +import java.nio.ByteBuffer; import java.util.List; import java.util.Set; @@ -147,6 +148,12 @@ ApplicationSubmissionContext getSubmissionContext(); /** + * The tokens belong to this app attempt + * @return The tokens belong to this app attempt + */ + ByteBuffer getAppAttemptTokens(); + + /** * Get application container and resource usage information. * @return an ApplicationResourceUsageReport object. */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index eaa15f5..d15feab 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -20,8 +20,11 @@ import static org.apache.hadoop.yarn.util.StringHelper.pjoin; +import java.io.IOException; +import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -38,6 +41,9 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.http.HttpConfig; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.ExitUtil; @@ -58,6 +64,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier; import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -125,6 +132,7 @@ private final ApplicationAttemptId applicationAttemptId; private ClientToken clientToken; private final ApplicationSubmissionContext submissionContext; + private ByteBuffer appAttemptTokens = null; //nodes on while this attempt's containers ran private final Set ranNodes = @@ -366,19 +374,6 @@ public RMAppAttemptImpl(ApplicationAttemptId appAttemptId, this.scheduler = scheduler; this.masterService = masterService; - if (UserGroupInformation.isSecurityEnabled()) { - - this.rmContext.getClientToAMTokenSecretManager().registerApplication( - appAttemptId); - - Token token = - new Token(new ClientTokenIdentifier( - appAttemptId), this.rmContext.getClientToAMTokenSecretManager()); - this.clientToken = - BuilderUtils.newClientToken(token.getIdentifier(), token.getKind() - .toString(), token.getPassword(), token.getService().toString()); - } - ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.readLock = lock.readLock(); this.writeLock = lock.writeLock(); @@ -507,6 +502,11 @@ public ClientToken getClientToken() { } @Override + public ByteBuffer getAppAttemptTokens() { + return this.appAttemptTokens; + } + + @Override public String getDiagnostics() { this.readLock.lock(); @@ -657,6 +657,8 @@ public void recover(RMState state) { ApplicationAttemptState attemptState = appState.getAttempt(getAppAttemptId()); assert attemptState != null; setMasterContainer(attemptState.getMasterContainer()); + recoverAppAttemptTokens(attemptState.getAppAttemptTokens()); + LOG.info("Recovered attempt: AppId: " + getAppAttemptId().getApplicationId() + " AttemptId: " + getAppAttemptId() + " MasterContainer: " + masterContainer); @@ -664,7 +666,21 @@ public void recover(RMState state) { handle(new RMAppAttemptEvent(getAppAttemptId(), RMAppAttemptEventType.RECOVER)); } - + + public void recoverAppAttemptTokens(ByteBuffer appAttemptTokens) { + + if (appAttemptTokens == null) { + return; + } + if (UserGroupInformation.isSecurityEnabled()) { + this.appAttemptTokens = appAttemptTokens; + + // For now, no need to populate tokens back to ApplicationTokenSecretManager. + // Later in work-preserve restart, we'll create NEW->RUNNING transition + // in which the restored tokens will be added to the secret manager + } + } + private static class BaseTransition implements SingleArcTransition { @@ -686,6 +702,49 @@ public void transition(RMAppAttemptImpl appAttempt, appAttempt.masterService .registerAppAttempt(appAttempt.applicationAttemptId); + if (UserGroupInformation.isSecurityEnabled()) { + + appAttempt.rmContext.getClientToAMTokenSecretManager() + .registerApplication(appAttempt.applicationAttemptId); + + // create clientToken + Token clientToken = + new Token(new ClientTokenIdentifier( + appAttempt.applicationAttemptId), + appAttempt.rmContext.getClientToAMTokenSecretManager()); + appAttempt.clientToken = + BuilderUtils.newClientToken(clientToken.getIdentifier(), + clientToken.getKind().toString(), clientToken.getPassword(), + clientToken.getService().toString()); + + // create application token + ApplicationTokenIdentifier id = + new ApplicationTokenIdentifier(appAttempt.applicationAttemptId); + Token applicationToken = + new Token(id, + appAttempt.rmContext.getApplicationTokenSecretManager()); + InetSocketAddress serviceAddr = + appAttempt.conf.getSocketAddr( + YarnConfiguration.RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); + // normally the client should set the service after acquiring the + // token, but this token is directly provided to the AMs + SecurityUtil.setTokenService(applicationToken, serviceAddr); + + Credentials credentials = new Credentials(); + credentials.addToken(applicationToken.getService(), applicationToken); + // credentials.addToken(clientToken.getService(), clientToken); + DataOutputBuffer dob = new DataOutputBuffer(); + try { + credentials.writeTokenStorageToStream(dob); + } catch (IOException e) { + LOG.warn("Error in writing app attempt tokens"); + } + appAttempt.appAttemptTokens = + ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + } + // Add the application to the scheduler appAttempt.eventHandler.handle( new AppAddedSchedulerEvent(appAttempt.applicationAttemptId, @@ -992,7 +1051,6 @@ public void transition(RMAppAttemptImpl appAttempt, appAttempt.rmContext.getAMFinishingMonitor().unregister( appAttempt.getAppAttemptId()); - // Unregister from the ClientTokenSecretManager if (UserGroupInformation.isSecurityEnabled()) { appAttempt.rmContext.getClientToAMTokenSecretManager() @@ -1191,7 +1249,7 @@ public long getStartTime() { this.readLock.unlock(); } } - + private void launchAttempt(){ // Send event to launch the AM Container eventHandler.handle(new AMLauncherEvent(AMLauncherEventType.LAUNCH, this)); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index fded9fb..8ed8435 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -404,7 +404,7 @@ public void testRMRestartOnMaxAppAttempts() throws Exception { } @Test - public void testTokenRestoredOnRMrestart() throws Exception { + public void testDelegationTokenRestoredOnRMrestart() throws Exception { Logger rootLogger = LogManager.getRootLogger(); rootLogger.setLevel(Level.DEBUG); ExitUtil.disableSystemExit(); @@ -423,7 +423,7 @@ public void testTokenRestoredOnRMrestart() throws Exception { Map rmAppState = rmState.getApplicationState(); - MockRM rm1 = new MyMockRM(conf, memStore); + MockRM rm1 = new TestSecurityMockRM(conf, memStore); rm1.start(); HashSet> tokenSet = @@ -471,7 +471,7 @@ public void testTokenRestoredOnRMrestart() throws Exception { .getContainerTokens()); // start new RM - MockRM rm2 = new MyMockRM(conf, memStore); + MockRM rm2 = new TestSecurityMockRM(conf, memStore); rm2.start(); // verify tokens are properly populated back to DelegationTokenRenewer @@ -483,9 +483,82 @@ public void testTokenRestoredOnRMrestart() throws Exception { rm2.stop(); } - class MyMockRM extends MockRM { + @Test + public void testApplicationTokenRestoredOnRMRestart() throws Exception { + Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.DEBUG); + ExitUtil.disableSystemExit(); + + YarnConfiguration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + UserGroupInformation.setConfiguration(conf); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + RMState rmState = memStore.getState(); + + Map rmAppState = + rmState.getApplicationState(); + MockRM rm1 = new TestSecurityMockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("0.0.0.0:4321", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // submit an app + RMApp app1 = + rm1.submitApp(200, "name", "user", + new HashMap(), "default"); + + // assert app info is saved + ApplicationState appState = rmAppState.get(app1.getApplicationId()); + Assert.assertNotNull(appState); + + // Allocate the AM + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + ApplicationAttemptId attemptId1 = attempt1.getAppAttemptId(); + rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED); + + // assert attempt info is saved + ApplicationAttemptState attemptState = appState.getAttempt(attemptId1); + Assert.assertNotNull(attemptState); + Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1), + attemptState.getMasterContainer().getId()); + + // the appToken that is generated when RMAppAttempt is created + ByteBuffer attempt1Token = + attempt1.getAppAttemptTokens(); + + // assert application Token is saved + Assert.assertEquals(attempt1Token, attemptState.getAppAttemptTokens()); + + // start new RM + MockRM rm2 = new TestSecurityMockRM(conf, memStore); + rm2.start(); + + RMApp loadedApp1 = + rm2.getRMContext().getRMApps().get(app1.getApplicationId()); + RMAppAttempt loadedAttempt1 = loadedApp1.getRMAppAttempt(attemptId1); + + // assert loaded attempt recovered attempt tokens + Assert.assertNotNull(loadedAttempt1); + Assert.assertEquals(attempt1Token, loadedAttempt1.getAppAttemptTokens()); + + // Not testing ApplicationTokenSecretManager has the password populated back, + // that is needed in work-preserving restart + + rm1.stop(); + rm2.stop(); + } + + class TestSecurityMockRM extends MockRM { - public MyMockRM(Configuration conf, RMStateStore store) { + public TestSecurityMockRM(Configuration conf, RMStateStore store) { super(conf, store); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java index 440908f..ce97380 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java @@ -18,14 +18,18 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; - +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; -import org.junit.Test; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -34,6 +38,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -44,13 +51,16 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent; +import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.junit.Test; public class TestRMStateStore { @@ -141,7 +151,7 @@ public void addOrphanAttemptIfNeeded(RMStateStore testStore, ApplicationAttemptId attemptId = ConverterUtils.toApplicationAttemptId( "appattempt_1352994193343_0003_000001"); storeAttempt(testStore, attemptId, - "container_1352994193343_0003_01_000001", dispatcher); + "container_1352994193343_0003_01_000001", null, dispatcher); } @Override @@ -186,14 +196,15 @@ void storeApp(RMStateStore store, ApplicationId appId, long time) } ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId, - String containerIdStr, TestDispatcher dispatcher) - throws Exception { + String containerIdStr,ByteBuffer appAttemptTokens, + TestDispatcher dispatcher) throws Exception { Container container = new ContainerPBImpl(); container.setId(ConverterUtils.toContainerId(containerIdStr)); RMAppAttempt mockAttempt = mock(RMAppAttempt.class); when(mockAttempt.getAppAttemptId()).thenReturn(attemptId); when(mockAttempt.getMasterContainer()).thenReturn(container); + when(mockAttempt.getAppAttemptTokens()).thenReturn(appAttemptTokens); dispatcher.attemptId = attemptId; dispatcher.storedException = null; store.storeApplicationAttempt(mockAttempt); @@ -203,28 +214,40 @@ ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId, void testRMStateStore(RMStateStoreHelper stateStoreHelper) throws Exception { long submitTime = System.currentTimeMillis(); + Configuration conf = new YarnConfiguration(); RMStateStore store = stateStoreHelper.getRMStateStore(); TestDispatcher dispatcher = new TestDispatcher(); store.setDispatcher(dispatcher); + ApplicationTokenSecretManager appTokenMgr = + new ApplicationTokenSecretManager(conf); ApplicationAttemptId attemptId1 = ConverterUtils .toApplicationAttemptId("appattempt_1352994193343_0001_000001"); ApplicationId appId1 = attemptId1.getApplicationId(); storeApp(store, appId1, submitTime); + + // create application token1 for attempt1 + ByteBuffer appAttemptToken1 = generateTokens(attemptId1, appTokenMgr, conf); + ContainerId containerId1 = storeAttempt(store, attemptId1, - "container_1352994193343_0001_01_000001", dispatcher); + "container_1352994193343_0001_01_000001", appAttemptToken1, dispatcher); + String appAttemptIdStr2 = "appattempt_1352994193343_0001_000002"; ApplicationAttemptId attemptId2 = - ConverterUtils.toApplicationAttemptId(appAttemptIdStr2); + ConverterUtils.toApplicationAttemptId(appAttemptIdStr2); + + // create application token2 for attempt2 + ByteBuffer appAttemptToken2 = generateTokens(attemptId2, appTokenMgr, conf); + ContainerId containerId2 = storeAttempt(store, attemptId2, - "container_1352994193343_0001_02_000001", dispatcher); + "container_1352994193343_0001_02_000001", appAttemptToken2, dispatcher); ApplicationAttemptId attemptIdRemoved = ConverterUtils .toApplicationAttemptId("appattempt_1352994193343_0002_000001"); ApplicationId appIdRemoved = attemptIdRemoved.getApplicationId(); storeApp(store, appIdRemoved, submitTime); storeAttempt(store, attemptIdRemoved, - "container_1352994193343_0002_01_000001", dispatcher); + "container_1352994193343_0002_01_000001", null, dispatcher); RMApp mockRemovedApp = mock(RMApp.class); HashMap attempts = @@ -268,12 +291,17 @@ void testRMStateStore(RMStateStoreHelper stateStoreHelper) throws Exception { assertEquals(attemptId1, attemptState.getAttemptId()); // attempt1 container is loaded correctly assertEquals(containerId1, attemptState.getMasterContainer().getId()); + // attempt1 applicationToken is loaded correctly + assertEquals(appAttemptToken1, attemptState.getAppAttemptTokens()); + attemptState = appState.getAttempt(attemptId2); // attempt2 is loaded correctly assertNotNull(attemptState); assertEquals(attemptId2, attemptState.getAttemptId()); // attempt2 container is loaded correctly assertEquals(containerId2, attemptState.getMasterContainer().getId()); + // attempt2 applicationToken is loaded correctly + assertEquals(appAttemptToken2, attemptState.getAppAttemptTokens()); // assert store is in expected state after everything is cleaned assertTrue(stateStoreHelper.isFinalStateValid()); @@ -281,4 +309,21 @@ void testRMStateStore(RMStateStoreHelper stateStoreHelper) throws Exception { store.close(); } + private ByteBuffer generateTokens(ApplicationAttemptId attemptId, + ApplicationTokenSecretManager appTokenMgr, Configuration conf) { + ApplicationTokenIdentifier appTokenId = + new ApplicationTokenIdentifier(attemptId); + Token appToken = + new Token(appTokenId, appTokenMgr); + + Credentials credentials = new Credentials(); + credentials.addToken(appToken.getService(), appToken); + DataOutputBuffer dob = new DataOutputBuffer(); + try { + credentials.writeTokenStorageToStream(dob); + } catch (IOException e) { + LOG.warn("Error in writing app attempt tokens"); + } + return ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + } }