diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java index 08aee77..b29ccf6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager.UnmanagedAMIdentifier; import org.apache.hadoop.yarn.server.utils.AMRMClientUtils; import org.apache.hadoop.yarn.util.AsyncCallback; import org.slf4j.Logger; @@ -173,30 +174,37 @@ public String createAndRegisterNewUAM( rmClient = null; } - createAndRegisterNewUAM(appId.toString(), registerRequest, conf, appId, - queueName, submitter, appNameSuffix); + // Launch the UAM in RM + launchUAM(appId.toString(), conf, appId, queueName, submitter, + appNameSuffix, null); + + // Register the UAM application + registerApplicationMaster(appId.toString(), registerRequest, false); + + // Returns the appId as uamId return appId.toString(); } /** - * Create a new UAM and register the application, using the provided uamId and - * appId. + * Launch a new UAM or re-attach to an existing UAM, using the provided uamId + * and appId. * - * @param uamId identifier for the UAM - * @param registerRequest RegisterApplicationMasterRequest + * @param uamId uam Id * @param conf configuration for this UAM * @param appId application id for the UAM * @param queueName queue of the application * @param submitter submitter name of the UAM * @param appNameSuffix application name suffix for the UAM - * @return RegisterApplicationMasterResponse - * @throws YarnException if registerApplicationMaster fails - * @throws IOException if registerApplicationMaster fails + * @param uamIdentifier identifying info about the uam. Non null for attaching + * to an existing UAM. Null for launching a new UAM. + * @return UnmanagedAMIdentifier uam identifier + * @throws YarnException if fails + * @throws IOException if fails */ - public RegisterApplicationMasterResponse createAndRegisterNewUAM(String uamId, - RegisterApplicationMasterRequest registerRequest, Configuration conf, + public UnmanagedAMIdentifier launchUAM(String uamId, Configuration conf, ApplicationId appId, String queueName, String submitter, - String appNameSuffix) throws YarnException, IOException { + String appNameSuffix, UnmanagedAMIdentifier uamIdentifier) + throws YarnException, IOException { if (this.unmanagedAppMasterMap.containsKey(uamId)) { throw new YarnException("UAM " + uamId + " already exists"); @@ -207,11 +215,9 @@ public RegisterApplicationMasterResponse createAndRegisterNewUAM(String uamId, // for the same uamId being created concurrently this.unmanagedAppMasterMap.put(uamId, uam); - RegisterApplicationMasterResponse response = null; try { - LOG.info("Creating and registering UAM id {} for application {}", uamId, - appId); - response = uam.createAndRegisterApplicationMaster(registerRequest); + LOG.info("Luanching UAM id {} for application {}", uamId, appId); + uamIdentifier = uam.launchUAM(uamIdentifier); } catch (Exception e) { // Add the map earlier and remove here if register failed because we want // to make sure there is only one uam instance per uamId at any given time @@ -220,7 +226,7 @@ public RegisterApplicationMasterResponse createAndRegisterNewUAM(String uamId, } this.attemptIdMap.put(uamId, uam.getAttemptId()); - return response; + return uamIdentifier; } /** @@ -242,9 +248,31 @@ protected UnmanagedApplicationManager createUAM(Configuration conf, } /** + * Register application master for the UAM. + * + * @param uamId uam Id + * @param registerRequest RegisterApplicationMasterRequest + * @param isReAttach whether this is a re-attach of an existing UAM + * @return register response + * @throws YarnException if register fails + * @throws IOException if register fails + */ + public RegisterApplicationMasterResponse registerApplicationMaster( + String uamId, RegisterApplicationMasterRequest registerRequest, + boolean isReAttach) throws YarnException, IOException { + if (!this.unmanagedAppMasterMap.containsKey(uamId)) { + throw new YarnException("UAM " + uamId + " does not exist"); + } + LOG.info("Registering UAM id {} for application {}", uamId, + this.attemptIdMap.get(uamId)); + return this.unmanagedAppMasterMap.get(uamId) + .registerApplicationMaster(registerRequest, isReAttach); + } + + /** * AllocateAsync to an UAM. * - * @param uamId identifier for the UAM + * @param uamId uam Id * @param request AllocateRequest * @param callback callback for response * @throws YarnException if allocate fails @@ -262,7 +290,7 @@ public void allocateAsync(String uamId, AllocateRequest request, /** * Finish an UAM/application. * - * @param uamId identifier for the UAM + * @param uamId uam Id * @param request FinishApplicationMasterRequest * @return FinishApplicationMasterResponse * @throws YarnException if finishApplicationMaster call fails @@ -274,7 +302,8 @@ public FinishApplicationMasterResponse finishApplicationMaster(String uamId, if (!this.unmanagedAppMasterMap.containsKey(uamId)) { throw new YarnException("UAM " + uamId + " does not exist"); } - LOG.info("Finishing application for UAM id {} ", uamId); + LOG.info("Finishing UAM id {} for application {}", uamId, + this.attemptIdMap.get(uamId)); FinishApplicationMasterResponse response = this.unmanagedAppMasterMap.get(uamId).finishApplicationMaster(request); @@ -301,7 +330,7 @@ public FinishApplicationMasterResponse finishApplicationMaster(String uamId, /** * Return whether an UAM exists. * - * @param uamId identifier for the UAM + * @param uamId uam Id * @return UAM exists or not */ public boolean hasUAMId(String uamId) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java index 60a9a27..993ef21 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java @@ -116,6 +116,7 @@ public UnmanagedApplicationManager(Configuration conf, ApplicationId appId, this.handlerThread = new AMRequestHandlerThread(); this.requestQueue = new LinkedBlockingQueue<>(); this.rmProxy = null; + this.attemptId = null; this.registerRequest = null; this.recordFactory = RecordFactoryProvider.getRecordFactory(conf); this.asyncApiPollIntervalMillis = conf.getLong( @@ -126,25 +127,31 @@ public UnmanagedApplicationManager(Configuration conf, ApplicationId appId, } /** - * Registers this {@link UnmanagedApplicationManager} with the resource - * manager. + * Launch a new UAM or re-attach to an existing UAM in the resource manager. * - * @param request the register request - * @return the register response - * @throws YarnException if register fails - * @throws IOException if register fails + * @param uamIdentifier identifying info about the uam. Non null for attaching + * to an existing UAM. Null for launching a new UAM. + * @return identifier uam identifier + * @throws YarnException if fails + * @throws IOException if fails */ - public RegisterApplicationMasterResponse createAndRegisterApplicationMaster( - RegisterApplicationMasterRequest request) + public UnmanagedAMIdentifier launchUAM(UnmanagedAMIdentifier identifier) throws YarnException, IOException { - // This need to be done first in this method, because it is used as an - // indication that this method is called (and perhaps blocked due to RM - // connection and not finished yet) - this.registerRequest = request; - - // attemptId will be available after this call - UnmanagedAMIdentifier identifier = - initializeUnmanagedAM(this.applicationId); + if (identifier == null) { + /* + * We use attemptId != null as an indication that this method launchUAM is + * called (and perhaps blocked in initializeUnmanagedAM below due to RM + * connection/failover issue and not finished yet). Set the identifier to + * non-null first before calling the blocking call to RM. + */ + this.attemptId = ApplicationAttemptId.newInstance(null, 0); + + // The real this.attemptId will be set after this blocking call to RM + identifier = initializeUnmanagedAM(this.applicationId); + } else { + // Reattaching to an existing UAM + this.attemptId = identifier.getAttemptId(); + } try { this.userUgi = UserGroupInformation.createProxyUser( @@ -158,9 +165,44 @@ public RegisterApplicationMasterResponse createAndRegisterApplicationMaster( this.rmProxy = createRMProxy(ApplicationMasterProtocol.class, this.conf, this.userUgi, identifier.getToken()); - LOG.info("Registering the Unmanaged application master {}", this.attemptId); - RegisterApplicationMasterResponse response = - this.rmProxy.registerApplicationMaster(this.registerRequest); + return identifier; + } + + /** + * Registers this {@link UnmanagedApplicationManager} with the resource + * manager. + * + * @param request RegisterApplicationMasterRequest + * @param isReAttach whether this is a re-attach of an existing UAM + * @return register response + * @throws YarnException if register fails + * @throws IOException if register fails + */ + public RegisterApplicationMasterResponse registerApplicationMaster( + RegisterApplicationMasterRequest request, boolean isReAttach) + throws YarnException, IOException { + if (this.rmProxy == null) { + throw new YarnException( + "finishApplicationMaster should not be called before launchUAM"); + } + + // Save the register request for re-register later + this.registerRequest = request; + RegisterApplicationMasterResponse response = null; + if (isReAttach) { + /* + * Since for performance reasons, we didn't save the last valid responseId + * from last run, we have to force RM to accept our first allocate call by + * setting the responseId to certain value. We will get the proper + * responseId to use afterwards from RM's first allocate response. + */ + this.lastResponseId = Integer.MAX_VALUE - 1; + } else { + LOG.info("Registering the Unmanaged application master {}", + this.attemptId); + response = this.rmProxy.registerApplicationMaster(this.registerRequest); + this.lastResponseId = 0; + } // Only when register succeed that we start the heartbeat thread this.handlerThread.setUncaughtExceptionHandler( @@ -168,7 +210,6 @@ public RegisterApplicationMasterResponse createAndRegisterApplicationMaster( this.handlerThread.setDaemon(true); this.handlerThread.start(); - this.lastResponseId = 0; return response; } @@ -187,11 +228,11 @@ public FinishApplicationMasterResponse finishApplicationMaster( this.handlerThread.shutdown(); if (this.rmProxy == null) { - if (this.registerRequest != null) { - // This is possible if the async registerApplicationMaster is still + if (this.attemptId != null) { + // This is possible if the async launchUAM is still // blocked and retrying. Return a dummy response in this case. LOG.warn("Unmanaged AM still not successfully launched/registered yet." - + " Stopping the UAM client thread anyways."); + + " Stopping the UAM heartbeat thread anyways."); return FinishApplicationMasterResponse.newInstance(false); } else { throw new YarnException("finishApplicationMaster should not " @@ -240,18 +281,18 @@ public void allocateAsync(AllocateRequest request, LOG.debug("Interrupted while waiting to put on response queue", ex); } // Two possible cases why the UAM is not successfully registered yet: - // 1. registerApplicationMaster is not called at all. Should throw here. - // 2. registerApplicationMaster is called but hasn't successfully returned. + // 1. launchUAM is not called at all. Should throw here. + // 2. launchUAM is called but hasn't successfully returned. // // In case 2, we have already save the allocate request above, so if the // registration succeed later, no request is lost. if (this.rmProxy == null) { - if (this.registerRequest != null) { + if (this.attemptId != null) { LOG.info("Unmanaged AM still not successfully launched/registered yet." + " Saving the allocate request and send later."); } else { throw new YarnException( - "AllocateAsync should not be called before createAndRegister"); + "AllocateAsync should not be called before launchUAM"); } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java index 68c55ac..fe5fb3c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java @@ -105,6 +105,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NMToken; @@ -369,6 +370,7 @@ public AllocateResponse allocate(AllocateRequest request) } } + List completedList = new ArrayList<>(); if (request.getReleaseList() != null && request.getReleaseList().size() > 0) { LOG.info("Releasing containers: " + request.getReleaseList().size()); @@ -394,18 +396,8 @@ public AllocateResponse allocate(AllocateRequest request) + conf.get("AMRMTOKEN"), found); ids.remove(id); - - // Return the released container back to the AM with new fake Ids. The - // test case does not care about the IDs. The IDs are faked because - // otherwise the LRM will throw duplication identifier exception. This - // returning of fake containers is ONLY done for testing purpose - for - // the test code to get confirmation that the sub-cluster resource - // managers received the release request - ContainerId fakeContainerId = ContainerId.newInstance( - getApplicationAttemptId(1), containerIndex.incrementAndGet()); - Container fakeContainer = allocatedContainerMap.get(id); - fakeContainer.setId(fakeContainerId); - containerList.add(fakeContainer); + completedList.add( + ContainerStatus.newInstance(id, ContainerState.COMPLETE, "", 0)); } } } @@ -416,9 +408,9 @@ public AllocateResponse allocate(AllocateRequest request) // Always issue a new AMRMToken as if RM rolled master key Token newAMRMToken = Token.newInstance(new byte[0], "", new byte[0], ""); - return AllocateResponse.newInstance(0, new ArrayList(), - containerList, new ArrayList(), null, AMCommand.AM_RESYNC, - 1, null, new ArrayList(), newAMRMToken, + return AllocateResponse.newInstance(0, completedList, containerList, + new ArrayList(), null, AMCommand.AM_RESYNC, 1, null, + new ArrayList(), newAMRMToken, new ArrayList()); } @@ -435,6 +427,8 @@ public GetApplicationReportResponse getApplicationReport( report.setApplicationId(request.getApplicationId()); report.setCurrentApplicationAttemptId( ApplicationAttemptId.newInstance(request.getApplicationId(), 1)); + report.setAMRMToken( + Token.newInstance(new byte[3], "kind", new byte[3], "service")); response.setApplicationReport(report); return response; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java index 9159cf7..db6350e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.MockResourceManagerFacade; +import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager.UnmanagedAMIdentifier; import org.apache.hadoop.yarn.util.AsyncCallback; import org.junit.Assert; import org.junit.Before; @@ -89,7 +90,47 @@ public void testBasicUsage() throws YarnException, IOException, InterruptedException { createAndRegisterApplicationMaster( - RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); + RegisterApplicationMasterRequest.newInstance(null, 0, null), null, + attemptId); + + allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback, + attemptId); + + // Wait for outstanding async allocate callback + waitForCallBackCountAndCheckZeroPending(callback, 1); + + finishApplicationMaster( + FinishApplicationMasterRequest.newInstance(null, null, null), + attemptId); + } + + /* + * Test re-attaching of an existing UAM. This is for HA of UAM client. + */ + @Test(timeout = 5000) + public void testUAMReAttach() + throws YarnException, IOException, InterruptedException { + + createAndRegisterApplicationMaster( + RegisterApplicationMasterRequest.newInstance(null, 0, null), null, + attemptId); + + allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback, + attemptId); + // Wait for outstanding async allocate callback + waitForCallBackCountAndCheckZeroPending(callback, 1); + + MockResourceManagerFacade rmProxy = uam.getRMProxy(); + uam = new TestableUnmanagedApplicationManager(conf, + attemptId.getApplicationId(), null, "submitter", "appNameSuffix"); + uam.setRMProxy(rmProxy); + + UnmanagedAMIdentifier uamIdentifier = + new UnmanagedAMIdentifier(attemptId, null); + + createAndRegisterApplicationMaster( + RegisterApplicationMasterRequest.newInstance(null, 0, null), + uamIdentifier, attemptId); allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback, attemptId); @@ -107,7 +148,8 @@ public void testReRegister() throws YarnException, IOException, InterruptedException { createAndRegisterApplicationMaster( - RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); + RegisterApplicationMasterRequest.newInstance(null, 0, null), null, + attemptId); uam.setShouldReRegisterNext(); @@ -139,7 +181,7 @@ public void run() { try { createAndRegisterApplicationMaster( RegisterApplicationMasterRequest.newInstance(null, 1001, null), - attemptId); + null, attemptId); } catch (Exception e) { LOG.info("Register thread exception", e); } @@ -222,7 +264,8 @@ public void testFinishWithoutRegister() public void testForceKill() throws YarnException, IOException, InterruptedException { createAndRegisterApplicationMaster( - RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); + RegisterApplicationMasterRequest.newInstance(null, 0, null), null, + attemptId); uam.forceKillApplication(); try { @@ -244,6 +287,7 @@ protected UserGroupInformation getUGIWithToken( protected RegisterApplicationMasterResponse createAndRegisterApplicationMaster( final RegisterApplicationMasterRequest request, + final UnmanagedAMIdentifier identifier, ApplicationAttemptId appAttemptId) throws YarnException, IOException, InterruptedException { return getUGIWithToken(appAttemptId).doAs( @@ -251,9 +295,9 @@ protected UserGroupInformation getUGIWithToken( @Override public RegisterApplicationMasterResponse run() throws YarnException, IOException { - RegisterApplicationMasterResponse response = - uam.createAndRegisterApplicationMaster(request); - return response; + uam.launchUAM(identifier); + return uam.registerApplicationMaster(request, + !(identifier == null)); } }); } @@ -330,6 +374,14 @@ public void setShouldReRegisterNext() { rmProxy.setShouldReRegisterNext(); } } + + public MockResourceManagerFacade getRMProxy() { + return rmProxy; + } + + public void setRMProxy(MockResourceManagerFacade proxy) { + this.rmProxy = proxy; + } } } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java index d63b2cf..24eef7c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java @@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.server.security.MasterKeyData; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -460,9 +461,8 @@ protected void stopApplication(ApplicationId applicationId) { try { pipeline.getRootInterceptor().shutdown(); } catch (Throwable ex) { - LOG.warn( - "Failed to shutdown the request processing pipeline for app:" - + applicationId, ex); + LOG.warn("Failed to shutdown the request processing pipeline for app:" + + applicationId, ex); } // Remove the app context from NMSS after the interceptors are shutdown @@ -497,10 +497,7 @@ private void updateAMRMTokens(AMRMTokenIdentifier amrmTokenIdentifier, allocateResponse.setAMRMToken(null); org.apache.hadoop.security.token.Token newToken = - new org.apache.hadoop.security.token.Token( - token.getIdentifier().array(), token.getPassword().array(), - new Text(token.getKind()), new Text(token.getService())); - + ConverterUtils.convertFromYarn(token, (Text) null); context.setAMRMToken(newToken); // Update the AMRMToken in context map in NM state store diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java index ffe47f4..a49994a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -34,7 +35,9 @@ import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; @@ -42,6 +45,9 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -56,6 +62,9 @@ import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy; @@ -64,9 +73,11 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager; +import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager.UnmanagedAMIdentifier; import org.apache.hadoop.yarn.server.utils.AMRMClientUtils; import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; import org.apache.hadoop.yarn.util.AsyncCallback; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,6 +95,19 @@ private static final Logger LOG = LoggerFactory.getLogger(FederationInterceptor.class); + private static final String NMSS_CLASS_PREFIX = "FederationInterceptor/"; + + private static final String NMSS_REG_REQUEST_KEY = + NMSS_CLASS_PREFIX + "registerRequest"; + private static final String NMSS_REG_RESPONSE_KEY = + NMSS_CLASS_PREFIX + "registerResponse"; + + private static final String NMSS_CONTAINERID_PREFIX = + NMSS_CLASS_PREFIX + "container/"; + + private static final String NMSS_SECONDARY_SC_PREFIX = + NMSS_CLASS_PREFIX + "secondarySC/"; + /** * The home sub-cluster is the sub-cluster where the AM container is running * in. @@ -194,6 +218,95 @@ public void init(AMRMProxyApplicationContext appContext) { this.uamPool.start(); } + @Override + public void recover(Map recoveredDataMap) { + super.recover(recoveredDataMap); + LOG.info("Recovering data for FederationInterceptor"); + if (recoveredDataMap == null) { + return; + } + + ApplicationAttemptId attemptId = + getApplicationContext().getApplicationAttemptId(); + try { + // First pass over the dataMap + int containers = 0; + for (Map.Entry entry : recoveredDataMap.entrySet()) { + if (entry.getKey().equals(NMSS_REG_REQUEST_KEY)) { + RegisterApplicationMasterRequestProto pb = + RegisterApplicationMasterRequestProto.parseFrom(entry.getValue()); + this.amRegistrationRequest = + new RegisterApplicationMasterRequestPBImpl(pb); + LOG.info("amRegistrationRequest recovered"); + + } else if (entry.getKey().equals(NMSS_REG_RESPONSE_KEY)) { + RegisterApplicationMasterResponseProto pb = + RegisterApplicationMasterResponseProto + .parseFrom(entry.getValue()); + this.amRegistrationResponse = + new RegisterApplicationMasterResponsePBImpl(pb); + LOG.info("amRegistrationResponse recovered"); + + } else if (entry.getKey().startsWith(NMSS_CONTAINERID_PREFIX)) { + // entry for containerIdToSubClusterIdMap + ContainerId cid = ContainerId.fromString( + entry.getKey().substring(NMSS_CONTAINERID_PREFIX.length())); + SubClusterId subClusterId = + SubClusterId.newInstance(new String(entry.getValue(), "UTF-8")); + containerIdToSubClusterIdMap.put(cid, subClusterId); + containers++; + LOG.debug("Recovered container " + cid + " -> " + subClusterId); + } + } + LOG.info("{} running containers recovered for {}", containers, attemptId); + + // Second pass over the dataMap, after register request and response have + // been recovered if available + int uams = 0; + for (Map.Entry entry : recoveredDataMap.entrySet()) { + if (entry.getKey().startsWith(NMSS_SECONDARY_SC_PREFIX)) { + // entry for subCluster->UAM amrmToken + String subClusterId = + entry.getKey().substring(NMSS_SECONDARY_SC_PREFIX.length()); + LOG.debug("Recovered UAM " + subClusterId); + + Token amrmToken = + new Token(); + amrmToken.decodeFromUrlString(new String(entry.getValue(), "UTF-8")); + + // Create a config loaded with federation on and subclusterId + // for each UAM + YarnConfiguration config = new YarnConfiguration(getConf()); + FederationProxyProviderUtil.updateConfForFederation(config, + subClusterId); + + this.uamPool.launchUAM(subClusterId, config, + attemptId.getApplicationId(), + this.amRegistrationResponse.getQueue(), + getApplicationContext().getUser(), + this.homeSubClusterId.toString(), + new UnmanagedAMIdentifier(attemptId, amrmToken)); + + this.uamPool.registerApplicationMaster(subClusterId, + this.amRegistrationRequest, true); + uams++; + } + } + LOG.info("{} UAMs recovered for {}", uams, attemptId); + + if (this.amRegistrationResponse != null) { + // Initialize the AMRMProxyPolicy + String queue = this.amRegistrationResponse.getQueue(); + this.policyInterpreter = + FederationPolicyUtils.loadAMRMPolicy(queue, this.policyInterpreter, + getConf(), this.federationFacade, this.homeSubClusterId); + } + } catch (IOException | YarnException e) { + throw new YarnRuntimeException(e); + } + + } + /** * Sends the application master's registration request to the home RM. * @@ -223,6 +336,18 @@ public RegisterApplicationMasterResponse registerApplicationMaster( // Save the registration request. This will be used for registering with // secondary sub-clusters using UAMs, as well as re-register later this.amRegistrationRequest = request; + if (getNMStateStore() != null) { + try { + RegisterApplicationMasterRequestPBImpl pb = + (RegisterApplicationMasterRequestPBImpl) this.amRegistrationRequest; + getNMStateStore().storeAMRMProxyAppContextEntry( + getApplicationContext().getApplicationAttemptId(), + NMSS_REG_REQUEST_KEY, pb.getProto().toByteArray()); + } catch (Exception e) { + LOG.error("Error storing AMRMProxy application context entry for " + + getApplicationContext().getApplicationAttemptId(), e); + } + } /* * Present to AM as if we are the RM that never fails over. When actual RM @@ -248,6 +373,21 @@ public RegisterApplicationMasterResponse registerApplicationMaster( try { this.amRegistrationResponse = this.homeRM.registerApplicationMaster(request); + + if (getNMStateStore() != null) { + try { + RegisterApplicationMasterResponsePBImpl pb = + (RegisterApplicationMasterResponsePBImpl) + this.amRegistrationResponse; + getNMStateStore().storeAMRMProxyAppContextEntry( + getApplicationContext().getApplicationAttemptId(), + NMSS_REG_RESPONSE_KEY, pb.getProto().toByteArray()); + } catch (Exception e) { + LOG.error("Error storing AMRMProxy application context entry for " + + getApplicationContext().getApplicationAttemptId(), e); + } + } + } catch (InvalidApplicationMasterRequestException e) { if (e.getMessage() .contains(AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE)) { @@ -387,6 +527,19 @@ public FinishApplicationMasterResponseInfo call() throws Exception { try { uamResponse = uamPool.finishApplicationMaster(subClusterId, finishRequest); + if (uamResponse.getIsUnregistered() + && getNMStateStore() != null) { + try { + getNMStateStore().removeAMRMProxyAppContextEntry( + getApplicationContext().getApplicationAttemptId(), + NMSS_SECONDARY_SC_PREFIX + subClusterId); + } catch (Exception e) { + LOG.error( + "Error removing AMRMProxy application context entry for " + + getApplicationContext().getApplicationAttemptId(), + e); + } + } } catch (Throwable e) { LOG.warn("Failed to finish unmanaged application master: " + "RM address: " + subClusterId + " ApplicationId: " @@ -454,7 +607,23 @@ public void setNextInterceptor(RequestInterceptor next) { @Override public void shutdown() { if (this.uamPool != null) { + // Save a local copy of the key set + Set subClusterIds = new HashSet<>(this.uamPool.getAllUAMIds()); this.uamPool.stop(); + + // Remove the subcluster entries in NMSS + if (getNMStateStore() != null) { + try { + for (String subClusterId : subClusterIds) { + getNMStateStore().removeAMRMProxyAppContextEntry( + getApplicationContext().getApplicationAttemptId(), + NMSS_SECONDARY_SC_PREFIX + subClusterId); + } + } catch (Throwable e) { + LOG.error("Error removing AMRMProxy application context entry for " + + getApplicationContext().getApplicationAttemptId(), e); + } + } } if (threadpool != null) { try { @@ -663,6 +832,24 @@ public void callback(AllocateResponse response) { responses.add(response); } + // Save the new AMRMToken for the UAM in NMSS if present + if (response.getAMRMToken() != null + && getNMStateStore() != null) { + Token newToken = ConverterUtils + .convertFromYarn(response.getAMRMToken(), (Text) null); + try { + getNMStateStore().storeAMRMProxyAppContextEntry( + getApplicationContext().getApplicationAttemptId(), + NMSS_SECONDARY_SC_PREFIX + subClusterId.getId(), + newToken.encodeToUrlString().getBytes("UTF-8")); + } catch (IOException e) { + LOG.error( + "Error storing AMRMProxy application context entry for " + + getApplicationContext().getApplicationAttemptId(), + e); + } + } + // Notify policy of secondary sub-cluster responses try { policyInterpreter.notifyOfResponse(subClusterId, response); @@ -724,11 +911,21 @@ public RegisterApplicationMasterResponseInfo call() RegisterApplicationMasterResponse uamResponse = null; try { // For appNameSuffix, use subClusterId of the home sub-cluster - uamResponse = uamPool.createAndRegisterNewUAM(subClusterId, - registerRequest, config, + UnmanagedAMIdentifier uamIdentifier = uamPool.launchUAM( + subClusterId, config, appContext.getApplicationAttemptId().getApplicationId(), amRegistrationResponse.getQueue(), appContext.getUser(), - homeSubClusterId.toString()); + homeSubClusterId.toString(), null); + + uamPool.registerApplicationMaster(subClusterId, + registerRequest, false); + + if (getNMStateStore() != null) { + getNMStateStore().storeAMRMProxyAppContextEntry( + getApplicationContext().getApplicationAttemptId(), + NMSS_SECONDARY_SC_PREFIX + subClusterId, uamIdentifier + .getToken().encodeToUrlString().getBytes("UTF-8")); + } } catch (Throwable e) { LOG.error("Failed to register application master: " + subClusterId + " Application: " @@ -820,6 +1017,18 @@ private void removeFinishedContainersFromCache( if (containerIdToSubClusterIdMap .containsKey(container.getContainerId())) { containerIdToSubClusterIdMap.remove(container.getContainerId()); + + if (getNMStateStore() != null) { + try { + getNMStateStore().removeAMRMProxyAppContextEntry( + getApplicationContext().getApplicationAttemptId(), + NMSS_CONTAINERID_PREFIX + + container.getContainerId().toString()); + } catch (IOException e) { + LOG.error("Error removing AMRMProxy application context entry for " + + getApplicationContext().getApplicationAttemptId(), e); + } + } } } } @@ -982,6 +1191,17 @@ private void cacheAllocatedContainers(List containers, } containerIdToSubClusterIdMap.put(container.getId(), subClusterId); + if (getNMStateStore() != null) { + try { + getNMStateStore().storeAMRMProxyAppContextEntry( + getApplicationContext().getApplicationAttemptId(), + NMSS_CONTAINERID_PREFIX + container.getId().toString(), + subClusterId.getId().getBytes("UTF-8")); + } catch (IOException e) { + LOG.error("Error storing AMRMProxy application context entry for " + + getApplicationContext().getApplicationAttemptId(), e); + } + } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index a24c83b..463eeec 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; @@ -176,6 +177,20 @@ protected Context createContext() { return new NMContext(null, null, null, null, stateStore, false, this.conf); } + // A utility method for intercepter recover unit test + protected Map recoverDataMapForAppAttempt( + NMStateStoreService nmStateStore, ApplicationAttemptId attemptId) + throws IOException { + RecoveredAMRMProxyState state = nmStateStore.loadAMRMProxyState(); + for (Map.Entry> entry : state + .getAppContexts().entrySet()) { + if (entry.getKey().equals(attemptId)) { + return entry.getValue(); + } + } + return null; + } + /** * This helper method will invoke the specified function in parallel for each * end point in the specified list using a thread pool and return the diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java index 72e5f53..2cdb6b5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -510,6 +511,15 @@ public void testMultipleAttemptsSameNode() return containers; } + private List getCompletedContainerIds( + List containerStatus) { + List ret = new ArrayList<>(); + for (ContainerStatus status : containerStatus) { + ret.add(status.getContainerId()); + } + return ret; + } + private void releaseContainersAndAssert(int appId, List containers) throws Exception { Assert.assertTrue(containers.size() > 0); @@ -531,16 +541,14 @@ private void releaseContainersAndAssert(int appId, "new AMRMToken from RM should have been nulled by AMRMProxyService", allocateResponse.getAMRMToken()); - // The way the mock resource manager is setup, it will return the containers - // that were released in the response. This is done because the UAMs run - // asynchronously and we need to if all the resource managers received the - // release it. The containers sent by the mock resource managers will be + // We need to make sure all the resource managers received the + // release list. The containers sent by the mock resource managers will be // aggregated and returned back to us and we can assert if all the release // lists reached the sub-clusters - List containersForReleasedContainerIds = - new ArrayList(); - containersForReleasedContainerIds.addAll(allocateResponse - .getAllocatedContainers()); + List containersForReleasedContainerIds = new ArrayList<>(); + List newlyFinished = getCompletedContainerIds( + allocateResponse.getCompletedContainersStatuses()); + containersForReleasedContainerIds.addAll(newlyFinished); // Send max 10 heart beats to receive all the containers. If not, we will // fail the test @@ -554,8 +562,9 @@ private void releaseContainersAndAssert(int appId, "new AMRMToken from RM should have been nulled by AMRMProxyService", allocateResponse.getAMRMToken()); - containersForReleasedContainerIds.addAll(allocateResponse - .getAllocatedContainers()); + newlyFinished = getCompletedContainerIds( + allocateResponse.getCompletedContainersStatuses()); + containersForReleasedContainerIds.addAll(newlyFinished); LOG.info("Number of containers received in this request: " + Integer.toString(allocateResponse.getAllocatedContainers() diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java index 4e15323..8dd8835 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java @@ -21,6 +21,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; @@ -31,11 +33,13 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.MockResourceManagerFacade; import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager; import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest; @@ -44,6 +48,10 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; import org.junit.Test; @@ -64,7 +72,9 @@ private TestableFederationInterceptor interceptor; private MemoryFederationStateStore stateStore; + private NMStateStoreService nmStateStore; + private Context nmContext; private int testAppId; private ApplicationAttemptId attemptId; @@ -78,9 +88,15 @@ public void setUp() throws IOException { FederationStateStoreFacade.getInstance().reinitialize(stateStore, getConf()); + nmStateStore = new NMMemoryStateStoreService(); + nmStateStore.init(getConf()); + nmStateStore.start(); + testAppId = 1; attemptId = getApplicationAttemptId(testAppId); - interceptor.init(new AMRMProxyApplicationContextImpl(null, getConf(), + nmContext = + new NMContext(null, null, null, null, nmStateStore, false, getConf()); + interceptor.init(new AMRMProxyApplicationContextImpl(nmContext, getConf(), attemptId, "test-user", null, null)); } @@ -176,6 +192,15 @@ private void deRegisterSubCluster(SubClusterId subClusterId) return containers; } + private List getCompletedContainerIds( + List containerStatus) { + List ret = new ArrayList<>(); + for (ContainerStatus status : containerStatus) { + ret.add(status.getContainerId()); + } + return ret; + } + private void releaseContainersAndAssert(List containers) throws Exception { Assert.assertTrue(containers.size() > 0); @@ -192,18 +217,17 @@ private void releaseContainersAndAssert(List containers) AllocateResponse allocateResponse = interceptor.allocate(allocateRequest); Assert.assertNotNull(allocateResponse); - // The way the mock resource manager is setup, it will return the containers - // that were released in the allocated containers. The release request will - // be split and handled by the corresponding UAM. The release containers - // returned by the mock resource managers will be aggregated and returned - // back to us and we can check if total request size and returned size are - // the same - List containersForReleasedContainerIds = - new ArrayList(); - containersForReleasedContainerIds - .addAll(allocateResponse.getAllocatedContainers()); + // The release request will be split and handled by the corresponding UAM. + // The release containers returned by the mock resource managers will be + // aggregated and returned back to us and we can check if total request size + // and returned size are the same + List containersForReleasedContainerIds = + new ArrayList(); + List newlyFinished = getCompletedContainerIds( + allocateResponse.getCompletedContainersStatuses()); + containersForReleasedContainerIds.addAll(newlyFinished); LOG.info("Number of containers received in the original request: " - + Integer.toString(allocateResponse.getAllocatedContainers().size())); + + Integer.toString(newlyFinished.size())); // Send max 10 heart beats to receive all the containers. If not, we will // fail the test @@ -213,11 +237,12 @@ private void releaseContainersAndAssert(List containers) allocateResponse = interceptor.allocate(Records.newRecord(AllocateRequest.class)); Assert.assertNotNull(allocateResponse); - containersForReleasedContainerIds - .addAll(allocateResponse.getAllocatedContainers()); + newlyFinished = getCompletedContainerIds( + allocateResponse.getCompletedContainersStatuses()); + containersForReleasedContainerIds.addAll(newlyFinished); LOG.info("Number of containers received in this request: " - + Integer.toString(allocateResponse.getAllocatedContainers().size())); + + Integer.toString(newlyFinished.size())); LOG.info("Total number of containers received: " + Integer.toString(containersForReleasedContainerIds.size())); Thread.sleep(10); @@ -339,6 +364,64 @@ public void testReregister() throws Exception { } @Test + public void testRecover() throws Exception { + + // Register the application + RegisterApplicationMasterRequest registerReq = + Records.newRecord(RegisterApplicationMasterRequest.class); + registerReq.setHost(Integer.toString(testAppId)); + registerReq.setRpcPort(testAppId); + registerReq.setTrackingUrl(""); + + RegisterApplicationMasterResponse registerResponse = + interceptor.registerApplicationMaster(registerReq); + Assert.assertNotNull(registerResponse); + + Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); + + // Allocate one batch of containers + registerSubCluster(SubClusterId.newInstance("SC-1")); + registerSubCluster(SubClusterId.newInstance(HOME_SC_ID)); + + int numberOfContainers = 3; + List containers = + getContainersAndAssert(numberOfContainers, numberOfContainers * 2); + Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); + + // Prepare for Federation Interceptor restart and recover + Map recoveredDataMap = + recoverDataMapForAppAttempt(nmStateStore, attemptId); + + // Preserve the mock RM instances + MockResourceManagerFacade homeRM = interceptor.getHomeRM(); + ConcurrentHashMap secondaries = + interceptor.getSecondaryRMs(); + + // Create a new intercepter instance and recover + interceptor = new TestableFederationInterceptor(homeRM, secondaries); + interceptor.init(new AMRMProxyApplicationContextImpl(nmContext, getConf(), + attemptId, "test-user", null, null)); + interceptor.recover(recoveredDataMap); + + Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); + + // Release all containers + releaseContainersAndAssert(containers); + + // Finish the application + FinishApplicationMasterRequest finishReq = + Records.newRecord(FinishApplicationMasterRequest.class); + finishReq.setDiagnostics(""); + finishReq.setTrackingUrl(""); + finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); + + FinishApplicationMasterResponse finshResponse = + interceptor.finishApplicationMaster(finishReq); + Assert.assertNotNull(finshResponse); + Assert.assertEquals(true, finshResponse.getIsUnregistered()); + } + + @Test public void testRequestInterceptorChainCreation() throws Exception { RequestInterceptor root = super.getAMRMProxyService().createRequestInterceptorChain(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java index d4b8735..870e229 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java @@ -44,6 +44,16 @@ private AtomicInteger runningIndex = new AtomicInteger(0); private MockResourceManagerFacade mockRm; + public TestableFederationInterceptor() { + } + + public TestableFederationInterceptor(MockResourceManagerFacade homeRM, + ConcurrentHashMap secondaries) { + this(); + mockRm = homeRM; + secondaryResourceManagers = secondaries; + } + @Override protected UnmanagedAMPoolManager createUnmanagedAMPoolManager( ExecutorService threadPool) { @@ -68,7 +78,7 @@ protected ApplicationMasterProtocol createHomeRMProxy( // We create one instance of the mock resource manager per sub cluster. Keep // track of the instances of the RMs in the map keyed by the sub cluster id synchronized (this.secondaryResourceManagers) { - if (this.secondaryResourceManagers.contains(subClusterId)) { + if (this.secondaryResourceManagers.containsKey(subClusterId)) { return (T) this.secondaryResourceManagers.get(subClusterId); } else { // The running index here is used to simulate different RM_EPOCH to @@ -91,6 +101,15 @@ protected void setShouldReRegisterNext() { } } + protected MockResourceManagerFacade getHomeRM() { + return mockRm; + } + + protected ConcurrentHashMap + getSecondaryRMs() { + return secondaryResourceManagers; + } + /** * Extends the UnmanagedAMPoolManager and overrides methods to provide a * testable implementation of UnmanagedAMPoolManager.