diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java index b5727aa..15e1cea 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java @@ -111,6 +111,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -622,7 +623,20 @@ public GetContainersResponse getContainers(GetContainersRequest request) validateRunning(); - return GetContainersResponse.newInstance(null); + ApplicationAttemptId attemptId = request.getApplicationAttemptId(); + List containers = new ArrayList<>(); + synchronized (applicationContainerIdMap) { + // Return the list of running containers that were being tracked for this + // application + Assert.assertTrue("The application id is NOT registered: " + attemptId, + applicationContainerIdMap.containsKey(attemptId)); + List ids = applicationContainerIdMap.get(attemptId); + for (ContainerId c : ids) { + containers.add(ContainerReport.newInstance(c, null, null, null, 0, 0, + null, null, 0, null, null)); + } + } + return GetContainersResponse.newInstance(containers); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java index ef5e061..d1fae3e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java @@ -37,16 +37,23 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.PreemptionContract; @@ -59,6 +66,8 @@ import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; @@ -90,6 +99,13 @@ private static final Logger LOG = LoggerFactory.getLogger(FederationInterceptor.class); + private static final String NMSS_CLASS_PREFIX = "FederationInterceptor/"; + + private static final String NMSS_REG_REQUEST_KEY = + NMSS_CLASS_PREFIX + "registerRequest"; + private static final String NMSS_REG_RESPONSE_KEY = + NMSS_CLASS_PREFIX + "registerResponse"; + /** * The home sub-cluster is the sub-cluster where the AM container is running * in. @@ -194,7 +210,8 @@ public void init(AMRMProxyApplicationContext appContext) { this.homeSubClusterId = SubClusterId.newInstance(YarnConfiguration.getClusterId(conf)); - this.homeRM = createHomeRMProxy(appContext); + this.homeRM = createHomeRMProxy(appContext, ApplicationMasterProtocol.class, + this.appOwner); this.federationFacade = FederationStateStoreFacade.getInstance(); this.subClusterResolver = this.federationFacade.getSubClusterResolver(); @@ -211,6 +228,119 @@ public void init(AMRMProxyApplicationContext appContext) { } } + @Override + public void recover(Map recoveredDataMap) { + super.recover(recoveredDataMap); + LOG.info("Recovering data for FederationInterceptor"); + if (recoveredDataMap == null) { + return; + } + if (this.registryClient == null) { + throw new RuntimeException("registryClient is null"); + } + + ApplicationAttemptId attemptId = + getApplicationContext().getApplicationAttemptId(); + try { + if (recoveredDataMap.containsKey(NMSS_REG_REQUEST_KEY)) { + RegisterApplicationMasterRequestProto pb = + RegisterApplicationMasterRequestProto + .parseFrom(recoveredDataMap.get(NMSS_REG_REQUEST_KEY)); + this.amRegistrationRequest = + new RegisterApplicationMasterRequestPBImpl(pb); + LOG.info("amRegistrationRequest recovered for {}", attemptId); + } + if (recoveredDataMap.containsKey(NMSS_REG_RESPONSE_KEY)) { + RegisterApplicationMasterResponseProto pb = + RegisterApplicationMasterResponseProto + .parseFrom(recoveredDataMap.get(NMSS_REG_RESPONSE_KEY)); + this.amRegistrationResponse = + new RegisterApplicationMasterResponsePBImpl(pb); + LOG.info("amRegistrationResponse recovered for {}", attemptId); + } + + int containers = 0; + // Recover UAMs from registry + Map> uamMap = this.registryClient + .loadStateFromRegistry(attemptId.getApplicationId()); + LOG.info("Found {} existing UAMs for application {} in Yarn Registry", + uamMap.size(), attemptId.getApplicationId()); + for (Map.Entry> entry : uamMap + .entrySet()) { + SubClusterId subClusterId = SubClusterId.newInstance(entry.getKey()); + + // Create a config loaded with federation on and subclusterId + // for each UAM + YarnConfiguration config = new YarnConfiguration(getConf()); + FederationProxyProviderUtil.updateConfForFederation(config, + subClusterId.getId()); + + try { + this.uamPool.reAttachUAM(subClusterId.getId(), config, + attemptId.getApplicationId(), + this.amRegistrationResponse.getQueue(), + getApplicationContext().getUser(), this.homeSubClusterId.getId(), + entry.getValue()); + + RegisterApplicationMasterResponse response = + this.uamPool.registerApplicationMaster(subClusterId.getId(), + this.amRegistrationRequest); + + // Running containers from secondary RMs + for (Container container : response + .getContainersFromPreviousAttempts()) { + containerIdToSubClusterIdMap.put(container.getId(), subClusterId); + containers++; + } + LOG.info("Recovered {} running containers from UAM in {}", + response.getContainersFromPreviousAttempts().size(), + subClusterId); + + } catch (Exception e) { + LOG.error( + "Error reattaching UAM to " + subClusterId + " for " + attemptId, + e); + } + } + + // Get the running containers from home RM, note that we will also get the + // AM container itself from here. We don't need it, but no harm to put the + // map as well. + UserGroupInformation appSubmitter = UserGroupInformation + .createRemoteUser(getApplicationContext().getUser()); + ApplicationClientProtocol rmClient = + createHomeRMProxy(getApplicationContext(), + ApplicationClientProtocol.class, appSubmitter); + + GetContainersResponse response = + rmClient.getContainers(GetContainersRequest.newInstance(attemptId)); + for (ContainerReport container : response.getContainerList()) { + containerIdToSubClusterIdMap.put(container.getContainerId(), + this.homeSubClusterId); + containers++; + LOG.info(" From home RM " + this.homeSubClusterId + + " running container " + container.getContainerId()); + } + LOG.info("{} running containers including AM recovered from home RM ", + response.getContainerList().size(), this.homeSubClusterId); + + LOG.info( + "In all {} UAMs {} running containers including AM recovered for {}", + uamMap.size(), containers, attemptId); + + if (this.amRegistrationResponse != null) { + // Initialize the AMRMProxyPolicy + String queue = this.amRegistrationResponse.getQueue(); + this.policyInterpreter = + FederationPolicyUtils.loadAMRMPolicy(queue, this.policyInterpreter, + getConf(), this.federationFacade, this.homeSubClusterId); + } + } catch (IOException | YarnException e) { + throw new YarnRuntimeException(e); + } + + } + /** * Sends the application master's registration request to the home RM. * @@ -242,6 +372,19 @@ public void init(AMRMProxyApplicationContext appContext) { // Save the registration request. This will be used for registering with // secondary sub-clusters using UAMs, as well as re-register later this.amRegistrationRequest = request; + if (getNMStateStore() != null) { + try { + RegisterApplicationMasterRequestPBImpl pb = + (RegisterApplicationMasterRequestPBImpl) + this.amRegistrationRequest; + getNMStateStore().storeAMRMProxyAppContextEntry( + getApplicationContext().getApplicationAttemptId(), + NMSS_REG_REQUEST_KEY, pb.getProto().toByteArray()); + } catch (Exception e) { + LOG.error("Error storing AMRMProxy application context entry for " + + getApplicationContext().getApplicationAttemptId(), e); + } + } } /* @@ -278,6 +421,19 @@ public void init(AMRMProxyApplicationContext appContext) { getApplicationContext().getApplicationAttemptId().getApplicationId(); reAttachUAMAndMergeRegisterResponse(this.amRegistrationResponse, appId); + if (getNMStateStore() != null) { + try { + RegisterApplicationMasterResponsePBImpl pb = + (RegisterApplicationMasterResponsePBImpl) this.amRegistrationResponse; + getNMStateStore().storeAMRMProxyAppContextEntry( + getApplicationContext().getApplicationAttemptId(), + NMSS_REG_RESPONSE_KEY, pb.getProto().toByteArray()); + } catch (Exception e) { + LOG.error("Error storing AMRMProxy application context entry for " + + getApplicationContext().getApplicationAttemptId(), e); + } + } + // the queue this application belongs will be used for getting // AMRMProxy policy from state store. String queue = this.amRegistrationResponse.getQueue(); @@ -510,18 +666,20 @@ protected UnmanagedAMPoolManager createUnmanagedAMPoolManager( } /** - * Returns instance of the ApplicationMasterProtocol proxy class that is used - * to connect to the Home resource manager. + * Create a proxy instance that is used to connect to the Home resource + * manager. * * @param appContext AMRMProxyApplicationContext + * @param protocol the protocol class for the proxy + * @param user the ugi for the proxy + * @param the type of the proxy * @return the proxy created */ - protected ApplicationMasterProtocol createHomeRMProxy( - AMRMProxyApplicationContext appContext) { + protected T createHomeRMProxy(AMRMProxyApplicationContext appContext, + Class protocol, UserGroupInformation user) { try { return FederationProxyProviderUtil.createRMProxy(appContext.getConf(), - ApplicationMasterProtocol.class, this.homeSubClusterId, this.appOwner, - appContext.getAMRMToken()); + protocol, this.homeSubClusterId, user, appContext.getAMRMToken()); } catch (Exception ex) { throw new YarnRuntimeException(ex); } @@ -952,11 +1110,6 @@ public RegisterApplicationMasterResponseInfo call() private AllocateResponse mergeAllocateResponses( AllocateResponse homeResponse) { // Timing issue, we need to remove the completed and then save the new ones. - if (LOG.isDebugEnabled()) { - LOG.debug("Remove containers: " - + homeResponse.getCompletedContainersStatuses()); - LOG.debug("Adding containers: " + homeResponse.getAllocatedContainers()); - } removeFinishedContainersFromCache( homeResponse.getCompletedContainersStatuses()); cacheAllocatedContainers(homeResponse.getAllocatedContainers(), @@ -989,6 +1142,7 @@ private AllocateResponse mergeAllocateResponses( private void removeFinishedContainersFromCache( List finishedContainers) { for (ContainerStatus container : finishedContainers) { + LOG.debug("Completed container {}", container); if (containerIdToSubClusterIdMap .containsKey(container.getContainerId())) { containerIdToSubClusterIdMap.remove(container.getContainerId()); @@ -1146,12 +1300,22 @@ private void mergeAllocateResponse(AllocateResponse homeResponse, private void cacheAllocatedContainers(List containers, SubClusterId subClusterId) { for (Container container : containers) { + LOG.debug("Adding container {}", container); if (containerIdToSubClusterIdMap.containsKey(container.getId())) { SubClusterId existingSubClusterId = containerIdToSubClusterIdMap.get(container.getId()); if (existingSubClusterId.equals(subClusterId)) { - // When RM fails over, the new RM master might send out the same - // container allocation more than once. Just move on in this case. + /* + * When RM fails over, the new RM master might send out the same + * container allocation more than once. + * + * It is also possible because of a recent NM restart with NM recovery + * enabled. We recover running containers from RM. But RM might not + * notified AM of some of these containers yet. When RM dose notify, + * we will already have these containers in the map. + * + * Either case, just warn and move on. + */ LOG.warn( "Duplicate containerID: {} found in the allocated containers" + " from same sub-cluster: {}, so ignoring.", @@ -1226,7 +1390,9 @@ private static AllocateRequest createAllocateRequest() { */ private boolean warnIfNotExists(ContainerId containerId, String actionName) { if (!this.containerIdToSubClusterIdMap.containsKey(containerId)) { - LOG.error("AM is trying to {} a container {} that does not exist. ", + LOG.error( + "AM is trying to {} a container {} that does not exist. Might happen " + + "shortly after NM restart when NM recovery is enabled", actionName, containerId.toString()); return false; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index da1d047..0319dbe 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; @@ -180,6 +181,20 @@ protected Context createContext() { return new NMContext(null, null, null, null, stateStore, false, this.conf); } + // A utility method for intercepter recover unit test + protected Map recoverDataMapForAppAttempt( + NMStateStoreService nmStateStore, ApplicationAttemptId attemptId) + throws IOException { + RecoveredAMRMProxyState state = nmStateStore.loadAMRMProxyState(); + for (Map.Entry> entry : state + .getAppContexts().entrySet()) { + if (entry.getKey().equals(attemptId)) { + return entry.getValue(); + } + } + return null; + } + protected List getCompletedContainerIds( List containerStatus) { List ret = new ArrayList<>(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java index aa7ed69..67336e5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java @@ -450,6 +450,70 @@ public RegisterApplicationMasterResponse call() throws Exception { } @Test + public void testRecover() throws Exception { + ApplicationUserInfo userInfo = getApplicationUserInfo(testAppId); + userInfo.getUser().doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + // Register the application + RegisterApplicationMasterRequest registerReq = + Records.newRecord(RegisterApplicationMasterRequest.class); + registerReq.setHost(Integer.toString(testAppId)); + registerReq.setRpcPort(testAppId); + registerReq.setTrackingUrl(""); + + RegisterApplicationMasterResponse registerResponse = + interceptor.registerApplicationMaster(registerReq); + Assert.assertNotNull(registerResponse); + + Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); + + // Allocate one batch of containers + registerSubCluster(SubClusterId.newInstance("SC-1")); + registerSubCluster(SubClusterId.newInstance(HOME_SC_ID)); + + int numberOfContainers = 3; + List containers = + getContainersAndAssert(numberOfContainers, numberOfContainers * 2); + Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); + + // Prepare for Federation Interceptor restart and recover + Map recoveredDataMap = + recoverDataMapForAppAttempt(nmStateStore, attemptId); + + // Preserve the mock RM instances + MockResourceManagerFacade homeRM = interceptor.getHomeRM(); + ConcurrentHashMap secondaries = + interceptor.getSecondaryRMs(); + + // Create a new intercepter instance and recover + interceptor = new TestableFederationInterceptor(homeRM, secondaries); + interceptor.init(new AMRMProxyApplicationContextImpl(nmContext, + getConf(), attemptId, "test-user", null, null, null, registry)); + interceptor.recover(recoveredDataMap); + + Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); + + // Release all containers + releaseContainersAndAssert(containers); + + // Finish the application + FinishApplicationMasterRequest finishReq = + Records.newRecord(FinishApplicationMasterRequest.class); + finishReq.setDiagnostics(""); + finishReq.setTrackingUrl(""); + finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); + + FinishApplicationMasterResponse finshResponse = + interceptor.finishApplicationMaster(finishReq); + Assert.assertNotNull(finshResponse); + Assert.assertEquals(true, finshResponse.getIsUnregistered()); + return null; + } + }); + } + + @Test public void testRequestInterceptorChainCreation() throws Exception { RequestInterceptor root = super.getAMRMProxyService().createRequestInterceptorChain(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java index 23c80ae..1088c69 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java @@ -26,7 +26,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; @@ -59,16 +58,17 @@ protected UnmanagedAMPoolManager createUnmanagedAMPoolManager( return new TestableUnmanagedAMPoolManager(threadPool); } + @SuppressWarnings("unchecked") @Override - protected ApplicationMasterProtocol createHomeRMProxy( - AMRMProxyApplicationContext appContext) { + protected T createHomeRMProxy(AMRMProxyApplicationContext appContext, + Class protocol, UserGroupInformation user) { synchronized (this) { if (mockRm == null) { mockRm = new MockResourceManagerFacade( new YarnConfiguration(super.getConf()), 0); } } - return mockRm; + return (T) mockRm; } @SuppressWarnings("unchecked")