diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyServiceUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyServiceUtils.java new file mode 100644 index 0000000..d1a6b49 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyServiceUtils.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; + +public class AMRMProxyServiceUtils { + public static void updateAMRMToken(Token token, + UserGroupInformation user, Configuration conf) throws IOException { + org.apache.hadoop.security.token.Token amrmToken = + new org.apache.hadoop.security.token.Token( + token.getIdentifier().array(), token.getPassword().array(), + new Text(token.getKind()), new Text(token.getService())); + // Preserve the token service sent by the RM when adding the token + // to ensure we replace the previous token setup by the RM. + // Afterwards we can update the service address for the RPC layer. + user.addToken(amrmToken); + amrmToken.setService(ClientRMProxy.getAMRMTokenService(conf)); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java index 22fc8f6..c517040 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java @@ -134,7 +134,8 @@ public AllocateResponse allocate(final AllocateRequest request) } AllocateResponse allocateResponse = rmClient.allocate(request); if (allocateResponse.getAMRMToken() != null) { - updateAMRMToken(allocateResponse.getAMRMToken()); + AMRMProxyServiceUtils.updateAMRMToken(allocateResponse.getAMRMToken(), + this.user, getConf()); } return allocateResponse; @@ -170,7 +171,9 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling( ((DistributedSchedulingAMProtocol)rmClient) .allocateForDistributedScheduling(request); if (allocateResponse.getAllocateResponse().getAMRMToken() != null) { - updateAMRMToken(allocateResponse.getAllocateResponse().getAMRMToken()); + AMRMProxyServiceUtils.updateAMRMToken( + allocateResponse.getAllocateResponse().getAMRMToken(), this.user, + getConf()); } return allocateResponse; } else { @@ -195,18 +198,6 @@ public void setNextInterceptor(RequestInterceptor next) { + "Check if the interceptor pipeline configuration is correct"); } - private void updateAMRMToken(Token token) throws IOException { - org.apache.hadoop.security.token.Token amrmToken = - new org.apache.hadoop.security.token.Token( - token.getIdentifier().array(), token.getPassword().array(), - new Text(token.getKind()), new Text(token.getService())); - // Preserve the token service sent by the RM when adding the token - // to ensure we replace the previous token setup by the RM. - // Afterwards we can update the service address for the RPC layer. - user.addToken(amrmToken); - amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConf())); - } - @VisibleForTesting public void setRMClient(final ApplicationMasterProtocol rmClient) { if (rmClient instanceof DistributedSchedulingAMProtocol) { @@ -257,19 +248,12 @@ private static void setAMRMTokenService(final Configuration conf) for (org.apache.hadoop.security.token.Token token : UserGroupInformation .getCurrentUser().getTokens()) { if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { - token.setService(getAMRMTokenService(conf)); + token.setService(ClientRMProxy.getAMRMTokenService(conf)); } } } @InterfaceStability.Unstable - public static Text getAMRMTokenService(Configuration conf) { - return getTokenService(conf, YarnConfiguration.RM_SCHEDULER_ADDRESS, - YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, - YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); - } - - @InterfaceStability.Unstable public static Text getTokenService(Configuration conf, String address, String defaultAddr, int defaultPort) { if (HAUtil.isHAEnabled(conf)) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java new file mode 100644 index 0000000..bf1bc11 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java @@ -0,0 +1,567 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; +import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyApplicationContext; +import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyServiceUtils; +import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AbstractRequestInterceptor; +import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Extends the AbstractRequestInterceptor and provides an implementation for + * federation of YARN RM and scaling an application across multiple YARN + * sub-clusters. All the federation specific implementation is encapsulated in + * this class. This is always the last intercepter in the chain. + */ +public class FederationInterceptor extends AbstractRequestInterceptor { + private static final Logger LOG = + LoggerFactory.getLogger(FederationInterceptor.class); + /** + * The home sub-cluster resource manager proxy instance. + */ + private ApplicationMasterProtocol homeRM; + + /** + * Used to keep track of the container Id and the sub cluster RM that created + * the container. + */ + private Map containerIdToSubClusterIdMap; + + /** + * The original registration request that was sent by the AM. This instance is + * reused to register with all the sub-cluster RMs. + */ + private RegisterApplicationMasterRequest amRegistrationRequest; + + /** + * The original registration response from home RM. This instance is reused + * for duplicate register request from AM, triggered by timeout between AM and + * AMRMProxy. + */ + private RegisterApplicationMasterResponse amRegistrationResponse; + + // Queue of the application + private String queue; + + private SubClusterId homeSubClusterId; + + // The proxy ugi used to talk to home RM + private UserGroupInformation appOwner; + + // Time out for calls into RM + long rmTimeoutMs; + + /** + * Creates an instance of the FederationInterceptor class. + */ + public FederationInterceptor() { + this.containerIdToSubClusterIdMap = + new ConcurrentHashMap(); + this.amRegistrationResponse = null; + } + + /** + * Helper method to create instances of Object using the class name specified + * in the configuration object. + * + * @param conf + * @param configuredClassName + * @param defaultValue + * @param type + * @return instance created + */ + @SuppressWarnings("unchecked") + protected static T createInstance(Configuration conf, + String configuredClassName, String defaultValue, Class type) { + + String className = conf.get(configuredClassName, defaultValue); + try { + Class clusterResolverClass = conf.getClassByName(className); + if (type.isAssignableFrom(clusterResolverClass)) { + return (T) ReflectionUtils.newInstance(clusterResolverClass, conf); + } else { + throw new YarnRuntimeException("Class: " + className + + " not instance of " + type.getCanonicalName()); + } + } catch (ClassNotFoundException e) { + throw new YarnRuntimeException("Could not instantiate : " + className, e); + } + } + + /** + * Initializes the instance using specified context. + */ + @Override + public void init(AMRMProxyApplicationContext appContext) { + LOG.info("Initializing Federation Interceptor"); + super.init(appContext); + + // Update the conf if available + Configuration conf = appContext.getConf(); + if (conf == null) { + conf = getConf(); + } else { + setConf(conf); + } + + try { + this.appOwner = UserGroupInformation.createProxyUser(appContext.getUser(), + UserGroupInformation.getCurrentUser()); + } catch (Exception ex) { + throw new YarnRuntimeException(ex); + } + + this.homeSubClusterId = + SubClusterId.newInstance(YarnConfiguration.getClusterId(conf)); + this.homeRM = createHomeRMProxy(appContext, this.appOwner); + + this.rmTimeoutMs = conf.getLong( + YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_TIMEOUT_MS, + YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_TIMEOUT_MS); + } + + /** + * Sends the application master's registration request to the home RM. + * + * Between AM and AMRMProxy, FederationInterceptor modifies the RM behavior, + * so that when AM registers more than once, it returns the same register + * success response instead of throwing + * {@link InvalidApplicationMasterRequestException} + * + * We did this because FederationInterceptor can receive concurrent register + * requests from AM because of timeout between AM and AMRMProxy, which is + * shorter than the timeout + failOver between FederationInterceptor + * (AMRMProxy) and RM. + */ + @Override + public RegisterApplicationMasterResponse registerApplicationMaster( + RegisterApplicationMasterRequest request) + throws YarnException, IOException { + // If AM is calling with a different request, complain + if (this.amRegistrationRequest != null + && !this.amRegistrationRequest.equals(request)) { + throw new YarnException("A different request body recieved. AM should" + + " not call registerApplicationMaster with different request body"); + } + + // Save the registration request. This will be used for registering + // with the other UAMs later + this.amRegistrationRequest = request; + + // It is possible for AM to send duplicate register request because of + // timeout. If this is the case, simply return the success message. Out of + // all outstanding register threads, only the last one will still have an + // unbroken RPC connection and successfully return the response. + if (this.amRegistrationResponse != null) { + return this.amRegistrationResponse; + } + + // Send a registration request to the home resource manager. Note that here + // we don't register with other sub-cluster resource managers because that + // will prevent us from using new sub-clusters that get added while the AM + // is running and will breaks the elasticity feature. The registration with + // the other sub-cluster RM will be done lazily as needed later. + try { + this.amRegistrationResponse = + this.homeRM.registerApplicationMaster(request); + } catch (InvalidApplicationMasterRequestException e) { + if (e.getMessage().contains("Application Master is already registered")) { + // Some other register thread might have succeeded in the meantime + if (this.amRegistrationResponse != null) { + LOG.info("Other concurrent thread registered successfully, " + + "simply return the same success register response"); + return this.amRegistrationResponse; + } + } + // This is a real issue, throw back to AM + throw e; + } + + // the queue this application belongs will be used for getting + // AMRMProxy policy from state store. + this.queue = this.amRegistrationResponse.getQueue(); + if (this.queue == null) { + LOG.warn("Received null queue for application " + + getApplicationContext().getApplicationAttemptId().getApplicationId() + + " from home subcluster. Will use default queue name " + + YarnConfiguration.DEFAULT_QUEUE_NAME + + " for getting AMRMProxyPolicy"); + } else { + LOG.info("Application " + + getApplicationContext().getApplicationAttemptId().getApplicationId() + + " belongs to queue " + this.queue); + } + + return this.amRegistrationResponse; + } + + /** + * Sends the heart beats to the home RM and the secondary sub-cluster RMs that + * are being used by the application. + */ + @Override + public AllocateResponse allocate(AllocateRequest request) + throws YarnException { + + try { + // Split the heart beat request into multiple requests, one for each + // sub-cluster RM that is used by this application. + Map requests = + splitAllocateRequest(request); + + // Send the request to the home RM and get the response + AllocateRequest homeRequest = requests.get(this.homeSubClusterId); + + AllocateResponse homeResponse = null; + try { + homeResponse = this.homeRM.allocate(homeRequest); + } catch (ApplicationMasterNotRegisteredException ex) { + LOG.info("AM not registered, most likely due to Home RM failover. " + + "Trying to re-register."); + try { + this.homeRM.registerApplicationMaster(amRegistrationRequest); + homeResponse = this.homeRM.allocate(homeRequest); + } catch (Exception e) { + LOG.error("Error trying to re-register AM", e); + throw new YarnException(e); + } + } + + // If the resource manager sent us a new token, add to the current user + if (homeResponse.getAMRMToken() != null) { + LOG.debug("Received new AMRMToken"); + AMRMProxyServiceUtils.updateAMRMToken(homeResponse.getAMRMToken(), + this.appOwner, getConf()); + } + + // Merge the responses from home and secondary sub-cluster RMs + homeResponse = mergeAllocateResponses(homeResponse); + + // return the final response to the application master. + return homeResponse; + } catch (IOException ex) { + LOG.error("Exception encountered while processing heart beat", ex); + throw new YarnException(ex); + } + } + + /** + * Sends the finish application master request to all the resource managers + * used by the application. + */ + @Override + public FinishApplicationMasterResponse finishApplicationMaster( + FinishApplicationMasterRequest request) + throws YarnException, IOException { + + FinishApplicationMasterResponse homeResponse = + homeRM.finishApplicationMaster(request); + + return homeResponse; + } + + @Override + public void setNextInterceptor(RequestInterceptor next) { + throw new YarnRuntimeException( + "setNextInterceptor is being called on FederationManager. " + + "This should always be used as the last interceptor in the chain"); + } + + /** + * This is called when the application pipeline is being destroyed. We will + * release all the resources that we are holding in this call. + */ + @Override + public void shutdown() { + super.shutdown(); + } + + /** + * Returns instance of the ApplicationMasterProtocol proxy class that is used + * to connect to the Home resource manager. + */ + protected ApplicationMasterProtocol createHomeRMProxy( + AMRMProxyApplicationContext appContext, UserGroupInformation appOwner) { + try { + return FederationProxyProviderUtil.createRMProxy(appContext.getConf(), + ApplicationMasterProtocol.class, this.homeSubClusterId, appOwner, + appContext.getAMRMToken()); + } catch (Exception ex) { + throw new YarnRuntimeException(ex); + } + } + + /** + * In federation, the heart beat request needs to be sent to all the sub + * clusters from which the AM has requested containers. This method splits the + * specified AllocateRequest from the AM and creates a new request for each + * sub-cluster RM. + */ + private Map splitAllocateRequest( + AllocateRequest request) throws YarnException { + Map requestMap = + new HashMap(); + + // Create heart beat request for home sub-cluster resource manager + findOrCreateAllocateRequestForSubCluster(this.homeSubClusterId, request, + requestMap); + + if (!isNullOrEmpty(request.getAskList())) { + AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster( + this.homeSubClusterId, request, requestMap); + newRequest.getAskList().addAll(request.getAskList()); + } + + if (request.getResourceBlacklistRequest() != null && !isNullOrEmpty( + request.getResourceBlacklistRequest().getBlacklistAdditions())) { + for (String resourceName : request.getResourceBlacklistRequest() + .getBlacklistAdditions()) { + AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster( + this.homeSubClusterId, request, requestMap); + newRequest.getResourceBlacklistRequest().getBlacklistAdditions() + .add(resourceName); + } + } + + if (request.getResourceBlacklistRequest() != null && !isNullOrEmpty( + request.getResourceBlacklistRequest().getBlacklistRemovals())) { + for (String resourceName : request.getResourceBlacklistRequest() + .getBlacklistRemovals()) { + AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster( + this.homeSubClusterId, request, requestMap); + newRequest.getResourceBlacklistRequest().getBlacklistRemovals() + .add(resourceName); + } + } + + synchronized (this.containerIdToSubClusterIdMap) { + if (!isNullOrEmpty(request.getReleaseList())) { + for (ContainerId cid : request.getReleaseList()) { + if (warnIfNotExists(cid)) { + SubClusterId subClusterId = + this.containerIdToSubClusterIdMap.get(cid); + AllocateRequest newRequest = requestMap.get(subClusterId); + newRequest.getReleaseList().add(cid); + } + } + } + + if (!isNullOrEmpty(request.getUpdateRequests())) { + for (UpdateContainerRequest ucr : request.getUpdateRequests()) { + if (warnIfNotExists(ucr.getContainerId())) { + SubClusterId subClusterId = + this.containerIdToSubClusterIdMap.get(ucr.getContainerId()); + AllocateRequest newRequest = requestMap.get(subClusterId); + newRequest.getUpdateRequests().add(ucr); + } + } + } + } + + return requestMap; + } + + /** + * Merges the responses from other sub-clusters that we received + * asynchronously with the specified home cluster response and keeps track of + * the containers received from each sub-cluster resource managers. + */ + private AllocateResponse mergeAllocateResponses( + AllocateResponse homeResponse) { + // Timing issue, we need to remove the completed and then save the new ones. + if (LOG.isDebugEnabled()) { + LOG.debug("Remove containers: " + + homeResponse.getCompletedContainersStatuses()); + LOG.debug("Adding containers: " + homeResponse.getAllocatedContainers()); + } + removeFinishedContainersFromCache( + homeResponse.getCompletedContainersStatuses()); + cacheAllocatedContainers(homeResponse.getAllocatedContainers(), + this.homeSubClusterId); + + return homeResponse; + } + + /** + * Removes the finished containers from the local cache. + */ + private void removeFinishedContainersFromCache( + List finishedContainers) { + synchronized (this.containerIdToSubClusterIdMap) { + for (ContainerStatus container : finishedContainers) { + if (containerIdToSubClusterIdMap + .containsKey(container.getContainerId())) { + containerIdToSubClusterIdMap.remove(container.getContainerId()); + } + } + } + } + + /** + * Add allocated containers to cache mapping. + */ + private void cacheAllocatedContainers(List containers, + SubClusterId subClusterId) { + synchronized (this.containerIdToSubClusterIdMap) { + for (Container container : containers) { + if (containerIdToSubClusterIdMap.containsKey(container.getId())) { + SubClusterId existingSubClusterId = + containerIdToSubClusterIdMap.get(container.getId()); + + // Log both containerId strings just in case they are not exactly the + // same + ContainerId existingContainerId = null; + for (ContainerId containerId : containerIdToSubClusterIdMap + .keySet()) { + if (containerId.equals(container.getId())) { + existingContainerId = containerId; + break; + } + } + LOG.warn( + "Duplicate containerID found in the allocated" + + " containers: existing {}, new {}", + existingContainerId, container.getId()); + + if (existingSubClusterId.equals(subClusterId)) { + // When RM fails over, the new RM master might send out the same + // container allocation more than once. Just move on in this case. + LOG.warn( + "Duplicate containerID: {} found in the allocated containers" + + " from same subcluster: {}, so ignoring.", + container.getId(), subClusterId); + } else { + // The same container allocation from different subclusters, + // something is wrong. + throw new YarnRuntimeException( + "Duplicate containerID found in the allocated containers. This" + + " can happen if the RM epoch is not configured properly." + + " ContainerId: " + container.getId().toString() + + " ApplicationId: " + + getApplicationContext().getApplicationAttemptId() + + " From RM: " + subClusterId + + " . Previous container was from subcluster: " + + existingSubClusterId); + } + } + + containerIdToSubClusterIdMap.put(container.getId(), subClusterId); + } + } + } + + /** + * Check to see if an AllocateRequest exists in the Map for the specified sub + * cluster. If not found, create a new one, copy the value of responseId and + * progress from the orignialAMRequest, save it in the specified Map and + * return the new instance. If found, just return the old instance. + */ + private static AllocateRequest findOrCreateAllocateRequestForSubCluster( + SubClusterId subClusterId, AllocateRequest originalAMRequest, + Map requestMap) { + AllocateRequest newRequest = null; + if (requestMap.containsKey(subClusterId)) { + newRequest = requestMap.get(subClusterId); + } else { + newRequest = createAllocateRequest(); + newRequest.setResponseId(originalAMRequest.getResponseId()); + newRequest.setProgress(originalAMRequest.getProgress()); + requestMap.put(subClusterId, newRequest); + } + + return newRequest; + } + + /** + * Create an empty AllocateRequest instance. + */ + private static AllocateRequest createAllocateRequest() { + AllocateRequest request = + AllocateRequest.newInstance(0, 0, null, null, null); + request.setAskList(new ArrayList()); + request.setReleaseList(new ArrayList()); + ResourceBlacklistRequest blackList = + ResourceBlacklistRequest.newInstance(null, null); + blackList.setBlacklistAdditions(new ArrayList()); + blackList.setBlacklistRemovals(new ArrayList()); + request.setResourceBlacklistRequest(blackList); + request.setUpdateRequests(new ArrayList()); + return request; + } + + /** + * Check to see if the specified containerId exists in the cache and log an + * error if not found. + * + * @param containerId the container id + * @return true if the container exists in the map, false otherwise + */ + private boolean warnIfNotExists(ContainerId containerId) { + if (!this.containerIdToSubClusterIdMap.containsKey(containerId)) { + LOG.error("AM is trying to use a container that does not exist. " + + "ContainerId: " + containerId.toString()); + return false; + } + return true; + } + + /** + * Utility method to check if the specified Collection is null or empty + * + * @param c the collection object + * @param element type of the collection + * @return whether is it is null or empty + */ + public static boolean isNullOrEmpty(Collection c) { + return (c == null || c.size() == 0); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index 6f5009e..a4b6eb7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -104,22 +104,26 @@ protected MockAMRMProxyService getAMRMProxyService() { return this.amrmProxyService; } - @Before - public void setUp() { - this.conf = new YarnConfiguration(); - this.conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true); + protected YarnConfiguration createConfiguration() { + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true); String mockPassThroughInterceptorClass = PassThroughRequestInterceptor.class.getName(); // Create a request intercepter pipeline for testing. The last one in the // chain will call the mock resource manager. The others in the chain will // simply forward it to the next one in the chain - this.conf.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE, - mockPassThroughInterceptorClass + "," - + mockPassThroughInterceptorClass + "," - + mockPassThroughInterceptorClass + "," + conf.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE, + mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass + + "," + mockPassThroughInterceptorClass + "," + MockRequestInterceptor.class.getName()); + return conf; + } + + @Before + public void setUp() { + this.conf = this.createConfiguration(); this.dispatcher = new AsyncDispatcher(); this.dispatcher.init(this.conf); this.dispatcher.start(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java index f584c94..964d534 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java @@ -112,11 +112,13 @@ import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; -import org.eclipse.jetty.util.log.Log; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Mock Resource Manager facade implementation that exposes all the methods @@ -124,8 +126,10 @@ * implementation is expected by the unit test cases. So please change the * implementation with care. */ -public class MockResourceManagerFacade implements - ApplicationMasterProtocol, ApplicationClientProtocol { +public class MockResourceManagerFacade + implements ApplicationMasterProtocol, ApplicationClientProtocol { + private static final Logger LOG = + LoggerFactory.getLogger(MockResourceManagerFacade.class); private HashMap> applicationContainerIdMap = new HashMap>(); @@ -159,16 +163,17 @@ public RegisterApplicationMasterResponse registerApplicationMaster( RegisterApplicationMasterRequest request) throws YarnException, IOException { String amrmToken = getAppIdentifier(); - Log.getLog().info("Registering application attempt: " + amrmToken); + LOG.info("Registering application attempt: " + amrmToken); synchronized (applicationContainerIdMap) { - Assert.assertFalse("The application id is already registered: " - + amrmToken, applicationContainerIdMap.containsKey(amrmToken)); - // Keep track of the containers that are returned to this application - applicationContainerIdMap.put(amrmToken, - new ArrayList()); + if (applicationContainerIdMap.containsKey(amrmToken)) { + throw new InvalidApplicationMasterRequestException( + "Application Master is already registered"); + } + // Keep track of the containers that are returned to this + // application + applicationContainerIdMap.put(amrmToken, new ArrayList()); } - return RegisterApplicationMasterResponse.newInstance(null, null, null, null, null, request.getHost(), null); } @@ -178,7 +183,7 @@ public FinishApplicationMasterResponse finishApplicationMaster( FinishApplicationMasterRequest request) throws YarnException, IOException { String amrmToken = getAppIdentifier(); - Log.getLog().info("Finishing application attempt: " + amrmToken); + LOG.info("Finishing application attempt: " + amrmToken); synchronized (applicationContainerIdMap) { // Remove the containers that were being tracked for this application @@ -255,8 +260,7 @@ public AllocateResponse allocate(AllocateRequest request) if (request.getReleaseList() != null && request.getReleaseList().size() > 0) { - Log.getLog().info("Releasing containers: " - + request.getReleaseList().size()); + LOG.info("Releasing containers: " + request.getReleaseList().size()); synchronized (applicationContainerIdMap) { Assert.assertTrue( "The application id is not registered before allocate(): " @@ -296,7 +300,7 @@ public AllocateResponse allocate(AllocateRequest request) } } - Log.getLog().info("Allocating containers: " + containerList.size() + LOG.info("Allocating containers: " + containerList.size() + " for application attempt: " + conf.get("AMRMTOKEN")); // Always issue a new AMRMToken as if RM rolled master key diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java new file mode 100644 index 0000000..d4afb24 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.BaseAMRMProxyTest; +import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.PassThroughRequestInterceptor; +import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor; +import org.apache.hadoop.yarn.util.Records; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Extends the TestAMRMProxyService and overrides methods in order to use the + * AMRMProxyService's pipeline test cases for testing the FederationInterceptor + * class. The tests for AMRMProxyService has been written cleverly so that it + * can be reused to validate different request intercepter chains. + */ +public class TestFederationInterceptor extends BaseAMRMProxyTest { + private static final Logger LOG = + LoggerFactory.getLogger(TestFederationInterceptor.class); + + public static final String HOME_SC_ID = "SC-home"; + + private FederationInterceptor interceptor; + + private int testAppId; + private ApplicationAttemptId attemptId; + + @Override + public void setUp() { + super.setUp(); + interceptor = new TestableFederationInterceptor(); + + testAppId = 1; + attemptId = getApplicationAttemptId(testAppId); + interceptor.init(new AMRMProxyApplicationContextImpl(null, getConf(), + attemptId, "test-user", null, null)); + } + + @Override + public void tearDown() { + interceptor.shutdown(); + super.tearDown(); + } + + @Override + protected YarnConfiguration createConfiguration() { + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true); + conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true); + String mockPassThroughInterceptorClass = + PassThroughRequestInterceptor.class.getName(); + + // Create a request intercepter pipeline for testing. The last one in the + // chain is the federation intercepter that calls the mock resource manager. + // The others in the chain will simply forward it to the next one in the + // chain + conf.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE, + mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass + + "," + TestableFederationInterceptor.class.getName()); + + conf.set(YarnConfiguration.RM_CLUSTER_ID, HOME_SC_ID); + + return conf; + } + + @Test + public void testRequestInterceptorChainCreation() throws Exception { + RequestInterceptor root = + super.getAMRMProxyService().createRequestInterceptorChain(); + int index = 0; + while (root != null) { + switch (index) { + case 0: + case 1: + Assert.assertEquals(PassThroughRequestInterceptor.class.getName(), + root.getClass().getName()); + break; + case 2: + Assert.assertEquals(TestableFederationInterceptor.class.getName(), + root.getClass().getName()); + break; + } + root = root.getNextInterceptor(); + index++; + } + Assert.assertEquals("The number of interceptors in chain does not match", + Integer.toString(3), Integer.toString(index)); + } + + /** + * Between AM and AMRMProxy, FederationInterceptor modifies the RM behavior, + * so that when AM registers more than once, it returns the same register + * success response instead of throwing + * {@link InvalidApplicationMasterRequestException} + * + * We did this because FederationInterceptor can receive concurrent register + * requests from AM because of timeout between AM and AMRMProxy. This can + * possible since the timeout between FederationInterceptor and RM longer + * because of performFailover + timeout. + */ + @Test + public void testTwoIdenticalRegisterRequest() throws Exception { + // Register the application twice + RegisterApplicationMasterRequest registerReq = + Records.newRecord(RegisterApplicationMasterRequest.class); + registerReq.setHost(Integer.toString(testAppId)); + registerReq.setRpcPort(testAppId); + registerReq.setTrackingUrl(""); + + for (int i = 0; i < 2; i++) { + RegisterApplicationMasterResponse registerResponse = + interceptor.registerApplicationMaster(registerReq); + Assert.assertNotNull(registerResponse); + } + } + + @Test + public void testTwoDifferentRegisterRequest() throws Exception { + // Register the application first time + RegisterApplicationMasterRequest registerReq = + Records.newRecord(RegisterApplicationMasterRequest.class); + registerReq.setHost(Integer.toString(testAppId)); + registerReq.setRpcPort(testAppId); + registerReq.setTrackingUrl(""); + + RegisterApplicationMasterResponse registerResponse = + interceptor.registerApplicationMaster(registerReq); + Assert.assertNotNull(registerResponse); + + // Register the application second time with a different request obj + registerReq = Records.newRecord(RegisterApplicationMasterRequest.class); + registerReq.setHost(Integer.toString(testAppId)); + registerReq.setRpcPort(testAppId); + registerReq.setTrackingUrl("different"); + try { + registerResponse = interceptor.registerApplicationMaster(registerReq); + Assert.fail("Should throw if a different request obj is used"); + } catch (YarnException e) { + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java new file mode 100644 index 0000000..04bd119 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyApplicationContext; +import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.MockResourceManagerFacade; + +/** + * Extends the FederationInterceptor and overrides methods to provide a testable + * implementation of FederationInterceptor. + */ +public class TestableFederationInterceptor extends FederationInterceptor { + private ConcurrentHashMap secondaryResourceManagers = + new ConcurrentHashMap(); + private AtomicInteger runningIndex = new AtomicInteger(0); + private MockResourceManagerFacade mockRm; + + @Override + protected ApplicationMasterProtocol createHomeRMProxy( + final AMRMProxyApplicationContext appContext, + final UserGroupInformation appOwner) { + synchronized (this) { + if (mockRm == null) { + mockRm = new MockResourceManagerFacade( + new YarnConfiguration(super.getConf()), 0); + } + } + return mockRm; + } + + @SuppressWarnings("unchecked") + protected T createSecondaryRMProxy(final Class proxyClass, + final Configuration conf, final String subClusterId) throws IOException { + // We create one instance of the mock resource manager per sub cluster. Keep + // track of the instances of the RMs in the map keyed by the sub cluster id + synchronized (this.secondaryResourceManagers) { + if (this.secondaryResourceManagers.contains(subClusterId)) { + return (T) this.secondaryResourceManagers.get(subClusterId); + } else { + // The running index here is used to simulate different RM_EPOCH to + // generate unique container identifiers in a federation environment + MockResourceManagerFacade rm = new MockResourceManagerFacade( + new Configuration(conf), runningIndex.addAndGet(10000)); + this.secondaryResourceManagers.put(subClusterId, rm); + return (T) rm; + } + } + } + +}