diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java index 08aee77..1a17a31 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java @@ -33,6 +33,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; @@ -47,6 +48,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager.UnmanagedAMIdentifier; import org.apache.hadoop.yarn.server.utils.AMRMClientUtils; import org.apache.hadoop.yarn.util.AsyncCallback; import org.slf4j.Logger; @@ -174,7 +177,7 @@ public String createAndRegisterNewUAM( } createAndRegisterNewUAM(appId.toString(), registerRequest, conf, appId, - queueName, submitter, appNameSuffix); + queueName, submitter, appNameSuffix, null); return appId.toString(); } @@ -196,7 +199,8 @@ public String createAndRegisterNewUAM( public RegisterApplicationMasterResponse createAndRegisterNewUAM(String uamId, RegisterApplicationMasterRequest registerRequest, Configuration conf, ApplicationId appId, String queueName, String submitter, - String appNameSuffix) throws YarnException, IOException { + String appNameSuffix, UnmanagedAMIdentifier identifier) + throws YarnException, IOException { if (this.unmanagedAppMasterMap.containsKey(uamId)) { throw new YarnException("UAM " + uamId + " already exists"); @@ -211,7 +215,8 @@ public RegisterApplicationMasterResponse createAndRegisterNewUAM(String uamId, try { LOG.info("Creating and registering UAM id {} for application {}", uamId, appId); - response = uam.createAndRegisterApplicationMaster(registerRequest); + response = + uam.createAndRegisterApplicationMaster(registerRequest, identifier); } catch (Exception e) { // Add the map earlier and remove here if register failed because we want // to make sure there is only one uam instance per uamId at any given time @@ -242,6 +247,21 @@ protected UnmanagedApplicationManager createUAM(Configuration conf, } /** + * Get the initial AMRMToken issued by RM. Used by clients with HA. + * + * @param uamId uamId identifier for the UAM + * @return the initial AMRMToken + * @throws YarnException if uam does not exist + */ + public Token getInitialAMRMToken(String uamId) + throws YarnException { + if (!this.unmanagedAppMasterMap.containsKey(uamId)) { + throw new YarnException("UAM " + uamId + " does not exist"); + } + return this.unmanagedAppMasterMap.get(uamId).getInitialAMRMToken(); + } + + /** * AllocateAsync to an UAM. * * @param uamId identifier for the UAM diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java index 60a9a27..6c22a92 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java @@ -101,6 +101,7 @@ private ApplicationClientProtocol rmClient; private long asyncApiPollIntervalMillis; private RecordFactory recordFactory; + private Token initialAmRmToken; public UnmanagedApplicationManager(Configuration conf, ApplicationId appId, String queueName, String submitter, String appNameSuffix) { @@ -135,16 +136,25 @@ public UnmanagedApplicationManager(Configuration conf, ApplicationId appId, * @throws IOException if register fails */ public RegisterApplicationMasterResponse createAndRegisterApplicationMaster( - RegisterApplicationMasterRequest request) - throws YarnException, IOException { + RegisterApplicationMasterRequest request, + UnmanagedAMIdentifier identifier) throws YarnException, IOException { // This need to be done first in this method, because it is used as an // indication that this method is called (and perhaps blocked due to RM // connection and not finished yet) this.registerRequest = request; - // attemptId will be available after this call - UnmanagedAMIdentifier identifier = - initializeUnmanagedAM(this.applicationId); + boolean isReAttach = (identifier != null); + if (isReAttach) { + if (identifier == null) { + throw new YarnRuntimeException( + "Should feed in a UnmanagedAMIdentifier when re-attaching"); + } + this.attemptId = identifier.getAttemptId(); + } else { + // this.attemptId will be available after this call + identifier = initializeUnmanagedAM(this.applicationId); + this.initialAmRmToken = identifier.getToken(); + } try { this.userUgi = UserGroupInformation.createProxyUser( @@ -158,9 +168,21 @@ public RegisterApplicationMasterResponse createAndRegisterApplicationMaster( this.rmProxy = createRMProxy(ApplicationMasterProtocol.class, this.conf, this.userUgi, identifier.getToken()); - LOG.info("Registering the Unmanaged application master {}", this.attemptId); - RegisterApplicationMasterResponse response = - this.rmProxy.registerApplicationMaster(this.registerRequest); + RegisterApplicationMasterResponse response = null; + if (isReAttach) { + /* + * Since we didn't save the last valid responseId from last run, we have + * to force RM to accept our first allocate call by making sure that + * responseId + 1 == MAXINT. We will get and sync the valid responseId + * from RM's first response. + */ + this.lastResponseId = Integer.MAX_VALUE - 1; + } else { + LOG.info("Registering the Unmanaged application master {}", + this.attemptId); + response = this.rmProxy.registerApplicationMaster(this.registerRequest); + this.lastResponseId = 0; + } // Only when register succeed that we start the heartbeat thread this.handlerThread.setUncaughtExceptionHandler( @@ -168,11 +190,19 @@ public RegisterApplicationMasterResponse createAndRegisterApplicationMaster( this.handlerThread.setDaemon(true); this.handlerThread.start(); - this.lastResponseId = 0; return response; } /** + * Get the initial AMRMToken issued by RM. + * + * @return the initial AMRMToken + */ + public Token getInitialAMRMToken() { + return this.initialAmRmToken; + } + + /** * Unregisters from the resource manager and stops the request handler thread. * * @param request the finishApplicationMaster request diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java index 68c55ac..fe5fb3c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java @@ -105,6 +105,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NMToken; @@ -369,6 +370,7 @@ public AllocateResponse allocate(AllocateRequest request) } } + List completedList = new ArrayList<>(); if (request.getReleaseList() != null && request.getReleaseList().size() > 0) { LOG.info("Releasing containers: " + request.getReleaseList().size()); @@ -394,18 +396,8 @@ public AllocateResponse allocate(AllocateRequest request) + conf.get("AMRMTOKEN"), found); ids.remove(id); - - // Return the released container back to the AM with new fake Ids. The - // test case does not care about the IDs. The IDs are faked because - // otherwise the LRM will throw duplication identifier exception. This - // returning of fake containers is ONLY done for testing purpose - for - // the test code to get confirmation that the sub-cluster resource - // managers received the release request - ContainerId fakeContainerId = ContainerId.newInstance( - getApplicationAttemptId(1), containerIndex.incrementAndGet()); - Container fakeContainer = allocatedContainerMap.get(id); - fakeContainer.setId(fakeContainerId); - containerList.add(fakeContainer); + completedList.add( + ContainerStatus.newInstance(id, ContainerState.COMPLETE, "", 0)); } } } @@ -416,9 +408,9 @@ public AllocateResponse allocate(AllocateRequest request) // Always issue a new AMRMToken as if RM rolled master key Token newAMRMToken = Token.newInstance(new byte[0], "", new byte[0], ""); - return AllocateResponse.newInstance(0, new ArrayList(), - containerList, new ArrayList(), null, AMCommand.AM_RESYNC, - 1, null, new ArrayList(), newAMRMToken, + return AllocateResponse.newInstance(0, completedList, containerList, + new ArrayList(), null, AMCommand.AM_RESYNC, 1, null, + new ArrayList(), newAMRMToken, new ArrayList()); } @@ -435,6 +427,8 @@ public GetApplicationReportResponse getApplicationReport( report.setApplicationId(request.getApplicationId()); report.setCurrentApplicationAttemptId( ApplicationAttemptId.newInstance(request.getApplicationId(), 1)); + report.setAMRMToken( + Token.newInstance(new byte[3], "kind", new byte[3], "service")); response.setApplicationReport(report); return response; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java index 9159cf7..ee1fde6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.MockResourceManagerFacade; +import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager.UnmanagedAMIdentifier; import org.apache.hadoop.yarn.util.AsyncCallback; import org.junit.Assert; import org.junit.Before; @@ -89,7 +90,47 @@ public void testBasicUsage() throws YarnException, IOException, InterruptedException { createAndRegisterApplicationMaster( - RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); + RegisterApplicationMasterRequest.newInstance(null, 0, null), null, + attemptId); + + allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback, + attemptId); + + // Wait for outstanding async allocate callback + waitForCallBackCountAndCheckZeroPending(callback, 1); + + finishApplicationMaster( + FinishApplicationMasterRequest.newInstance(null, null, null), + attemptId); + } + + /* + * Test re-attaching of an existing UAM. This is for HA of UAM client. + */ + @Test(timeout = 5000) + public void testUAMReAttach() + throws YarnException, IOException, InterruptedException { + + createAndRegisterApplicationMaster( + RegisterApplicationMasterRequest.newInstance(null, 0, null), null, + attemptId); + + allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback, + attemptId); + // Wait for outstanding async allocate callback + waitForCallBackCountAndCheckZeroPending(callback, 1); + + MockResourceManagerFacade rmProxy = uam.getRMProxy(); + uam = new TestableUnmanagedApplicationManager(conf, + attemptId.getApplicationId(), null, "submitter", "appNameSuffix"); + uam.setRMProxy(rmProxy); + + UnmanagedAMIdentifier uamIdentifier = + new UnmanagedAMIdentifier(attemptId, null); + + createAndRegisterApplicationMaster( + RegisterApplicationMasterRequest.newInstance(null, 0, null), + uamIdentifier, attemptId); allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback, attemptId); @@ -107,7 +148,8 @@ public void testReRegister() throws YarnException, IOException, InterruptedException { createAndRegisterApplicationMaster( - RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); + RegisterApplicationMasterRequest.newInstance(null, 0, null), null, + attemptId); uam.setShouldReRegisterNext(); @@ -139,7 +181,7 @@ public void run() { try { createAndRegisterApplicationMaster( RegisterApplicationMasterRequest.newInstance(null, 1001, null), - attemptId); + null, attemptId); } catch (Exception e) { LOG.info("Register thread exception", e); } @@ -222,7 +264,8 @@ public void testFinishWithoutRegister() public void testForceKill() throws YarnException, IOException, InterruptedException { createAndRegisterApplicationMaster( - RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); + RegisterApplicationMasterRequest.newInstance(null, 0, null), null, + attemptId); uam.forceKillApplication(); try { @@ -241,18 +284,17 @@ protected UserGroupInformation getUGIWithToken( return ugi; } - protected RegisterApplicationMasterResponse - createAndRegisterApplicationMaster( - final RegisterApplicationMasterRequest request, - ApplicationAttemptId appAttemptId) - throws YarnException, IOException, InterruptedException { + protected RegisterApplicationMasterResponse createAndRegisterApplicationMaster( + final RegisterApplicationMasterRequest request, + final UnmanagedAMIdentifier identifier, ApplicationAttemptId appAttemptId) + throws YarnException, IOException, InterruptedException { return getUGIWithToken(appAttemptId).doAs( new PrivilegedExceptionAction() { @Override public RegisterApplicationMasterResponse run() throws YarnException, IOException { RegisterApplicationMasterResponse response = - uam.createAndRegisterApplicationMaster(request); + uam.createAndRegisterApplicationMaster(request, identifier); return response; } }); @@ -330,6 +372,14 @@ public void setShouldReRegisterNext() { rmProxy.setShouldReRegisterNext(); } } + + public MockResourceManagerFacade getRMProxy() { + return rmProxy; + } + + public void setRMProxy(MockResourceManagerFacade rmProxy) { + this.rmProxy = rmProxy; + } } } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java index d63b2cf..24eef7c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java @@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.server.security.MasterKeyData; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -460,9 +461,8 @@ protected void stopApplication(ApplicationId applicationId) { try { pipeline.getRootInterceptor().shutdown(); } catch (Throwable ex) { - LOG.warn( - "Failed to shutdown the request processing pipeline for app:" - + applicationId, ex); + LOG.warn("Failed to shutdown the request processing pipeline for app:" + + applicationId, ex); } // Remove the app context from NMSS after the interceptors are shutdown @@ -497,10 +497,7 @@ private void updateAMRMTokens(AMRMTokenIdentifier amrmTokenIdentifier, allocateResponse.setAMRMToken(null); org.apache.hadoop.security.token.Token newToken = - new org.apache.hadoop.security.token.Token( - token.getIdentifier().array(), token.getPassword().array(), - new Text(token.getKind()), new Text(token.getService())); - + ConverterUtils.convertFromYarn(token, (Text) null); context.setAMRMToken(newToken); // Update the AMRMToken in context map in NM state store diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java index ffe47f4..b6aea2d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -34,7 +35,9 @@ import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; @@ -42,6 +45,9 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -56,6 +62,9 @@ import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy; @@ -64,9 +73,11 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager; +import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager.UnmanagedAMIdentifier; import org.apache.hadoop.yarn.server.utils.AMRMClientUtils; import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; import org.apache.hadoop.yarn.util.AsyncCallback; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,6 +95,19 @@ private static final Logger LOG = LoggerFactory.getLogger(FederationInterceptor.class); + private static final String NMSS_CLASS_PREFIX = "FederationInterceptor/"; + + private static final String NMSS_REG_REQUEST_KEY = + NMSS_CLASS_PREFIX + "registerRequest"; + private static final String NMSS_REG_RESPONSE_KEY = + NMSS_CLASS_PREFIX + "registerResponse"; + + private static final String NMSS_CONTAINERID_PREFIX = + NMSS_CLASS_PREFIX + "container/"; + + private static final String NMSS_SECONDARY_SC_PREFIX = + NMSS_CLASS_PREFIX + "secondarySC/"; + /** * The home sub-cluster is the sub-cluster where the AM container is running * in. @@ -194,6 +218,93 @@ public void init(AMRMProxyApplicationContext appContext) { this.uamPool.start(); } + @Override + public void recover(Map recoveredDataMap) { + super.recover(recoveredDataMap); + LOG.info("Recovering data for FederationInterceptor"); + if (recoveredDataMap == null) { + return; + } + + ApplicationAttemptId attemptId = + getApplicationContext().getApplicationAttemptId(); + try { + // First pass over the dataMap + int containers = 0; + for (Map.Entry entry : recoveredDataMap.entrySet()) { + if (entry.getKey().equals(NMSS_REG_REQUEST_KEY)) { + RegisterApplicationMasterRequestProto pb = + RegisterApplicationMasterRequestProto.parseFrom(entry.getValue()); + this.amRegistrationRequest = + new RegisterApplicationMasterRequestPBImpl(pb); + LOG.info("amRegistrationRequest recovered"); + + } else if (entry.getKey().equals(NMSS_REG_RESPONSE_KEY)) { + RegisterApplicationMasterResponseProto pb = + RegisterApplicationMasterResponseProto + .parseFrom(entry.getValue()); + this.amRegistrationResponse = + new RegisterApplicationMasterResponsePBImpl(pb); + LOG.info("amRegistrationResponse recovered"); + + } else if (entry.getKey().startsWith(NMSS_CONTAINERID_PREFIX)) { + // entry for containerIdToSubClusterIdMap + ContainerId cid = ContainerId.fromString( + entry.getKey().substring(NMSS_CONTAINERID_PREFIX.length())); + SubClusterId subClusterId = + SubClusterId.newInstance(new String(entry.getValue(), "UTF-8")); + containerIdToSubClusterIdMap.put(cid, subClusterId); + containers++; + LOG.debug("Recovered container " + cid + " -> " + subClusterId); + } + } + LOG.info("{} running containers recovered for {}", containers, attemptId); + + // Second pass over the dataMap, after register request and response have + // been recovered if available + int uams = 0; + for (Map.Entry entry : recoveredDataMap.entrySet()) { + if (entry.getKey().startsWith(NMSS_SECONDARY_SC_PREFIX)) { + // entry for subCluster->UAM amrmToken + String subClusterId = + entry.getKey().substring(NMSS_SECONDARY_SC_PREFIX.length()); + LOG.debug("Recovered UAM " + subClusterId); + + Token amrmToken = + new Token(); + amrmToken.decodeFromUrlString(new String(entry.getValue(), "UTF-8")); + + // Create a config loaded with federation on and subclusterId + // for each UAM + YarnConfiguration config = new YarnConfiguration(getConf()); + FederationProxyProviderUtil.updateConfForFederation(config, + subClusterId); + + this.uamPool.createAndRegisterNewUAM(subClusterId, + this.amRegistrationRequest, config, attemptId.getApplicationId(), + this.amRegistrationResponse.getQueue(), + getApplicationContext().getUser(), + this.homeSubClusterId.toString(), + new UnmanagedAMIdentifier(attemptId, amrmToken)); + + uams++; + } + } + LOG.info("{} UAMs recovered for {}", uams, attemptId); + + if (this.amRegistrationResponse != null) { + // Initialize the AMRMProxyPolicy + String queue = this.amRegistrationResponse.getQueue(); + this.policyInterpreter = + FederationPolicyUtils.loadAMRMPolicy(queue, this.policyInterpreter, + getConf(), this.federationFacade, this.homeSubClusterId); + } + } catch (Exception e) { + throw new YarnRuntimeException(e); + } + + } + /** * Sends the application master's registration request to the home RM. * @@ -223,6 +334,18 @@ public RegisterApplicationMasterResponse registerApplicationMaster( // Save the registration request. This will be used for registering with // secondary sub-clusters using UAMs, as well as re-register later this.amRegistrationRequest = request; + if (getNMStateStore() != null) { + try { + RegisterApplicationMasterRequestPBImpl pb = + (RegisterApplicationMasterRequestPBImpl) this.amRegistrationRequest; + getNMStateStore().storeAMRMProxyAppContextEntry( + getApplicationContext().getApplicationAttemptId(), + NMSS_REG_REQUEST_KEY, pb.getProto().toByteArray()); + } catch (Exception e) { + LOG.error("Error storing AMRMProxy application context entry for " + + getApplicationContext().getApplicationAttemptId(), e); + } + } /* * Present to AM as if we are the RM that never fails over. When actual RM @@ -248,6 +371,20 @@ public RegisterApplicationMasterResponse registerApplicationMaster( try { this.amRegistrationResponse = this.homeRM.registerApplicationMaster(request); + + if (getNMStateStore() != null) { + try { + RegisterApplicationMasterResponsePBImpl pb = + (RegisterApplicationMasterResponsePBImpl) this.amRegistrationResponse; + getNMStateStore().storeAMRMProxyAppContextEntry( + getApplicationContext().getApplicationAttemptId(), + NMSS_REG_RESPONSE_KEY, pb.getProto().toByteArray()); + } catch (Exception e) { + LOG.error("Error storing AMRMProxy application context entry for " + + getApplicationContext().getApplicationAttemptId(), e); + } + } + } catch (InvalidApplicationMasterRequestException e) { if (e.getMessage() .contains(AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE)) { @@ -387,6 +524,19 @@ public FinishApplicationMasterResponseInfo call() throws Exception { try { uamResponse = uamPool.finishApplicationMaster(subClusterId, finishRequest); + if (uamResponse.getIsUnregistered() + && getNMStateStore() != null) { + try { + getNMStateStore().removeAMRMProxyAppContextEntry( + getApplicationContext().getApplicationAttemptId(), + NMSS_SECONDARY_SC_PREFIX + subClusterId); + } catch (Exception e) { + LOG.error( + "Error removing AMRMProxy application context entry for " + + getApplicationContext().getApplicationAttemptId(), + e); + } + } } catch (Throwable e) { LOG.warn("Failed to finish unmanaged application master: " + "RM address: " + subClusterId + " ApplicationId: " @@ -454,7 +604,23 @@ public void setNextInterceptor(RequestInterceptor next) { @Override public void shutdown() { if (this.uamPool != null) { + // Save a local copy of the key set + Set subClusterIds = new HashSet<>(this.uamPool.getAllUAMIds()); this.uamPool.stop(); + + // Remove the subcluster entries in NMSS + if (getNMStateStore() != null) { + try { + for (String subClusterId : subClusterIds) { + getNMStateStore().removeAMRMProxyAppContextEntry( + getApplicationContext().getApplicationAttemptId(), + NMSS_SECONDARY_SC_PREFIX + subClusterId); + } + } catch (Throwable e) { + LOG.error("Error removing AMRMProxy application context entry for " + + getApplicationContext().getApplicationAttemptId(), e); + } + } } if (threadpool != null) { try { @@ -663,6 +829,24 @@ public void callback(AllocateResponse response) { responses.add(response); } + // Save the new AMRMToken for the UAM in NMSS if present + if (response.getAMRMToken() != null + && getNMStateStore() != null) { + Token newToken = ConverterUtils + .convertFromYarn(response.getAMRMToken(), (Text) null); + try { + getNMStateStore().storeAMRMProxyAppContextEntry( + getApplicationContext().getApplicationAttemptId(), + NMSS_SECONDARY_SC_PREFIX + subClusterId.getId(), + newToken.encodeToUrlString().getBytes("UTF-8")); + } catch (IOException e) { + LOG.error( + "Error storing AMRMProxy application context entry for " + + getApplicationContext().getApplicationAttemptId(), + e); + } + } + // Notify policy of secondary sub-cluster responses try { policyInterpreter.notifyOfResponse(subClusterId, response); @@ -728,7 +912,15 @@ public RegisterApplicationMasterResponseInfo call() registerRequest, config, appContext.getApplicationAttemptId().getApplicationId(), amRegistrationResponse.getQueue(), appContext.getUser(), - homeSubClusterId.toString()); + homeSubClusterId.toString(), null); + + if (getNMStateStore() != null) { + getNMStateStore().storeAMRMProxyAppContextEntry( + getApplicationContext().getApplicationAttemptId(), + NMSS_SECONDARY_SC_PREFIX + subClusterId, + uamPool.getInitialAMRMToken(subClusterId) + .encodeToUrlString().getBytes("UTF-8")); + } } catch (Throwable e) { LOG.error("Failed to register application master: " + subClusterId + " Application: " @@ -820,6 +1012,18 @@ private void removeFinishedContainersFromCache( if (containerIdToSubClusterIdMap .containsKey(container.getContainerId())) { containerIdToSubClusterIdMap.remove(container.getContainerId()); + + if (getNMStateStore() != null) { + try { + getNMStateStore().removeAMRMProxyAppContextEntry( + getApplicationContext().getApplicationAttemptId(), + NMSS_CONTAINERID_PREFIX + + container.getContainerId().toString()); + } catch (IOException e) { + LOG.error("Error removing AMRMProxy application context entry for " + + getApplicationContext().getApplicationAttemptId(), e); + } + } } } } @@ -982,6 +1186,17 @@ private void cacheAllocatedContainers(List containers, } containerIdToSubClusterIdMap.put(container.getId(), subClusterId); + if (getNMStateStore() != null) { + try { + getNMStateStore().storeAMRMProxyAppContextEntry( + getApplicationContext().getApplicationAttemptId(), + NMSS_CONTAINERID_PREFIX + container.getId().toString(), + subClusterId.getId().getBytes("UTF-8")); + } catch (IOException e) { + LOG.error("Error storing AMRMProxy application context entry for " + + getApplicationContext().getApplicationAttemptId(), e); + } + } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index a24c83b..463eeec 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; @@ -176,6 +177,20 @@ protected Context createContext() { return new NMContext(null, null, null, null, stateStore, false, this.conf); } + // A utility method for intercepter recover unit test + protected Map recoverDataMapForAppAttempt( + NMStateStoreService nmStateStore, ApplicationAttemptId attemptId) + throws IOException { + RecoveredAMRMProxyState state = nmStateStore.loadAMRMProxyState(); + for (Map.Entry> entry : state + .getAppContexts().entrySet()) { + if (entry.getKey().equals(attemptId)) { + return entry.getValue(); + } + } + return null; + } + /** * This helper method will invoke the specified function in parallel for each * end point in the specified list using a thread pool and return the diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java index 4e15323..0ba4b96 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java @@ -21,6 +21,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; @@ -31,11 +33,13 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.MockResourceManagerFacade; import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager; import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest; @@ -44,6 +48,10 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; import org.junit.Test; @@ -64,7 +72,9 @@ private TestableFederationInterceptor interceptor; private MemoryFederationStateStore stateStore; + private NMStateStoreService nmStateStore; + private Context nmContext; private int testAppId; private ApplicationAttemptId attemptId; @@ -78,9 +88,15 @@ public void setUp() throws IOException { FederationStateStoreFacade.getInstance().reinitialize(stateStore, getConf()); + nmStateStore = new NMMemoryStateStoreService(); + nmStateStore.init(getConf()); + nmStateStore.start(); + testAppId = 1; attemptId = getApplicationAttemptId(testAppId); - interceptor.init(new AMRMProxyApplicationContextImpl(null, getConf(), + nmContext = + new NMContext(null, null, null, null, nmStateStore, false, getConf()); + interceptor.init(new AMRMProxyApplicationContextImpl(nmContext, getConf(), attemptId, "test-user", null, null)); } @@ -176,6 +192,15 @@ private void deRegisterSubCluster(SubClusterId subClusterId) return containers; } + private List getCompletedContainerIds( + List containerStatus) { + List ret = new ArrayList<>(); + for (ContainerStatus status : containerStatus) { + ret.add(status.getContainerId()); + } + return ret; + } + private void releaseContainersAndAssert(List containers) throws Exception { Assert.assertTrue(containers.size() > 0); @@ -198,12 +223,13 @@ private void releaseContainersAndAssert(List containers) // returned by the mock resource managers will be aggregated and returned // back to us and we can check if total request size and returned size are // the same - List containersForReleasedContainerIds = - new ArrayList(); - containersForReleasedContainerIds - .addAll(allocateResponse.getAllocatedContainers()); + List containersForReleasedContainerIds = + new ArrayList(); + List newlyFinished = getCompletedContainerIds( + allocateResponse.getCompletedContainersStatuses()); + containersForReleasedContainerIds.addAll(newlyFinished); LOG.info("Number of containers received in the original request: " - + Integer.toString(allocateResponse.getAllocatedContainers().size())); + + Integer.toString(newlyFinished.size())); // Send max 10 heart beats to receive all the containers. If not, we will // fail the test @@ -213,11 +239,12 @@ private void releaseContainersAndAssert(List containers) allocateResponse = interceptor.allocate(Records.newRecord(AllocateRequest.class)); Assert.assertNotNull(allocateResponse); - containersForReleasedContainerIds - .addAll(allocateResponse.getAllocatedContainers()); + newlyFinished = getCompletedContainerIds( + allocateResponse.getCompletedContainersStatuses()); + containersForReleasedContainerIds.addAll(newlyFinished); LOG.info("Number of containers received in this request: " - + Integer.toString(allocateResponse.getAllocatedContainers().size())); + + Integer.toString(newlyFinished.size())); LOG.info("Total number of containers received: " + Integer.toString(containersForReleasedContainerIds.size())); Thread.sleep(10); @@ -339,6 +366,64 @@ public void testReregister() throws Exception { } @Test + public void testRecover() throws Exception { + + // Register the application + RegisterApplicationMasterRequest registerReq = + Records.newRecord(RegisterApplicationMasterRequest.class); + registerReq.setHost(Integer.toString(testAppId)); + registerReq.setRpcPort(testAppId); + registerReq.setTrackingUrl(""); + + RegisterApplicationMasterResponse registerResponse = + interceptor.registerApplicationMaster(registerReq); + Assert.assertNotNull(registerResponse); + + Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); + + // Allocate one batch of containers + registerSubCluster(SubClusterId.newInstance("SC-1")); + registerSubCluster(SubClusterId.newInstance(HOME_SC_ID)); + + int numberOfContainers = 3; + List containers = + getContainersAndAssert(numberOfContainers, numberOfContainers * 2); + Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); + + // Prepare for Federation Interceptor restart and recover + Map recoveredDataMap = + recoverDataMapForAppAttempt(nmStateStore, attemptId); + + // Preserve the mock RM instances + MockResourceManagerFacade homeRM = interceptor.getHomeRM(); + ConcurrentHashMap secondaries = + interceptor.getSecondaryRMs(); + + // Create a new intercepter instance and recover + interceptor = new TestableFederationInterceptor(homeRM, secondaries); + interceptor.init(new AMRMProxyApplicationContextImpl(nmContext, getConf(), + attemptId, "test-user", null, null)); + interceptor.recover(recoveredDataMap); + + Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); + + // Release all containers + releaseContainersAndAssert(containers); + + // Finish the application + FinishApplicationMasterRequest finishReq = + Records.newRecord(FinishApplicationMasterRequest.class); + finishReq.setDiagnostics(""); + finishReq.setTrackingUrl(""); + finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); + + FinishApplicationMasterResponse finshResponse = + interceptor.finishApplicationMaster(finishReq); + Assert.assertNotNull(finshResponse); + Assert.assertEquals(true, finshResponse.getIsUnregistered()); + } + + @Test public void testRequestInterceptorChainCreation() throws Exception { RequestInterceptor root = super.getAMRMProxyService().createRequestInterceptorChain(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java index d4b8735..0eaaca1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java @@ -44,6 +44,16 @@ private AtomicInteger runningIndex = new AtomicInteger(0); private MockResourceManagerFacade mockRm; + public TestableFederationInterceptor() { + } + + public TestableFederationInterceptor(MockResourceManagerFacade homeRM, + ConcurrentHashMap secondaries) { + this(); + mockRm = homeRM; + secondaryResourceManagers = secondaries; + } + @Override protected UnmanagedAMPoolManager createUnmanagedAMPoolManager( ExecutorService threadPool) { @@ -68,7 +78,7 @@ protected ApplicationMasterProtocol createHomeRMProxy( // We create one instance of the mock resource manager per sub cluster. Keep // track of the instances of the RMs in the map keyed by the sub cluster id synchronized (this.secondaryResourceManagers) { - if (this.secondaryResourceManagers.contains(subClusterId)) { + if (this.secondaryResourceManagers.containsKey(subClusterId)) { return (T) this.secondaryResourceManagers.get(subClusterId); } else { // The running index here is used to simulate different RM_EPOCH to @@ -91,6 +101,14 @@ protected void setShouldReRegisterNext() { } } + protected MockResourceManagerFacade getHomeRM() { + return mockRm; + } + + protected ConcurrentHashMap getSecondaryRMs() { + return secondaryResourceManagers; + } + /** * Extends the UnmanagedAMPoolManager and overrides methods to provide a * testable implementation of UnmanagedAMPoolManager.