diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index a18ef7c..afdbb6d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1327,6 +1327,23 @@ private static void addDeprecatedKeys() {
public static final String YARN_APPLICATION_CLASSPATH = YARN_PREFIX
+ "application.classpath";
+ public static final String AMRM_PROXY_ENABLED = NM_PREFIX
+ + "amrmproxy.enable";
+ public static final boolean DEFAULT_AMRM_PROXY_ENABLED = false;
+ public static final String AMRM_PROXY_ADDRESS = NM_PREFIX
+ + "amrmproxy.address";
+ public static final int DEFAULT_AMRM_PROXY_PORT = 8048;
+ public static final String DEFAULT_AMRM_PROXY_ADDRESS = "0.0.0.0:"
+ + DEFAULT_AMRM_PROXY_PORT;
+ public static final String AMRM_PROXY_CLIENT_THREAD_COUNT = NM_PREFIX
+ + "amrmproxy.client.thread-count";
+ public static final int DEFAULT_AMRM_PROXY_CLIENT_THREAD_COUNT = 25;
+ public static final String AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE =
+ NM_PREFIX + "amrmproxy.interceptor-class.pipeline";
+ public static final String DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE =
+ "org.apache.hadoop.yarn.server.nodemanager.amrmproxy."
+ + "DefaultRequestInterceptor";
+
/**
* Default platform-agnostic CLASSPATH for YARN applications. A
* comma-separated list of CLASSPATH entries. The parameter expansion marker
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
index e89a90d..97fcfa1 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
@@ -86,6 +86,8 @@ public void initializeMemberVariables() {
.add(YarnConfiguration.DEFAULT_SCM_APP_CHECKER_CLASS);
configurationPropsToSkipCompare
.add(YarnConfiguration.DEFAULT_SHARED_CACHE_CHECKSUM_ALGO_IMPL);
+ configurationPropsToSkipCompare
+ .add(YarnConfiguration.DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE);
// Ignore all YARN Application Timeline Service (version 1) properties
configurationPrefixToSkipCompare.add("yarn.timeline-service.");
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 62ba599..88f0ba4 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2250,4 +2250,38 @@
+
+
+ Enable/Disable AMRMProxyService in the node manager. This service is used to intercept
+ calls from the application masters to the resource manager.
+
+ yarn.nodemanager.amrmproxy.enable
+ false
+
+
+
+
+ The address of the AMRMProxyService listener.
+
+ yarn.nodemanager.amrmproxy.address
+ 0.0.0.0:8048
+
+
+
+
+ The number of threads used to handle requests by the AMRMProxyService.
+
+ yarn.nodemanager.amrmproxy.client.thread-count
+ 25
+
+
+
+
+ The comma separated list of class names that implement the RequestInterceptor interface. This is used by the
+ AMRMProxyService to create the request processing pipeline for applications.
+
+ yarn.nodemanager.amrmproxy.interceptor-class.pipeline
+ org.apache.hadoop.yarn.server.nodemanager.amrmproxy.DefaultRequestInterceptor
+
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerSecurityUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerSecurityUtils.java
new file mode 100644
index 0000000..9af556e
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerSecurityUtils.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.utils;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class that contains commonly used server methods.
+ *
+ */
+@Private
+public final class YarnServerSecurityUtils {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(YarnServerSecurityUtils.class);
+
+ private YarnServerSecurityUtils() {
+ }
+
+ /**
+ * Authorizes the current request and returns the AMRMTokenIdentifier for the
+ * current application.
+ *
+ * @return the AMRMTokenIdentifier instance for the current user
+ * @throws YarnException
+ */
+ public static AMRMTokenIdentifier authorizeRequest()
+ throws YarnException {
+
+ UserGroupInformation remoteUgi;
+ try {
+ remoteUgi = UserGroupInformation.getCurrentUser();
+ } catch (IOException e) {
+ String msg =
+ "Cannot obtain the user-name for authorizing ApplicationMaster. "
+ + "Got exception: " + StringUtils.stringifyException(e);
+ LOG.warn(msg);
+ throw RPCUtil.getRemoteException(msg);
+ }
+
+ boolean tokenFound = false;
+ String message = "";
+ AMRMTokenIdentifier appTokenIdentifier = null;
+ try {
+ appTokenIdentifier = selectAMRMTokenIdentifier(remoteUgi);
+ if (appTokenIdentifier == null) {
+ tokenFound = false;
+ message = "No AMRMToken found for user " + remoteUgi.getUserName();
+ } else {
+ tokenFound = true;
+ }
+ } catch (IOException e) {
+ tokenFound = false;
+ message =
+ "Got exception while looking for AMRMToken for user "
+ + remoteUgi.getUserName();
+ }
+
+ if (!tokenFound) {
+ LOG.warn(message);
+ throw RPCUtil.getRemoteException(message);
+ }
+
+ return appTokenIdentifier;
+ }
+
+ // Obtain the needed AMRMTokenIdentifier from the remote-UGI. RPC layer
+ // currently sets only the required id, but iterate through anyways just to be
+ // sure.
+ private static AMRMTokenIdentifier selectAMRMTokenIdentifier(
+ UserGroupInformation remoteUgi) throws IOException {
+ AMRMTokenIdentifier result = null;
+ Set tokenIds = remoteUgi.getTokenIdentifiers();
+ for (TokenIdentifier tokenId : tokenIds) {
+ if (tokenId instanceof AMRMTokenIdentifier) {
+ result = (AMRMTokenIdentifier) tokenId;
+ break;
+ }
+ }
+
+ return result;
+ }
+
+ /**
+ * Parses the container launch context and returns a Credential instance that
+ * contains all the tokens from the launch context.
+ * @param launchContext
+ * @return the credential instance
+ * @throws IOException
+ */
+ public static Credentials parseCredentials(
+ ContainerLaunchContext launchContext) throws IOException {
+ Credentials credentials = new Credentials();
+ ByteBuffer tokens = launchContext.getTokens();
+
+ if (tokens != null) {
+ DataInputByteBuffer buf = new DataInputByteBuffer();
+ tokens.rewind();
+ buf.reset(tokens);
+ credentials.readTokenStorageStream(buf);
+ if (LOG.isDebugEnabled()) {
+ for (Token extends TokenIdentifier> tk : credentials
+ .getAllTokens()) {
+ LOG.debug(tk.getService() + " = " + tk.toString());
+ }
+ }
+ }
+
+ return credentials;
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java
new file mode 100644
index 0000000..c355a8b
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+
+/**
+ * Interface that can be used by the intercepter plugins to get the information
+ * about one application.
+ *
+ */
+public interface AMRMProxyApplicationContext {
+
+ /**
+ * Gets the configuration object instance.
+ * @return the configuration object.
+ */
+ Configuration getConf();
+
+ /**
+ * Gets the application attempt identifier.
+ * @return the application attempt identifier.
+ */
+ ApplicationAttemptId getApplicationAttemptId();
+
+ /**
+ * Gets the application submitter.
+ * @return the application submitter
+ */
+ String getUser();
+
+ /**
+ * Gets the application's AMRMToken that is issued by the RM.
+ * @return the application's AMRMToken that is issued by the RM.
+ */
+ Token getAMRMToken();
+
+ /**
+ * Gets the application's local AMRMToken issued by the proxy service.
+ * @return the application's local AMRMToken issued by the proxy service.
+ */
+ Token getLocalAMRMToken();
+
+ /**
+ * Gets the NMContext object.
+ * @return the NMContext.
+ */
+ Context getNMCotext();
+
+}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java
new file mode 100644
index 0000000..2e5aa94
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+
+/**
+ * Encapsulates the information about one application that is needed by the
+ * request intercepters.
+ *
+ */
+public class AMRMProxyApplicationContextImpl implements
+ AMRMProxyApplicationContext {
+ private final Configuration conf;
+ private final Context nmContext;
+ private final ApplicationAttemptId applicationAttemptId;
+ private final String user;
+ private Integer localTokenKeyId;
+ private Token amrmToken;
+ private Token localToken;
+
+ /**
+ * Create an instance of the AMRMProxyApplicationContext.
+ *
+ * @param nmContext
+ * @param conf
+ * @param applicationAttemptId
+ * @param user
+ * @param amrmToken
+ */
+ public AMRMProxyApplicationContextImpl(Context nmContext,
+ Configuration conf, ApplicationAttemptId applicationAttemptId,
+ String user, Token amrmToken,
+ Token localToken) {
+ this.nmContext = nmContext;
+ this.conf = conf;
+ this.applicationAttemptId = applicationAttemptId;
+ this.user = user;
+ this.amrmToken = amrmToken;
+ this.localToken = localToken;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public ApplicationAttemptId getApplicationAttemptId() {
+ return applicationAttemptId;
+ }
+
+ @Override
+ public String getUser() {
+ return user;
+ }
+
+ @Override
+ public synchronized Token getAMRMToken() {
+ return amrmToken;
+ }
+
+ /**
+ * Sets the application's AMRMToken.
+ */
+ public synchronized void setAMRMToken(
+ Token amrmToken) {
+ this.amrmToken = amrmToken;
+ }
+
+ @Override
+ public synchronized Token getLocalAMRMToken() {
+ return this.localToken;
+ }
+
+ /**
+ * Sets the application's AMRMToken.
+ */
+ public synchronized void setLocalAMRMToken(
+ Token localToken) {
+ this.localToken = localToken;
+ this.localTokenKeyId = null;
+ }
+
+ @Private
+ public synchronized int getLocalAMRMTokenKeyId() {
+ Integer keyId = this.localTokenKeyId;
+ if (keyId == null) {
+ try {
+ if (this.localToken == null) {
+ throw new YarnRuntimeException("Missing AMRM token for "
+ + this.applicationAttemptId);
+ }
+ keyId = this.amrmToken.decodeIdentifier().getKeyId();
+ this.localTokenKeyId = keyId;
+ } catch (IOException e) {
+ throw new YarnRuntimeException("AMRM token decode error for "
+ + this.applicationAttemptId, e);
+ }
+ }
+ return keyId;
+ }
+
+ @Override
+ public Context getNMCotext() {
+ return nmContext;
+ }
+}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
new file mode 100644
index 0000000..ad357f9
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
@@ -0,0 +1,592 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * AMRMProxyService is a service that runs on each node manager that can be used
+ * to intercept and inspect messages from application master to the cluster
+ * resource manager. It listens to messages from the application master and
+ * creates a request intercepting pipeline instance for each application. The
+ * pipeline is a chain of intercepter instances that can inspect and modify the
+ * request/response as needed.
+ */
+public class AMRMProxyService extends AbstractService implements
+ ApplicationMasterProtocol {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(AMRMProxyService.class);
+ private Server server;
+ private final Context nmContext;
+ private final AsyncDispatcher dispatcher;
+ private InetSocketAddress listenerEndpoint;
+ private AMRMProxyTokenSecretManager secretManager;
+ private Map applPipelineMap;
+
+ /**
+ * Creates an instance of the service.
+ *
+ * @param nmContext
+ * @param dispatcher
+ */
+ public AMRMProxyService(Context nmContext, AsyncDispatcher dispatcher) {
+ super(AMRMProxyService.class.getName());
+ Preconditions.checkArgument(nmContext != null, "nmContext is null");
+ Preconditions.checkArgument(dispatcher != null, "dispatcher is null");
+ this.nmContext = nmContext;
+ this.dispatcher = dispatcher;
+ this.applPipelineMap =
+ new ConcurrentHashMap();
+
+ this.dispatcher.register(ApplicationEventType.class,
+ new ApplicationEventHandler());
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ LOG.info("Starting AMRMProxyService");
+ Configuration conf = getConfig();
+ YarnRPC rpc = YarnRPC.create(conf);
+ UserGroupInformation.setConfiguration(conf);
+
+ this.listenerEndpoint =
+ conf.getSocketAddr(YarnConfiguration.AMRM_PROXY_ADDRESS,
+ YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS,
+ YarnConfiguration.DEFAULT_AMRM_PROXY_PORT);
+
+ Configuration serverConf = new Configuration(conf);
+ serverConf.set(
+ CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+ SaslRpcServer.AuthMethod.TOKEN.toString());
+
+ int numWorkerThreads =
+ serverConf.getInt(
+ YarnConfiguration.AMRM_PROXY_CLIENT_THREAD_COUNT,
+ YarnConfiguration.DEFAULT_AMRM_PROXY_CLIENT_THREAD_COUNT);
+
+ this.secretManager = new AMRMProxyTokenSecretManager(serverConf);
+ this.secretManager.start();
+
+ this.server =
+ rpc.getServer(ApplicationMasterProtocol.class, this,
+ listenerEndpoint, serverConf, this.secretManager,
+ numWorkerThreads);
+
+ this.server.start();
+ LOG.info("AMRMProxyService listening on address: "
+ + this.server.getListenerAddress());
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ LOG.info("Stopping AMRMProxyService");
+ if (this.server != null) {
+ this.server.stop();
+ }
+
+ this.secretManager.stop();
+
+ super.serviceStop();
+ }
+
+ /**
+ * This is called by the AMs started on this node to register with the RM.
+ * This method does the initial authorization and then forwards the request to
+ * the application instance specific intercepter chain.
+ */
+ @Override
+ public RegisterApplicationMasterResponse registerApplicationMaster(
+ RegisterApplicationMasterRequest request) throws YarnException,
+ IOException {
+ LOG.info("Registering application master." + " Host:"
+ + request.getHost() + " Port:" + request.getRpcPort()
+ + " Tracking Url:" + request.getTrackingUrl());
+ RequestInterceptorChainWrapper pipeline =
+ authorizeAndGetInterceptorChain();
+ return pipeline.getRootInterceptor()
+ .registerApplicationMaster(request);
+ }
+
+ /**
+ * This is called by the AMs started on this node to unregister from the RM.
+ * This method does the initial authorization and then forwards the request to
+ * the application instance specific intercepter chain.
+ */
+ @Override
+ public FinishApplicationMasterResponse finishApplicationMaster(
+ FinishApplicationMasterRequest request) throws YarnException,
+ IOException {
+ LOG.info("Finishing application master. Tracking Url:"
+ + request.getTrackingUrl());
+ RequestInterceptorChainWrapper pipeline =
+ authorizeAndGetInterceptorChain();
+ return pipeline.getRootInterceptor().finishApplicationMaster(request);
+ }
+
+ /**
+ * This is called by the AMs started on this node to send heart beat to RM.
+ * This method does the initial authorization and then forwards the request to
+ * the application instance specific pipeline, which is a chain of request
+ * intercepter objects. One application request processing pipeline is created
+ * per AM instance.
+ */
+ @Override
+ public AllocateResponse allocate(AllocateRequest request)
+ throws YarnException, IOException {
+ AMRMTokenIdentifier amrmTokenIdentifier =
+ YarnServerSecurityUtils.authorizeRequest();
+ RequestInterceptorChainWrapper pipeline =
+ getInterceptorChain(amrmTokenIdentifier);
+ AllocateResponse allocateResponse =
+ pipeline.getRootInterceptor().allocate(request);
+
+ updateAMRMTokens(amrmTokenIdentifier, pipeline, allocateResponse);
+
+ return allocateResponse;
+ }
+
+ /**
+ * Callback from the ContainerManager implementation for initializing the
+ * application request processing pipeline.
+ *
+ * @param request - encapsulates information for starting an AM
+ * @throws IOException
+ * @throws YarnException
+ */
+ public void processApplicationStartRequest(StartContainerRequest request)
+ throws IOException, YarnException {
+ LOG.info("Callback received for initializing request "
+ + "processing pipeline for an AM");
+ ContainerTokenIdentifier containerTokenIdentifierForKey =
+ BuilderUtils.newContainerTokenIdentifier(request
+ .getContainerToken());
+ ApplicationAttemptId appAttemptId =
+ containerTokenIdentifierForKey.getContainerID()
+ .getApplicationAttemptId();
+ Credentials credentials =
+ YarnServerSecurityUtils.parseCredentials(request
+ .getContainerLaunchContext());
+
+ Token amrmToken =
+ getFirstAMRMToken(credentials.getAllTokens());
+ if (amrmToken == null) {
+ throw new YarnRuntimeException(
+ "AMRMToken not found in the start container request for application:"
+ + appAttemptId.toString());
+ }
+
+ // Substitute the existing AMRM Token with a local one. Keep the rest of the
+ // tokens in the credentials intact.
+ Token localToken =
+ this.secretManager.createAndGetAMRMToken(appAttemptId);
+ credentials.addToken(localToken.getService(), localToken);
+
+ DataOutputBuffer dob = new DataOutputBuffer();
+ credentials.writeTokenStorageToStream(dob);
+ request.getContainerLaunchContext().setTokens(
+ ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
+
+ initializePipeline(containerTokenIdentifierForKey.getContainerID()
+ .getApplicationAttemptId(),
+ containerTokenIdentifierForKey.getApplicationSubmitter(),
+ amrmToken, localToken);
+ }
+
+ /**
+ * Initializes the request intercepter pipeline for the specified application.
+ *
+ * @param applicationAttemptId
+ * @param user
+ * @param amrmToken
+ */
+ protected void initializePipeline(
+ ApplicationAttemptId applicationAttemptId, String user,
+ Token amrmToken,
+ Token localToken) {
+ RequestInterceptorChainWrapper chainWrapper = null;
+ synchronized (applPipelineMap) {
+ if (applPipelineMap.containsKey(applicationAttemptId)) {
+ LOG.warn("Request to start an already existing appId was received. "
+ + " This can happen if an application failed and a new attempt "
+ + "was created on this machine. ApplicationId: "
+ + applicationAttemptId.toString());
+ return;
+ }
+
+ chainWrapper = new RequestInterceptorChainWrapper();
+ this.applPipelineMap.put(applicationAttemptId.getApplicationId(),
+ chainWrapper);
+ }
+
+ // We register the pipeline instance in the map first and then initialize it
+ // later because chain initialization can be expensive and we would like to
+ // release the lock as soon as possible to prevent other applications from
+ // blocking when one application's chain is initializing
+ LOG.info("Initializing request processing pipeline for application. "
+ + " ApplicationId:" + applicationAttemptId + " for the user: "
+ + user);
+
+ RequestInterceptor interceptorChain =
+ this.createRequestInterceptorChain();
+ interceptorChain.init(createApplicationMasterContext(
+ applicationAttemptId, user, amrmToken, localToken));
+ chainWrapper.init(interceptorChain, applicationAttemptId);
+ }
+
+ /**
+ * Shuts down the request processing pipeline for the specified application
+ * attempt id.
+ *
+ * @param applicationId
+ */
+ protected void stopApplication(ApplicationId applicationId) {
+ Preconditions.checkArgument(applicationId != null,
+ "applicationId is null");
+ RequestInterceptorChainWrapper pipeline =
+ this.applPipelineMap.remove(applicationId);
+
+ if (pipeline == null) {
+ LOG.info("Request to stop an application that does not exist. Id:"
+ + applicationId);
+ } else {
+ LOG.info("Stopping the request processing pipeline for application: "
+ + applicationId);
+ try {
+ pipeline.getRootInterceptor().shutdown();
+ } catch (Throwable ex) {
+ LOG.warn(
+ "Failed to shutdown the request processing pipeline for app:"
+ + applicationId, ex);
+ }
+ }
+ }
+
+ private void updateAMRMTokens(AMRMTokenIdentifier amrmTokenIdentifier,
+ RequestInterceptorChainWrapper pipeline,
+ AllocateResponse allocateResponse) {
+ AMRMProxyApplicationContextImpl context =
+ (AMRMProxyApplicationContextImpl) pipeline.getRootInterceptor()
+ .getApplicationContext();
+
+ // check to see if the RM has issued a new AMRMToken & accordingly update
+ // the real ARMRMToken in the current context
+ if (allocateResponse.getAMRMToken() != null) {
+ org.apache.hadoop.yarn.api.records.Token token =
+ allocateResponse.getAMRMToken();
+
+ org.apache.hadoop.security.token.Token newTokenId =
+ new org.apache.hadoop.security.token.Token(
+ token.getIdentifier().array(), token.getPassword().array(),
+ new Text(token.getKind()), new Text(token.getService()));
+
+ context.setAMRMToken(newTokenId);
+ }
+
+ // Check if the local AMRMToken is rolled up and update the context and
+ // response accordingly
+ MasterKeyData nextMasterKey =
+ this.secretManager.getNextMasterKeyData();
+
+ if (nextMasterKey != null
+ && nextMasterKey.getMasterKey().getKeyId() != amrmTokenIdentifier
+ .getKeyId()) {
+ Token localToken = context.getLocalAMRMToken();
+ if (nextMasterKey.getMasterKey().getKeyId() != context
+ .getLocalAMRMTokenKeyId()) {
+ LOG.info("The local AMRMToken has been rolled-over."
+ + " Send new local AMRMToken back to application: "
+ + pipeline.getApplicationId());
+ localToken =
+ this.secretManager.createAndGetAMRMToken(pipeline
+ .getApplicationAttemptId());
+ context.setLocalAMRMToken(localToken);
+ }
+
+ allocateResponse
+ .setAMRMToken(org.apache.hadoop.yarn.api.records.Token
+ .newInstance(localToken.getIdentifier(), localToken
+ .getKind().toString(), localToken.getPassword(),
+ localToken.getService().toString()));
+ }
+ }
+
+ private AMRMProxyApplicationContext createApplicationMasterContext(
+ ApplicationAttemptId applicationAttemptId, String user,
+ Token amrmToken,
+ Token localToken) {
+ AMRMProxyApplicationContextImpl appContext =
+ new AMRMProxyApplicationContextImpl(this.nmContext, getConfig(),
+ applicationAttemptId, user, amrmToken, localToken);
+ return appContext;
+ }
+
+ /**
+ * Gets the Request intercepter chains for all the applications.
+ *
+ * @return the request intercepter chains.
+ */
+ protected Map getPipelines() {
+ return this.applPipelineMap;
+ }
+
+ /**
+ * This method creates and returns reference of the first intercepter in the
+ * chain of request intercepter instances.
+ *
+ * @return the reference of the first intercepter in the chain
+ */
+ protected RequestInterceptor createRequestInterceptorChain() {
+ Configuration conf = getConfig();
+
+ List interceptorClassNames = getInterceptorClassNames(conf);
+
+ RequestInterceptor pipeline = null;
+ RequestInterceptor current = null;
+ for (String interceptorClassName : interceptorClassNames) {
+ try {
+ Class> interceptorClass =
+ conf.getClassByName(interceptorClassName);
+ if (RequestInterceptor.class.isAssignableFrom(interceptorClass)) {
+ RequestInterceptor interceptorInstance =
+ (RequestInterceptor) ReflectionUtils.newInstance(
+ interceptorClass, conf);
+ if (pipeline == null) {
+ pipeline = interceptorInstance;
+ current = interceptorInstance;
+ continue;
+ } else {
+ current.setNextInterceptor(interceptorInstance);
+ current = interceptorInstance;
+ }
+ } else {
+ throw new YarnRuntimeException("Class: " + interceptorClassName
+ + " not instance of "
+ + RequestInterceptor.class.getCanonicalName());
+ }
+ } catch (ClassNotFoundException e) {
+ throw new YarnRuntimeException(
+ "Could not instantiate ApplicationMasterRequestInterceptor: "
+ + interceptorClassName, e);
+ }
+ }
+
+ if (pipeline == null) {
+ throw new YarnRuntimeException(
+ "RequestInterceptor pipeline is not configured in the system");
+ }
+ return pipeline;
+ }
+
+ /**
+ * Returns the comma separated intercepter class names from the configuration.
+ *
+ * @param conf
+ * @return the intercepter class names as an instance of ArrayList
+ */
+ private List getInterceptorClassNames(Configuration conf) {
+ String configuredInterceptorClassNames =
+ conf.get(
+ YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE,
+ YarnConfiguration.DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE);
+
+ List interceptorClassNames = new ArrayList();
+ Collection tempList =
+ StringUtils.getStringCollection(configuredInterceptorClassNames);
+ for (String item : tempList) {
+ interceptorClassNames.add(item.trim());
+ }
+
+ return interceptorClassNames;
+ }
+
+ /**
+ * Authorizes the request and returns the application specific request
+ * processing pipeline.
+ *
+ * @return the the intercepter wrapper instance
+ * @throws YarnException
+ */
+ private RequestInterceptorChainWrapper authorizeAndGetInterceptorChain()
+ throws YarnException {
+ AMRMTokenIdentifier tokenIdentifier =
+ YarnServerSecurityUtils.authorizeRequest();
+ return getInterceptorChain(tokenIdentifier);
+ }
+
+ private RequestInterceptorChainWrapper getInterceptorChain(
+ AMRMTokenIdentifier tokenIdentifier) throws YarnException {
+ ApplicationAttemptId appAttemptId =
+ tokenIdentifier.getApplicationAttemptId();
+
+ synchronized (this.applPipelineMap) {
+ if (!this.applPipelineMap.containsKey(appAttemptId
+ .getApplicationId())) {
+ throw new YarnException(
+ "The AM request processing pipeline is not initialized for app: "
+ + appAttemptId.getApplicationId().toString());
+ }
+
+ return this.applPipelineMap.get(appAttemptId.getApplicationId());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private Token getFirstAMRMToken(
+ Collection> allTokens) {
+ Iterator> iter = allTokens.iterator();
+ while (iter.hasNext()) {
+ Token extends TokenIdentifier> token = iter.next();
+ if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
+ return (Token) token;
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * Private class for handling application stop events.
+ *
+ */
+ class ApplicationEventHandler implements EventHandler {
+
+ @Override
+ public void handle(ApplicationEvent event) {
+ Application app =
+ AMRMProxyService.this.nmContext.getApplications().get(
+ event.getApplicationID());
+ if (app != null) {
+ switch (event.getType()) {
+ case FINISH_APPLICATION:
+ LOG.info("Application stop event received for stopping AppId:"
+ + event.getApplicationID().toString());
+ AMRMProxyService.this.stopApplication(event.getApplicationID());
+ break;
+ default:
+ LOG.debug("AMRMProxy is ignoring event: " + event.getType());
+ break;
+ }
+ } else {
+ LOG.warn("Event " + event + " sent to absent application "
+ + event.getApplicationID());
+ }
+ }
+ }
+
+ /**
+ * Private structure for encapsulating RequestInterceptor and
+ * ApplicationAttemptId instances.
+ *
+ */
+ private static class RequestInterceptorChainWrapper {
+ private RequestInterceptor rootInterceptor;
+ private ApplicationAttemptId applicationAttemptId;
+
+ /**
+ * Initializes the wrapper with the specified parameters.
+ *
+ * @param rootInterceptor
+ * @param applicationAttemptId
+ */
+ public synchronized void init(RequestInterceptor rootInterceptor,
+ ApplicationAttemptId applicationAttemptId) {
+ this.rootInterceptor = rootInterceptor;
+ this.applicationAttemptId = applicationAttemptId;
+ }
+
+ /**
+ * Gets the root request intercepter.
+ *
+ * @return the root request intercepter
+ */
+ public synchronized RequestInterceptor getRootInterceptor() {
+ return rootInterceptor;
+ }
+
+ /**
+ * Gets the application attempt identifier.
+ *
+ * @return the application attempt identifier
+ */
+ public synchronized ApplicationAttemptId getApplicationAttemptId() {
+ return applicationAttemptId;
+ }
+
+ /**
+ * Gets the application identifier.
+ *
+ * @return the application identifier
+ */
+ public synchronized ApplicationId getApplicationId() {
+ return applicationAttemptId.getApplicationId();
+ }
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyTokenSecretManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyTokenSecretManager.java
new file mode 100644
index 0000000..d09ce41
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyTokenSecretManager.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.security.SecureRandom;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This secret manager instance is used by the AMRMProxyService to generate and
+ * manage tokens.
+ */
+public class AMRMProxyTokenSecretManager extends
+ SecretManager {
+
+ private static final Log LOG = LogFactory
+ .getLog(AMRMProxyTokenSecretManager.class);
+
+ private int serialNo = new SecureRandom().nextInt();
+ private MasterKeyData nextMasterKey;
+ private MasterKeyData currentMasterKey;
+
+ private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ private final Lock readLock = readWriteLock.readLock();
+ private final Lock writeLock = readWriteLock.writeLock();
+
+ private final Timer timer;
+ private final long rollingInterval;
+ private final long activationDelay;
+
+ private final Set appAttemptSet =
+ new HashSet();
+
+ /**
+ * Create an {@link AMRMProxyTokenSecretManager}.
+ */
+ public AMRMProxyTokenSecretManager(Configuration conf) {
+ this.timer = new Timer();
+ this.rollingInterval =
+ conf.getLong(
+ YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
+ YarnConfiguration.DEFAULT_RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS) * 1000;
+ // Adding delay = 1.5 * expiry interval makes sure that all active AMs get
+ // the updated shared-key.
+ this.activationDelay =
+ (long) (conf.getLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS) * 1.5);
+ LOG.info("AMRMTokenKeyRollingInterval: " + this.rollingInterval
+ + "ms and AMRMTokenKeyActivationDelay: " + this.activationDelay
+ + " ms");
+ if (rollingInterval <= activationDelay * 2) {
+ throw new IllegalArgumentException(
+ YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS
+ + " should be more than 3 X "
+ + YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS);
+ }
+ }
+
+ public void start() {
+ if (this.currentMasterKey == null) {
+ this.currentMasterKey = createNewMasterKey();
+ }
+ this.timer.scheduleAtFixedRate(new MasterKeyRoller(), rollingInterval,
+ rollingInterval);
+ }
+
+ public void stop() {
+ this.timer.cancel();
+ }
+
+ public void applicationMasterFinished(ApplicationAttemptId appAttemptId) {
+ this.writeLock.lock();
+ try {
+ LOG.info("Application finished, removing password for "
+ + appAttemptId);
+ this.appAttemptSet.remove(appAttemptId);
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ private class MasterKeyRoller extends TimerTask {
+ @Override
+ public void run() {
+ rollMasterKey();
+ }
+ }
+
+ @Private
+ void rollMasterKey() {
+ this.writeLock.lock();
+ try {
+ LOG.info("Rolling master-key for amrm-tokens");
+ this.nextMasterKey = createNewMasterKey();
+ this.timer.schedule(new NextKeyActivator(), this.activationDelay);
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ private class NextKeyActivator extends TimerTask {
+ @Override
+ public void run() {
+ activateNextMasterKey();
+ }
+ }
+
+ public void activateNextMasterKey() {
+ this.writeLock.lock();
+ try {
+ LOG.info("Activating next master key with id: "
+ + this.nextMasterKey.getMasterKey().getKeyId());
+ this.currentMasterKey = this.nextMasterKey;
+ this.nextMasterKey = null;
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ @Private
+ @VisibleForTesting
+ public MasterKeyData createNewMasterKey() {
+ this.writeLock.lock();
+ try {
+ return new MasterKeyData(serialNo++, generateSecret());
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ public Token createAndGetAMRMToken(
+ ApplicationAttemptId appAttemptId) {
+ this.writeLock.lock();
+ try {
+ LOG.info("Create AMRMToken for ApplicationAttempt: " + appAttemptId);
+ AMRMTokenIdentifier identifier =
+ new AMRMTokenIdentifier(appAttemptId, getMasterKey()
+ .getMasterKey().getKeyId());
+ byte[] password = this.createPassword(identifier);
+ appAttemptSet.add(appAttemptId);
+ return new Token(identifier.getBytes(),
+ password, identifier.getKind(), new Text());
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ // If nextMasterKey is not Null, then return nextMasterKey
+ // otherwise return currentMasterKey.
+ @VisibleForTesting
+ public MasterKeyData getMasterKey() {
+ this.readLock.lock();
+ try {
+ return nextMasterKey == null ? currentMasterKey : nextMasterKey;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ /**
+ * Retrieve the password for the given {@link AMRMTokenIdentifier}. Used by
+ * RPC layer to validate a remote {@link AMRMTokenIdentifier}.
+ */
+ @Override
+ public byte[] retrievePassword(AMRMTokenIdentifier identifier)
+ throws InvalidToken {
+ this.readLock.lock();
+ try {
+ ApplicationAttemptId applicationAttemptId =
+ identifier.getApplicationAttemptId();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Trying to retrieve password for "
+ + applicationAttemptId);
+ }
+ if (!appAttemptSet.contains(applicationAttemptId)) {
+ throw new InvalidToken(applicationAttemptId
+ + " not found in AMRMProxyTokenSecretManager.");
+ }
+ if (identifier.getKeyId() == this.currentMasterKey.getMasterKey()
+ .getKeyId()) {
+ return createPassword(identifier.getBytes(),
+ this.currentMasterKey.getSecretKey());
+ } else if (nextMasterKey != null
+ && identifier.getKeyId() == this.nextMasterKey.getMasterKey()
+ .getKeyId()) {
+ return createPassword(identifier.getBytes(),
+ this.nextMasterKey.getSecretKey());
+ }
+ throw new InvalidToken("Invalid AMRMToken from "
+ + applicationAttemptId);
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ /**
+ * Creates an empty TokenId to be used for de-serializing an
+ * {@link AMRMTokenIdentifier} by the RPC layer.
+ */
+ @Override
+ public AMRMTokenIdentifier createIdentifier() {
+ return new AMRMTokenIdentifier();
+ }
+
+ @Private
+ @VisibleForTesting
+ public MasterKeyData getNextMasterKeyData() {
+ this.readLock.lock();
+ try {
+ return this.nextMasterKey;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Override
+ @Private
+ protected byte[] createPassword(AMRMTokenIdentifier identifier) {
+ this.readLock.lock();
+ try {
+ ApplicationAttemptId applicationAttemptId =
+ identifier.getApplicationAttemptId();
+ LOG.info("Creating password for " + applicationAttemptId);
+ return createPassword(identifier.getBytes(), getMasterKey()
+ .getSecretKey());
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
new file mode 100644
index 0000000..810dfa8
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Implements the RequestInterceptor interface and provides common functionality
+ * which can can be used and/or extended by other concrete intercepter classes.
+ *
+ */
+public abstract class AbstractRequestInterceptor implements
+ RequestInterceptor {
+ private Configuration conf;
+ private AMRMProxyApplicationContext appContext;
+ private RequestInterceptor nextInterceptor;
+
+ /**
+ * Sets the {@link RequestInterceptor} in the chain.
+ */
+ @Override
+ public void setNextInterceptor(RequestInterceptor nextInterceptor) {
+ this.nextInterceptor = nextInterceptor;
+ }
+
+ /**
+ * Sets the {@link Configuration}.
+ */
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ if (this.nextInterceptor != null) {
+ this.nextInterceptor.setConf(conf);
+ }
+ }
+
+ /**
+ * Gets the {@link Configuration}.
+ */
+ @Override
+ public Configuration getConf() {
+ return this.conf;
+ }
+
+ /**
+ * Initializes the {@link RequestInterceptor}.
+ */
+ @Override
+ public void init(AMRMProxyApplicationContext appContext) {
+ Preconditions.checkState(this.appContext == null,
+ "init is called multiple times on this interceptor: "
+ + this.getClass().getName());
+ this.appContext = appContext;
+ if (this.nextInterceptor != null) {
+ this.nextInterceptor.init(appContext);
+ }
+ }
+
+ /**
+ * Disposes the {@link RequestInterceptor}.
+ */
+ @Override
+ public void shutdown() {
+ if (this.nextInterceptor != null) {
+ this.nextInterceptor.shutdown();
+ }
+ }
+
+ /**
+ * Gets the next {@link RequestInterceptor} in the chain.
+ */
+ @Override
+ public RequestInterceptor getNextInterceptor() {
+ return this.nextInterceptor;
+ }
+
+ /**
+ * Gets the {@link AMRMProxyApplicationContext}.
+ */
+ public AMRMProxyApplicationContext getApplicationContext() {
+ return this.appContext;
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
new file mode 100644
index 0000000..2c7939b
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Extends the AbstractRequestInterceptor class and provides an implementation
+ * that simply forwards the AM requests to the cluster resource manager.
+ *
+ */
+public final class DefaultRequestInterceptor extends
+ AbstractRequestInterceptor {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(DefaultRequestInterceptor.class);
+ private ApplicationMasterProtocol rmClient;
+ private UserGroupInformation user = null;
+
+ @Override
+ public void init(AMRMProxyApplicationContext appContext) {
+ super.init(appContext);
+ try {
+ user =
+ UserGroupInformation.createProxyUser(appContext
+ .getApplicationAttemptId().toString(), UserGroupInformation
+ .getCurrentUser());
+ user.addToken(appContext.getAMRMToken());
+ final Configuration conf = this.getConf();
+
+ rmClient =
+ user.doAs(new PrivilegedExceptionAction() {
+ @Override
+ public ApplicationMasterProtocol run() throws Exception {
+ return ClientRMProxy.createRMProxy(conf,
+ ApplicationMasterProtocol.class);
+ }
+ });
+ } catch (IOException e) {
+ String message =
+ "Error while creating of RM app master service proxy for attemptId:"
+ + appContext.getApplicationAttemptId().toString();
+ if (user != null) {
+ message += ", user: " + user;
+ }
+
+ LOG.info(message);
+ throw new YarnRuntimeException(message, e);
+ } catch (Exception e) {
+ throw new YarnRuntimeException(e);
+ }
+ }
+
+ @Override
+ public RegisterApplicationMasterResponse registerApplicationMaster(
+ final RegisterApplicationMasterRequest request)
+ throws YarnException, IOException {
+ LOG.info("Forwarding registration request to the real YARN RM");
+ return rmClient.registerApplicationMaster(request);
+ }
+
+ @Override
+ public AllocateResponse allocate(final AllocateRequest request)
+ throws YarnException, IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Forwarding allocate request to the real YARN RM");
+ }
+ AllocateResponse allocateResponse = rmClient.allocate(request);
+ if (allocateResponse.getAMRMToken() != null) {
+ updateAMRMToken(allocateResponse.getAMRMToken());
+ }
+
+ return allocateResponse;
+ }
+
+ @Override
+ public FinishApplicationMasterResponse finishApplicationMaster(
+ final FinishApplicationMasterRequest request) throws YarnException,
+ IOException {
+ LOG.info("Forwarding finish application request to "
+ + "the real YARN Resource Manager");
+ return rmClient.finishApplicationMaster(request);
+ }
+
+ @Override
+ public void setNextInterceptor(RequestInterceptor next) {
+ throw new YarnRuntimeException(
+ "setNextInterceptor is being called on DefaultRequestInterceptor,"
+ + "which should be the last one in the chain "
+ + "Check if the interceptor pipeline configuration is correct");
+ }
+
+ private void updateAMRMToken(Token token) throws IOException {
+ org.apache.hadoop.security.token.Token amrmToken =
+ new org.apache.hadoop.security.token.Token(
+ token.getIdentifier().array(), token.getPassword().array(),
+ new Text(token.getKind()), new Text(token.getService()));
+ // Preserve the token service sent by the RM when adding the token
+ // to ensure we replace the previous token setup by the RM.
+ // Afterwards we can update the service address for the RPC layer.
+ user.addToken(amrmToken);
+ amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConf()));
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java
new file mode 100644
index 0000000..c74c88f
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+
+/**
+ * Defines the contract to be implemented by the request intercepter classes,
+ * that can be used to intercept and inspect messages sent from the application
+ * master to the resource manager.
+ */
+public interface RequestInterceptor extends ApplicationMasterProtocol,
+ Configurable {
+ /**
+ * This method is called for initializing the intercepter. This is guaranteed
+ * to be called only once in the lifetime of this instance.
+ *
+ * @param ctx
+ */
+ void init(AMRMProxyApplicationContext ctx);
+
+ /**
+ * This method is called to release the resources held by the intercepter.
+ * This will be called when the application pipeline is being destroyed. The
+ * concrete implementations should dispose the resources and forward the
+ * request to the next intercepter, if any.
+ */
+ void shutdown();
+
+ /**
+ * Sets the next intercepter in the pipeline. The concrete implementation of
+ * this interface should always pass the request to the nextInterceptor after
+ * inspecting the message. The last intercepter in the chain is responsible to
+ * send the messages to the resource manager service and so the last
+ * intercepter will not receive this method call.
+ *
+ * @param nextInterceptor
+ */
+ void setNextInterceptor(RequestInterceptor nextInterceptor);
+
+ /**
+ * Returns the next intercepter in the chain.
+ *
+ * @return the next intercepter in the chain
+ */
+ RequestInterceptor getNextInterceptor();
+
+ /**
+ * Returns the context.
+ *
+ * @return the context
+ */
+ AMRMProxyApplicationContext getApplicationContext();
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 68c7f2c..a658e53 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -42,7 +42,6 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
@@ -51,7 +50,6 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
-import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.service.Service;
@@ -92,6 +90,7 @@
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
@@ -103,6 +102,7 @@
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
+import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerInitEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
@@ -135,6 +135,7 @@
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
@@ -172,6 +173,8 @@
private boolean serviceStopped = false;
private final ReadLock readLock;
private final WriteLock writeLock;
+ private AMRMProxyService amrmProxyService;
+ private boolean amrmProxyEnabled = false;
private long waitForContainersOnShutdownMillis;
@@ -235,6 +238,20 @@ public void serviceInit(Configuration conf) throws Exception {
addService(sharedCacheUploader);
dispatcher.register(SharedCacheUploadEventType.class, sharedCacheUploader);
+ amrmProxyEnabled =
+ conf.getBoolean(YarnConfiguration.AMRM_PROXY_ENABLED,
+ YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED);
+
+ if (amrmProxyEnabled) {
+ LOG.info("AMRMProxyService is enabled. "
+ + "All the AM->RM requests will be intercepted by the proxy");
+ this.amrmProxyService =
+ new AMRMProxyService(this.context, this.dispatcher);
+ addService(this.amrmProxyService);
+ } else {
+ LOG.info("AMRMProxyService is disabled");
+ }
+
waitForContainersOnShutdownMillis =
conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS) +
@@ -246,6 +263,10 @@ public void serviceInit(Configuration conf) throws Exception {
recover();
}
+ public boolean isARMRMProxyEnabled() {
+ return amrmProxyEnabled;
+ }
+
@SuppressWarnings("unchecked")
private void recover() throws IOException, URISyntaxException {
NMStateStoreService stateStore = context.getNMStateStore();
@@ -314,7 +335,8 @@ private void recoverContainer(RecoveredContainerState rcs)
+ " with exit code " + rcs.getExitCode());
if (context.getApplications().containsKey(appId)) {
- Credentials credentials = parseCredentials(launchContext);
+ Credentials credentials =
+ YarnServerSecurityUtils.parseCredentials(launchContext);
Container container = new ContainerImpl(getConfig(), dispatcher,
context.getNMStateStore(), req.getContainerLaunchContext(),
credentials, metrics, token, rcs.getStatus(), rcs.getExitCode(),
@@ -737,8 +759,17 @@ protected void authorizeStartRequest(NMTokenIdentifier nmTokenIdentifier,
verifyAndGetContainerTokenIdentifier(request.getContainerToken(),
containerTokenIdentifier);
containerId = containerTokenIdentifier.getContainerID();
- startContainerInternal(nmTokenIdentifier, containerTokenIdentifier,
- request);
+
+ // Initialize the AMRMProxy service instance only if the container is of
+ // type AM and if the AMRMProxy service is enabled
+ if (isARMRMProxyEnabled()
+ && containerTokenIdentifier.getContainerType().equals(
+ ContainerType.APPLICATION_MASTER)) {
+ this.amrmProxyService.processApplicationStartRequest(request);
+ }
+
+ startContainerInternal(nmTokenIdentifier,
+ containerTokenIdentifier, request);
succeededContainers.add(containerId);
} catch (YarnException e) {
failedContainers.put(containerId, SerializedException.newInstance(e));
@@ -751,7 +782,7 @@ protected void authorizeStartRequest(NMTokenIdentifier nmTokenIdentifier,
}
return StartContainersResponse.newInstance(getAuxServiceMetaData(),
- succeededContainers, failedContainers);
+ succeededContainers, failedContainers);
}
private ContainerManagerApplicationProto buildAppProto(ApplicationId appId,
@@ -844,7 +875,8 @@ private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
}
}
- Credentials credentials = parseCredentials(launchContext);
+ Credentials credentials =
+ YarnServerSecurityUtils.parseCredentials(launchContext);
Container container =
new ContainerImpl(getConfig(), this.dispatcher,
@@ -928,27 +960,6 @@ protected void updateNMTokenIdentifier(NMTokenIdentifier nmTokenIdentifier)
nmTokenIdentifier);
}
- private Credentials parseCredentials(ContainerLaunchContext launchContext)
- throws IOException {
- Credentials credentials = new Credentials();
- // //////////// Parse credentials
- ByteBuffer tokens = launchContext.getTokens();
-
- if (tokens != null) {
- DataInputByteBuffer buf = new DataInputByteBuffer();
- tokens.rewind();
- buf.reset(tokens);
- credentials.readTokenStorageStream(buf);
- if (LOG.isDebugEnabled()) {
- for (Token extends TokenIdentifier> tk : credentials.getAllTokens()) {
- LOG.debug(tk.getService() + " = " + tk.toString());
- }
- }
- }
- // //////////// End of parsing credentials
- return credentials;
- }
-
/**
* Stop a list of containers running on this NodeManager.
*/
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
new file mode 100644
index 0000000..964379a
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
@@ -0,0 +1,677 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
+import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
+import org.apache.hadoop.yarn.server.nodemanager.NodeResourceMonitor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+/**
+ * Base class for all the AMRMProxyService test cases. It provides utility
+ * methods that can be used by the concrete test case classes
+ *
+ */
+public abstract class BaseAMRMProxyTest {
+ private static final Log LOG = LogFactory
+ .getLog(BaseAMRMProxyTest.class);
+ /**
+ * The AMRMProxyService instance that will be used by all the test cases
+ */
+ private MockAMRMProxyService amrmProxyService;
+ /**
+ * Thread pool used for asynchronous operations
+ */
+ private static ExecutorService threadpool = Executors
+ .newCachedThreadPool();
+ private Configuration conf;
+ private AsyncDispatcher dispatcher;
+
+ protected MockAMRMProxyService getAMRMProxyService() {
+ Assert.assertNotNull(this.amrmProxyService);
+ return this.amrmProxyService;
+ }
+
+ @Before
+ public void setUp() {
+ this.conf = new YarnConfiguration();
+ this.conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
+ String mockPassThroughInterceptorClass =
+ PassThroughRequestInterceptor.class.getName();
+
+ // Create a request intercepter pipeline for testing. The last one in the
+ // chain will call the mock resource manager. The others in the chain will
+ // simply forward it to the next one in the chain
+ this.conf.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE,
+ mockPassThroughInterceptorClass + ","
+ + mockPassThroughInterceptorClass + ","
+ + mockPassThroughInterceptorClass + ","
+ + MockRequestInterceptor.class.getName());
+
+ this.dispatcher = new AsyncDispatcher();
+ this.dispatcher.init(conf);
+ this.dispatcher.start();
+ this.amrmProxyService = createAndStartAMRMProxyService();
+ }
+
+ @After
+ public void tearDown() {
+ amrmProxyService.stop();
+ amrmProxyService = null;
+ this.dispatcher.stop();
+ }
+
+ protected ExecutorService getThreadPool() {
+ return threadpool;
+ }
+
+ protected MockAMRMProxyService createAndStartAMRMProxyService() {
+ MockAMRMProxyService svc =
+ new MockAMRMProxyService(new NullContext(), dispatcher);
+ svc.init(conf);
+ svc.start();
+ return svc;
+ }
+
+ /**
+ * This helper method will invoke the specified function in parallel for each
+ * end point in the specified list using a thread pool and return the
+ * responses received from the function. It implements the logic required for
+ * dispatching requests in parallel and waiting for the responses. If any of
+ * the function call fails or times out, it will ignore and proceed with the
+ * rest. So the responses returned can be less than the number of end points
+ * specified
+ *
+ * @param testContext
+ * @param func
+ * @return
+ */
+ protected List runInParallel(List testContexts,
+ final Function func) {
+ ExecutorCompletionService completionService =
+ new ExecutorCompletionService(this.getThreadPool());
+ LOG.info("Sending requests to endpoints asynchronously. Number of test contexts="
+ + testContexts.size());
+ for (int index = 0; index < testContexts.size(); index++) {
+ final T testContext = testContexts.get(index);
+
+ LOG.info("Adding request to threadpool for test context: "
+ + testContext.toString());
+
+ completionService.submit(new Callable() {
+ @Override
+ public R call() throws Exception {
+ LOG.info("Sending request. Test context:"
+ + testContext.toString());
+
+ R response = null;
+ try {
+ response = func.invoke(testContext);
+ LOG.info("Successfully sent request for context: "
+ + testContext.toString());
+ } catch (Throwable ex) {
+ LOG.error("Failed to process request for context: "
+ + testContext);
+ response = null;
+ }
+
+ return response;
+ }
+ });
+ }
+
+ ArrayList responseList = new ArrayList();
+ LOG.info("Waiting for responses from endpoints. Number of contexts="
+ + testContexts.size());
+ for (int i = 0; i < testContexts.size(); ++i) {
+ try {
+ final Future future = completionService.take();
+ final R response = future.get(3000, TimeUnit.MILLISECONDS);
+ responseList.add(response);
+ } catch (Throwable e) {
+ LOG.error("Failed to process request " + e.getMessage());
+ }
+ }
+
+ return responseList;
+ }
+
+ /**
+ * Helper method to register an application master using specified testAppId
+ * as the application identifier and return the response
+ *
+ * @param testAppId
+ * @return
+ * @throws Exception
+ * @throws YarnException
+ * @throws IOException
+ */
+ protected RegisterApplicationMasterResponse registerApplicationMaster(
+ final int testAppId) throws Exception, YarnException, IOException {
+ final ApplicationUserInfo ugi = getApplicationUserInfo(testAppId);
+
+ return ugi
+ .getUser()
+ .doAs(
+ new PrivilegedExceptionAction() {
+ @Override
+ public RegisterApplicationMasterResponse run()
+ throws Exception {
+ getAMRMProxyService().initApp(
+ ugi.getAppAttemptId(),
+ ugi.getUser().getUserName());
+
+ final RegisterApplicationMasterRequest req =
+ Records
+ .newRecord(RegisterApplicationMasterRequest.class);
+ req.setHost(Integer.toString(testAppId));
+ req.setRpcPort(testAppId);
+ req.setTrackingUrl("");
+
+ RegisterApplicationMasterResponse response =
+ getAMRMProxyService().registerApplicationMaster(req);
+ return response;
+ }
+ });
+ }
+
+ /**
+ * Helper method that can be used to register multiple application masters in
+ * parallel to the specified RM end points
+ *
+ * @param testContexts - used to identify the requests
+ * @return
+ */
+ protected List> registerApplicationMastersInParallel(
+ final ArrayList testContexts) {
+ List> responses =
+ runInParallel(testContexts,
+ new Function>() {
+ @Override
+ public RegisterApplicationMasterResponseInfo invoke(
+ T testContext) {
+ RegisterApplicationMasterResponseInfo response = null;
+ try {
+ int index = testContexts.indexOf(testContext);
+ response =
+ new RegisterApplicationMasterResponseInfo(
+ registerApplicationMaster(index), testContext);
+ Assert.assertNotNull(response.getResponse());
+ Assert.assertEquals(Integer.toString(index), response
+ .getResponse().getQueue());
+
+ LOG.info("Sucessfully registered application master with test context: "
+ + testContext);
+ } catch (Throwable ex) {
+ response = null;
+ LOG.error("Failed to register application master with test context: "
+ + testContext);
+ }
+
+ return response;
+ }
+ });
+
+ Assert.assertEquals(
+ "Number of responses received does not match with request",
+ testContexts.size(), responses.size());
+
+ Set contextResponses = new TreeSet();
+ for (RegisterApplicationMasterResponseInfo item : responses) {
+ contextResponses.add(item.getTestContext());
+ }
+
+ for (T ep : testContexts) {
+ Assert.assertTrue(contextResponses.contains(ep));
+ }
+
+ return responses;
+ }
+
+ /**
+ * Unregisters the application master for specified application id
+ *
+ * @param appId
+ * @param status
+ * @return
+ * @throws Exception
+ * @throws YarnException
+ * @throws IOException
+ */
+ protected FinishApplicationMasterResponse finishApplicationMaster(
+ final int appId, final FinalApplicationStatus status)
+ throws Exception, YarnException, IOException {
+
+ final ApplicationUserInfo ugi = getApplicationUserInfo(appId);
+
+ return ugi.getUser().doAs(
+ new PrivilegedExceptionAction() {
+ @Override
+ public FinishApplicationMasterResponse run() throws Exception {
+ final FinishApplicationMasterRequest req =
+ Records.newRecord(FinishApplicationMasterRequest.class);
+ req.setDiagnostics("");
+ req.setTrackingUrl("");
+ req.setFinalApplicationStatus(status);
+
+ FinishApplicationMasterResponse response =
+ getAMRMProxyService().finishApplicationMaster(req);
+
+ getAMRMProxyService().stopApp(
+ ugi.getAppAttemptId().getApplicationId());
+
+ return response;
+ }
+ });
+ }
+
+ protected List> finishApplicationMastersInParallel(
+ final ArrayList testContexts) {
+ List> responses =
+ runInParallel(testContexts,
+ new Function>() {
+ @Override
+ public FinishApplicationMasterResponseInfo invoke(
+ T testContext) {
+ FinishApplicationMasterResponseInfo response = null;
+ try {
+ response =
+ new FinishApplicationMasterResponseInfo(
+ finishApplicationMaster(
+ testContexts.indexOf(testContext),
+ FinalApplicationStatus.SUCCEEDED),
+ testContext);
+ Assert.assertNotNull(response.getResponse());
+
+ LOG.info("Sucessfully finished application master with test contexts: "
+ + testContext);
+ } catch (Throwable ex) {
+ response = null;
+ LOG.error("Failed to finish application master with test context: "
+ + testContext);
+ }
+
+ return response;
+ }
+ });
+
+ Assert.assertEquals(
+ "Number of responses received does not match with request",
+ testContexts.size(), responses.size());
+
+ Set contextResponses = new TreeSet();
+ for (FinishApplicationMasterResponseInfo item : responses) {
+ Assert.assertNotNull(item);
+ Assert.assertNotNull(item.getResponse());
+ contextResponses.add(item.getTestContext());
+ }
+
+ for (T ep : testContexts) {
+ Assert.assertTrue(contextResponses.contains(ep));
+ }
+
+ return responses;
+ }
+
+ protected AllocateResponse allocate(final int testAppId)
+ throws Exception, YarnException, IOException {
+ final AllocateRequest req = Records.newRecord(AllocateRequest.class);
+ req.setResponseId(testAppId);
+ return allocate(testAppId, req);
+ }
+
+ protected AllocateResponse allocate(final int testAppId,
+ final AllocateRequest request) throws Exception, YarnException,
+ IOException {
+
+ final ApplicationUserInfo ugi = getApplicationUserInfo(testAppId);
+
+ return ugi.getUser().doAs(
+ new PrivilegedExceptionAction() {
+ @Override
+ public AllocateResponse run() throws Exception {
+ AllocateResponse response =
+ getAMRMProxyService().allocate(request);
+ return response;
+ }
+ });
+ }
+
+ protected ApplicationUserInfo getApplicationUserInfo(final int testAppId) {
+ final ApplicationAttemptId attemptId =
+ getApplicationAttemptId(testAppId);
+
+ UserGroupInformation ugi =
+ UserGroupInformation.createRemoteUser(attemptId.toString());
+ AMRMTokenIdentifier token = new AMRMTokenIdentifier(attemptId, 1);
+ ugi.addTokenIdentifier(token);
+ return new ApplicationUserInfo(ugi, attemptId);
+ }
+
+ protected List createResourceRequests(String[] hosts,
+ int memory, int vCores, int priority, int containers)
+ throws Exception {
+ return createResourceRequests(hosts, memory, vCores, priority,
+ containers, null);
+ }
+
+ protected List createResourceRequests(String[] hosts,
+ int memory, int vCores, int priority, int containers,
+ String labelExpression) throws Exception {
+ List reqs = new ArrayList();
+ for (String host : hosts) {
+ ResourceRequest hostReq =
+ createResourceRequest(host, memory, vCores, priority,
+ containers, labelExpression);
+ reqs.add(hostReq);
+ ResourceRequest rackReq =
+ createResourceRequest("/default-rack", memory, vCores, priority,
+ containers, labelExpression);
+ reqs.add(rackReq);
+ }
+
+ ResourceRequest offRackReq =
+ createResourceRequest(ResourceRequest.ANY, memory, vCores,
+ priority, containers, labelExpression);
+ reqs.add(offRackReq);
+ return reqs;
+ }
+
+ protected ResourceRequest createResourceRequest(String resource,
+ int memory, int vCores, int priority, int containers)
+ throws Exception {
+ return createResourceRequest(resource, memory, vCores, priority,
+ containers, null);
+ }
+
+ protected ResourceRequest createResourceRequest(String resource,
+ int memory, int vCores, int priority, int containers,
+ String labelExpression) throws Exception {
+ ResourceRequest req = Records.newRecord(ResourceRequest.class);
+ req.setResourceName(resource);
+ req.setNumContainers(containers);
+ Priority pri = Records.newRecord(Priority.class);
+ pri.setPriority(priority);
+ req.setPriority(pri);
+ Resource capability = Records.newRecord(Resource.class);
+ capability.setMemory(memory);
+ capability.setVirtualCores(vCores);
+ req.setCapability(capability);
+ if (labelExpression != null) {
+ req.setNodeLabelExpression(labelExpression);
+ }
+ return req;
+ }
+
+ /**
+ * Returns an ApplicationId with the specified identifier
+ *
+ * @param testAppId
+ * @return
+ */
+ protected ApplicationId getApplicationId(int testAppId) {
+ return ApplicationId.newInstance(123456, testAppId);
+ }
+
+ /**
+ * Return an instance of ApplicationAttemptId using specified identifier. This
+ * identifier will be used for the ApplicationId too.
+ *
+ * @param testAppId
+ * @return
+ */
+ protected ApplicationAttemptId getApplicationAttemptId(int testAppId) {
+ return ApplicationAttemptId.newInstance(getApplicationId(testAppId),
+ testAppId);
+ }
+
+ /**
+ * Return an instance of ApplicationAttemptId using specified identifier and
+ * application id
+ *
+ * @param testAppId
+ * @return
+ */
+ protected ApplicationAttemptId getApplicationAttemptId(int testAppId,
+ ApplicationId appId) {
+ return ApplicationAttemptId.newInstance(appId, testAppId);
+ }
+
+ protected static class RegisterApplicationMasterResponseInfo {
+ private RegisterApplicationMasterResponse response;
+ private T testContext;
+
+ RegisterApplicationMasterResponseInfo(
+ RegisterApplicationMasterResponse response, T testContext) {
+ this.response = response;
+ this.testContext = testContext;
+ }
+
+ public RegisterApplicationMasterResponse getResponse() {
+ return response;
+ }
+
+ public T getTestContext() {
+ return testContext;
+ }
+ }
+
+ protected static class FinishApplicationMasterResponseInfo {
+ private FinishApplicationMasterResponse response;
+ private T testContext;
+
+ FinishApplicationMasterResponseInfo(
+ FinishApplicationMasterResponse response, T testContext) {
+ this.response = response;
+ this.testContext = testContext;
+ }
+
+ public FinishApplicationMasterResponse getResponse() {
+ return response;
+ }
+
+ public T getTestContext() {
+ return testContext;
+ }
+ }
+
+ protected static class ApplicationUserInfo {
+ private UserGroupInformation user;
+ private ApplicationAttemptId attemptId;
+
+ ApplicationUserInfo(UserGroupInformation user,
+ ApplicationAttemptId attemptId) {
+ this.user = user;
+ this.attemptId = attemptId;
+ }
+
+ public UserGroupInformation getUser() {
+ return this.user;
+ }
+
+ public ApplicationAttemptId getAppAttemptId() {
+ return this.attemptId;
+ }
+ }
+
+ protected static class MockAMRMProxyService extends AMRMProxyService {
+ public MockAMRMProxyService(Context nmContext,
+ AsyncDispatcher dispatcher) {
+ super(nmContext, dispatcher);
+ }
+
+ /**
+ * This method is used by the test code to initialize the pipeline. In the
+ * actual service, the initialization is called by the
+ * ContainerManagerImpl::StartContainers method
+ *
+ * @param applicationId
+ * @param user
+ */
+ public void initApp(ApplicationAttemptId applicationId, String user) {
+ super.initializePipeline(applicationId, user, null, null);
+ }
+
+ public void stopApp(ApplicationId applicationId) {
+ super.stopApplication(applicationId);
+ }
+ }
+
+ /**
+ * The Function interface is used for passing method pointers that can be
+ * invoked asynchronously at a later point.
+ */
+ protected interface Function {
+ public R invoke(T input);
+ }
+
+ protected class NullContext implements Context {
+
+ @Override
+ public NodeId getNodeId() {
+ return null;
+ }
+
+ @Override
+ public int getHttpPort() {
+ return 0;
+ }
+
+ @Override
+ public ConcurrentMap getApplications() {
+ return null;
+ }
+
+ @Override
+ public Map getSystemCredentialsForApps() {
+ return null;
+ }
+
+ @Override
+ public ConcurrentMap getContainers() {
+ return null;
+ }
+
+ @Override
+ public NMContainerTokenSecretManager getContainerTokenSecretManager() {
+ return null;
+ }
+
+ @Override
+ public NMTokenSecretManagerInNM getNMTokenSecretManager() {
+ return null;
+ }
+
+ @Override
+ public NodeHealthStatus getNodeHealthStatus() {
+ return null;
+ }
+
+ @Override
+ public ContainerManagementProtocol getContainerManager() {
+ return null;
+ }
+
+ @Override
+ public LocalDirsHandlerService getLocalDirsHandler() {
+ return null;
+ }
+
+ @Override
+ public ApplicationACLsManager getApplicationACLsManager() {
+ return null;
+ }
+
+ @Override
+ public NMStateStoreService getNMStateStore() {
+ return null;
+ }
+
+ @Override
+ public boolean getDecommissioned() {
+ return false;
+ }
+
+ @Override
+ public void setDecommissioned(boolean isDecommissioned) {
+ }
+
+ @Override
+ public ConcurrentLinkedQueue getLogAggregationStatusForApps() {
+ return null;
+ }
+
+ @Override
+ public NodeResourceMonitor getNodeResourceMonitor() {
+ return null;
+ }
+
+ }
+}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java
new file mode 100644
index 0000000..c962f97
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.io.IOException;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+public class MockRequestInterceptor extends AbstractRequestInterceptor {
+
+ private MockResourceManagerFacade mockRM;
+
+ public MockRequestInterceptor() {
+ }
+
+ public void init(AMRMProxyApplicationContext appContext) {
+ super.init(appContext);
+ mockRM =
+ new MockResourceManagerFacade(new YarnConfiguration(
+ super.getConf()), 0);
+ }
+
+ @Override
+ public RegisterApplicationMasterResponse registerApplicationMaster(
+ RegisterApplicationMasterRequest request) throws YarnException,
+ IOException {
+ return mockRM.registerApplicationMaster(request);
+ }
+
+ @Override
+ public FinishApplicationMasterResponse finishApplicationMaster(
+ FinishApplicationMasterRequest request) throws YarnException,
+ IOException {
+ return mockRM.finishApplicationMaster(request);
+ }
+
+ @Override
+ public AllocateResponse allocate(AllocateRequest request)
+ throws YarnException, IOException {
+ return mockRM.allocate(request);
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java
new file mode 100644
index 0000000..7573a7a
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java
@@ -0,0 +1,469 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.directory.api.util.Strings;
+import org.apache.directory.api.util.exception.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
+import org.apache.hadoop.yarn.api.records.AMCommand;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease;
+import org.apache.hadoop.yarn.api.records.ContainerResourceIncrease;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
+import org.mortbay.log.Log;
+
+/**
+ * Mock Resource Manager facade implementation that exposes all the methods
+ * implemented by the YARN RM. The behavior and the values returned by this mock
+ * implementation is expected by the unit test cases. So please change the
+ * implementation with care.
+ */
+public class MockResourceManagerFacade implements
+ ApplicationMasterProtocol, ApplicationClientProtocol {
+
+ private HashMap> applicationContainerIdMap =
+ new HashMap>();
+ private HashMap allocatedContainerMap =
+ new HashMap();
+ private AtomicInteger containerIndex = new AtomicInteger(0);
+ private Configuration conf;
+
+ public MockResourceManagerFacade(Configuration conf,
+ int startContainerIndex) {
+ this.conf = conf;
+ this.containerIndex.set(startContainerIndex);
+ }
+
+ private static String getAppIdentifier() throws IOException {
+ AMRMTokenIdentifier result = null;
+ UserGroupInformation remoteUgi = UserGroupInformation.getCurrentUser();
+ Set tokenIds = remoteUgi.getTokenIdentifiers();
+ for (TokenIdentifier tokenId : tokenIds) {
+ if (tokenId instanceof AMRMTokenIdentifier) {
+ result = (AMRMTokenIdentifier) tokenId;
+ break;
+ }
+ }
+ return result != null ? result.getApplicationAttemptId().toString()
+ : "";
+ }
+
+ @Override
+ public RegisterApplicationMasterResponse registerApplicationMaster(
+ RegisterApplicationMasterRequest request) throws YarnException,
+ IOException {
+ String amrmToken = getAppIdentifier();
+ Log.info("Registering application attempt: " + amrmToken);
+
+ synchronized (applicationContainerIdMap) {
+ Assert.assertFalse("The application id is already registered: "
+ + amrmToken, applicationContainerIdMap.containsKey(amrmToken));
+ // Keep track of the containers that are returned to this application
+ applicationContainerIdMap.put(amrmToken,
+ new ArrayList());
+ }
+
+ return RegisterApplicationMasterResponse.newInstance(null, null, null,
+ null, null, request.getHost(), null);
+ }
+
+ @Override
+ public FinishApplicationMasterResponse finishApplicationMaster(
+ FinishApplicationMasterRequest request) throws YarnException,
+ IOException {
+ String amrmToken = getAppIdentifier();
+ Log.info("Finishing application attempt: " + amrmToken);
+
+ synchronized (applicationContainerIdMap) {
+ // Remove the containers that were being tracked for this application
+ Assert.assertTrue("The application id is NOT registered: "
+ + amrmToken, applicationContainerIdMap.containsKey(amrmToken));
+ List ids = applicationContainerIdMap.remove(amrmToken);
+ for (ContainerId c : ids) {
+ allocatedContainerMap.remove(c);
+ }
+ }
+
+ return FinishApplicationMasterResponse
+ .newInstance(request.getFinalApplicationStatus() == FinalApplicationStatus.SUCCEEDED ? true
+ : false);
+ }
+
+ protected ApplicationId getApplicationId(int id) {
+ return ApplicationId.newInstance(12345, id);
+ }
+
+ protected ApplicationAttemptId getApplicationAttemptId(int id) {
+ return ApplicationAttemptId.newInstance(getApplicationId(id), 1);
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public AllocateResponse allocate(AllocateRequest request)
+ throws YarnException, IOException {
+ if (request.getAskList() != null && request.getAskList().size() > 0
+ && request.getReleaseList() != null
+ && request.getReleaseList().size() > 0) {
+ Assert.fail("The mock RM implementation does not support receiving "
+ + "askList and releaseList in the same heartbeat");
+ }
+
+ String amrmToken = getAppIdentifier();
+
+ ArrayList containerList = new ArrayList();
+ if (request.getAskList() != null) {
+ for (ResourceRequest rr : request.getAskList()) {
+ for (int i = 0; i < rr.getNumContainers(); i++) {
+ ContainerId containerId =
+ ContainerId.newInstance(getApplicationAttemptId(1),
+ containerIndex.incrementAndGet());
+ Container container = Records.newRecord(Container.class);
+ container.setId(containerId);
+ container.setPriority(rr.getPriority());
+
+ // We don't use the node for running containers in the test cases. So
+ // it is OK to hard code it to some dummy value
+ NodeId nodeId =
+ NodeId.newInstance(
+ !Strings.isEmpty(rr.getResourceName()) ? rr
+ .getResourceName() : "dummy", 1000);
+ container.setNodeId(nodeId);
+ container.setResource(rr.getCapability());
+ containerList.add(container);
+
+ synchronized (applicationContainerIdMap) {
+ // Keep track of the containers returned to this application. We
+ // will need it in future
+ Assert.assertTrue(
+ "The application id is Not registered before allocate(): "
+ + amrmToken,
+ applicationContainerIdMap.containsKey(amrmToken));
+ List ids =
+ applicationContainerIdMap.get(amrmToken);
+ ids.add(containerId);
+ this.allocatedContainerMap.put(containerId, container);
+ }
+ }
+ }
+ }
+
+ if (request.getReleaseList() != null
+ && request.getReleaseList().size() > 0) {
+ Log.info("Releasing containers: " + request.getReleaseList().size());
+ synchronized (applicationContainerIdMap) {
+ Assert.assertTrue(
+ "The application id is not registered before allocate(): "
+ + amrmToken,
+ applicationContainerIdMap.containsKey(amrmToken));
+ List ids = applicationContainerIdMap.get(amrmToken);
+
+ for (ContainerId id : request.getReleaseList()) {
+ boolean found = false;
+ for (ContainerId c : ids) {
+ if (c.equals(id)) {
+ found = true;
+ break;
+ }
+ }
+
+ Assert.assertTrue(
+ "ContainerId " + id
+ + " being released is not valid for application: "
+ + conf.get("AMRMTOKEN"), found);
+
+ ids.remove(id);
+
+ // Return the released container back to the AM with new fake Ids. The
+ // test case does not care about the IDs. The IDs are faked because
+ // otherwise the LRM will throw duplication identifier exception. This
+ // returning of fake containers is ONLY done for testing purpose - for
+ // the test code to get confirmation that the sub-cluster resource
+ // managers received the release request
+ ContainerId fakeContainerId =
+ ContainerId.newInstance(getApplicationAttemptId(1),
+ containerIndex.incrementAndGet());
+ Container fakeContainer = allocatedContainerMap.get(id);
+ fakeContainer.setId(fakeContainerId);
+ containerList.add(fakeContainer);
+ }
+ }
+ }
+
+ Log.info("Allocating containers: " + containerList.size()
+ + " for application attempt: " + conf.get("AMRMTOKEN"));
+ return AllocateResponse.newInstance(0,
+ new ArrayList(), containerList,
+ new ArrayList(), null, AMCommand.AM_RESYNC, 1, null,
+ new ArrayList(),
+ new ArrayList(),
+ new ArrayList());
+ }
+
+ @Override
+ public GetApplicationReportResponse getApplicationReport(
+ GetApplicationReportRequest request) throws YarnException,
+ IOException {
+
+ GetApplicationReportResponse response =
+ Records.newRecord(GetApplicationReportResponse.class);
+ ApplicationReport report = Records.newRecord(ApplicationReport.class);
+ report.setYarnApplicationState(YarnApplicationState.ACCEPTED);
+ report.setApplicationId(request.getApplicationId());
+ report.setCurrentApplicationAttemptId(ApplicationAttemptId
+ .newInstance(request.getApplicationId(), 1));
+ response.setApplicationReport(report);
+ return response;
+ }
+
+ @Override
+ public GetApplicationAttemptReportResponse getApplicationAttemptReport(
+ GetApplicationAttemptReportRequest request) throws YarnException,
+ IOException {
+ GetApplicationAttemptReportResponse response =
+ Records.newRecord(GetApplicationAttemptReportResponse.class);
+ ApplicationAttemptReport report =
+ Records.newRecord(ApplicationAttemptReport.class);
+ report.setApplicationAttemptId(request.getApplicationAttemptId());
+ report
+ .setYarnApplicationAttemptState(YarnApplicationAttemptState.LAUNCHED);
+ response.setApplicationAttemptReport(report);
+ return response;
+ }
+
+ @Override
+ public GetNewApplicationResponse getNewApplication(
+ GetNewApplicationRequest request) throws YarnException, IOException {
+ return null;
+ }
+
+ @Override
+ public SubmitApplicationResponse submitApplication(
+ SubmitApplicationRequest request) throws YarnException, IOException {
+ return null;
+ }
+
+ @Override
+ public KillApplicationResponse forceKillApplication(
+ KillApplicationRequest request) throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public GetClusterMetricsResponse getClusterMetrics(
+ GetClusterMetricsRequest request) throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public GetApplicationsResponse getApplications(
+ GetApplicationsRequest request) throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public GetClusterNodesResponse getClusterNodes(
+ GetClusterNodesRequest request) throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
+ throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public GetQueueUserAclsInfoResponse getQueueUserAcls(
+ GetQueueUserAclsInfoRequest request) throws YarnException,
+ IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public GetDelegationTokenResponse getDelegationToken(
+ GetDelegationTokenRequest request) throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public RenewDelegationTokenResponse renewDelegationToken(
+ RenewDelegationTokenRequest request) throws YarnException,
+ IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public CancelDelegationTokenResponse cancelDelegationToken(
+ CancelDelegationTokenRequest request) throws YarnException,
+ IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
+ MoveApplicationAcrossQueuesRequest request) throws YarnException,
+ IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public GetApplicationAttemptsResponse getApplicationAttempts(
+ GetApplicationAttemptsRequest request) throws YarnException,
+ IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public GetContainerReportResponse getContainerReport(
+ GetContainerReportRequest request) throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public GetContainersResponse getContainers(GetContainersRequest request)
+ throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public ReservationSubmissionResponse submitReservation(
+ ReservationSubmissionRequest request) throws YarnException,
+ IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public ReservationUpdateResponse updateReservation(
+ ReservationUpdateRequest request) throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public ReservationDeleteResponse deleteReservation(
+ ReservationDeleteRequest request) throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public GetNodesToLabelsResponse getNodeToLabels(
+ GetNodesToLabelsRequest request) throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public GetClusterNodeLabelsResponse getClusterNodeLabels(
+ GetClusterNodeLabelsRequest request) throws YarnException,
+ IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public GetLabelsToNodesResponse getLabelsToNodes(
+ GetLabelsToNodesRequest request) throws YarnException, IOException {
+ return null;
+ }
+
+ @Override
+ public UpdateApplicationPriorityResponse updateApplicationPriority(
+ UpdateApplicationPriorityRequest request) throws YarnException,
+ IOException {
+ return null;
+ }
+}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/PassThroughRequestInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/PassThroughRequestInterceptor.java
new file mode 100644
index 0000000..97a844e
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/PassThroughRequestInterceptor.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.io.IOException;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * Mock intercepter that does not do anything other than forwarding it to the
+ * next intercepter in the chain
+ *
+ */
+public class PassThroughRequestInterceptor extends
+ AbstractRequestInterceptor {
+
+ @Override
+ public RegisterApplicationMasterResponse registerApplicationMaster(
+ RegisterApplicationMasterRequest request) throws YarnException,
+ IOException {
+ return getNextInterceptor().registerApplicationMaster(request);
+ }
+
+ @Override
+ public FinishApplicationMasterResponse finishApplicationMaster(
+ FinishApplicationMasterRequest request) throws YarnException,
+ IOException {
+ return getNextInterceptor().finishApplicationMaster(request);
+ }
+
+ @Override
+ public AllocateResponse allocate(AllocateRequest request)
+ throws YarnException, IOException {
+ return getNextInterceptor().allocate(request);
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java
new file mode 100644
index 0000000..69b913a
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java
@@ -0,0 +1,484 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestAMRMProxyService extends BaseAMRMProxyTest {
+
+ private static final Log LOG = LogFactory
+ .getLog(TestAMRMProxyService.class);
+
+ /**
+ * Test if the pipeline is created properly.
+ */
+ @Test
+ public void testRequestInterceptorChainCreation() throws Exception {
+ RequestInterceptor root =
+ super.getAMRMProxyService().createRequestInterceptorChain();
+ int index = 0;
+ while (root != null) {
+ switch (index) {
+ case 0:
+ case 1:
+ case 2:
+ Assert.assertEquals(PassThroughRequestInterceptor.class.getName(),
+ root.getClass().getName());
+ break;
+ case 3:
+ Assert.assertEquals(MockRequestInterceptor.class.getName(), root
+ .getClass().getName());
+ break;
+ }
+
+ root = root.getNextInterceptor();
+ index++;
+ }
+
+ Assert.assertEquals(
+ "The number of interceptors in chain does not match",
+ Integer.toString(4), Integer.toString(index));
+
+ }
+
+ /**
+ * Tests registration of a single application master.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testRegisterOneApplicationMaster() throws Exception {
+ // The testAppId identifier is used as host name and the mock resource
+ // manager return it as the queue name. Assert that we received the queue
+ // name
+ int testAppId = 1;
+ RegisterApplicationMasterResponse response1 =
+ registerApplicationMaster(testAppId);
+ Assert.assertNotNull(response1);
+ Assert.assertEquals(Integer.toString(testAppId), response1.getQueue());
+ }
+
+ /**
+ * Tests the registration of multiple application master serially one at a
+ * time.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testRegisterMulitpleApplicationMasters() throws Exception {
+ for (int testAppId = 0; testAppId < 3; testAppId++) {
+ RegisterApplicationMasterResponse response =
+ registerApplicationMaster(testAppId);
+ Assert.assertNotNull(response);
+ Assert
+ .assertEquals(Integer.toString(testAppId), response.getQueue());
+ }
+ }
+
+ /**
+ * Tests the registration of multiple application masters using multiple
+ * threads in parallel.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testRegisterMulitpleApplicationMastersInParallel()
+ throws Exception {
+ int numberOfRequests = 5;
+ ArrayList testContexts =
+ CreateTestRequestIdentifiers(numberOfRequests);
+ super.registerApplicationMastersInParallel(testContexts);
+ }
+
+ private ArrayList CreateTestRequestIdentifiers(
+ int numberOfRequests) {
+ ArrayList testContexts = new ArrayList();
+ LOG.info("Creating " + numberOfRequests + " contexts for testing");
+ for (int ep = 0; ep < numberOfRequests; ep++) {
+ testContexts.add("test-endpoint-" + Integer.toString(ep));
+ LOG.info("Created test context: " + testContexts.get(ep));
+ }
+ return testContexts;
+ }
+
+ @Test
+ public void testFinishOneApplicationMasterWithSuccess() throws Exception {
+ int testAppId = 1;
+ RegisterApplicationMasterResponse registerResponse =
+ registerApplicationMaster(testAppId);
+ Assert.assertNotNull(registerResponse);
+ Assert.assertEquals(Integer.toString(testAppId),
+ registerResponse.getQueue());
+
+ FinishApplicationMasterResponse finshResponse =
+ finishApplicationMaster(testAppId,
+ FinalApplicationStatus.SUCCEEDED);
+
+ Assert.assertNotNull(finshResponse);
+ Assert.assertEquals(true, finshResponse.getIsUnregistered());
+ }
+
+ @Test
+ public void testFinishOneApplicationMasterWithFailure() throws Exception {
+ int testAppId = 1;
+ RegisterApplicationMasterResponse registerResponse =
+ registerApplicationMaster(testAppId);
+ Assert.assertNotNull(registerResponse);
+ Assert.assertEquals(Integer.toString(testAppId),
+ registerResponse.getQueue());
+
+ FinishApplicationMasterResponse finshResponse =
+ finishApplicationMaster(testAppId, FinalApplicationStatus.FAILED);
+
+ Assert.assertNotNull(finshResponse);
+ Assert.assertEquals(false, finshResponse.getIsUnregistered());
+
+ try {
+ // Try to finish an application master that is already finished.
+ finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED);
+ Assert
+ .fail("The request to finish application master should have failed");
+ } catch (Throwable ex) {
+ // This is expected. So nothing required here.
+ LOG.info("Finish registration failed as expected because it was not registered");
+ }
+ }
+
+ @Test
+ public void testFinishInvalidApplicationMaster() throws Exception {
+ try {
+ // Try to finish an application master that was not registered.
+ finishApplicationMaster(4, FinalApplicationStatus.SUCCEEDED);
+ Assert
+ .fail("The request to finish application master should have failed");
+ } catch (Throwable ex) {
+ // This is expected. So nothing required here.
+ LOG.info("Finish registration failed as expected because it was not registered");
+ }
+ }
+
+ @Test
+ public void testFinishMulitpleApplicationMasters() throws Exception {
+ int numberOfRequests = 3;
+ for (int index = 0; index < numberOfRequests; index++) {
+ RegisterApplicationMasterResponse registerResponse =
+ registerApplicationMaster(index);
+ Assert.assertNotNull(registerResponse);
+ Assert.assertEquals(Integer.toString(index),
+ registerResponse.getQueue());
+ }
+
+ // Finish in reverse sequence
+ for (int index = numberOfRequests - 1; index >= 0; index--) {
+ FinishApplicationMasterResponse finshResponse =
+ finishApplicationMaster(index, FinalApplicationStatus.SUCCEEDED);
+
+ Assert.assertNotNull(finshResponse);
+ Assert.assertEquals(true, finshResponse.getIsUnregistered());
+
+ // Assert that the application has been removed from the collection
+ Assert.assertTrue(this.getAMRMProxyService()
+ .getPipelines().size() == index);
+ }
+
+ try {
+ // Try to finish an application master that is already finished.
+ finishApplicationMaster(1, FinalApplicationStatus.SUCCEEDED);
+ Assert
+ .fail("The request to finish application master should have failed");
+ } catch (Throwable ex) {
+ // This is expected. So nothing required here.
+ LOG.info("Finish registration failed as expected because it was not registered");
+ }
+
+ try {
+ // Try to finish an application master that was not registered.
+ finishApplicationMaster(4, FinalApplicationStatus.SUCCEEDED);
+ Assert
+ .fail("The request to finish application master should have failed");
+ } catch (Throwable ex) {
+ // This is expected. So nothing required here.
+ LOG.info("Finish registration failed as expected because it was not registered");
+ }
+ }
+
+ @Test
+ public void testFinishMulitpleApplicationMastersInParallel()
+ throws Exception {
+ int numberOfRequests = 5;
+ ArrayList testContexts = new ArrayList();
+ LOG.info("Creating " + numberOfRequests + " contexts for testing");
+ for (int i = 0; i < numberOfRequests; i++) {
+ testContexts.add("test-endpoint-" + Integer.toString(i));
+ LOG.info("Created test context: " + testContexts.get(i));
+
+ RegisterApplicationMasterResponse registerResponse =
+ registerApplicationMaster(i);
+ Assert.assertNotNull(registerResponse);
+ Assert
+ .assertEquals(Integer.toString(i), registerResponse.getQueue());
+ }
+
+ finishApplicationMastersInParallel(testContexts);
+ }
+
+ @Test
+ public void testAllocateRequestWithNullValues() throws Exception {
+ int testAppId = 1;
+ RegisterApplicationMasterResponse registerResponse =
+ registerApplicationMaster(testAppId);
+ Assert.assertNotNull(registerResponse);
+ Assert.assertEquals(Integer.toString(testAppId),
+ registerResponse.getQueue());
+
+ AllocateResponse allocateResponse = allocate(testAppId);
+ Assert.assertNotNull(allocateResponse);
+
+ FinishApplicationMasterResponse finshResponse =
+ finishApplicationMaster(testAppId,
+ FinalApplicationStatus.SUCCEEDED);
+
+ Assert.assertNotNull(finshResponse);
+ Assert.assertEquals(true, finshResponse.getIsUnregistered());
+ }
+
+ @Test
+ public void testAllocateRequestWithoutRegistering() throws Exception {
+
+ try {
+ // Try to allocate an application master without registering.
+ allocate(1);
+ Assert
+ .fail("The request to allocate application master should have failed");
+ } catch (Throwable ex) {
+ // This is expected. So nothing required here.
+ LOG.info("AllocateRequest failed as expected because AM was not registered");
+ }
+ }
+
+ @Test
+ public void testAllocateWithOneResourceRequest() throws Exception {
+ int testAppId = 1;
+ RegisterApplicationMasterResponse registerResponse =
+ registerApplicationMaster(testAppId);
+ Assert.assertNotNull(registerResponse);
+ getContainersAndAssert(testAppId, 1);
+ finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED);
+ }
+
+ @Test
+ public void testAllocateWithMultipleResourceRequest() throws Exception {
+ int testAppId = 1;
+ RegisterApplicationMasterResponse registerResponse =
+ registerApplicationMaster(testAppId);
+ Assert.assertNotNull(registerResponse);
+ getContainersAndAssert(testAppId, 10);
+ finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED);
+ }
+
+ @Test
+ public void testAllocateAndReleaseContainers() throws Exception {
+ int testAppId = 1;
+ RegisterApplicationMasterResponse registerResponse =
+ registerApplicationMaster(testAppId);
+ Assert.assertNotNull(registerResponse);
+ List containers = getContainersAndAssert(testAppId, 10);
+ releaseContainersAndAssert(testAppId, containers);
+ finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED);
+ }
+
+ @Test
+ public void testAllocateAndReleaseContainersForMultipleAM()
+ throws Exception {
+ int numberOfApps = 5;
+ for (int testAppId = 0; testAppId < numberOfApps; testAppId++) {
+ RegisterApplicationMasterResponse registerResponse =
+ registerApplicationMaster(testAppId);
+ Assert.assertNotNull(registerResponse);
+ List containers = getContainersAndAssert(testAppId, 10);
+ releaseContainersAndAssert(testAppId, containers);
+ }
+ for (int testAppId = 0; testAppId < numberOfApps; testAppId++) {
+ finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED);
+ }
+ }
+
+ @Test
+ public void testAllocateAndReleaseContainersForMultipleAMInParallel()
+ throws Exception {
+ int numberOfApps = 6;
+ ArrayList tempAppIds = new ArrayList();
+ for (int i = 0; i < numberOfApps; i++) {
+ tempAppIds.add(new Integer(i));
+ }
+
+ final ArrayList appIds = tempAppIds;
+ List responses =
+ runInParallel(appIds, new Function() {
+ @Override
+ public Integer invoke(Integer testAppId) {
+ try {
+ RegisterApplicationMasterResponse registerResponse =
+ registerApplicationMaster(testAppId);
+ Assert.assertNotNull("response is null", registerResponse);
+ List containers =
+ getContainersAndAssert(testAppId, 10);
+ releaseContainersAndAssert(testAppId, containers);
+
+ LOG.info("Sucessfully registered application master with appId: "
+ + testAppId);
+ } catch (Throwable ex) {
+ LOG.error(
+ "Failed to register application master with appId: "
+ + testAppId, ex);
+ testAppId = null;
+ }
+
+ return testAppId;
+ }
+ });
+
+ Assert.assertEquals(
+ "Number of responses received does not match with request",
+ appIds.size(), responses.size());
+
+ for (Integer testAppId : responses) {
+ Assert.assertNotNull(testAppId);
+ finishApplicationMaster(testAppId.intValue(),
+ FinalApplicationStatus.SUCCEEDED);
+ }
+ }
+
+ private List getContainersAndAssert(int appId,
+ int numberOfResourceRequests) throws Exception {
+ AllocateRequest allocateRequest =
+ Records.newRecord(AllocateRequest.class);
+ allocateRequest.setResponseId(1);
+
+ List containers =
+ new ArrayList(numberOfResourceRequests);
+ List askList =
+ new ArrayList(numberOfResourceRequests);
+ for (int testAppId = 0; testAppId < numberOfResourceRequests; testAppId++) {
+ askList.add(createResourceRequest(
+ "test-node-" + Integer.toString(testAppId), 6000, 2,
+ testAppId % 5, 1));
+ }
+
+ allocateRequest.setAskList(askList);
+
+ AllocateResponse allocateResponse = allocate(appId, allocateRequest);
+ Assert.assertNotNull("allocate() returned null response",
+ allocateResponse);
+
+ containers.addAll(allocateResponse.getAllocatedContainers());
+
+ // Send max 10 heart beats to receive all the containers. If not, we will
+ // fail the test
+ int numHeartbeat = 0;
+ while (containers.size() < askList.size() && numHeartbeat++ < 10) {
+ allocateResponse =
+ allocate(appId, Records.newRecord(AllocateRequest.class));
+ Assert.assertNotNull("allocate() returned null response",
+ allocateResponse);
+
+ containers.addAll(allocateResponse.getAllocatedContainers());
+
+ LOG.info("Number of allocated containers in this request: "
+ + Integer.toString(allocateResponse.getAllocatedContainers()
+ .size()));
+ LOG.info("Total number of allocated containers: "
+ + Integer.toString(containers.size()));
+ Thread.sleep(10);
+ }
+
+ // We broadcast the request, the number of containers we received will be
+ // higher than we ask
+ Assert.assertTrue("The asklist count is not same as response",
+ askList.size() <= containers.size());
+ return containers;
+ }
+
+ private void releaseContainersAndAssert(int appId,
+ List containers) throws Exception {
+ Assert.assertTrue(containers.size() > 0);
+ AllocateRequest allocateRequest =
+ Records.newRecord(AllocateRequest.class);
+ allocateRequest.setResponseId(1);
+
+ List relList =
+ new ArrayList(containers.size());
+ for (Container container : containers) {
+ relList.add(container.getId());
+ }
+
+ allocateRequest.setReleaseList(relList);
+
+ AllocateResponse allocateResponse = allocate(appId, allocateRequest);
+ Assert.assertNotNull(allocateResponse);
+
+ // The way the mock resource manager is setup, it will return the containers
+ // that were released in the response. This is done because the UAMs run
+ // asynchronously and we need to if all the resource managers received the
+ // release it. The containers sent by the mock resource managers will be
+ // aggregated and returned back to us and we can assert if all the release
+ // lists reached the sub-clusters
+ List containersForReleasedContainerIds =
+ new ArrayList();
+ containersForReleasedContainerIds.addAll(allocateResponse
+ .getAllocatedContainers());
+
+ // Send max 10 heart beats to receive all the containers. If not, we will
+ // fail the test
+ int numHeartbeat = 0;
+ while (containersForReleasedContainerIds.size() < relList.size()
+ && numHeartbeat++ < 10) {
+ allocateResponse =
+ allocate(appId, Records.newRecord(AllocateRequest.class));
+ Assert.assertNotNull(allocateResponse);
+ containersForReleasedContainerIds.addAll(allocateResponse
+ .getAllocatedContainers());
+
+ LOG.info("Number of containers received in this request: "
+ + Integer.toString(allocateResponse.getAllocatedContainers()
+ .size()));
+ LOG.info("Total number of containers received: "
+ + Integer.toString(containersForReleasedContainerIds.size()));
+ Thread.sleep(10);
+ }
+
+ Assert.assertEquals(relList.size(),
+ containersForReleasedContainerIds.size());
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index c8b985d..14142de 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -40,9 +40,7 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@@ -76,7 +74,6 @@
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
@@ -95,6 +92,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import org.apache.hadoop.yarn.server.security.MasterKeyData;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
@@ -175,69 +173,13 @@ public InetSocketAddress getBindAddress() {
return this.masterServiceAddress;
}
- // Obtain the needed AMRMTokenIdentifier from the remote-UGI. RPC layer
- // currently sets only the required id, but iterate through anyways just to be
- // sure.
- private AMRMTokenIdentifier selectAMRMTokenIdentifier(
- UserGroupInformation remoteUgi) throws IOException {
- AMRMTokenIdentifier result = null;
- Set tokenIds = remoteUgi.getTokenIdentifiers();
- for (TokenIdentifier tokenId : tokenIds) {
- if (tokenId instanceof AMRMTokenIdentifier) {
- result = (AMRMTokenIdentifier) tokenId;
- break;
- }
- }
-
- return result;
- }
-
- private AMRMTokenIdentifier authorizeRequest()
- throws YarnException {
-
- UserGroupInformation remoteUgi;
- try {
- remoteUgi = UserGroupInformation.getCurrentUser();
- } catch (IOException e) {
- String msg =
- "Cannot obtain the user-name for authorizing ApplicationMaster. "
- + "Got exception: " + StringUtils.stringifyException(e);
- LOG.warn(msg);
- throw RPCUtil.getRemoteException(msg);
- }
-
- boolean tokenFound = false;
- String message = "";
- AMRMTokenIdentifier appTokenIdentifier = null;
- try {
- appTokenIdentifier = selectAMRMTokenIdentifier(remoteUgi);
- if (appTokenIdentifier == null) {
- tokenFound = false;
- message = "No AMRMToken found for user " + remoteUgi.getUserName();
- } else {
- tokenFound = true;
- }
- } catch (IOException e) {
- tokenFound = false;
- message =
- "Got exception while looking for AMRMToken for user "
- + remoteUgi.getUserName();
- }
-
- if (!tokenFound) {
- LOG.warn(message);
- throw RPCUtil.getRemoteException(message);
- }
-
- return appTokenIdentifier;
- }
-
@Override
public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request) throws YarnException,
IOException {
- AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest();
+ AMRMTokenIdentifier amrmTokenIdentifier =
+ YarnServerSecurityUtils.authorizeRequest();
ApplicationAttemptId applicationAttemptId =
amrmTokenIdentifier.getApplicationAttemptId();
@@ -346,7 +288,7 @@ public FinishApplicationMasterResponse finishApplicationMaster(
IOException {
ApplicationAttemptId applicationAttemptId =
- authorizeRequest().getApplicationAttemptId();
+ YarnServerSecurityUtils.authorizeRequest().getApplicationAttemptId();
ApplicationId appId = applicationAttemptId.getApplicationId();
RMApp rmApp =
@@ -430,7 +372,8 @@ public boolean hasApplicationMasterRegistered(
public AllocateResponse allocate(AllocateRequest request)
throws YarnException, IOException {
- AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest();
+ AMRMTokenIdentifier amrmTokenIdentifier =
+ YarnServerSecurityUtils.authorizeRequest();
ApplicationAttemptId appAttemptId =
amrmTokenIdentifier.getApplicationAttemptId();