diff --git hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index ee51094..034f03c 100644
--- hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -594,4 +594,11 @@
+
+
+
+
+
+
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyServiceUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyServiceUtils.java
new file mode 100644
index 0000000..86d600c
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyServiceUtils.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+
+/**
+ * Utilities for AMRMProxyService.
+ */
+public final class AMRMProxyServiceUtils {
+
+ private AMRMProxyServiceUtils() {
+ // disabled.
+ }
+
+ /**
+ * Update the ugi with the new AMRMToken issued by RM.
+ *
+ * @param token the new AMRMToken
+ * @param user the ugi for RM connection
+ * @param conf configuration
+ */
+ public static void updateAMRMToken(Token token, UserGroupInformation user,
+ Configuration conf) {
+ org.apache.hadoop.security.token.Token amrmToken =
+ new org.apache.hadoop.security.token.Token(
+ token.getIdentifier().array(), token.getPassword().array(),
+ new Text(token.getKind()), new Text(token.getService()));
+ // Preserve the token service sent by the RM when adding the token
+ // to ensure we replace the previous token setup by the RM.
+ // Afterwards we can update the service address for the RPC layer.
+ user.addToken(amrmToken);
+ amrmToken.setService(ClientRMProxy.getAMRMTokenService(conf));
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
index 22fc8f6..c517040 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
@@ -134,7 +134,8 @@ public AllocateResponse allocate(final AllocateRequest request)
}
AllocateResponse allocateResponse = rmClient.allocate(request);
if (allocateResponse.getAMRMToken() != null) {
- updateAMRMToken(allocateResponse.getAMRMToken());
+ AMRMProxyServiceUtils.updateAMRMToken(allocateResponse.getAMRMToken(),
+ this.user, getConf());
}
return allocateResponse;
@@ -170,7 +171,9 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
((DistributedSchedulingAMProtocol)rmClient)
.allocateForDistributedScheduling(request);
if (allocateResponse.getAllocateResponse().getAMRMToken() != null) {
- updateAMRMToken(allocateResponse.getAllocateResponse().getAMRMToken());
+ AMRMProxyServiceUtils.updateAMRMToken(
+ allocateResponse.getAllocateResponse().getAMRMToken(), this.user,
+ getConf());
}
return allocateResponse;
} else {
@@ -195,18 +198,6 @@ public void setNextInterceptor(RequestInterceptor next) {
+ "Check if the interceptor pipeline configuration is correct");
}
- private void updateAMRMToken(Token token) throws IOException {
- org.apache.hadoop.security.token.Token amrmToken =
- new org.apache.hadoop.security.token.Token(
- token.getIdentifier().array(), token.getPassword().array(),
- new Text(token.getKind()), new Text(token.getService()));
- // Preserve the token service sent by the RM when adding the token
- // to ensure we replace the previous token setup by the RM.
- // Afterwards we can update the service address for the RPC layer.
- user.addToken(amrmToken);
- amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConf()));
- }
-
@VisibleForTesting
public void setRMClient(final ApplicationMasterProtocol rmClient) {
if (rmClient instanceof DistributedSchedulingAMProtocol) {
@@ -257,19 +248,12 @@ private static void setAMRMTokenService(final Configuration conf)
for (org.apache.hadoop.security.token.Token extends TokenIdentifier> token : UserGroupInformation
.getCurrentUser().getTokens()) {
if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
- token.setService(getAMRMTokenService(conf));
+ token.setService(ClientRMProxy.getAMRMTokenService(conf));
}
}
}
@InterfaceStability.Unstable
- public static Text getAMRMTokenService(Configuration conf) {
- return getTokenService(conf, YarnConfiguration.RM_SCHEDULER_ADDRESS,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
- }
-
- @InterfaceStability.Unstable
public static Text getTokenService(Configuration conf, String address,
String defaultAddr, int defaultPort) {
if (HAUtil.isHAEnabled(conf)) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
new file mode 100644
index 0000000..782117c
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
@@ -0,0 +1,539 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
+import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Extends the AbstractRequestInterceptor and provides an implementation for
+ * federation of YARN RM and scaling an application across multiple YARN
+ * sub-clusters. All the federation specific implementation is encapsulated in
+ * this class. This is always the last intercepter in the chain.
+ */
+public class FederationInterceptor extends AbstractRequestInterceptor {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(FederationInterceptor.class);
+ /**
+ * The home sub-cluster resource manager proxy instance.
+ */
+ private ApplicationMasterProtocol homeRM;
+
+ /**
+ * Used to keep track of the container Id and the sub cluster RM that created
+ * the container.
+ */
+ private Map containerIdToSubClusterIdMap;
+
+ /**
+ * The original registration request that was sent by the AM. This instance is
+ * reused to register with all the sub-cluster RMs.
+ */
+ private RegisterApplicationMasterRequest amRegistrationRequest;
+
+ /**
+ * The original registration response from home RM. This instance is reused
+ * for duplicate register request from AM, triggered by timeout between AM and
+ * AMRMProxy.
+ */
+ private RegisterApplicationMasterResponse amRegistrationResponse;
+
+ // Queue of the application
+ private String queue;
+
+ private SubClusterId homeSubClusterId;
+
+ // The proxy ugi used to talk to home RM
+ private UserGroupInformation appOwner;
+
+ // Time out for calls into RM
+ private long rmTimeoutMs;
+
+ /**
+ * Creates an instance of the FederationInterceptor class.
+ */
+ public FederationInterceptor() {
+ this.containerIdToSubClusterIdMap =
+ new ConcurrentHashMap();
+ this.amRegistrationResponse = null;
+ }
+
+ /**
+ * Initializes the instance using specified context.
+ */
+ @Override
+ public void init(AMRMProxyApplicationContext appContext) {
+ LOG.info("Initializing Federation Interceptor");
+ super.init(appContext);
+
+ // Update the conf if available
+ Configuration conf = appContext.getConf();
+ if (conf == null) {
+ conf = getConf();
+ } else {
+ setConf(conf);
+ }
+
+ try {
+ this.appOwner = UserGroupInformation.createProxyUser(appContext.getUser(),
+ UserGroupInformation.getCurrentUser());
+ } catch (Exception ex) {
+ throw new YarnRuntimeException(ex);
+ }
+
+ this.homeSubClusterId =
+ SubClusterId.newInstance(YarnConfiguration.getClusterId(conf));
+ this.homeRM = createHomeRMProxy(appContext);
+
+ this.rmTimeoutMs = conf.getLong(
+ YarnConfiguration.
+ YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_TIMEOUT_MS,
+ YarnConfiguration.
+ DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_TIMEOUT_MS);
+ }
+
+ /**
+ * Sends the application master's registration request to the home RM.
+ *
+ * Between AM and AMRMProxy, FederationInterceptor modifies the RM behavior,
+ * so that when AM registers more than once, it returns the same register
+ * success response instead of throwing
+ * {@link InvalidApplicationMasterRequestException}
+ *
+ * We did this because FederationInterceptor can receive concurrent register
+ * requests from AM because of timeout between AM and AMRMProxy, which is
+ * shorter than the timeout + failOver between FederationInterceptor
+ * (AMRMProxy) and RM.
+ */
+ @Override
+ public RegisterApplicationMasterResponse registerApplicationMaster(
+ RegisterApplicationMasterRequest request)
+ throws YarnException, IOException {
+ // If AM is calling with a different request, complain
+ if (this.amRegistrationRequest != null
+ && !this.amRegistrationRequest.equals(request)) {
+ throw new YarnException("A different request body recieved. AM should"
+ + " not call registerApplicationMaster with different request body");
+ }
+
+ // Save the registration request. This will be used for registering
+ // with the other UAMs later
+ this.amRegistrationRequest = request;
+
+ // It is possible for AM to send duplicate register request because of
+ // timeout. If this is the case, simply return the success message. Out of
+ // all outstanding register threads, only the last one will still have an
+ // unbroken RPC connection and successfully return the response.
+ if (this.amRegistrationResponse != null) {
+ return this.amRegistrationResponse;
+ }
+
+ // Send a registration request to the home resource manager. Note that here
+ // we don't register with other sub-cluster resource managers because that
+ // will prevent us from using new sub-clusters that get added while the AM
+ // is running and will breaks the elasticity feature. The registration with
+ // the other sub-cluster RM will be done lazily as needed later.
+ try {
+ this.amRegistrationResponse =
+ this.homeRM.registerApplicationMaster(request);
+ } catch (InvalidApplicationMasterRequestException e) {
+ if (e.getMessage().contains("Application Master is already registered")) {
+ // Some other register thread might have succeeded in the meantime
+ if (this.amRegistrationResponse != null) {
+ LOG.info("Other concurrent thread registered successfully, "
+ + "simply return the same success register response");
+ return this.amRegistrationResponse;
+ }
+ }
+ // This is a real issue, throw back to AM
+ throw e;
+ }
+
+ // the queue this application belongs will be used for getting
+ // AMRMProxy policy from state store.
+ this.queue = this.amRegistrationResponse.getQueue();
+ if (this.queue == null) {
+ LOG.warn("Received null queue for application "
+ + getApplicationContext().getApplicationAttemptId().getApplicationId()
+ + " from home subcluster. Will use default queue name "
+ + YarnConfiguration.DEFAULT_QUEUE_NAME
+ + " for getting AMRMProxyPolicy");
+ } else {
+ LOG.info("Application "
+ + getApplicationContext().getApplicationAttemptId().getApplicationId()
+ + " belongs to queue " + this.queue);
+ }
+
+ return this.amRegistrationResponse;
+ }
+
+ /**
+ * Sends the heart beats to the home RM and the secondary sub-cluster RMs that
+ * are being used by the application.
+ */
+ @Override
+ public AllocateResponse allocate(AllocateRequest request)
+ throws YarnException {
+
+ try {
+ // Split the heart beat request into multiple requests, one for each
+ // sub-cluster RM that is used by this application.
+ Map requests =
+ splitAllocateRequest(request);
+
+ // Send the request to the home RM and get the response
+ AllocateRequest homeRequest = requests.get(this.homeSubClusterId);
+
+ AllocateResponse homeResponse = null;
+ try {
+ homeResponse = this.homeRM.allocate(homeRequest);
+ } catch (ApplicationMasterNotRegisteredException ex) {
+ LOG.info("AM not registered, most likely due to Home RM failover. "
+ + "Trying to re-register.");
+ try {
+ this.homeRM.registerApplicationMaster(amRegistrationRequest);
+ homeResponse = this.homeRM.allocate(homeRequest);
+ } catch (Exception e) {
+ LOG.error("Error trying to re-register AM", e);
+ throw new YarnException(e);
+ }
+ }
+
+ // If the resource manager sent us a new token, add to the current user
+ if (homeResponse.getAMRMToken() != null) {
+ LOG.debug("Received new AMRMToken");
+ AMRMProxyServiceUtils.updateAMRMToken(homeResponse.getAMRMToken(),
+ this.appOwner, getConf());
+ }
+
+ // Merge the responses from home and secondary sub-cluster RMs
+ homeResponse = mergeAllocateResponses(homeResponse);
+
+ // return the final response to the application master.
+ return homeResponse;
+ } catch (IOException ex) {
+ LOG.error("Exception encountered while processing heart beat", ex);
+ throw new YarnException(ex);
+ }
+ }
+
+ /**
+ * Sends the finish application master request to all the resource managers
+ * used by the application.
+ */
+ @Override
+ public FinishApplicationMasterResponse finishApplicationMaster(
+ FinishApplicationMasterRequest request)
+ throws YarnException, IOException {
+
+ FinishApplicationMasterResponse homeResponse =
+ homeRM.finishApplicationMaster(request);
+
+ return homeResponse;
+ }
+
+ @Override
+ public void setNextInterceptor(RequestInterceptor next) {
+ throw new YarnRuntimeException(
+ "setNextInterceptor is being called on FederationManager. This should "
+ + "always be used as the last interceptor in the chain");
+ }
+
+ /**
+ * This is called when the application pipeline is being destroyed. We will
+ * release all the resources that we are holding in this call.
+ */
+ @Override
+ public void shutdown() {
+ super.shutdown();
+ }
+
+ /**
+ * Returns instance of the ApplicationMasterProtocol proxy class that is used
+ * to connect to the Home resource manager.
+ *
+ * @param appContext AMRMProxyApplicationContext
+ * @return the proxy created
+ */
+ protected ApplicationMasterProtocol createHomeRMProxy(
+ AMRMProxyApplicationContext appContext) {
+ try {
+ return FederationProxyProviderUtil.createRMProxy(appContext.getConf(),
+ ApplicationMasterProtocol.class, this.homeSubClusterId, this.appOwner,
+ appContext.getAMRMToken());
+ } catch (Exception ex) {
+ throw new YarnRuntimeException(ex);
+ }
+ }
+
+ /**
+ * In federation, the heart beat request needs to be sent to all the sub
+ * clusters from which the AM has requested containers. This method splits the
+ * specified AllocateRequest from the AM and creates a new request for each
+ * sub-cluster RM.
+ */
+ private Map splitAllocateRequest(
+ AllocateRequest request) throws YarnException {
+ Map requestMap =
+ new HashMap();
+
+ // Create heart beat request for home sub-cluster resource manager
+ findOrCreateAllocateRequestForSubCluster(this.homeSubClusterId, request,
+ requestMap);
+
+ if (!isNullOrEmpty(request.getAskList())) {
+ AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
+ this.homeSubClusterId, request, requestMap);
+ newRequest.getAskList().addAll(request.getAskList());
+ }
+
+ if (request.getResourceBlacklistRequest() != null && !isNullOrEmpty(
+ request.getResourceBlacklistRequest().getBlacklistAdditions())) {
+ for (String resourceName : request.getResourceBlacklistRequest()
+ .getBlacklistAdditions()) {
+ AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
+ this.homeSubClusterId, request, requestMap);
+ newRequest.getResourceBlacklistRequest().getBlacklistAdditions()
+ .add(resourceName);
+ }
+ }
+
+ if (request.getResourceBlacklistRequest() != null && !isNullOrEmpty(
+ request.getResourceBlacklistRequest().getBlacklistRemovals())) {
+ for (String resourceName : request.getResourceBlacklistRequest()
+ .getBlacklistRemovals()) {
+ AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
+ this.homeSubClusterId, request, requestMap);
+ newRequest.getResourceBlacklistRequest().getBlacklistRemovals()
+ .add(resourceName);
+ }
+ }
+
+ synchronized (this.containerIdToSubClusterIdMap) {
+ if (!isNullOrEmpty(request.getReleaseList())) {
+ for (ContainerId cid : request.getReleaseList()) {
+ if (warnIfNotExists(cid)) {
+ SubClusterId subClusterId =
+ this.containerIdToSubClusterIdMap.get(cid);
+ AllocateRequest newRequest = requestMap.get(subClusterId);
+ newRequest.getReleaseList().add(cid);
+ }
+ }
+ }
+
+ if (!isNullOrEmpty(request.getUpdateRequests())) {
+ for (UpdateContainerRequest ucr : request.getUpdateRequests()) {
+ if (warnIfNotExists(ucr.getContainerId())) {
+ SubClusterId subClusterId =
+ this.containerIdToSubClusterIdMap.get(ucr.getContainerId());
+ AllocateRequest newRequest = requestMap.get(subClusterId);
+ newRequest.getUpdateRequests().add(ucr);
+ }
+ }
+ }
+ }
+
+ return requestMap;
+ }
+
+ /**
+ * Merges the responses from other sub-clusters that we received
+ * asynchronously with the specified home cluster response and keeps track of
+ * the containers received from each sub-cluster resource managers.
+ */
+ private AllocateResponse mergeAllocateResponses(
+ AllocateResponse homeResponse) {
+ // Timing issue, we need to remove the completed and then save the new ones.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Remove containers: "
+ + homeResponse.getCompletedContainersStatuses());
+ LOG.debug("Adding containers: " + homeResponse.getAllocatedContainers());
+ }
+ removeFinishedContainersFromCache(
+ homeResponse.getCompletedContainersStatuses());
+ cacheAllocatedContainers(homeResponse.getAllocatedContainers(),
+ this.homeSubClusterId);
+
+ return homeResponse;
+ }
+
+ /**
+ * Removes the finished containers from the local cache.
+ */
+ private void removeFinishedContainersFromCache(
+ List finishedContainers) {
+ synchronized (this.containerIdToSubClusterIdMap) {
+ for (ContainerStatus container : finishedContainers) {
+ if (containerIdToSubClusterIdMap
+ .containsKey(container.getContainerId())) {
+ containerIdToSubClusterIdMap.remove(container.getContainerId());
+ }
+ }
+ }
+ }
+
+ /**
+ * Add allocated containers to cache mapping.
+ */
+ private void cacheAllocatedContainers(List containers,
+ SubClusterId subClusterId) {
+ synchronized (this.containerIdToSubClusterIdMap) {
+ for (Container container : containers) {
+ if (containerIdToSubClusterIdMap.containsKey(container.getId())) {
+ SubClusterId existingSubClusterId =
+ containerIdToSubClusterIdMap.get(container.getId());
+
+ // Log both containerId strings just in case they are not exactly the
+ // same
+ ContainerId existingContainerId = null;
+ for (ContainerId containerId : containerIdToSubClusterIdMap
+ .keySet()) {
+ if (containerId.equals(container.getId())) {
+ existingContainerId = containerId;
+ break;
+ }
+ }
+ LOG.warn(
+ "Duplicate containerID found in the allocated"
+ + " containers: existing {}, new {}",
+ existingContainerId, container.getId());
+
+ if (existingSubClusterId.equals(subClusterId)) {
+ // When RM fails over, the new RM master might send out the same
+ // container allocation more than once. Just move on in this case.
+ LOG.warn(
+ "Duplicate containerID: {} found in the allocated containers"
+ + " from same subcluster: {}, so ignoring.",
+ container.getId(), subClusterId);
+ } else {
+ // The same container allocation from different subclusters,
+ // something is wrong.
+ throw new YarnRuntimeException(
+ "Duplicate containerID found in the allocated containers. This"
+ + " can happen if the RM epoch is not configured properly."
+ + " ContainerId: " + container.getId().toString()
+ + " ApplicationId: "
+ + getApplicationContext().getApplicationAttemptId()
+ + " From RM: " + subClusterId
+ + " . Previous container was from subcluster: "
+ + existingSubClusterId);
+ }
+ }
+
+ containerIdToSubClusterIdMap.put(container.getId(), subClusterId);
+ }
+ }
+ }
+
+ /**
+ * Check to see if an AllocateRequest exists in the Map for the specified sub
+ * cluster. If not found, create a new one, copy the value of responseId and
+ * progress from the orignialAMRequest, save it in the specified Map and
+ * return the new instance. If found, just return the old instance.
+ */
+ private static AllocateRequest findOrCreateAllocateRequestForSubCluster(
+ SubClusterId subClusterId, AllocateRequest originalAMRequest,
+ Map requestMap) {
+ AllocateRequest newRequest = null;
+ if (requestMap.containsKey(subClusterId)) {
+ newRequest = requestMap.get(subClusterId);
+ } else {
+ newRequest = createAllocateRequest();
+ newRequest.setResponseId(originalAMRequest.getResponseId());
+ newRequest.setProgress(originalAMRequest.getProgress());
+ requestMap.put(subClusterId, newRequest);
+ }
+
+ return newRequest;
+ }
+
+ /**
+ * Create an empty AllocateRequest instance.
+ */
+ private static AllocateRequest createAllocateRequest() {
+ AllocateRequest request =
+ AllocateRequest.newInstance(0, 0, null, null, null);
+ request.setAskList(new ArrayList());
+ request.setReleaseList(new ArrayList());
+ ResourceBlacklistRequest blackList =
+ ResourceBlacklistRequest.newInstance(null, null);
+ blackList.setBlacklistAdditions(new ArrayList());
+ blackList.setBlacklistRemovals(new ArrayList());
+ request.setResourceBlacklistRequest(blackList);
+ request.setUpdateRequests(new ArrayList());
+ return request;
+ }
+
+ /**
+ * Check to see if the specified containerId exists in the cache and log an
+ * error if not found.
+ *
+ * @param containerId the container id
+ * @return true if the container exists in the map, false otherwise
+ */
+ private boolean warnIfNotExists(ContainerId containerId) {
+ if (!this.containerIdToSubClusterIdMap.containsKey(containerId)) {
+ LOG.error("AM is trying to use a container that does not exist. "
+ + "ContainerId: " + containerId.toString());
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Utility method to check if the specified Collection is null or empty
+ *
+ * @param c the collection object
+ * @param element type of the collection
+ * @return whether is it is null or empty
+ */
+ public static boolean isNullOrEmpty(Collection c) {
+ return (c == null || c.size() == 0);
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
index 6f5009e..9e57c84 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
@@ -87,13 +87,10 @@
public abstract class BaseAMRMProxyTest {
private static final Log LOG = LogFactory
.getLog(BaseAMRMProxyTest.class);
- /**
- * The AMRMProxyService instance that will be used by all the test cases
- */
+
+ // The AMRMProxyService instance that will be used by all the test cases
private MockAMRMProxyService amrmProxyService;
- /**
- * Thread pool used for asynchronous operations
- */
+ // Thread pool used for asynchronous operations
private static ExecutorService threadpool = Executors
.newCachedThreadPool();
private Configuration conf;
@@ -104,22 +101,26 @@ protected MockAMRMProxyService getAMRMProxyService() {
return this.amrmProxyService;
}
- @Before
- public void setUp() {
- this.conf = new YarnConfiguration();
- this.conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
+ protected YarnConfiguration createConfiguration() {
+ YarnConfiguration config = new YarnConfiguration();
+ config.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
String mockPassThroughInterceptorClass =
PassThroughRequestInterceptor.class.getName();
// Create a request intercepter pipeline for testing. The last one in the
// chain will call the mock resource manager. The others in the chain will
// simply forward it to the next one in the chain
- this.conf.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE,
- mockPassThroughInterceptorClass + ","
- + mockPassThroughInterceptorClass + ","
- + mockPassThroughInterceptorClass + ","
+ config.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE,
+ mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
+ + "," + mockPassThroughInterceptorClass + ","
+ MockRequestInterceptor.class.getName());
+ return config;
+ }
+
+ @Before
+ public void setUp() {
+ this.conf = this.createConfiguration();
this.dispatcher = new AsyncDispatcher();
this.dispatcher.init(this.conf);
this.dispatcher.start();
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java
index f584c94..964d534 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java
@@ -112,11 +112,13 @@
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
-import org.eclipse.jetty.util.log.Log;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Mock Resource Manager facade implementation that exposes all the methods
@@ -124,8 +126,10 @@
* implementation is expected by the unit test cases. So please change the
* implementation with care.
*/
-public class MockResourceManagerFacade implements
- ApplicationMasterProtocol, ApplicationClientProtocol {
+public class MockResourceManagerFacade
+ implements ApplicationMasterProtocol, ApplicationClientProtocol {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(MockResourceManagerFacade.class);
private HashMap> applicationContainerIdMap =
new HashMap>();
@@ -159,16 +163,17 @@ public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request) throws YarnException,
IOException {
String amrmToken = getAppIdentifier();
- Log.getLog().info("Registering application attempt: " + amrmToken);
+ LOG.info("Registering application attempt: " + amrmToken);
synchronized (applicationContainerIdMap) {
- Assert.assertFalse("The application id is already registered: "
- + amrmToken, applicationContainerIdMap.containsKey(amrmToken));
- // Keep track of the containers that are returned to this application
- applicationContainerIdMap.put(amrmToken,
- new ArrayList());
+ if (applicationContainerIdMap.containsKey(amrmToken)) {
+ throw new InvalidApplicationMasterRequestException(
+ "Application Master is already registered");
+ }
+ // Keep track of the containers that are returned to this
+ // application
+ applicationContainerIdMap.put(amrmToken, new ArrayList());
}
-
return RegisterApplicationMasterResponse.newInstance(null, null, null,
null, null, request.getHost(), null);
}
@@ -178,7 +183,7 @@ public FinishApplicationMasterResponse finishApplicationMaster(
FinishApplicationMasterRequest request) throws YarnException,
IOException {
String amrmToken = getAppIdentifier();
- Log.getLog().info("Finishing application attempt: " + amrmToken);
+ LOG.info("Finishing application attempt: " + amrmToken);
synchronized (applicationContainerIdMap) {
// Remove the containers that were being tracked for this application
@@ -255,8 +260,7 @@ public AllocateResponse allocate(AllocateRequest request)
if (request.getReleaseList() != null
&& request.getReleaseList().size() > 0) {
- Log.getLog().info("Releasing containers: "
- + request.getReleaseList().size());
+ LOG.info("Releasing containers: " + request.getReleaseList().size());
synchronized (applicationContainerIdMap) {
Assert.assertTrue(
"The application id is not registered before allocate(): "
@@ -296,7 +300,7 @@ public AllocateResponse allocate(AllocateRequest request)
}
}
- Log.getLog().info("Allocating containers: " + containerList.size()
+ LOG.info("Allocating containers: " + containerList.size()
+ " for application attempt: " + conf.get("AMRMTOKEN"));
// Always issue a new AMRMToken as if RM rolled master key
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
new file mode 100644
index 0000000..c2f1b65
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Extends the TestAMRMProxyService and overrides methods in order to use the
+ * AMRMProxyService's pipeline test cases for testing the FederationInterceptor
+ * class. The tests for AMRMProxyService has been written cleverly so that it
+ * can be reused to validate different request intercepter chains.
+ */
+public class TestFederationInterceptor extends BaseAMRMProxyTest {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestFederationInterceptor.class);
+
+ public static final String HOME_SC_ID = "SC-home";
+
+ private FederationInterceptor interceptor;
+
+ private int testAppId;
+ private ApplicationAttemptId attemptId;
+
+ @Override
+ public void setUp() {
+ super.setUp();
+ interceptor = new TestableFederationInterceptor();
+
+ testAppId = 1;
+ attemptId = getApplicationAttemptId(testAppId);
+ interceptor.init(new AMRMProxyApplicationContextImpl(null, getConf(),
+ attemptId, "test-user", null, null));
+ }
+
+ @Override
+ public void tearDown() {
+ interceptor.shutdown();
+ super.tearDown();
+ }
+
+ @Override
+ protected YarnConfiguration createConfiguration() {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
+ conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
+ String mockPassThroughInterceptorClass =
+ PassThroughRequestInterceptor.class.getName();
+
+ // Create a request intercepter pipeline for testing. The last one in the
+ // chain is the federation intercepter that calls the mock resource manager.
+ // The others in the chain will simply forward it to the next one in the
+ // chain
+ conf.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE,
+ mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
+ + "," + TestableFederationInterceptor.class.getName());
+
+ conf.set(YarnConfiguration.RM_CLUSTER_ID, HOME_SC_ID);
+
+ return conf;
+ }
+
+ @Test
+ public void testRequestInterceptorChainCreation() throws Exception {
+ RequestInterceptor root =
+ super.getAMRMProxyService().createRequestInterceptorChain();
+ int index = 0;
+ while (root != null) {
+ switch (index) {
+ case 0:
+ case 1:
+ Assert.assertEquals(PassThroughRequestInterceptor.class.getName(),
+ root.getClass().getName());
+ break;
+ case 2:
+ Assert.assertEquals(TestableFederationInterceptor.class.getName(),
+ root.getClass().getName());
+ break;
+ default:
+ Assert.fail();
+ }
+ root = root.getNextInterceptor();
+ index++;
+ }
+ Assert.assertEquals("The number of interceptors in chain does not match",
+ Integer.toString(3), Integer.toString(index));
+ }
+
+ /**
+ * Between AM and AMRMProxy, FederationInterceptor modifies the RM behavior,
+ * so that when AM registers more than once, it returns the same register
+ * success response instead of throwing
+ * {@link InvalidApplicationMasterRequestException}
+ *
+ * We did this because FederationInterceptor can receive concurrent register
+ * requests from AM because of timeout between AM and AMRMProxy. This can
+ * possible since the timeout between FederationInterceptor and RM longer
+ * because of performFailover + timeout.
+ */
+ @Test
+ public void testTwoIdenticalRegisterRequest() throws Exception {
+ // Register the application twice
+ RegisterApplicationMasterRequest registerReq =
+ Records.newRecord(RegisterApplicationMasterRequest.class);
+ registerReq.setHost(Integer.toString(testAppId));
+ registerReq.setRpcPort(testAppId);
+ registerReq.setTrackingUrl("");
+
+ for (int i = 0; i < 2; i++) {
+ RegisterApplicationMasterResponse registerResponse =
+ interceptor.registerApplicationMaster(registerReq);
+ Assert.assertNotNull(registerResponse);
+ }
+ }
+
+ @Test
+ public void testTwoDifferentRegisterRequest() throws Exception {
+ // Register the application first time
+ RegisterApplicationMasterRequest registerReq =
+ Records.newRecord(RegisterApplicationMasterRequest.class);
+ registerReq.setHost(Integer.toString(testAppId));
+ registerReq.setRpcPort(testAppId);
+ registerReq.setTrackingUrl("");
+
+ RegisterApplicationMasterResponse registerResponse =
+ interceptor.registerApplicationMaster(registerReq);
+ Assert.assertNotNull(registerResponse);
+
+ // Register the application second time with a different request obj
+ registerReq = Records.newRecord(RegisterApplicationMasterRequest.class);
+ registerReq.setHost(Integer.toString(testAppId));
+ registerReq.setRpcPort(testAppId);
+ registerReq.setTrackingUrl("different");
+ try {
+ registerResponse = interceptor.registerApplicationMaster(registerReq);
+ Assert.fail("Should throw if a different request obj is used");
+ } catch (YarnException e) {
+ }
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
new file mode 100644
index 0000000..cbc9ed1
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+/**
+ * Extends the FederationInterceptor and overrides methods to provide a testable
+ * implementation of FederationInterceptor.
+ */
+public class TestableFederationInterceptor extends FederationInterceptor {
+ private ConcurrentHashMap secondaryRMs =
+ new ConcurrentHashMap();
+ private AtomicInteger runningIndex = new AtomicInteger(0);
+ private MockResourceManagerFacade mockRm;
+
+ @Override
+ protected ApplicationMasterProtocol createHomeRMProxy(
+ final AMRMProxyApplicationContext appContext) {
+ synchronized (this) {
+ if (mockRm == null) {
+ mockRm = new MockResourceManagerFacade(
+ new YarnConfiguration(super.getConf()), 0);
+ }
+ }
+ return mockRm;
+ }
+
+ @SuppressWarnings("unchecked")
+ protected T createSecondaryRMProxy(final Class proxyClass,
+ final Configuration conf, final String subClusterId) throws IOException {
+ // We create one instance of the mock resource manager per sub cluster. Keep
+ // track of the instances of the RMs in the map keyed by the sub cluster id
+ synchronized (this.secondaryRMs) {
+ if (this.secondaryRMs.contains(subClusterId)) {
+ return (T) this.secondaryRMs.get(subClusterId);
+ } else {
+ // The running index here is used to simulate different RM_EPOCH to
+ // generate unique container identifiers in a federation environment
+ MockResourceManagerFacade rm = new MockResourceManagerFacade(
+ new Configuration(conf), runningIndex.addAndGet(10000));
+ this.secondaryRMs.put(subClusterId, rm);
+ return (T) rm;
+ }
+ }
+ }
+
+}