diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java index c355a8b..ae3a094 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; +import java.util.Map; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -67,4 +69,13 @@ */ Context getNMCotext(); + /** + * Get the recovered data map from NMSS upon initiation. We use map here + * because different intercepters will need to store various data into NMSS + * for their own recovery. We don't want to modify the NMSS interface + * everytime a new intercepter is introduced. + * @return recovered data map + */ + Map getRecoveredDataMap(); + } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java index 6d4fdfc..82339fb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; import java.io.IOException; +import java.util.Map; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; @@ -42,26 +43,30 @@ private Integer localTokenKeyId; private Token amrmToken; private Token localToken; + private Map recoveredDataMap; /** * Create an instance of the AMRMProxyApplicationContext. - * + * * @param nmContext * @param conf * @param applicationAttemptId * @param user * @param amrmToken + * @param recoveredDataMap */ - public AMRMProxyApplicationContextImpl(Context nmContext, - Configuration conf, ApplicationAttemptId applicationAttemptId, - String user, Token amrmToken, - Token localToken) { + public AMRMProxyApplicationContextImpl(Context nmContext, Configuration conf, + ApplicationAttemptId applicationAttemptId, String user, + Token amrmToken, + Token localToken, + Map recoveredDataMap) { this.nmContext = nmContext; this.conf = conf; this.applicationAttemptId = applicationAttemptId; this.user = user; this.amrmToken = amrmToken; this.localToken = localToken; + this.recoveredDataMap = recoveredDataMap; } @Override @@ -129,4 +134,9 @@ public synchronized int getLocalAMRMTokenKeyId() { public Context getNMCotext() { return nmContext; } + + @Override + public Map getRecoveredDataMap() { + return this.recoveredDataMap; + } } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java index aeb3be8..a0188ff 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java @@ -64,7 +64,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; - +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState; import org.apache.hadoop.yarn.server.nodemanager.scheduler.DistributedScheduler; import org.apache.hadoop.yarn.server.security.MasterKeyData; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -86,6 +86,10 @@ ApplicationMasterProtocol { private static final Logger LOG = LoggerFactory .getLogger(AMRMProxyService.class); + + private static final String NMSS_USER_KEY = "user"; + private static final String NMSS_AMRMTOKEN_KEY = "amrmtoken"; + private Server server; private final Context nmContext; private final AsyncDispatcher dispatcher; @@ -113,6 +117,14 @@ public AMRMProxyService(Context nmContext, AsyncDispatcher dispatcher) { } @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + this.secretManager = + new AMRMProxyTokenSecretManager(this.nmContext.getNMStateStore()); + this.secretManager.init(conf); + } + + @Override protected void serviceStart() throws Exception { LOG.info("Starting AMRMProxyService"); Configuration conf = getConfig(); @@ -134,7 +146,6 @@ protected void serviceStart() throws Exception { YarnConfiguration.AMRM_PROXY_CLIENT_THREAD_COUNT, YarnConfiguration.DEFAULT_AMRM_PROXY_CLIENT_THREAD_COUNT); - this.secretManager = new AMRMProxyTokenSecretManager(serverConf); this.secretManager.start(); this.server = @@ -161,6 +172,55 @@ protected void serviceStop() throws Exception { } /** + * Recover from NM state store. Called after serviceInit before serviceStart. + * + * @throws IOException if recover fails + */ + public void recover() throws IOException { + LOG.info("Recovering AMRMProxyService"); + + RecoveredAMRMProxyState state = + this.nmContext.getNMStateStore().loadAMRMProxyState(); + + this.secretManager.recover(state); + + LOG.info("Recovering {} running applications for AMRMProxy", + state.getAppContexts().size()); + for (Map.Entry> entry : state + .getAppContexts().entrySet()) { + ApplicationAttemptId attemptId = entry.getKey(); + LOG.info("Recovering app attempt {}", attemptId); + + String user = null; + Token amrmToken = null; + for (Map.Entry contextEntry : entry.getValue() + .entrySet()) { + if (contextEntry.getKey().equals(NMSS_USER_KEY)) { + user = new String(contextEntry.getValue(), "UTF-8"); + } else if (contextEntry.getKey().equals(NMSS_AMRMTOKEN_KEY)) { + amrmToken = new Token(); + amrmToken.decodeFromUrlString( + new String(contextEntry.getValue(), "UTF-8")); + } + } + + if (amrmToken == null) { + throw new IOException( + "No amrmToken found for app attempt " + attemptId); + } + if (user == null) { + throw new IOException("No user found for app attempt " + attemptId); + } + + Token localToken = + this.secretManager.createAndGetAMRMToken(attemptId); + + initializePipeline(attemptId, user, amrmToken, localToken, + entry.getValue(), true); + } + } + + /** * This is called by the AMs started on this node to register with the RM. * This method does the initial authorization and then forwards the request to * the application instance specific intercepter chain. @@ -257,10 +317,9 @@ public void processApplicationStartRequest(StartContainerRequest request) request.getContainerLaunchContext().setTokens( ByteBuffer.wrap(dob.getData(), 0, dob.getLength())); - initializePipeline(containerTokenIdentifierForKey.getContainerID() - .getApplicationAttemptId(), - containerTokenIdentifierForKey.getApplicationSubmitter(), - amrmToken, localToken); + initializePipeline(appAttemptId, + containerTokenIdentifierForKey.getApplicationSubmitter(), amrmToken, + localToken, null, false); } /** @@ -269,10 +328,14 @@ public void processApplicationStartRequest(StartContainerRequest request) * @param applicationAttemptId * @param user * @param amrmToken + * @param localToken + * @param recoveredDataMap + * @param isRecovery */ protected void initializePipeline(ApplicationAttemptId applicationAttemptId, String user, Token amrmToken, - Token localToken) { + Token localToken, + Map recoveredDataMap, boolean isRecovery) { RequestInterceptorChainWrapper chainWrapper = null; synchronized (applPipelineMap) { if (applPipelineMap @@ -288,11 +351,23 @@ protected void initializePipeline(ApplicationAttemptId applicationAttemptId, && chainWrapperBackup.getApplicationAttemptId() != null && !chainWrapperBackup.getApplicationAttemptId() .equals(applicationAttemptId)) { + // TODO: revisit in AMRMProxy HA in YARN-6128 // Remove the existing pipeline LOG.info("Remove the previous pipeline for ApplicationId: " + applicationAttemptId.toString()); RequestInterceptorChainWrapper pipeline = applPipelineMap.remove(applicationAttemptId.getApplicationId()); + + if (!isRecovery && this.nmContext.getNMStateStore() != null) { + try { + this.nmContext.getNMStateStore() + .removeAMRMProxyAppContext(applicationAttemptId); + } catch (IOException e) { + LOG.error("Error removing AMRMProxy application context for " + + applicationAttemptId, e); + } + } + try { pipeline.getRootInterceptor().shutdown(); } catch (Throwable ex) { @@ -323,8 +398,24 @@ protected void initializePipeline(ApplicationAttemptId applicationAttemptId, RequestInterceptor interceptorChain = this.createRequestInterceptorChain(); interceptorChain.init(createApplicationMasterContext(this.nmContext, - applicationAttemptId, user, amrmToken, localToken)); + applicationAttemptId, user, amrmToken, localToken, recoveredDataMap)); + if (isRecovery) { + interceptorChain.recover(); + } chainWrapper.init(interceptorChain, applicationAttemptId); + + if (!isRecovery && this.nmContext.getNMStateStore() != null) { + try { + this.nmContext.getNMStateStore().storeAMRMProxyAppContextEntry( + applicationAttemptId, NMSS_USER_KEY, user.getBytes("UTF-8")); + this.nmContext.getNMStateStore().storeAMRMProxyAppContextEntry( + applicationAttemptId, NMSS_AMRMTOKEN_KEY, + amrmToken.encodeToUrlString().getBytes("UTF-8")); + } catch (IOException e) { + LOG.error("Error storing AMRMProxy application context entry for " + + applicationAttemptId, e); + } + } } catch (Exception e) { this.applPipelineMap.remove(applicationAttemptId.getApplicationId()); throw e; @@ -362,6 +453,17 @@ protected void stopApplication(ApplicationId applicationId) { "Failed to shutdown the request processing pipeline for app:" + applicationId, ex); } + + // Remove the app context from NMSS after the interceptors are shutdown + if (this.nmContext.getNMStateStore() != null) { + try { + this.nmContext.getNMStateStore() + .removeAMRMProxyAppContext(pipeline.getApplicationAttemptId()); + } catch (IOException e) { + LOG.error("Error removing AMRMProxy application context for " + + applicationId, e); + } + } } } @@ -383,12 +485,24 @@ private void updateAMRMTokens(AMRMTokenIdentifier amrmTokenIdentifier, // Do not propagate this info back to AM allocateResponse.setAMRMToken(null); - org.apache.hadoop.security.token.Token newTokenId = + org.apache.hadoop.security.token.Token newToken = new org.apache.hadoop.security.token.Token( token.getIdentifier().array(), token.getPassword().array(), new Text(token.getKind()), new Text(token.getService())); - context.setAMRMToken(newTokenId); + context.setAMRMToken(newToken); + + // Update the AMRMToken in context map in NM state store + if (this.nmContext.getNMStateStore() != null) { + try { + this.nmContext.getNMStateStore().storeAMRMProxyAppContextEntry( + context.getApplicationAttemptId(), NMSS_AMRMTOKEN_KEY, + newToken.encodeToUrlString().getBytes("UTF-8")); + } catch (IOException e) { + LOG.error("Error storing AMRMProxy application context entry for " + + context.getApplicationAttemptId(), e); + } + } } // Check if the local AMRMToken is rolled up and update the context and @@ -422,10 +536,12 @@ private void updateAMRMTokens(AMRMTokenIdentifier amrmTokenIdentifier, private AMRMProxyApplicationContext createApplicationMasterContext( Context context, ApplicationAttemptId applicationAttemptId, String user, Token amrmToken, - Token localToken) { + Token localToken, + Map recoveredDataMap) { AMRMProxyApplicationContextImpl appContext = new AMRMProxyApplicationContextImpl(context, getConfig(), - applicationAttemptId, user, amrmToken, localToken); + applicationAttemptId, user, amrmToken, localToken, + recoveredDataMap); return appContext; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyTokenSecretManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyTokenSecretManager.java index d09ce41..bb6538d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyTokenSecretManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyTokenSecretManager.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; +import java.io.IOException; import java.security.SecureRandom; import java.util.HashSet; import java.util.Set; @@ -37,6 +38,9 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.api.records.MasterKey; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState; import org.apache.hadoop.yarn.server.security.MasterKeyData; import com.google.common.annotations.VisibleForTesting; @@ -60,17 +64,24 @@ private final Lock writeLock = readWriteLock.writeLock(); private final Timer timer; - private final long rollingInterval; - private final long activationDelay; + private long rollingInterval; + private long activationDelay; + + private NMStateStoreService nmStateStore; private final Set appAttemptSet = new HashSet(); /** * Create an {@link AMRMProxyTokenSecretManager}. + * @param nmStateStoreService NM state store */ - public AMRMProxyTokenSecretManager(Configuration conf) { + public AMRMProxyTokenSecretManager(NMStateStoreService nmStateStoreService) { this.timer = new Timer(); + this.nmStateStore = nmStateStoreService; + } + + public void init(Configuration conf) { this.rollingInterval = conf.getLong( YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS, @@ -94,6 +105,14 @@ public AMRMProxyTokenSecretManager(Configuration conf) { public void start() { if (this.currentMasterKey == null) { this.currentMasterKey = createNewMasterKey(); + if (this.nmStateStore != null) { + try { + this.nmStateStore.storeAMRMProxyCurrentMasterKey( + this.currentMasterKey.getMasterKey()); + } catch (IOException e) { + LOG.error("Unable to update current master key in state store", e); + } + } } this.timer.scheduleAtFixedRate(new MasterKeyRoller(), rollingInterval, rollingInterval); @@ -103,6 +122,11 @@ public void stop() { this.timer.cancel(); } + @VisibleForTesting + public void setNMStateStoreService(NMStateStoreService nmStateStoreService) { + this.nmStateStore = nmStateStoreService; + } + public void applicationMasterFinished(ApplicationAttemptId appAttemptId) { this.writeLock.lock(); try { @@ -122,11 +146,21 @@ public void run() { } @Private - void rollMasterKey() { + @VisibleForTesting + public void rollMasterKey() { this.writeLock.lock(); try { LOG.info("Rolling master-key for amrm-tokens"); this.nextMasterKey = createNewMasterKey(); + if (this.nmStateStore != null) { + try { + this.nmStateStore + .storeAMRMProxyNextMasterKey(this.nextMasterKey.getMasterKey()); + } catch (IOException e) { + LOG.error("Unable to update next master key in state store", e); + } + } + this.timer.schedule(new NextKeyActivator(), this.activationDelay); } finally { this.writeLock.unlock(); @@ -140,6 +174,8 @@ public void run() { } } + @Private + @VisibleForTesting public void activateNextMasterKey() { this.writeLock.lock(); try { @@ -147,6 +183,15 @@ public void activateNextMasterKey() { + this.nextMasterKey.getMasterKey().getKeyId()); this.currentMasterKey = this.nextMasterKey; this.nextMasterKey = null; + if (this.nmStateStore != null) { + try { + this.nmStateStore.storeAMRMProxyCurrentMasterKey( + this.currentMasterKey.getMasterKey()); + this.nmStateStore.storeAMRMProxyNextMasterKey(null); + } catch (IOException e) { + LOG.error("Unable to update current master key in state store", e); + } + } } finally { this.writeLock.unlock(); } @@ -239,6 +284,17 @@ public AMRMTokenIdentifier createIdentifier() { @Private @VisibleForTesting + public MasterKeyData getCurrentMasterKeyData() { + this.readLock.lock(); + try { + return this.currentMasterKey; + } finally { + this.readLock.unlock(); + } + } + + @Private + @VisibleForTesting public MasterKeyData getNextMasterKeyData() { this.readLock.lock(); try { @@ -262,4 +318,34 @@ public MasterKeyData getNextMasterKeyData() { this.readLock.unlock(); } } + + /** + * Recover secretManager from state store. Called after serviceInit before + * serviceStart. + * + * @param state the state to recover from + * @throws IOException if recover fails + */ + public void recover(RecoveredAMRMProxyState state) { + if (state != null) { + // recover the current master key + MasterKey currentKey = state.getCurrentMasterKey(); + if (currentKey != null) { + this.currentMasterKey = new MasterKeyData(currentKey, + createSecretKey(currentKey.getBytes().array())); + } else { + LOG.warn("No current master key recovered from NM StateStore" + + " for AMRMProxyTokenSecretManager"); + } + + // recover the next master key if not null + MasterKey nextKey = state.getNextMasterKey(); + if (nextKey != null) { + this.nextMasterKey = new MasterKeyData(nextKey, + createSecretKey(nextKey.getBytes().array())); + this.timer.schedule(new NextKeyActivator(), this.activationDelay); + } + } + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java index e6c9bbd..4a14bd5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java @@ -18,16 +18,16 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; -import org.apache.hadoop.conf.Configuration; +import java.io.IOException; import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse; - -import java.io.IOException; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; /** * Implements the RequestInterceptor interface and provides common functionality @@ -83,6 +83,16 @@ public void init(AMRMProxyApplicationContext appContext) { } /** + * Recover {@link RequestInterceptor} state from store. + */ + @Override + public void recover() { + if (this.nextInterceptor != null) { + this.nextInterceptor.recover(); + } + } + + /** * Disposes the {@link RequestInterceptor}. */ @Override @@ -141,4 +151,16 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling( return (this.nextInterceptor != null) ? this.nextInterceptor .registerApplicationMasterForDistributedScheduling(request) : null; } + + /** + * A helper method for getting NM state store. + * + * @return the NMSS instance + */ + public NMStateStoreService getNMStateStore() { + if (this.appContext == null || this.appContext.getNMCotext() == null) { + return null; + } + return this.appContext.getNMCotext().getNMStateStore(); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java index 5995af1..32c71a9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java @@ -37,6 +37,18 @@ void init(AMRMProxyApplicationContext ctx); /** + * Recover intercepter state when NM recovery is enabled. AMRMProxy will + * recover the data map into + * AMRMProxyApplicationContext.getRecoveredDataMap(). Each intercepter should + * recover its state from it. + * + * For example, registerRequest has to be saved by the last intercepter (i.e. + * the one that actually connects to RM), in order to re-register when RM + * fails over. + */ + void recover(); + + /** * This method is called to release the resources held by the intercepter. * This will be called when the application pipeline is being destroyed. The * concrete implementations should dispose the resources and forward the diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index cbf617b..b1d634a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -341,6 +341,10 @@ private void recover() throws IOException, URISyntaxException { rsrcLocalizationSrvc.recoverLocalizedResources( stateStore.loadLocalizationState()); + if (this.amrmProxyEnabled) { + this.getAMRMProxyService().recover(); + } + RecoveredApplicationsState appsState = stateStore.loadApplicationsState(); for (ContainerManagerApplicationProto proto : appsState.getApplications()) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index ab23456..d6c8ea4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -24,12 +24,15 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Timer; import java.util.TimerTask; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -80,9 +83,8 @@ private static final String DB_NAME = "yarn-nm-state"; private static final String DB_SCHEMA_VERSION_KEY = "nm-schema-version"; - - private static final Version CURRENT_VERSION_INFO = Version - .newInstance(2, 0); + + private static final Version CURRENT_VERSION_INFO = Version.newInstance(2, 1); private static final String DELETION_TASK_KEY_PREFIX = "DeletionService/deltask_"; @@ -121,6 +123,7 @@ private static final String CURRENT_MASTER_KEY_SUFFIX = "CurrentMasterKey"; private static final String PREV_MASTER_KEY_SUFFIX = "PreviousMasterKey"; + private static final String NEXT_MASTER_KEY_SUFFIX = "NextMasterKey"; private static final String NM_TOKENS_KEY_PREFIX = "NMTokens/"; private static final String NM_TOKENS_CURRENT_MASTER_KEY = NM_TOKENS_KEY_PREFIX + CURRENT_MASTER_KEY_SUFFIX; @@ -135,6 +138,8 @@ private static final String LOG_DELETER_KEY_PREFIX = "LogDeleters/"; + private static final String AMRMPROXY_KEY_PREFIX = "AMRMProxy/"; + private static final byte[] EMPTY_VALUE = new byte[0]; private DB db; @@ -1125,6 +1130,160 @@ private String getLogDeleterKey(ApplicationId appId) { } @Override + public RecoveredAMRMProxyState loadAMRMProxyState() throws IOException { + RecoveredAMRMProxyState result = new RecoveredAMRMProxyState(); + result.appContexts = + new HashMap>(); + LeveldbIterator iter = null; + try { + iter = new LeveldbIterator(db); + iter.seek(bytes(AMRMPROXY_KEY_PREFIX)); + while (iter.hasNext()) { + Entry entry = iter.peekNext(); + String key = asString(entry.getKey()); + if (!key.startsWith(AMRMPROXY_KEY_PREFIX)) { + break; + } + + String suffix = key.substring(AMRMPROXY_KEY_PREFIX.length()); + if (suffix.equals(CURRENT_MASTER_KEY_SUFFIX)) { + iter.next(); + result.currentMasterKey = parseMasterKey(entry.getValue()); + LOG.info("Recovered for AMRMProxy: current master key id " + + result.currentMasterKey.getKeyId()); + + } else if (suffix.equals(NEXT_MASTER_KEY_SUFFIX)) { + iter.next(); + result.nextMasterKey = parseMasterKey(entry.getValue()); + LOG.info("Recovered for AMRMProxy: next master key id " + + result.nextMasterKey.getKeyId()); + + } else { // Load AMRMProxy application context map for an app attempt + // Parse appAttemptId + int idEndPos = key.indexOf('/', AMRMPROXY_KEY_PREFIX.length()); + if (idEndPos < 0) { + throw new IOException( + "Unable to determine attemptId in key: " + key); + } + ApplicationAttemptId attemptId = ApplicationAttemptId.fromString( + key.substring(AMRMPROXY_KEY_PREFIX.length(), idEndPos)); + + // Parse the context map for the appAttemptId + Map appContext = loadAMRMProxyAppContextMap(attemptId, + iter, key.substring(0, idEndPos + 1)); + result.appContexts.put(attemptId, appContext); + + LOG.info("Recovered for AMRMProxy: " + attemptId + ", map size " + + appContext.size()); + } + } + } catch (DBException e) { + throw new IOException(e); + } finally { + if (iter != null) { + iter.close(); + } + } + return result; + } + + private Map loadAMRMProxyAppContextMap( + ApplicationAttemptId attemptId, LeveldbIterator iter, String keyPrefix) + throws IOException { + Map appContextMap = new HashMap(); + while (iter.hasNext()) { + Entry entry = iter.peekNext(); + String key = asString(entry.getKey()); + if (!key.startsWith(keyPrefix)) { + break; + } + iter.next(); + String suffix = key.substring(keyPrefix.length()); + byte[] data = entry.getValue(); + appContextMap.put(suffix, Arrays.copyOf(data, data.length)); + } + return appContextMap; + } + + @Override + public void storeAMRMProxyCurrentMasterKey(MasterKey key) throws IOException { + storeMasterKey(AMRMPROXY_KEY_PREFIX + CURRENT_MASTER_KEY_SUFFIX, key); + } + + @Override + public void storeAMRMProxyNextMasterKey(MasterKey key) throws IOException { + String dbkey = AMRMPROXY_KEY_PREFIX + NEXT_MASTER_KEY_SUFFIX; + if (key == null) { + // When key is null, delete the entry instead + try { + db.delete(bytes(dbkey)); + } catch (DBException e) { + throw new IOException(e); + } + return; + } + storeMasterKey(dbkey, key); + } + + @Override + public void storeAMRMProxyAppContextEntry(ApplicationAttemptId attempt, + String key, byte[] data) throws IOException { + String fullkey = AMRMPROXY_KEY_PREFIX + attempt + "/" + key; + try { + db.put(bytes(fullkey), data); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + public void removeAMRMProxyAppContextEntry(ApplicationAttemptId attempt, + String key) throws IOException { + String fullkey = AMRMPROXY_KEY_PREFIX + attempt + "/" + key; + try { + db.delete(bytes(fullkey)); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + public void removeAMRMProxyAppContext(ApplicationAttemptId attempt) + throws IOException { + Set candidates = new HashSet(); + String keyPrefix = AMRMPROXY_KEY_PREFIX + attempt + "/"; + LeveldbIterator iter = null; + try { + iter = new LeveldbIterator(db); + iter.seek(bytes(keyPrefix)); + while (iter.hasNext()) { + Entry entry = iter.next(); + String key = asString(entry.getKey()); + if (!key.startsWith(keyPrefix)) { + break; + } + // Do this because iter.remove() is not supported here + candidates.add(key); + } + } catch (DBException e) { + throw new IOException(e); + } finally { + if (iter != null) { + iter.close(); + } + } + + // Delete all candidate keys + try { + for (String key : candidates) { + db.delete(bytes(key)); + } + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override protected void initStorage(Configuration conf) throws IOException { db = openDatabase(conf); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java index 4bcdf5c..96c3f9e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java @@ -229,6 +229,35 @@ public void removeLogDeleter(ApplicationId appId) throws IOException { } @Override + public RecoveredAMRMProxyState loadAMRMProxyState() throws IOException { + throw new UnsupportedOperationException( + "Recovery not supported by this state store"); + } + + @Override + public void storeAMRMProxyCurrentMasterKey(MasterKey key) throws IOException { + } + + @Override + public void storeAMRMProxyNextMasterKey(MasterKey key) throws IOException { + } + + @Override + public void storeAMRMProxyAppContextEntry(ApplicationAttemptId attempt, + String key, byte[] data) throws IOException { + } + + @Override + public void removeAMRMProxyAppContextEntry(ApplicationAttemptId attempt, + String key) throws IOException { + } + + @Override + public void removeAMRMProxyAppContext(ApplicationAttemptId attempt) + throws IOException { + } + + @Override protected void initStorage(Configuration conf) throws IOException { } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index 9dd1eb0..dc6e450 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -266,6 +266,26 @@ public MasterKey getPreviousMasterKey() { } } + public static class RecoveredAMRMProxyState { + MasterKey currentMasterKey; + MasterKey nextMasterKey; + // For each app, stores amrmToken, user name, as well as various AMRMProxy + // intercepter states + Map> appContexts; + + public MasterKey getCurrentMasterKey() { + return currentMasterKey; + } + + public MasterKey getNextMasterKey() { + return nextMasterKey; + } + + public Map> getAppContexts() { + return appContexts; + } + } + /** Initialize the state storage */ @Override public void serviceInit(Configuration conf) throws IOException { @@ -601,6 +621,57 @@ public abstract void storeLogDeleter(ApplicationId appId, public abstract void removeLogDeleter(ApplicationId appId) throws IOException; + /** + * Load the state of AMRMProxy + * @return recovered state of AMRMProxy + * @throws IOException + */ + public abstract RecoveredAMRMProxyState loadAMRMProxyState() + throws IOException; + + /** + * Record the current AMRMProxyTokenSecretManager master key + * @param key the current master key + * @throws IOException + */ + public abstract void storeAMRMProxyCurrentMasterKey(MasterKey key) + throws IOException; + + /** + * Record the next AMRMProxyTokenSecretManager master key + * @param key the next master key + * @throws IOException + */ + public abstract void storeAMRMProxyNextMasterKey(MasterKey key) + throws IOException; + + /** + * Add a context entry for an application attempt in AMRMProxyService. + * @param attempt app attempt ID + * @param key key string + * @param data state data to store + * @throws IOException + */ + public abstract void storeAMRMProxyAppContextEntry( + ApplicationAttemptId attempt, String key, byte[] data) throws IOException; + + /** + * Remove a context entry for an application attempt in AMRMProxyService. + * @param attempt attempt ID + * @param key key string + * @throws IOException + */ + public abstract void removeAMRMProxyAppContextEntry( + ApplicationAttemptId attempt, String key) throws IOException; + + /** + * Remove the entire context map for an application attempt in + * AMRMProxyService. + * @param attempt attempt ID + * @throws IOException + */ + public abstract void removeAMRMProxyAppContext(ApplicationAttemptId attempt) + throws IOException; protected abstract void initStorage(Configuration conf) throws IOException; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index 6f5009e..590da75 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -39,6 +39,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; @@ -62,12 +63,13 @@ import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; import org.apache.hadoop.yarn.server.nodemanager.NodeResourceMonitor; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; - +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; @@ -98,6 +100,7 @@ .newCachedThreadPool(); private Configuration conf; private AsyncDispatcher dispatcher; + private Context nmContext; protected MockAMRMProxyService getAMRMProxyService() { Assert.assertNotNull(this.amrmProxyService); @@ -105,32 +108,40 @@ protected MockAMRMProxyService getAMRMProxyService() { } @Before - public void setUp() { - this.conf = new YarnConfiguration(); - this.conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true); + public void setUp() throws IOException { + this.conf = createConfiguration(); + this.dispatcher = new AsyncDispatcher(); + this.dispatcher.init(this.conf); + this.dispatcher.start(); + createAndStartAMRMProxyService(this.conf); + } + + protected YarnConfiguration createConfiguration() { + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true); String mockPassThroughInterceptorClass = PassThroughRequestInterceptor.class.getName(); // Create a request intercepter pipeline for testing. The last one in the // chain will call the mock resource manager. The others in the chain will // simply forward it to the next one in the chain - this.conf.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE, - mockPassThroughInterceptorClass + "," - + mockPassThroughInterceptorClass + "," - + mockPassThroughInterceptorClass + "," + conf.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE, + mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass + + "," + mockPassThroughInterceptorClass + "," + MockRequestInterceptor.class.getName()); - this.dispatcher = new AsyncDispatcher(); - this.dispatcher.init(this.conf); - this.dispatcher.start(); - createAndStartAMRMProxyService(this.conf); + conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); + return conf; } @After public void tearDown() { - amrmProxyService.stop(); - amrmProxyService = null; + this.amrmProxyService.stop(); + this.amrmProxyService = null; this.dispatcher.stop(); + if (this.nmContext.getNMStateStore() != null) { + this.nmContext.getNMStateStore().stop(); + } } protected ExecutorService getThreadPool() { @@ -141,17 +152,33 @@ protected Configuration getConf() { return this.conf; } - protected void createAndStartAMRMProxyService(Configuration config) { + protected AsyncDispatcher getDispatcher() { + return this.dispatcher; + } + + protected void createAndStartAMRMProxyService(Configuration config) + throws IOException { // Stop the existing instance first if not null if (this.amrmProxyService != null) { this.amrmProxyService.stop(); } + if (this.nmContext == null) { + this.nmContext = createContext(); + } this.amrmProxyService = - new MockAMRMProxyService(new NullContext(), dispatcher); + new MockAMRMProxyService(this.nmContext, this.dispatcher); this.amrmProxyService.init(config); + this.amrmProxyService.recover(); this.amrmProxyService.start(); } + protected Context createContext() { + NMMemoryStateStoreService stateStore = new NMMemoryStateStoreService(); + stateStore.init(this.conf); + stateStore.start(); + return new NMContext(null, null, null, null, stateStore, false, this.conf); + } + /** * This helper method will invoke the specified function in parallel for each * end point in the specified list using a thread pool and return the @@ -579,6 +606,13 @@ public MockAMRMProxyService(Context nmContext, super(nmContext, dispatcher); } + @Override + protected void serviceStart() throws Exception { + // Override this method and do nothing to avoid the base class from + // listening to server end point + getSecretManager().start(); + } + /** * This method is used by the test code to initialize the pipeline. In the * actual service, the initialization is called by the @@ -588,7 +622,8 @@ public MockAMRMProxyService(Context nmContext, * @param user */ public void initApp(ApplicationAttemptId applicationId, String user) { - super.initializePipeline(applicationId, user, null, null); + super.initializePipeline(applicationId, user, + new Token(), null, null, false); } public void stopApp(ApplicationId applicationId) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java index fa17f26..e734bdd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java @@ -26,9 +26,12 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -38,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService.RequestInterceptorChainWrapper; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; @@ -48,6 +52,8 @@ private static final Log LOG = LogFactory .getLog(TestAMRMProxyService.class); + private static MockResourceManagerFacade mockRM; + /** * Test if the pipeline is created properly. */ @@ -99,9 +105,11 @@ public void testRegisterOneApplicationMaster() throws Exception { /** * Tests the case when interceptor pipeline initialization fails. + * + * @throws IOException */ @Test - public void testInterceptorInitFailure() { + public void testInterceptorInitFailure() throws IOException { Configuration conf = this.getConf(); // Override with a bad interceptor configuration conf.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE, @@ -434,8 +442,8 @@ public void testMultipleAttemptsSameNode() // Second Attempt applicationAttemptId = ApplicationAttemptId.newInstance(appId, 2); - getAMRMProxyService().initializePipeline(applicationAttemptId, user, null, - null); + getAMRMProxyService().initializePipeline(applicationAttemptId, user, + new Token(), null, null, false); RequestInterceptorChainWrapper chain2 = getAMRMProxyService().getPipelines().get(appId); @@ -559,4 +567,109 @@ private void releaseContainersAndAssert(int appId, Assert.assertEquals(relList.size(), containersForReleasedContainerIds.size()); } + + /** + * Test AMRMProxy restart with recovery. + */ + @Test + public void testRecovery() throws YarnException, Exception { + + Configuration conf = createConfiguration(); + // Use the MockRequestInterceptorAcrossRestart instead for the chain + conf.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE, + MockRequestInterceptorAcrossRestart.class.getName()); + + mockRM = new MockResourceManagerFacade(new YarnConfiguration(conf), 0); + + createAndStartAMRMProxyService(conf); + + int testAppId1 = 1; + RegisterApplicationMasterResponse registerResponse = + registerApplicationMaster(testAppId1); + Assert.assertNotNull(registerResponse); + Assert.assertEquals(Integer.toString(testAppId1), + registerResponse.getQueue()); + + int testAppId2 = 2; + registerResponse = registerApplicationMaster(testAppId2); + Assert.assertNotNull(registerResponse); + Assert.assertEquals(Integer.toString(testAppId2), + registerResponse.getQueue()); + + AllocateResponse allocateResponse = allocate(testAppId2); + Assert.assertNotNull(allocateResponse); + + // At the time of kill, app1 just registerAM, app2 already did one allocate. + // Both application should be recovered + createAndStartAMRMProxyService(conf); + Assert.assertTrue(getAMRMProxyService().getPipelines().size() == 2); + + allocateResponse = allocate(testAppId1); + Assert.assertNotNull(allocateResponse); + + FinishApplicationMasterResponse finshResponse = + finishApplicationMaster(testAppId1, FinalApplicationStatus.SUCCEEDED); + Assert.assertNotNull(finshResponse); + Assert.assertEquals(true, finshResponse.getIsUnregistered()); + + allocateResponse = allocate(testAppId2); + Assert.assertNotNull(allocateResponse); + + finshResponse = + finishApplicationMaster(testAppId2, FinalApplicationStatus.SUCCEEDED); + + Assert.assertNotNull(finshResponse); + Assert.assertEquals(true, finshResponse.getIsUnregistered()); + + int testAppId3 = 3; + try { + // Try to finish an application master that is not registered. + finishApplicationMaster(testAppId3, FinalApplicationStatus.SUCCEEDED); + Assert + .fail("The Mock RM should complain about not knowing the third app"); + } catch (Throwable ex) { + } + + mockRM = null; + } + + /** + * A mock intercepter implementation that uses the same mockRM instance across + * restart. + */ + public static class MockRequestInterceptorAcrossRestart + extends AbstractRequestInterceptor { + + public MockRequestInterceptorAcrossRestart() { + } + + @Override + public void init(AMRMProxyApplicationContext appContext) { + super.init(appContext); + if (mockRM == null) { + throw new RuntimeException("mockRM not initialized yet"); + } + } + + @Override + public RegisterApplicationMasterResponse registerApplicationMaster( + RegisterApplicationMasterRequest request) + throws YarnException, IOException { + return mockRM.registerApplicationMaster(request); + } + + @Override + public FinishApplicationMasterResponse finishApplicationMaster( + FinishApplicationMasterRequest request) + throws YarnException, IOException { + return mockRM.finishApplicationMaster(request); + } + + @Override + public AllocateResponse allocate(AllocateRequest request) + throws YarnException, IOException { + return mockRM.allocate(request); + } + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyTokenSecretManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyTokenSecretManager.java index 927563e..db7167d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyTokenSecretManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyTokenSecretManager.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -40,11 +41,19 @@ private YarnConfiguration conf; private AMRMProxyTokenSecretManager secretManager; + private NMMemoryStateStoreService stateStore; @Before public void setup() { conf = new YarnConfiguration(); - secretManager = new AMRMProxyTokenSecretManager(conf); + conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); + + stateStore = new NMMemoryStateStoreService(); + stateStore.init(conf); + stateStore.start(); + + secretManager = new AMRMProxyTokenSecretManager(stateStore); + secretManager.init(conf); secretManager.start(); } @@ -53,6 +62,9 @@ public void breakdown() { if (secretManager != null) { secretManager.stop(); } + if (stateStore != null) { + stateStore.stop(); + } } @Test @@ -78,4 +90,52 @@ public void testNormalCase() throws IOException { } } + @Test + public void testRecovery() throws IOException { + ApplicationId appId = ApplicationId.newInstance(1, 1); + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); + + Token localToken = + secretManager.createAndGetAMRMToken(attemptId); + + AMRMTokenIdentifier identifier = secretManager.createIdentifier(); + identifier.readFields(new DataInputStream( + new ByteArrayInputStream(localToken.getIdentifier()))); + + secretManager.retrievePassword(identifier); + + // Generate next master key + secretManager.rollMasterKey(); + + // Restart and recover + secretManager.stop(); + secretManager = new AMRMProxyTokenSecretManager(stateStore); + secretManager.init(conf); + secretManager.recover(stateStore.loadAMRMProxyState()); + secretManager.start(); + // Recover the app + secretManager.createAndGetAMRMToken(attemptId); + + // Current master key should be recovered, and thus pass here + secretManager.retrievePassword(identifier); + + // Roll key, current master key will be replaced + secretManager.activateNextMasterKey(); + + // Restart and recover + secretManager.stop(); + secretManager = new AMRMProxyTokenSecretManager(stateStore); + secretManager.init(conf); + secretManager.recover(stateStore.loadAMRMProxyState()); + secretManager.start(); + // Recover the app + secretManager.createAndGetAMRMToken(attemptId); + + try { + secretManager.retrievePassword(identifier); + Assert.fail("Expect InvalidToken exception because the " + + "old master key should have expired"); + } catch (InvalidToken e) { + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java index 15c0e84..1a1f812 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java @@ -20,11 +20,10 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -50,6 +49,7 @@ private RecoveredNMTokensState nmTokenState; private RecoveredContainerTokensState containerTokenState; private Map logDeleterState; + private RecoveredAMRMProxyState amrmProxyState; public NMMemoryStateStoreService() { super(NMMemoryStateStoreService.class.getName()); @@ -67,6 +67,9 @@ protected void initStorage(Configuration conf) { trackerStates = new HashMap(); deleteTasks = new HashMap(); logDeleterState = new HashMap(); + amrmProxyState = new RecoveredAMRMProxyState(); + amrmProxyState.appContexts = + new HashMap>(); } @Override @@ -417,6 +420,68 @@ public synchronized void removeLogDeleter(ApplicationId appId) logDeleterState.remove(appId); } + @Override + public synchronized RecoveredAMRMProxyState loadAMRMProxyState() + throws IOException { + // return a copy so caller can't modify our state + RecoveredAMRMProxyState result = new RecoveredAMRMProxyState(); + result.currentMasterKey = amrmProxyState.currentMasterKey; + result.nextMasterKey = amrmProxyState.nextMasterKey; + result.appContexts = + new HashMap>(); + for (Map.Entry> entry + : amrmProxyState.appContexts.entrySet()) { + result.appContexts.put(entry.getKey(), + new HashMap(entry.getValue())); + } + return result; + } + + @Override + public synchronized void storeAMRMProxyCurrentMasterKey(MasterKey key) + throws IOException { + MasterKeyPBImpl keypb = (MasterKeyPBImpl) key; + amrmProxyState.currentMasterKey = new MasterKeyPBImpl(keypb.getProto()); + } + + @Override + public synchronized void storeAMRMProxyNextMasterKey(MasterKey key) + throws IOException { + if (key == null) { + amrmProxyState.nextMasterKey = null; + return; + } + MasterKeyPBImpl keypb = (MasterKeyPBImpl) key; + amrmProxyState.nextMasterKey = new MasterKeyPBImpl(keypb.getProto()); + } + + @Override + public synchronized void storeAMRMProxyAppContextEntry( + ApplicationAttemptId attempt, String key, byte[] data) + throws IOException { + Map entryMap = amrmProxyState.appContexts.get(attempt); + if (entryMap == null) { + entryMap = new HashMap(); + amrmProxyState.appContexts.put(attempt, entryMap); + } + entryMap.put(key, Arrays.copyOf(data, data.length)); + } + + @Override + public synchronized void removeAMRMProxyAppContextEntry( + ApplicationAttemptId attempt, String key) throws IOException { + Map entryMap = amrmProxyState.appContexts.get(attempt); + if (entryMap != null) { + entryMap.remove(key); + } + } + + @Override + public synchronized void removeAMRMProxyAppContext( + ApplicationAttemptId attempt) throws IOException { + amrmProxyState.appContexts.remove(attempt); + } + private static class TrackerState { Map inProgressMap = new HashMap(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index 537f849..d47dda1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -20,10 +20,11 @@ import static org.fusesource.leveldbjni.JniDBFactory.bytes; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertFalse; import static org.mockito.Mockito.isNull; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.timeout; @@ -33,6 +34,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -65,7 +67,9 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.records.MasterKey; +import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; @@ -1016,6 +1020,105 @@ public void testUnexpectedKeyDoesntThrowException() throws IOException { assertNull(stateStore.getDB().get(invalidKey)); } + @Test + public void testAMRMProxyStorage() throws IOException { + RecoveredAMRMProxyState state = stateStore.loadAMRMProxyState(); + assertEquals(state.currentMasterKey, null); + assertEquals(state.nextMasterKey, null); + assertEquals(state.appContexts.size(), 0); + + ApplicationId appId1 = ApplicationId.newInstance(1, 1); + ApplicationId appId2 = ApplicationId.newInstance(1, 2); + ApplicationAttemptId attemptId1 = + ApplicationAttemptId.newInstance(appId1, 1); + ApplicationAttemptId attemptId2 = + ApplicationAttemptId.newInstance(appId2, 2); + String key1 = "key1"; + String key2 = "key2"; + byte[] data1 = "data1".getBytes(); + byte[] data2 = "data2".getBytes(); + + AMRMProxyTokenSecretManager secretManager = + new AMRMProxyTokenSecretManager(stateStore); + secretManager.init(conf); + // Generate currentMasterKey + secretManager.start(); + + try { + // Add two applications, each with two data entries + stateStore.storeAMRMProxyAppContextEntry(attemptId1, key1, data1); + stateStore.storeAMRMProxyAppContextEntry(attemptId2, key1, data1); + stateStore.storeAMRMProxyAppContextEntry(attemptId1, key2, data2); + stateStore.storeAMRMProxyAppContextEntry(attemptId2, key2, data2); + + // restart state store and verify recovered + restartStateStore(); + secretManager.setNMStateStoreService(stateStore); + state = stateStore.loadAMRMProxyState(); + assertEquals(state.currentMasterKey, + secretManager.getCurrentMasterKeyData().getMasterKey()); + assertEquals(state.nextMasterKey, null); + assertEquals(state.appContexts.size(), 2); + // app1 + Map map = state.appContexts.get(attemptId1); + assertNotEquals(map, null); + assertEquals(map.size(), 2); + assertTrue(Arrays.equals(map.get(key1), data1)); + assertTrue(Arrays.equals(map.get(key2), data2)); + // app2 + map = state.appContexts.get(attemptId2); + assertNotEquals(map, null); + assertEquals(map.size(), 2); + assertTrue(Arrays.equals(map.get(key1), data1)); + assertTrue(Arrays.equals(map.get(key2), data2)); + + // Generate next master key and remove one entry of app2 + secretManager.rollMasterKey(); + stateStore.removeAMRMProxyAppContextEntry(attemptId2, key1); + + // restart state store and verify recovered + restartStateStore(); + secretManager.setNMStateStoreService(stateStore); + state = stateStore.loadAMRMProxyState(); + assertEquals(state.currentMasterKey, + secretManager.getCurrentMasterKeyData().getMasterKey()); + assertEquals(state.nextMasterKey, + secretManager.getNextMasterKeyData().getMasterKey()); + assertEquals(state.appContexts.size(), 2); + // app1 + map = state.appContexts.get(attemptId1); + assertNotEquals(map, null); + assertEquals(map.size(), 2); + assertTrue(Arrays.equals(map.get(key1), data1)); + assertTrue(Arrays.equals(map.get(key2), data2)); + // app2 + map = state.appContexts.get(attemptId2); + assertNotEquals(map, null); + assertEquals(map.size(), 1); + assertTrue(Arrays.equals(map.get(key2), data2)); + + // Activate next master key and remove all entries of app1 + secretManager.activateNextMasterKey(); + stateStore.removeAMRMProxyAppContext(attemptId1); + + // restart state store and verify recovered + restartStateStore(); + secretManager.setNMStateStoreService(stateStore); + state = stateStore.loadAMRMProxyState(); + assertEquals(state.currentMasterKey, + secretManager.getCurrentMasterKeyData().getMasterKey()); + assertEquals(state.nextMasterKey, null); + assertEquals(state.appContexts.size(), 1); + // app2 only + map = state.appContexts.get(attemptId2); + assertNotEquals(map, null); + assertEquals(map.size(), 1); + assertTrue(Arrays.equals(map.get(key2), data2)); + } finally { + secretManager.stop(); + } + } + private static class NMTokenSecretManagerForTest extends BaseNMTokenSecretManager { public MasterKey generateKey() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index 77240c6..329d57e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -23,6 +23,7 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Collection; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -938,9 +939,10 @@ public ShortCircuitedAMRMProxy(Context context, @Override protected void initializePipeline(ApplicationAttemptId applicationAttemptId, String user, Token amrmToken, - Token localToken) { + Token localToken, + Map recoveredDataMap, boolean isRecovery) { super.initializePipeline(applicationAttemptId, user, amrmToken, - localToken); + localToken, recoveredDataMap, isRecovery); RequestInterceptor rt = getPipelines() .get(applicationAttemptId.getApplicationId()).getRootInterceptor(); // The DefaultRequestInterceptor will generally be the last