diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java index 08aee77fe6d..7acd23d6a0e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; @@ -46,7 +47,9 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager.UnmanagedAMIdentifier; import org.apache.hadoop.yarn.server.utils.AMRMClientUtils; import org.apache.hadoop.yarn.util.AsyncCallback; import org.slf4j.Logger; @@ -173,28 +176,31 @@ public String createAndRegisterNewUAM( rmClient = null; } - createAndRegisterNewUAM(appId.toString(), registerRequest, conf, appId, - queueName, submitter, appNameSuffix); + // Launch the UAM in RM + launchUAM(appId.toString(), conf, appId, queueName, submitter, + appNameSuffix); + + // Register the UAM application + registerApplicationMaster(appId.toString(), registerRequest, false); + + // Returns the appId as uamId return appId.toString(); } /** - * Create a new UAM and register the application, using the provided uamId and - * appId. + * Launch a new UAM, using the provided uamId and appId. * - * @param uamId identifier for the UAM - * @param registerRequest RegisterApplicationMasterRequest + * @param uamId uam Id * @param conf configuration for this UAM * @param appId application id for the UAM * @param queueName queue of the application * @param submitter submitter name of the UAM * @param appNameSuffix application name suffix for the UAM - * @return RegisterApplicationMasterResponse - * @throws YarnException if registerApplicationMaster fails - * @throws IOException if registerApplicationMaster fails + * @return UnmanagedAMIdentifier uam identifier + * @throws YarnException if fails + * @throws IOException if fails */ - public RegisterApplicationMasterResponse createAndRegisterNewUAM(String uamId, - RegisterApplicationMasterRequest registerRequest, Configuration conf, + public UnmanagedAMIdentifier launchUAM(String uamId, Configuration conf, ApplicationId appId, String queueName, String submitter, String appNameSuffix) throws YarnException, IOException { @@ -207,11 +213,10 @@ public RegisterApplicationMasterResponse createAndRegisterNewUAM(String uamId, // for the same uamId being created concurrently this.unmanagedAppMasterMap.put(uamId, uam); - RegisterApplicationMasterResponse response = null; + UnmanagedAMIdentifier uamIdentifier = null; try { - LOG.info("Creating and registering UAM id {} for application {}", uamId, - appId); - response = uam.createAndRegisterApplicationMaster(registerRequest); + LOG.info("Launching UAM id {} for application {}", uamId, appId); + uamIdentifier = uam.launchUAM(); } catch (Exception e) { // Add the map earlier and remove here if register failed because we want // to make sure there is only one uam instance per uamId at any given time @@ -220,7 +225,49 @@ public RegisterApplicationMasterResponse createAndRegisterNewUAM(String uamId, } this.attemptIdMap.put(uamId, uam.getAttemptId()); - return response; + return uamIdentifier; + } + + /** + * Re-attach to an existing UAM, using the provided uamIdentifier. + * + * @param uamId uam Id + * @param conf configuration for this UAM + * @param appId application id for the UAM + * @param queueName queue of the application + * @param submitter submitter name of the UAM + * @param appNameSuffix application name suffix for the UAM + * @param uamIdentifier identifying info about the uam + * @throws YarnException if fails + * @throws IOException if fails + */ + public List reAttachUAM(String uamId, Configuration conf, + ApplicationId appId, String queueName, String submitter, + String appNameSuffix, UnmanagedAMIdentifier uamIdentifier) + throws YarnException, IOException { + + if (this.unmanagedAppMasterMap.containsKey(uamId)) { + throw new YarnException("UAM " + uamId + " already exists"); + } + UnmanagedApplicationManager uam = + createUAM(conf, appId, queueName, submitter, appNameSuffix); + // Put the UAM into map first before initializing it to avoid additional UAM + // for the same uamId being created concurrently + this.unmanagedAppMasterMap.put(uamId, uam); + + List containerReports = null; + try { + LOG.info("Reattaching UAM id {} for application {}", uamId, appId); + containerReports = uam.reAttachUAM(uamIdentifier); + } catch (Exception e) { + // Add the map earlier and remove here if register failed because we want + // to make sure there is only one uam instance per uamId at any given time + this.unmanagedAppMasterMap.remove(uamId); + throw e; + } + + this.attemptIdMap.put(uamId, uam.getAttemptId()); + return containerReports; } /** @@ -241,10 +288,32 @@ protected UnmanagedApplicationManager createUAM(Configuration conf, appNameSuffix); } + /** + * Register application master for the UAM. + * + * @param uamId uam Id + * @param registerRequest RegisterApplicationMasterRequest + * @param isReAttach whether this is a re-attach of an existing UAM + * @return register response + * @throws YarnException if register fails + * @throws IOException if register fails + */ + public RegisterApplicationMasterResponse registerApplicationMaster( + String uamId, RegisterApplicationMasterRequest registerRequest, + boolean isReAttach) throws YarnException, IOException { + if (!this.unmanagedAppMasterMap.containsKey(uamId)) { + throw new YarnException("UAM " + uamId + " does not exist"); + } + LOG.info("Registering UAM id {} for application {}", uamId, + this.attemptIdMap.get(uamId)); + return this.unmanagedAppMasterMap.get(uamId) + .registerApplicationMaster(registerRequest, isReAttach); + } + /** * AllocateAsync to an UAM. * - * @param uamId identifier for the UAM + * @param uamId uam Id * @param request AllocateRequest * @param callback callback for response * @throws YarnException if allocate fails @@ -262,7 +331,7 @@ public void allocateAsync(String uamId, AllocateRequest request, /** * Finish an UAM/application. * - * @param uamId identifier for the UAM + * @param uamId uam Id * @param request FinishApplicationMasterRequest * @return FinishApplicationMasterResponse * @throws YarnException if finishApplicationMaster call fails @@ -274,7 +343,8 @@ public FinishApplicationMasterResponse finishApplicationMaster(String uamId, if (!this.unmanagedAppMasterMap.containsKey(uamId)) { throw new YarnException("UAM " + uamId + " does not exist"); } - LOG.info("Finishing application for UAM id {} ", uamId); + LOG.info("Finishing UAM id {} for application {}", uamId, + this.attemptIdMap.get(uamId)); FinishApplicationMasterResponse response = this.unmanagedAppMasterMap.get(uamId).finishApplicationMaster(request); @@ -301,7 +371,7 @@ public FinishApplicationMasterResponse finishApplicationMaster(String uamId, /** * Return whether an UAM exists. * - * @param uamId identifier for the UAM + * @param uamId uam Id * @return UAM exists or not */ public boolean hasUAMId(String uamId) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java index 60a9a277eac..0733413be5c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.lang.Thread.UncaughtExceptionHandler; import java.util.EnumSet; +import java.util.List; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -40,6 +41,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; @@ -51,6 +54,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -116,6 +120,7 @@ public UnmanagedApplicationManager(Configuration conf, ApplicationId appId, this.handlerThread = new AMRequestHandlerThread(); this.requestQueue = new LinkedBlockingQueue<>(); this.rmProxy = null; + this.attemptId = null; this.registerRequest = null; this.recordFactory = RecordFactoryProvider.getRecordFactory(conf); this.asyncApiPollIntervalMillis = conf.getLong( @@ -126,26 +131,67 @@ public UnmanagedApplicationManager(Configuration conf, ApplicationId appId, } /** - * Registers this {@link UnmanagedApplicationManager} with the resource - * manager. + * Launch a new UAM in the resource manager. * - * @param request the register request - * @return the register response - * @throws YarnException if register fails - * @throws IOException if register fails + * @return identifier uam identifier + * @throws YarnException if fails + * @throws IOException if fails */ - public RegisterApplicationMasterResponse createAndRegisterApplicationMaster( - RegisterApplicationMasterRequest request) - throws YarnException, IOException { - // This need to be done first in this method, because it is used as an - // indication that this method is called (and perhaps blocked due to RM - // connection and not finished yet) - this.registerRequest = request; + public UnmanagedAMIdentifier launchUAM() throws YarnException, IOException { + /* + * We use attemptId != null as an indication that this method launchUAM is + * called (and perhaps blocked in initializeUnmanagedAM below due to RM + * connection/failover issue and not finished yet). Set the identifier to + * non-null first before calling the blocking call to RM. + */ + this.attemptId = ApplicationAttemptId.newInstance(null, 0); - // attemptId will be available after this call + // The real this.attemptId will be set after this blocking call to RM UnmanagedAMIdentifier identifier = initializeUnmanagedAM(this.applicationId); + // Creates the UAM connection + createUAMConnection(identifier); + return identifier; + } + + /** + * Re-attach to an existing UAM in the resource manager. + * + * @param identifier identifying info about the uam + * @throws IOException + * @throws YarnException + */ + public List reAttachUAM(UnmanagedAMIdentifier identifier) + throws IOException, YarnException { + /* + * We use attemptId != null as an indication that this method reAttachUAM is + * called (and perhaps blocked in initializeUnmanagedAM below due to RM + * connection/failover issue and not finished yet). Set the identifier to + * non-null first before calling the blocking call to RM. + */ + this.attemptId = identifier.getAttemptId(); + + GetContainersResponse response = null; + try { + UserGroupInformation appSubmitter = + UserGroupInformation.createRemoteUser(this.submitter); + this.rmClient = createRMProxy(ApplicationClientProtocol.class, this.conf, + appSubmitter, null); + response = this.rmClient + .getContainers(GetContainersRequest.newInstance(this.attemptId)); + } finally { + this.rmClient = null; + } + + // Creates the UAM connection + createUAMConnection(identifier); + + return response.getContainerList(); + } + + protected void createUAMConnection(UnmanagedAMIdentifier identifier) + throws IOException { try { this.userUgi = UserGroupInformation.createProxyUser( identifier.getAttemptId().toString(), @@ -157,10 +203,43 @@ public RegisterApplicationMasterResponse createAndRegisterApplicationMaster( this.rmProxy = createRMProxy(ApplicationMasterProtocol.class, this.conf, this.userUgi, identifier.getToken()); + } - LOG.info("Registering the Unmanaged application master {}", this.attemptId); - RegisterApplicationMasterResponse response = - this.rmProxy.registerApplicationMaster(this.registerRequest); + /** + * Registers this {@link UnmanagedApplicationManager} with the resource + * manager. + * + * @param request RegisterApplicationMasterRequest + * @param isReAttach whether this is a re-attach of an existing UAM + * @return register response + * @throws YarnException if register fails + * @throws IOException if register fails + */ + public RegisterApplicationMasterResponse registerApplicationMaster( + RegisterApplicationMasterRequest request, boolean isReAttach) + throws YarnException, IOException { + if (this.rmProxy == null) { + throw new YarnException( + "finishApplicationMaster should not be called before launchUAM"); + } + + // Save the register request for re-register later + this.registerRequest = request; + RegisterApplicationMasterResponse response = null; + if (isReAttach) { + /* + * Since for performance reasons, we didn't save the last valid responseId + * from last run, we have to force RM to accept our first allocate call by + * setting the responseId to certain value. We will get the proper + * responseId to use afterwards from RM's first allocate response. + */ + this.lastResponseId = Integer.MAX_VALUE - 1; + } else { + LOG.info("Registering the Unmanaged application master {}", + this.attemptId); + response = this.rmProxy.registerApplicationMaster(this.registerRequest); + this.lastResponseId = 0; + } // Only when register succeed that we start the heartbeat thread this.handlerThread.setUncaughtExceptionHandler( @@ -168,7 +247,6 @@ public RegisterApplicationMasterResponse createAndRegisterApplicationMaster( this.handlerThread.setDaemon(true); this.handlerThread.start(); - this.lastResponseId = 0; return response; } @@ -187,11 +265,11 @@ public FinishApplicationMasterResponse finishApplicationMaster( this.handlerThread.shutdown(); if (this.rmProxy == null) { - if (this.registerRequest != null) { - // This is possible if the async registerApplicationMaster is still + if (this.attemptId != null) { + // This is possible if the async launchUAM is still // blocked and retrying. Return a dummy response in this case. LOG.warn("Unmanaged AM still not successfully launched/registered yet." - + " Stopping the UAM client thread anyways."); + + " Stopping the UAM heartbeat thread anyways."); return FinishApplicationMasterResponse.newInstance(false); } else { throw new YarnException("finishApplicationMaster should not " @@ -240,18 +318,18 @@ public void allocateAsync(AllocateRequest request, LOG.debug("Interrupted while waiting to put on response queue", ex); } // Two possible cases why the UAM is not successfully registered yet: - // 1. registerApplicationMaster is not called at all. Should throw here. - // 2. registerApplicationMaster is called but hasn't successfully returned. + // 1. launchUAM is not called at all. Should throw here. + // 2. launchUAM is called but hasn't successfully returned. // // In case 2, we have already save the allocate request above, so if the // registration succeed later, no request is lost. if (this.rmProxy == null) { - if (this.registerRequest != null) { + if (this.attemptId != null) { LOG.info("Unmanaged AM still not successfully launched/registered yet." + " Saving the allocate request and send later."); } else { throw new YarnException( - "AllocateAsync should not be called before createAndRegister"); + "AllocateAsync should not be called before launchUAM"); } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java index e33d7e19774..ec1e6352f70 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java @@ -105,6 +105,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NMToken; @@ -371,6 +372,7 @@ public AllocateResponse allocate(AllocateRequest request) } } + List completedList = new ArrayList<>(); if (request.getReleaseList() != null && request.getReleaseList().size() > 0) { LOG.info("Releasing containers: " + request.getReleaseList().size()); @@ -396,18 +398,8 @@ public AllocateResponse allocate(AllocateRequest request) + conf.get("AMRMTOKEN"), found); ids.remove(id); - - // Return the released container back to the AM with new fake Ids. The - // test case does not care about the IDs. The IDs are faked because - // otherwise the LRM will throw duplication identifier exception. This - // returning of fake containers is ONLY done for testing purpose - for - // the test code to get confirmation that the sub-cluster resource - // managers received the release request - ContainerId fakeContainerId = ContainerId.newInstance( - getApplicationAttemptId(1), containerIndex.incrementAndGet()); - Container fakeContainer = allocatedContainerMap.get(id); - fakeContainer.setId(fakeContainerId); - containerList.add(fakeContainer); + completedList.add( + ContainerStatus.newInstance(id, ContainerState.COMPLETE, "", 0)); } } } @@ -418,9 +410,9 @@ public AllocateResponse allocate(AllocateRequest request) // Always issue a new AMRMToken as if RM rolled master key Token newAMRMToken = Token.newInstance(new byte[0], "", new byte[0], ""); - return AllocateResponse.newInstance(0, new ArrayList(), - containerList, new ArrayList(), null, AMCommand.AM_RESYNC, - 1, null, new ArrayList(), newAMRMToken, + return AllocateResponse.newInstance(0, completedList, containerList, + new ArrayList(), null, AMCommand.AM_RESYNC, 1, null, + new ArrayList(), newAMRMToken, new ArrayList()); } @@ -437,6 +429,8 @@ public GetApplicationReportResponse getApplicationReport( report.setApplicationId(request.getApplicationId()); report.setCurrentApplicationAttemptId( ApplicationAttemptId.newInstance(request.getApplicationId(), 1)); + report.setAMRMToken( + Token.newInstance(new byte[3], "kind", new byte[3], "service")); response.setApplicationReport(report); return response; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java index 9159cf75150..364e8c3d640 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.MockResourceManagerFacade; +import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager.UnmanagedAMIdentifier; import org.apache.hadoop.yarn.util.AsyncCallback; import org.junit.Assert; import org.junit.Before; @@ -88,8 +89,10 @@ protected void waitForCallBackCountAndCheckZeroPending( public void testBasicUsage() throws YarnException, IOException, InterruptedException { - createAndRegisterApplicationMaster( - RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); + launchUAM(attemptId); + registerApplicationMaster( + RegisterApplicationMasterRequest.newInstance(null, 0, null), false, + attemptId); allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback, attemptId); @@ -102,12 +105,55 @@ public void testBasicUsage() attemptId); } + /* + * Test re-attaching of an existing UAM. This is for HA of UAM client. + */ + @Test(timeout = 5000) + public void testUAMReAttach() + throws YarnException, IOException, InterruptedException { + + launchUAM(attemptId); + registerApplicationMaster( + RegisterApplicationMasterRequest.newInstance(null, 0, null), false, + attemptId); + + allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback, + attemptId); + // Wait for outstanding async allocate callback + waitForCallBackCountAndCheckZeroPending(callback, 1); + + MockResourceManagerFacade rmProxy = uam.getRMProxy(); + uam = new TestableUnmanagedApplicationManager(conf, + attemptId.getApplicationId(), null, "submitter", "appNameSuffix"); + uam.setRMProxy(rmProxy); + + UnmanagedAMIdentifier uamIdentifier = + new UnmanagedAMIdentifier(attemptId, null); + + reAttachUAM(uamIdentifier, attemptId); + registerApplicationMaster( + RegisterApplicationMasterRequest.newInstance(null, 0, null), true, + attemptId); + + allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback, + attemptId); + + // Wait for outstanding async allocate callback + waitForCallBackCountAndCheckZeroPending(callback, 2); + + finishApplicationMaster( + FinishApplicationMasterRequest.newInstance(null, null, null), + attemptId); + } + @Test(timeout = 5000) public void testReRegister() throws YarnException, IOException, InterruptedException { - createAndRegisterApplicationMaster( - RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); + launchUAM(attemptId); + registerApplicationMaster( + RegisterApplicationMasterRequest.newInstance(null, 0, null), false, + attemptId); uam.setShouldReRegisterNext(); @@ -137,9 +183,10 @@ public void testSlowRegisterCall() @Override public void run() { try { - createAndRegisterApplicationMaster( + launchUAM(attemptId); + registerApplicationMaster( RegisterApplicationMasterRequest.newInstance(null, 1001, null), - attemptId); + false, attemptId); } catch (Exception e) { LOG.info("Register thread exception", e); } @@ -221,8 +268,10 @@ public void testFinishWithoutRegister() @Test public void testForceKill() throws YarnException, IOException, InterruptedException { - createAndRegisterApplicationMaster( - RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); + launchUAM(attemptId); + registerApplicationMaster( + RegisterApplicationMasterRequest.newInstance(null, 0, null), false, + attemptId); uam.forceKillApplication(); try { @@ -241,19 +290,39 @@ protected UserGroupInformation getUGIWithToken( return ugi; } - protected RegisterApplicationMasterResponse - createAndRegisterApplicationMaster( - final RegisterApplicationMasterRequest request, - ApplicationAttemptId appAttemptId) - throws YarnException, IOException, InterruptedException { + protected UnmanagedAMIdentifier launchUAM(ApplicationAttemptId appAttemptId) + throws IOException, InterruptedException { + return getUGIWithToken(appAttemptId) + .doAs(new PrivilegedExceptionAction() { + @Override + public UnmanagedAMIdentifier run() throws Exception { + return uam.launchUAM(); + } + }); + } + + protected void reAttachUAM(final UnmanagedAMIdentifier uamIdentifier, + ApplicationAttemptId appAttemptId) + throws IOException, InterruptedException { + getUGIWithToken(appAttemptId).doAs(new PrivilegedExceptionAction() { + @Override + public UnmanagedAMIdentifier run() throws Exception { + uam.reAttachUAM(uamIdentifier); + return null; + } + }); + } + + protected RegisterApplicationMasterResponse registerApplicationMaster( + final RegisterApplicationMasterRequest request, final boolean isReattach, + ApplicationAttemptId appAttemptId) + throws YarnException, IOException, InterruptedException { return getUGIWithToken(appAttemptId).doAs( new PrivilegedExceptionAction() { @Override public RegisterApplicationMasterResponse run() throws YarnException, IOException { - RegisterApplicationMasterResponse response = - uam.createAndRegisterApplicationMaster(request); - return response; + return uam.registerApplicationMaster(request, isReattach); } }); } @@ -330,6 +399,14 @@ public void setShouldReRegisterNext() { rmProxy.setShouldReRegisterNext(); } } + + public MockResourceManagerFacade getRMProxy() { + return rmProxy; + } + + public void setRMProxy(MockResourceManagerFacade proxy) { + this.rmProxy = proxy; + } } } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java index d63b2cf589b..24eef7c922b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java @@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.server.security.MasterKeyData; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -460,9 +461,8 @@ protected void stopApplication(ApplicationId applicationId) { try { pipeline.getRootInterceptor().shutdown(); } catch (Throwable ex) { - LOG.warn( - "Failed to shutdown the request processing pipeline for app:" - + applicationId, ex); + LOG.warn("Failed to shutdown the request processing pipeline for app:" + + applicationId, ex); } // Remove the app context from NMSS after the interceptors are shutdown @@ -497,10 +497,7 @@ private void updateAMRMTokens(AMRMTokenIdentifier amrmTokenIdentifier, allocateResponse.setAMRMToken(null); org.apache.hadoop.security.token.Token newToken = - new org.apache.hadoop.security.token.Token( - token.getIdentifier().array(), token.getPassword().array(), - new Text(token.getKind()), new Text(token.getService())); - + ConverterUtils.convertFromYarn(token, (Text) null); context.setAMRMToken(newToken); // Update the AMRMToken in context map in NM state store diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java index 28724aaf25c..9b60b198f27 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -34,16 +35,25 @@ import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.PreemptionContract; @@ -56,6 +66,9 @@ import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy; @@ -64,9 +77,11 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager; +import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager.UnmanagedAMIdentifier; import org.apache.hadoop.yarn.server.utils.AMRMClientUtils; import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; import org.apache.hadoop.yarn.util.AsyncCallback; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,6 +99,17 @@ private static final Logger LOG = LoggerFactory.getLogger(FederationInterceptor.class); + private static final String NMSS_CLASS_PREFIX = "FederationInterceptor/"; + private static final String NMSS_BYTE_STRING_FORMAT = "UTF-8"; + + private static final String NMSS_REG_REQUEST_KEY = + NMSS_CLASS_PREFIX + "registerRequest"; + private static final String NMSS_REG_RESPONSE_KEY = + NMSS_CLASS_PREFIX + "registerResponse"; + + private static final String NMSS_SECONDARY_SC_PREFIX = + NMSS_CLASS_PREFIX + "secondarySC/"; + /** * The home sub-cluster is the sub-cluster where the AM container is running * in. @@ -182,7 +208,8 @@ public void init(AMRMProxyApplicationContext appContext) { this.homeSubClusterId = SubClusterId.newInstance(YarnConfiguration.getClusterId(conf)); - this.homeRM = createHomeRMProxy(appContext); + this.homeRM = createHomeRMProxy(appContext, ApplicationMasterProtocol.class, + this.appOwner); this.federationFacade = FederationStateStoreFacade.getInstance(); this.subClusterResolver = this.federationFacade.getSubClusterResolver(); @@ -194,6 +221,111 @@ public void init(AMRMProxyApplicationContext appContext) { this.uamPool.start(); } + @Override + public void recover(Map recoveredDataMap) { + super.recover(recoveredDataMap); + LOG.info("Recovering data for FederationInterceptor"); + if (recoveredDataMap == null) { + return; + } + + ApplicationAttemptId attemptId = + getApplicationContext().getApplicationAttemptId(); + try { + if (recoveredDataMap.containsKey(NMSS_REG_REQUEST_KEY)) { + RegisterApplicationMasterRequestProto pb = + RegisterApplicationMasterRequestProto + .parseFrom(recoveredDataMap.get(NMSS_REG_REQUEST_KEY)); + this.amRegistrationRequest = + new RegisterApplicationMasterRequestPBImpl(pb); + LOG.info("amRegistrationRequest recovered for {}", attemptId); + } + if (recoveredDataMap.containsKey(NMSS_REG_RESPONSE_KEY)) { + RegisterApplicationMasterResponseProto pb = + RegisterApplicationMasterResponseProto + .parseFrom(recoveredDataMap.get(NMSS_REG_RESPONSE_KEY)); + this.amRegistrationResponse = + new RegisterApplicationMasterResponsePBImpl(pb); + LOG.info("amRegistrationResponse recovered for {}", attemptId); + } + + int containers = 0, uams = 0; + for (Map.Entry entry : recoveredDataMap.entrySet()) { + if (entry.getKey().startsWith(NMSS_SECONDARY_SC_PREFIX)) { + // entry for subCluster->UAM amrmToken + String subClusterId = + entry.getKey().substring(NMSS_SECONDARY_SC_PREFIX.length()); + LOG.debug("Recovered UAM " + subClusterId); + + Token amrmToken = + new Token(); + amrmToken.decodeFromUrlString( + new String(entry.getValue(), NMSS_BYTE_STRING_FORMAT)); + + // Create a config loaded with federation on and subclusterId + // for each UAM + YarnConfiguration config = new YarnConfiguration(getConf()); + FederationProxyProviderUtil.updateConfForFederation(config, + subClusterId); + + List containerReports = this.uamPool.reAttachUAM( + subClusterId, config, attemptId.getApplicationId(), + this.amRegistrationResponse.getQueue(), + getApplicationContext().getUser(), + this.homeSubClusterId.toString(), + new UnmanagedAMIdentifier(attemptId, amrmToken)); + + // Running containers from secondary RMs + SubClusterId subClusterIdObj = SubClusterId.newInstance(subClusterId); + for (ContainerReport container : containerReports) { + containerIdToSubClusterIdMap.put(container.getContainerId(), + subClusterIdObj); + containers++; + } + LOG.info("Recovered {} running containers from UAM in {}", + containerReports.size(), subClusterId); + + this.uamPool.registerApplicationMaster(subClusterId, + this.amRegistrationRequest, true); + uams++; + } + } + + // Get the running containers from home RM, note that we will also get the + // AM container itself from here. We don't need it, but no harm to put the + // map as well. + UserGroupInformation appSubmitter = UserGroupInformation + .createRemoteUser(getApplicationContext().getUser()); + ApplicationClientProtocol rmClient = createHomeRMProxy( + getApplicationContext(), ApplicationClientProtocol.class, appSubmitter); + + GetContainersResponse response = + rmClient.getContainers(GetContainersRequest.newInstance(attemptId)); + for (ContainerReport container : response.getContainerList()) { + containerIdToSubClusterIdMap.put(container.getContainerId(), + this.homeSubClusterId); + containers++; + } + LOG.info("{} running containers including AM recovered from home RM ", + response.getContainerList().size(), this.homeSubClusterId); + + LOG.info( + "In all {} UAMs {} running containers including AM recovered for {}", + uams, containers, attemptId); + + if (this.amRegistrationResponse != null) { + // Initialize the AMRMProxyPolicy + String queue = this.amRegistrationResponse.getQueue(); + this.policyInterpreter = + FederationPolicyUtils.loadAMRMPolicy(queue, this.policyInterpreter, + getConf(), this.federationFacade, this.homeSubClusterId); + } + } catch (IOException | YarnException e) { + throw new YarnRuntimeException(e); + } + + } + /** * Sends the application master's registration request to the home RM. * @@ -225,6 +357,18 @@ public void init(AMRMProxyApplicationContext appContext) { // Save the registration request. This will be used for registering with // secondary sub-clusters using UAMs, as well as re-register later this.amRegistrationRequest = request; + if (getNMStateStore() != null) { + try { + RegisterApplicationMasterRequestPBImpl pb = + (RegisterApplicationMasterRequestPBImpl) this.amRegistrationRequest; + getNMStateStore().storeAMRMProxyAppContextEntry( + getApplicationContext().getApplicationAttemptId(), + NMSS_REG_REQUEST_KEY, pb.getProto().toByteArray()); + } catch (Exception e) { + LOG.error("Error storing AMRMProxy application context entry for " + + getApplicationContext().getApplicationAttemptId(), e); + } + } } /* @@ -250,6 +394,18 @@ public void init(AMRMProxyApplicationContext appContext) { */ this.amRegistrationResponse = this.homeRM.registerApplicationMaster(request); + if (getNMStateStore() != null) { + try { + RegisterApplicationMasterResponsePBImpl pb = + (RegisterApplicationMasterResponsePBImpl) this.amRegistrationResponse; + getNMStateStore().storeAMRMProxyAppContextEntry( + getApplicationContext().getApplicationAttemptId(), + NMSS_REG_RESPONSE_KEY, pb.getProto().toByteArray()); + } catch (Exception e) { + LOG.error("Error storing AMRMProxy application context entry for " + + getApplicationContext().getApplicationAttemptId(), e); + } + } // the queue this application belongs will be used for getting // AMRMProxy policy from state store. @@ -376,6 +532,19 @@ public FinishApplicationMasterResponseInfo call() throws Exception { try { uamResponse = uamPool.finishApplicationMaster(subClusterId, finishRequest); + if (uamResponse.getIsUnregistered() + && getNMStateStore() != null) { + try { + getNMStateStore().removeAMRMProxyAppContextEntry( + getApplicationContext().getApplicationAttemptId(), + NMSS_SECONDARY_SC_PREFIX + subClusterId); + } catch (Exception e) { + LOG.error( + "Error removing AMRMProxy application context entry for " + + getApplicationContext().getApplicationAttemptId(), + e); + } + } } catch (Throwable e) { LOG.warn("Failed to finish unmanaged application master: " + "RM address: " + subClusterId + " ApplicationId: " @@ -443,7 +612,23 @@ public void setNextInterceptor(RequestInterceptor next) { @Override public void shutdown() { if (this.uamPool != null) { + // Save a local copy of the key set + Set subClusterIds = new HashSet<>(this.uamPool.getAllUAMIds()); this.uamPool.stop(); + + // Remove the subcluster entries in NMSS + if (getNMStateStore() != null) { + try { + for (String subClusterId : subClusterIds) { + getNMStateStore().removeAMRMProxyAppContextEntry( + getApplicationContext().getApplicationAttemptId(), + NMSS_SECONDARY_SC_PREFIX + subClusterId); + } + } catch (Throwable e) { + LOG.error("Error removing AMRMProxy application context entry for " + + getApplicationContext().getApplicationAttemptId(), e); + } + } } if (threadpool != null) { try { @@ -475,12 +660,11 @@ protected UnmanagedAMPoolManager createUnmanagedAMPoolManager( * @param appContext AMRMProxyApplicationContext * @return the proxy created */ - protected ApplicationMasterProtocol createHomeRMProxy( - AMRMProxyApplicationContext appContext) { + protected T createHomeRMProxy(AMRMProxyApplicationContext appContext, + Class protocol, UserGroupInformation user) { try { return FederationProxyProviderUtil.createRMProxy(appContext.getConf(), - ApplicationMasterProtocol.class, this.homeSubClusterId, this.appOwner, - appContext.getAMRMToken()); + protocol, this.homeSubClusterId, user, appContext.getAMRMToken()); } catch (Exception ex) { throw new YarnRuntimeException(ex); } @@ -652,6 +836,25 @@ public void callback(AllocateResponse response) { responses.add(response); } + // Save the new AMRMToken for the UAM in NMSS if present + if (response.getAMRMToken() != null + && getNMStateStore() != null) { + Token newToken = ConverterUtils + .convertFromYarn(response.getAMRMToken(), (Text) null); + try { + getNMStateStore().storeAMRMProxyAppContextEntry( + getApplicationContext().getApplicationAttemptId(), + NMSS_SECONDARY_SC_PREFIX + subClusterId.getId(), + newToken.encodeToUrlString() + .getBytes(NMSS_BYTE_STRING_FORMAT)); + } catch (IOException e) { + LOG.error( + "Error storing AMRMProxy application context entry for " + + getApplicationContext().getApplicationAttemptId(), + e); + } + } + // Notify policy of secondary sub-cluster responses try { policyInterpreter.notifyOfResponse(subClusterId, response); @@ -713,11 +916,23 @@ public RegisterApplicationMasterResponseInfo call() RegisterApplicationMasterResponse uamResponse = null; try { // For appNameSuffix, use subClusterId of the home sub-cluster - uamResponse = uamPool.createAndRegisterNewUAM(subClusterId, - registerRequest, config, - appContext.getApplicationAttemptId().getApplicationId(), - amRegistrationResponse.getQueue(), appContext.getUser(), - homeSubClusterId.toString()); + UnmanagedAMIdentifier uamIdentifier = + uamPool.launchUAM(subClusterId, config, + appContext.getApplicationAttemptId() + .getApplicationId(), + amRegistrationResponse.getQueue(), + appContext.getUser(), homeSubClusterId.toString()); + + uamPool.registerApplicationMaster(subClusterId, + registerRequest, false); + + if (getNMStateStore() != null) { + getNMStateStore().storeAMRMProxyAppContextEntry( + getApplicationContext().getApplicationAttemptId(), + NMSS_SECONDARY_SC_PREFIX + subClusterId, + uamIdentifier.getToken().encodeToUrlString() + .getBytes(NMSS_BYTE_STRING_FORMAT)); + } } catch (Throwable e) { LOG.error("Failed to register application master: " + subClusterId + " Application: " @@ -769,11 +984,6 @@ public RegisterApplicationMasterResponseInfo call() private AllocateResponse mergeAllocateResponses( AllocateResponse homeResponse) { // Timing issue, we need to remove the completed and then save the new ones. - if (LOG.isDebugEnabled()) { - LOG.debug("Remove containers: " - + homeResponse.getCompletedContainersStatuses()); - LOG.debug("Adding containers: " + homeResponse.getAllocatedContainers()); - } removeFinishedContainersFromCache( homeResponse.getCompletedContainersStatuses()); cacheAllocatedContainers(homeResponse.getAllocatedContainers(), @@ -796,7 +1006,6 @@ private AllocateResponse mergeAllocateResponses( } } } - return homeResponse; } @@ -806,6 +1015,7 @@ private AllocateResponse mergeAllocateResponses( private void removeFinishedContainersFromCache( List finishedContainers) { for (ContainerStatus container : finishedContainers) { + LOG.info("Completed container {}", container); if (containerIdToSubClusterIdMap .containsKey(container.getContainerId())) { containerIdToSubClusterIdMap.remove(container.getContainerId()); @@ -943,12 +1153,23 @@ private void mergeAllocateResponse(AllocateResponse homeResponse, private void cacheAllocatedContainers(List containers, SubClusterId subClusterId) { for (Container container : containers) { + LOG.info("Adding container {}", container); + if (containerIdToSubClusterIdMap.containsKey(container.getId())) { SubClusterId existingSubClusterId = containerIdToSubClusterIdMap.get(container.getId()); if (existingSubClusterId.equals(subClusterId)) { - // When RM fails over, the new RM master might send out the same - // container allocation more than once. Just move on in this case. + /* + * When RM fails over, the new RM master might send out the same + * container allocation more than once. + * + * It is also possible because of a recent NM restart with NM recovery + * enabled. We recover running containers from RM. But RM might not + * notified AM of some of these containers yet. When RM dose notify, + * we will already have these containers in the map. + * + * Either case, just warn and move on. + */ LOG.warn( "Duplicate containerID: {} found in the allocated containers" + " from same sub-cluster: {}, so ignoring.", @@ -1023,7 +1244,9 @@ private static AllocateRequest createAllocateRequest() { */ private boolean warnIfNotExists(ContainerId containerId, String actionName) { if (!this.containerIdToSubClusterIdMap.containsKey(containerId)) { - LOG.error("AM is trying to {} a container {} that does not exist. ", + LOG.error( + "AM is trying to {} a container {} that does not exist. Might happen " + + "shortly after NM restart when NM recovery is enabled", actionName, containerId.toString()); return false; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index a24c83b4cb7..ce2f179ae8a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; @@ -71,6 +72,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; @@ -176,6 +178,29 @@ protected Context createContext() { return new NMContext(null, null, null, null, stateStore, false, this.conf); } + // A utility method for intercepter recover unit test + protected Map recoverDataMapForAppAttempt( + NMStateStoreService nmStateStore, ApplicationAttemptId attemptId) + throws IOException { + RecoveredAMRMProxyState state = nmStateStore.loadAMRMProxyState(); + for (Map.Entry> entry : state + .getAppContexts().entrySet()) { + if (entry.getKey().equals(attemptId)) { + return entry.getValue(); + } + } + return null; + } + + protected List getCompletedContainerIds( + List containerStatus) { + List ret = new ArrayList<>(); + for (ContainerStatus status : containerStatus) { + ret.add(status.getContainerId()); + } + return ret; + } + /** * This helper method will invoke the specified function in parallel for each * end point in the specified list using a thread pool and return the diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java index 72e5f53b551..e42a4da7a12 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java @@ -531,16 +531,14 @@ private void releaseContainersAndAssert(int appId, "new AMRMToken from RM should have been nulled by AMRMProxyService", allocateResponse.getAMRMToken()); - // The way the mock resource manager is setup, it will return the containers - // that were released in the response. This is done because the UAMs run - // asynchronously and we need to if all the resource managers received the - // release it. The containers sent by the mock resource managers will be + // We need to make sure all the resource managers received the + // release list. The containers sent by the mock resource managers will be // aggregated and returned back to us and we can assert if all the release // lists reached the sub-clusters - List containersForReleasedContainerIds = - new ArrayList(); - containersForReleasedContainerIds.addAll(allocateResponse - .getAllocatedContainers()); + List containersForReleasedContainerIds = new ArrayList<>(); + List newlyFinished = getCompletedContainerIds( + allocateResponse.getCompletedContainersStatuses()); + containersForReleasedContainerIds.addAll(newlyFinished); // Send max 10 heart beats to receive all the containers. If not, we will // fail the test @@ -554,8 +552,9 @@ private void releaseContainersAndAssert(int appId, "new AMRMToken from RM should have been nulled by AMRMProxyService", allocateResponse.getAMRMToken()); - containersForReleasedContainerIds.addAll(allocateResponse - .getAllocatedContainers()); + newlyFinished = getCompletedContainerIds( + allocateResponse.getCompletedContainersStatuses()); + containersForReleasedContainerIds.addAll(newlyFinished); LOG.info("Number of containers received in this request: " + Integer.toString(allocateResponse.getAllocatedContainers() diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java index 34b07416a21..7d38d9e0e0a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java @@ -21,7 +21,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; @@ -50,6 +52,10 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; import org.junit.Test; @@ -70,7 +76,9 @@ private TestableFederationInterceptor interceptor; private MemoryFederationStateStore stateStore; + private NMStateStoreService nmStateStore; + private Context nmContext; private int testAppId; private ApplicationAttemptId attemptId; @@ -84,9 +92,15 @@ public void setUp() throws IOException { FederationStateStoreFacade.getInstance().reinitialize(stateStore, getConf()); + nmStateStore = new NMMemoryStateStoreService(); + nmStateStore.init(getConf()); + nmStateStore.start(); + testAppId = 1; attemptId = getApplicationAttemptId(testAppId); - interceptor.init(new AMRMProxyApplicationContextImpl(null, getConf(), + nmContext = + new NMContext(null, null, null, null, nmStateStore, false, getConf()); + interceptor.init(new AMRMProxyApplicationContextImpl(nmContext, getConf(), attemptId, "test-user", null, null)); } @@ -198,18 +212,17 @@ private void releaseContainersAndAssert(List containers) AllocateResponse allocateResponse = interceptor.allocate(allocateRequest); Assert.assertNotNull(allocateResponse); - // The way the mock resource manager is setup, it will return the containers - // that were released in the allocated containers. The release request will - // be split and handled by the corresponding UAM. The release containers - // returned by the mock resource managers will be aggregated and returned - // back to us and we can check if total request size and returned size are - // the same - List containersForReleasedContainerIds = - new ArrayList(); - containersForReleasedContainerIds - .addAll(allocateResponse.getAllocatedContainers()); + // The release request will be split and handled by the corresponding UAM. + // The release containers returned by the mock resource managers will be + // aggregated and returned back to us and we can check if total request size + // and returned size are the same + List containersForReleasedContainerIds = + new ArrayList(); + List newlyFinished = getCompletedContainerIds( + allocateResponse.getCompletedContainersStatuses()); + containersForReleasedContainerIds.addAll(newlyFinished); LOG.info("Number of containers received in the original request: " - + Integer.toString(allocateResponse.getAllocatedContainers().size())); + + Integer.toString(newlyFinished.size())); // Send max 10 heart beats to receive all the containers. If not, we will // fail the test @@ -219,11 +232,12 @@ private void releaseContainersAndAssert(List containers) allocateResponse = interceptor.allocate(Records.newRecord(AllocateRequest.class)); Assert.assertNotNull(allocateResponse); - containersForReleasedContainerIds - .addAll(allocateResponse.getAllocatedContainers()); + newlyFinished = getCompletedContainerIds( + allocateResponse.getCompletedContainersStatuses()); + containersForReleasedContainerIds.addAll(newlyFinished); LOG.info("Number of containers received in this request: " - + Integer.toString(allocateResponse.getAllocatedContainers().size())); + + Integer.toString(newlyFinished.size())); LOG.info("Total number of containers received: " + Integer.toString(containersForReleasedContainerIds.size())); Thread.sleep(10); @@ -416,6 +430,64 @@ public RegisterApplicationMasterResponse call() throws Exception { } } + @Test + public void testRecover() throws Exception { + + // Register the application + RegisterApplicationMasterRequest registerReq = + Records.newRecord(RegisterApplicationMasterRequest.class); + registerReq.setHost(Integer.toString(testAppId)); + registerReq.setRpcPort(testAppId); + registerReq.setTrackingUrl(""); + + RegisterApplicationMasterResponse registerResponse = + interceptor.registerApplicationMaster(registerReq); + Assert.assertNotNull(registerResponse); + + Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); + + // Allocate one batch of containers + registerSubCluster(SubClusterId.newInstance("SC-1")); + registerSubCluster(SubClusterId.newInstance(HOME_SC_ID)); + + int numberOfContainers = 3; + List containers = + getContainersAndAssert(numberOfContainers, numberOfContainers * 2); + Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); + + // Prepare for Federation Interceptor restart and recover + Map recoveredDataMap = + recoverDataMapForAppAttempt(nmStateStore, attemptId); + + // Preserve the mock RM instances + MockResourceManagerFacade homeRM = interceptor.getHomeRM(); + ConcurrentHashMap secondaries = + interceptor.getSecondaryRMs(); + + // Create a new intercepter instance and recover + interceptor = new TestableFederationInterceptor(homeRM, secondaries); + interceptor.init(new AMRMProxyApplicationContextImpl(nmContext, getConf(), + attemptId, "test-user", null, null)); + interceptor.recover(recoveredDataMap); + + Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); + + // Release all containers + releaseContainersAndAssert(containers); + + // Finish the application + FinishApplicationMasterRequest finishReq = + Records.newRecord(FinishApplicationMasterRequest.class); + finishReq.setDiagnostics(""); + finishReq.setTrackingUrl(""); + finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); + + FinishApplicationMasterResponse finshResponse = + interceptor.finishApplicationMaster(finishReq); + Assert.assertNotNull(finshResponse); + Assert.assertEquals(true, finshResponse.getIsUnregistered()); + } + @Test public void testRequestInterceptorChainCreation() throws Exception { RequestInterceptor root = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java index d4b8735d464..6559917150c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java @@ -26,7 +26,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; @@ -44,22 +43,33 @@ private AtomicInteger runningIndex = new AtomicInteger(0); private MockResourceManagerFacade mockRm; + public TestableFederationInterceptor() { + } + + public TestableFederationInterceptor(MockResourceManagerFacade homeRM, + ConcurrentHashMap secondaries) { + this(); + mockRm = homeRM; + secondaryResourceManagers = secondaries; + } + @Override protected UnmanagedAMPoolManager createUnmanagedAMPoolManager( ExecutorService threadPool) { return new TestableUnmanagedAMPoolManager(threadPool); } + @SuppressWarnings("unchecked") @Override - protected ApplicationMasterProtocol createHomeRMProxy( - AMRMProxyApplicationContext appContext) { + protected T createHomeRMProxy(AMRMProxyApplicationContext appContext, + Class protocol, UserGroupInformation user) { synchronized (this) { if (mockRm == null) { mockRm = new MockResourceManagerFacade( new YarnConfiguration(super.getConf()), 0); } } - return mockRm; + return (T) mockRm; } @SuppressWarnings("unchecked") @@ -68,7 +78,7 @@ protected ApplicationMasterProtocol createHomeRMProxy( // We create one instance of the mock resource manager per sub cluster. Keep // track of the instances of the RMs in the map keyed by the sub cluster id synchronized (this.secondaryResourceManagers) { - if (this.secondaryResourceManagers.contains(subClusterId)) { + if (this.secondaryResourceManagers.containsKey(subClusterId)) { return (T) this.secondaryResourceManagers.get(subClusterId); } else { // The running index here is used to simulate different RM_EPOCH to @@ -91,6 +101,15 @@ protected void setShouldReRegisterNext() { } } + protected MockResourceManagerFacade getHomeRM() { + return mockRm; + } + + protected ConcurrentHashMap + getSecondaryRMs() { + return secondaryResourceManagers; + } + /** * Extends the UnmanagedAMPoolManager and overrides methods to provide a * testable implementation of UnmanagedAMPoolManager.