diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 5193dbe..cf4b0d4 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2611,6 +2611,32 @@ public static boolean isAclEnabled(Configuration conf) {
public static final int DEFAULT_FEDERATION_STATESTORE_SQL_MAXCONNECTIONS = 1;
+ public static final String ROUTER_PREFIX = YARN_PREFIX + "federation.";
+
+ public static final String ROUTER_CLIENTRM_PROXY_PREFIX =
+ YARN_PREFIX + "clientrmproxy.";
+
+ public static final String ROUTER_CLIENTRM_PROXY_ADDRESS =
+ ROUTER_CLIENTRM_PROXY_PREFIX + ".address";
+ public static final int DEFAULT_ROUTER_CLIENTRM_PROXY_PORT = 8050;
+ public static final String DEFAULT_ROUTER_CLIENTRM_PROXY_ADDRESS =
+ "0.0.0.0:" + DEFAULT_ROUTER_CLIENTRM_PROXY_PORT;
+
+ public static final String ROUTER_CLIENTRM_PROXY_THREAD_COUNT =
+ ROUTER_CLIENTRM_PROXY_PREFIX + "client.thread-count";
+ public static final int DEFAULT_ROUTER_CLIENTRM_PROXY_THREAD_COUNT = 25;
+
+ public static final String ROUTER_CLIENTRM_PROXY_INTERCEPTOR_CLASS_PIPELINE =
+ ROUTER_CLIENTRM_PROXY_PREFIX + "interceptor-class.pipeline";
+ public static final String DEFAULT_ROUTER_CLIENTRM_PROXY_INTERCEPTOR_CLASS_PIPELINE =
+ "org.apache.hadoop.yarn.server.router.clientrmproxy."
+ + "DefaultClientRequestInterceptor";
+
+ /** Max cache size for user pipeline map */
+ public static final String CLIENTRM_PROXY_PIPELINE_CACHE_MAX_SIZE =
+ YARN_PREFIX + "clientrmproxy.cache-max-size";
+ public static final int DEFAULT_CLIENTRM_PROXY_PIPELINE_CACHE_MAX_SIZE = 25;
+
////////////////////////////////
// Other Configs
////////////////////////////////
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml
index 25afa5c..add856e 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml
@@ -52,6 +52,12 @@
org.apache.hadoop
hadoop-yarn-server-common
+
+
+ junit
+ junit
+ test
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java
index 7be8a59..88159ea 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java
@@ -18,6 +18,20 @@
package org.apache.hadoop.yarn.server.router;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.router.clientrmproxy.ClientRMProxyService;
+
/**
* The router is a stateless YARN component which is the entry point to the
* cluster. It can be deployed on multiple nodes behind a Virtual IP (VIP) with
@@ -33,6 +47,88 @@
* This provides a placeholder for throttling mis-behaving clients (YARN-1546)
* and masks the access to multiple RMs (YARN-3659).
*/
-public class Router{
+public class Router extends CompositeService {
+
+ private static final Log LOG = LogFactory.getLog(Router.class);
+ private static CompositeServiceShutdownHook routerShutdownHook;
+ private Configuration conf;
+ private AtomicBoolean isStopping = new AtomicBoolean(false);
+ private ClientRMProxyService clientRMProxyService;
+
+ /**
+ * Priority of the Router shutdown hook.
+ */
+ public static final int SHUTDOWN_HOOK_PRIORITY = 30;
+
+ public Router() {
+ super(Router.class.getName());
+ }
+
+ protected void doSecureLogin() throws IOException {
+ // TODO
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ this.conf = conf;
+ clientRMProxyService = createClientRMProxyService();
+ addService(clientRMProxyService);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ try {
+ doSecureLogin();
+ } catch (IOException e) {
+ throw new YarnRuntimeException("Failed Router login", e);
+ }
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ if (isStopping.getAndSet(true)) {
+ return;
+ }
+ super.serviceStop();
+ }
+
+ protected void shutDown() {
+ new Thread() {
+ @Override
+ public void run() {
+ Router.this.stop();
+ }
+ }.start();
+ }
+
+ protected ClientRMProxyService createClientRMProxyService() {
+ return new ClientRMProxyService();
+ }
+
+ public static void main(String argv[]) {
+ Configuration conf = new YarnConfiguration();
+ Thread
+ .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
+ StringUtils.startupShutdownMessage(Router.class, argv, LOG);
+ Router router = new Router();
+ try {
+
+ // Remove the old hook if we are rebooting.
+ if (null != routerShutdownHook) {
+ ShutdownHookManager.get().removeShutdownHook(routerShutdownHook);
+ }
+
+ routerShutdownHook = new CompositeServiceShutdownHook(router);
+ ShutdownHookManager.get().addShutdownHook(routerShutdownHook,
+ SHUTDOWN_HOOK_PRIORITY);
+ router.init(conf);
+ router.start();
+ } catch (Throwable t) {
+ LOG.fatal("Error starting Router", t);
+ System.exit(-1);
+ }
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrmproxy/AbstractClientRequestInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrmproxy/AbstractClientRequestInterceptor.java
new file mode 100644
index 0000000..5b75426
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrmproxy/AbstractClientRequestInterceptor.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.clientrmproxy;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Implements the RequestInterceptor interface and provides common functionality
+ * which can can be used and/or extended by other concrete intercepter classes.
+ *
+ */
+public abstract class AbstractClientRequestInterceptor
+ implements ClientRequestInterceptor {
+ private Configuration conf;
+ private ClientRequestInterceptor nextInterceptor;
+
+ /**
+ * Sets the {@link RequestInterceptor} in the chain.
+ */
+ @Override
+ public void setNextInterceptor(ClientRequestInterceptor nextInterceptor) {
+ this.nextInterceptor = nextInterceptor;
+ }
+
+ /**
+ * Sets the {@link Configuration}.
+ */
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ if (this.nextInterceptor != null) {
+ this.nextInterceptor.setConf(conf);
+ }
+ }
+
+ /**
+ * Gets the {@link Configuration}.
+ */
+ @Override
+ public Configuration getConf() {
+ return this.conf;
+ }
+
+ /**
+ * Initializes the {@link ClientRequestInterceptor}.
+ */
+ @Override
+ public void init(String user) {
+ if (this.nextInterceptor != null) {
+ this.nextInterceptor.init(user);
+ }
+ }
+
+ /**
+ * Disposes the {@link ClientRequestInterceptor}.
+ */
+ @Override
+ public void shutdown() {
+ if (this.nextInterceptor != null) {
+ this.nextInterceptor.shutdown();
+ }
+ }
+
+ /**
+ * Gets the next {@link ClientRequestInterceptor} in the chain.
+ */
+ @Override
+ public ClientRequestInterceptor getNextInterceptor() {
+ return this.nextInterceptor;
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrmproxy/ClientRMProxyService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrmproxy/ClientRMProxyService.java
new file mode 100644
index 0000000..ec0e61b
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrmproxy/ClientRMProxyService.java
@@ -0,0 +1,538 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.clientrmproxy;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.router.util.LRUCacheHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ClientRMProxyService is a service that runs on each router that can be used
+ * to intercept and inspect ApplicationClientProtocol messages from client to
+ * the cluster resource manager. It listens ApplicationClientProtocol messages
+ * from the client and creates a request intercepting pipeline instance for each
+ * client. The pipeline is a chain of intercepter instances that can inspect and
+ * modify the request/response as needed. The main difference with
+ * AMRMProxyService is the protocol they implement.
+ */
+public class ClientRMProxyService extends AbstractService
+ implements ApplicationClientProtocol {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ClientRMProxyService.class);
+
+ private Server server;
+ private InetSocketAddress listenerEndpoint;
+ private Map userPipelineMap;
+
+ public ClientRMProxyService() {
+ super(ClientRMProxyService.class.getName());
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ LOG.info("Starting Router ClientRMProxyService");
+ Configuration conf = getConfig();
+ YarnRPC rpc = YarnRPC.create(conf);
+ UserGroupInformation.setConfiguration(conf);
+
+ this.listenerEndpoint =
+ conf.getSocketAddr(YarnConfiguration.ROUTER_CLIENTRM_PROXY_ADDRESS,
+ YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_PROXY_ADDRESS,
+ YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_PROXY_PORT);
+
+ int maxCacheSize =
+ conf.getInt(YarnConfiguration.CLIENTRM_PROXY_PIPELINE_CACHE_MAX_SIZE,
+ YarnConfiguration.DEFAULT_CLIENTRM_PROXY_PIPELINE_CACHE_MAX_SIZE);
+ this.userPipelineMap = Collections.synchronizedMap(
+ new LRUCacheHashMap(
+ maxCacheSize, true));
+
+ Configuration serverConf = new Configuration(conf);
+
+ int numWorkerThreads =
+ serverConf.getInt(YarnConfiguration.ROUTER_CLIENTRM_PROXY_THREAD_COUNT,
+ YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_PROXY_THREAD_COUNT);
+
+ this.server = rpc.getServer(ApplicationClientProtocol.class, this,
+ listenerEndpoint, serverConf, null, numWorkerThreads);
+
+ this.server.start();
+ LOG.info("Router ClientRMProxyService listening on address: "
+ + this.server.getListenerAddress());
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ LOG.info("Stopping Router ClientRMProxyService");
+ if (this.server != null) {
+ this.server.stop();
+ }
+ userPipelineMap.clear();
+ super.serviceStop();
+ }
+
+ /**
+ * Returns the comma separated intercepter class names from the configuration.
+ *
+ * @param conf
+ * @return the intercepter class names as an instance of ArrayList
+ */
+ private List getInterceptorClassNames(Configuration conf) {
+ String configuredInterceptorClassNames = conf.get(
+ YarnConfiguration.ROUTER_CLIENTRM_PROXY_INTERCEPTOR_CLASS_PIPELINE,
+ YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_PROXY_INTERCEPTOR_CLASS_PIPELINE);
+
+ List interceptorClassNames = new ArrayList();
+ Collection tempList =
+ StringUtils.getStringCollection(configuredInterceptorClassNames);
+ for (String item : tempList) {
+ interceptorClassNames.add(item.trim());
+ }
+
+ return interceptorClassNames;
+ }
+
+ @Override
+ public GetNewApplicationResponse getNewApplication(
+ GetNewApplicationRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().getNewApplication(request);
+ }
+
+ @Override
+ public SubmitApplicationResponse submitApplication(
+ SubmitApplicationRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().submitApplication(request);
+ }
+
+ @Override
+ public KillApplicationResponse forceKillApplication(
+ KillApplicationRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().forceKillApplication(request);
+ }
+
+ @Override
+ public GetClusterMetricsResponse getClusterMetrics(
+ GetClusterMetricsRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().getClusterMetrics(request);
+ }
+
+ @Override
+ public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request)
+ throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().getClusterNodes(request);
+ }
+
+ @Override
+ public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
+ throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().getQueueInfo(request);
+ }
+
+ @Override
+ public GetQueueUserAclsInfoResponse getQueueUserAcls(
+ GetQueueUserAclsInfoRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().getQueueUserAcls(request);
+ }
+
+ @Override
+ public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
+ MoveApplicationAcrossQueuesRequest request)
+ throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().moveApplicationAcrossQueues(request);
+ }
+
+ @Override
+ public GetNewReservationResponse getNewReservation(
+ GetNewReservationRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().getNewReservation(request);
+ }
+
+ @Override
+ public ReservationSubmissionResponse submitReservation(
+ ReservationSubmissionRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().submitReservation(request);
+ }
+
+ @Override
+ public ReservationListResponse listReservations(
+ ReservationListRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().listReservations(request);
+ }
+
+ @Override
+ public ReservationUpdateResponse updateReservation(
+ ReservationUpdateRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().updateReservation(request);
+ }
+
+ @Override
+ public ReservationDeleteResponse deleteReservation(
+ ReservationDeleteRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().deleteReservation(request);
+ }
+
+ @Override
+ public GetNodesToLabelsResponse getNodeToLabels(
+ GetNodesToLabelsRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().getNodeToLabels(request);
+ }
+
+ @Override
+ public GetLabelsToNodesResponse getLabelsToNodes(
+ GetLabelsToNodesRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().getLabelsToNodes(request);
+ }
+
+ @Override
+ public GetClusterNodeLabelsResponse getClusterNodeLabels(
+ GetClusterNodeLabelsRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().getClusterNodeLabels(request);
+ }
+
+ @Override
+ public GetApplicationReportResponse getApplicationReport(
+ GetApplicationReportRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().getApplicationReport(request);
+ }
+
+ @Override
+ public GetApplicationsResponse getApplications(GetApplicationsRequest request)
+ throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().getApplications(request);
+ }
+
+ @Override
+ public GetApplicationAttemptReportResponse getApplicationAttemptReport(
+ GetApplicationAttemptReportRequest request)
+ throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().getApplicationAttemptReport(request);
+ }
+
+ @Override
+ public GetApplicationAttemptsResponse getApplicationAttempts(
+ GetApplicationAttemptsRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().getApplicationAttempts(request);
+ }
+
+ @Override
+ public GetContainerReportResponse getContainerReport(
+ GetContainerReportRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().getContainerReport(request);
+ }
+
+ @Override
+ public GetContainersResponse getContainers(GetContainersRequest request)
+ throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().getContainers(request);
+ }
+
+ @Override
+ public GetDelegationTokenResponse getDelegationToken(
+ GetDelegationTokenRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().getDelegationToken(request);
+ }
+
+ @Override
+ public RenewDelegationTokenResponse renewDelegationToken(
+ RenewDelegationTokenRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().renewDelegationToken(request);
+ }
+
+ @Override
+ public CancelDelegationTokenResponse cancelDelegationToken(
+ CancelDelegationTokenRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().cancelDelegationToken(request);
+ }
+
+ @Override
+ public FailApplicationAttemptResponse failApplicationAttempt(
+ FailApplicationAttemptRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().failApplicationAttempt(request);
+ }
+
+ @Override
+ public UpdateApplicationPriorityResponse updateApplicationPriority(
+ UpdateApplicationPriorityRequest request)
+ throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().updateApplicationPriority(request);
+ }
+
+ @Override
+ public SignalContainerResponse signalToContainer(
+ SignalContainerRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().signalToContainer(request);
+ }
+
+ @Override
+ public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
+ UpdateApplicationTimeoutsRequest request)
+ throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().updateApplicationTimeouts(request);
+ }
+
+ private RequestInterceptorChainWrapper getInterceptorChain()
+ throws IOException {
+ String user = UserGroupInformation.getCurrentUser().getUserName();
+ if (!userPipelineMap.containsKey(user)) {
+ initializePipeline(user);
+ }
+ return userPipelineMap.get(user);
+ }
+
+ /**
+ * Gets the Request intercepter chains for all the users.
+ *
+ * @return the request intercepter chains.
+ */
+ protected Map getPipelines() {
+ return this.userPipelineMap;
+ }
+
+ /**
+ * This method creates and returns reference of the first intercepter in the
+ * chain of request intercepter instances.
+ *
+ * @return the reference of the first intercepter in the chain
+ */
+ protected ClientRequestInterceptor createRequestInterceptorChain() {
+ Configuration conf = getConfig();
+
+ List interceptorClassNames = getInterceptorClassNames(conf);
+
+ ClientRequestInterceptor pipeline = null;
+ ClientRequestInterceptor current = null;
+ for (String interceptorClassName : interceptorClassNames) {
+ try {
+ Class> interceptorClass = conf.getClassByName(interceptorClassName);
+ if (ClientRequestInterceptor.class.isAssignableFrom(interceptorClass)) {
+ ClientRequestInterceptor interceptorInstance =
+ (ClientRequestInterceptor) ReflectionUtils
+ .newInstance(interceptorClass, conf);
+ if (pipeline == null) {
+ pipeline = interceptorInstance;
+ current = interceptorInstance;
+ continue;
+ } else {
+ current.setNextInterceptor(interceptorInstance);
+ current = interceptorInstance;
+ }
+ } else {
+ throw new YarnRuntimeException(
+ "Class: " + interceptorClassName + " not instance of "
+ + ClientRequestInterceptor.class.getCanonicalName());
+ }
+ } catch (ClassNotFoundException e) {
+ throw new YarnRuntimeException(
+ "Could not instantiate ApplicationClientRequestInterceptor: "
+ + interceptorClassName,
+ e);
+ }
+ }
+
+ if (pipeline == null) {
+ throw new YarnRuntimeException(
+ "RequestInterceptor pipeline is not configured in the system");
+ }
+ return pipeline;
+ }
+
+ /**
+ * Initializes the request intercepter pipeline for the specified application.
+ *
+ * @param user
+ */
+ protected void initializePipeline(String user) {
+ RequestInterceptorChainWrapper chainWrapper = null;
+ synchronized (this.userPipelineMap) {
+ if (this.userPipelineMap.containsKey(user)) {
+ LOG.info(
+ "Request to start an already existing user: {} was received, so ignoring.",
+ user);
+ return;
+ }
+
+ chainWrapper = new RequestInterceptorChainWrapper();
+ this.userPipelineMap.put(user, chainWrapper);
+ }
+
+ // We register the pipeline instance in the map first and then initialize it
+ // later because chain initialization can be expensive and we would like to
+ // release the lock as soon as possible to prevent other applications from
+ // blocking when one application's chain is initializing
+ LOG.info(
+ "Initializing request processing pipeline for application for the user: {}",
+ user);
+
+ try {
+ ClientRequestInterceptor interceptorChain =
+ this.createRequestInterceptorChain();
+ interceptorChain.init(user);
+ chainWrapper.init(interceptorChain);
+ } catch (Exception e) {
+ synchronized (this.userPipelineMap) {
+ this.userPipelineMap.remove(user);
+ }
+ throw e;
+ }
+ }
+
+ /**
+ * Private structure for encapsulating RequestInterceptor and user instances.
+ *
+ */
+ @Private
+ public static class RequestInterceptorChainWrapper {
+ private ClientRequestInterceptor rootInterceptor;
+
+ /**
+ * Initializes the wrapper with the specified parameters.
+ *
+ * @param rootInterceptor
+ */
+ public synchronized void init(ClientRequestInterceptor rootInterceptor) {
+ this.rootInterceptor = rootInterceptor;
+ }
+
+ /**
+ * Gets the root request intercepter.
+ *
+ * @return the root request intercepter
+ */
+ public synchronized ClientRequestInterceptor getRootInterceptor() {
+ return rootInterceptor;
+ }
+
+ /**
+ * Shutdown the chain of interceptors when the object is destroyed
+ */
+ @Override
+ public void finalize() {
+ rootInterceptor.shutdown();
+ }
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrmproxy/ClientRequestInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrmproxy/ClientRequestInterceptor.java
new file mode 100644
index 0000000..1b6a859
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrmproxy/ClientRequestInterceptor.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.clientrmproxy;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+
+/**
+ * Defines the contract to be implemented by the request intercepter classes,
+ * that can be used to intercept and inspect messages sent from the client to
+ * the resource manager.
+ */
+public interface ClientRequestInterceptor
+ extends ApplicationClientProtocol, Configurable {
+ /**
+ * This method is called for initializing the intercepter. This is guaranteed
+ * to be called only once in the lifetime of this instance.
+ *
+ * @param user
+ */
+ void init(String user);
+
+ /**
+ * This method is called to release the resources held by the intercepter.
+ * This will be called when the application pipeline is being destroyed. The
+ * concrete implementations should dispose the resources and forward the
+ * request to the next intercepter, if any.
+ */
+ void shutdown();
+
+ /**
+ * Sets the next intercepter in the pipeline. The concrete implementation of
+ * this interface should always pass the request to the nextInterceptor after
+ * inspecting the message. The last intercepter in the chain is responsible to
+ * send the messages to the resource manager service and so the last
+ * intercepter will not receive this method call.
+ *
+ * @param nextInterceptor
+ */
+ void setNextInterceptor(ClientRequestInterceptor nextInterceptor);
+
+ /**
+ * Returns the next intercepter in the chain.
+ *
+ * @return the next intercepter in the chain
+ */
+ ClientRequestInterceptor getNextInterceptor();
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrmproxy/DefaultClientRequestInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrmproxy/DefaultClientRequestInterceptor.java
new file mode 100644
index 0000000..76f0b16
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrmproxy/DefaultClientRequestInterceptor.java
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.clientrmproxy;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Extends the AbstractRequestInterceptorClient class and provides an
+ * implementation that simply forwards the client requests to the cluster
+ * resource manager.
+ *
+ */
+public final class DefaultClientRequestInterceptor
+ extends AbstractClientRequestInterceptor {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DefaultClientRequestInterceptor.class);
+ private ApplicationClientProtocol clientRM;
+ private UserGroupInformation user = null;
+
+ @Override
+ public void init(String userName) {
+ super.init(userName);
+ try {
+ // Do not create a proxy user if user name matches the user name on
+ // current UGI
+ if (userName.equalsIgnoreCase(
+ UserGroupInformation.getCurrentUser().getUserName())) {
+ user = UserGroupInformation.getCurrentUser();
+ } else {
+ user = UserGroupInformation.createProxyUser(userName,
+ UserGroupInformation.getCurrentUser());
+ }
+
+ final Configuration conf = this.getConf();
+
+ clientRM =
+ user.doAs(new PrivilegedExceptionAction() {
+ @Override
+ public ApplicationClientProtocol run() throws Exception {
+ return ClientRMProxy.createRMProxy(conf,
+ ApplicationClientProtocol.class);
+ }
+ });
+ } catch (IOException e) {
+ String message = "Error while creating ClientRM Proxy Service for user:";
+ if (user != null) {
+ message += ", user: " + user;
+ }
+
+ LOG.info(message);
+ throw new YarnRuntimeException(message, e);
+ } catch (Exception e) {
+ throw new YarnRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void setNextInterceptor(ClientRequestInterceptor next) {
+ throw new YarnRuntimeException(
+ "setNextInterceptor is being called on DefaultRequestInterceptor,"
+ + "which should be the last one in the chain "
+ + "Check if the interceptor pipeline configuration is correct");
+ }
+
+ @Override
+ public GetNewApplicationResponse getNewApplication(
+ GetNewApplicationRequest request) throws YarnException, IOException {
+ return clientRM.getNewApplication(request);
+ }
+
+ @Override
+ public SubmitApplicationResponse submitApplication(
+ SubmitApplicationRequest request) throws YarnException, IOException {
+ return clientRM.submitApplication(request);
+ }
+
+ @Override
+ public KillApplicationResponse forceKillApplication(
+ KillApplicationRequest request) throws YarnException, IOException {
+ return clientRM.forceKillApplication(request);
+ }
+
+ @Override
+ public GetClusterMetricsResponse getClusterMetrics(
+ GetClusterMetricsRequest request) throws YarnException, IOException {
+ return clientRM.getClusterMetrics(request);
+ }
+
+ @Override
+ public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request)
+ throws YarnException, IOException {
+ return clientRM.getClusterNodes(request);
+ }
+
+ @Override
+ public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
+ throws YarnException, IOException {
+ return clientRM.getQueueInfo(request);
+ }
+
+ @Override
+ public GetQueueUserAclsInfoResponse getQueueUserAcls(
+ GetQueueUserAclsInfoRequest request) throws YarnException, IOException {
+ return clientRM.getQueueUserAcls(request);
+ }
+
+ @Override
+ public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
+ MoveApplicationAcrossQueuesRequest request)
+ throws YarnException, IOException {
+ return clientRM.moveApplicationAcrossQueues(request);
+ }
+
+ @Override
+ public GetNewReservationResponse getNewReservation(
+ GetNewReservationRequest request) throws YarnException, IOException {
+ return clientRM.getNewReservation(request);
+ }
+
+ @Override
+ public ReservationSubmissionResponse submitReservation(
+ ReservationSubmissionRequest request) throws YarnException, IOException {
+ return clientRM.submitReservation(request);
+ }
+
+ @Override
+ public ReservationListResponse listReservations(
+ ReservationListRequest request) throws YarnException, IOException {
+ return clientRM.listReservations(request);
+ }
+
+ @Override
+ public ReservationUpdateResponse updateReservation(
+ ReservationUpdateRequest request) throws YarnException, IOException {
+ return clientRM.updateReservation(request);
+ }
+
+ @Override
+ public ReservationDeleteResponse deleteReservation(
+ ReservationDeleteRequest request) throws YarnException, IOException {
+ return clientRM.deleteReservation(request);
+ }
+
+ @Override
+ public GetNodesToLabelsResponse getNodeToLabels(
+ GetNodesToLabelsRequest request) throws YarnException, IOException {
+ return clientRM.getNodeToLabels(request);
+ }
+
+ @Override
+ public GetLabelsToNodesResponse getLabelsToNodes(
+ GetLabelsToNodesRequest request) throws YarnException, IOException {
+ return clientRM.getLabelsToNodes(request);
+ }
+
+ @Override
+ public GetClusterNodeLabelsResponse getClusterNodeLabels(
+ GetClusterNodeLabelsRequest request) throws YarnException, IOException {
+ return clientRM.getClusterNodeLabels(request);
+ }
+
+ @Override
+ public GetApplicationReportResponse getApplicationReport(
+ GetApplicationReportRequest request) throws YarnException, IOException {
+ return clientRM.getApplicationReport(request);
+ }
+
+ @Override
+ public GetApplicationsResponse getApplications(GetApplicationsRequest request)
+ throws YarnException, IOException {
+ return clientRM.getApplications(request);
+ }
+
+ @Override
+ public GetApplicationAttemptReportResponse getApplicationAttemptReport(
+ GetApplicationAttemptReportRequest request)
+ throws YarnException, IOException {
+ return clientRM.getApplicationAttemptReport(request);
+ }
+
+ @Override
+ public GetApplicationAttemptsResponse getApplicationAttempts(
+ GetApplicationAttemptsRequest request) throws YarnException, IOException {
+ return clientRM.getApplicationAttempts(request);
+ }
+
+ @Override
+ public GetContainerReportResponse getContainerReport(
+ GetContainerReportRequest request) throws YarnException, IOException {
+ return clientRM.getContainerReport(request);
+ }
+
+ @Override
+ public GetContainersResponse getContainers(GetContainersRequest request)
+ throws YarnException, IOException {
+ return clientRM.getContainers(request);
+ }
+
+ @Override
+ public GetDelegationTokenResponse getDelegationToken(
+ GetDelegationTokenRequest request) throws YarnException, IOException {
+ return clientRM.getDelegationToken(request);
+ }
+
+ @Override
+ public RenewDelegationTokenResponse renewDelegationToken(
+ RenewDelegationTokenRequest request) throws YarnException, IOException {
+ return clientRM.renewDelegationToken(request);
+ }
+
+ @Override
+ public CancelDelegationTokenResponse cancelDelegationToken(
+ CancelDelegationTokenRequest request) throws YarnException, IOException {
+ return clientRM.cancelDelegationToken(request);
+ }
+
+ @Override
+ public FailApplicationAttemptResponse failApplicationAttempt(
+ FailApplicationAttemptRequest request) throws YarnException, IOException {
+ return clientRM.failApplicationAttempt(request);
+ }
+
+ @Override
+ public UpdateApplicationPriorityResponse updateApplicationPriority(
+ UpdateApplicationPriorityRequest request)
+ throws YarnException, IOException {
+ return clientRM.updateApplicationPriority(request);
+ }
+
+ @Override
+ public SignalContainerResponse signalToContainer(
+ SignalContainerRequest request) throws YarnException, IOException {
+ return clientRM.signalToContainer(request);
+ }
+
+ @Override
+ public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
+ UpdateApplicationTimeoutsRequest request)
+ throws YarnException, IOException {
+ return clientRM.updateApplicationTimeouts(request);
+ }
+
+ @VisibleForTesting
+ public void setRMClient(ApplicationClientProtocol clientRM) {
+ this.clientRM = clientRM;
+
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrmproxy/package-info.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrmproxy/package-info.java
new file mode 100644
index 0000000..2d35d42
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrmproxy/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Router ClientRM Proxy Service package. **/
+package org.apache.hadoop.yarn.server.router.clientrmproxy;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/util/LRUCacheHashMap.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/util/LRUCacheHashMap.java
new file mode 100644
index 0000000..409bfb7
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/util/LRUCacheHashMap.java
@@ -0,0 +1,49 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.router.util;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/*
+ * LRU cache with a configurable maximum cache size and access order
+ */
+public class LRUCacheHashMap extends LinkedHashMap {
+
+ private static final long serialVersionUID = 1L;
+
+ // Maximum size of the cache
+ private int maxSize;
+
+ /**
+ * Constructor
+ *
+ * @param maxSize max size of the cache
+ * @param accessOrder true for access-order, false for insertion-order
+ */
+ public LRUCacheHashMap(int maxSize, boolean accessOrder) {
+ super(maxSize, 0.75f, accessOrder);
+ this.maxSize = maxSize;
+ }
+
+ @Override
+ protected boolean removeEldestEntry(Map.Entry eldest) {
+ return size() > maxSize;
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrmproxy/BaseClientRMProxyTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrmproxy/BaseClientRMProxyTest.java
new file mode 100644
index 0000000..c35a8d0
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrmproxy/BaseClientRMProxyTest.java
@@ -0,0 +1,574 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.clientrmproxy;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.UTCClock;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+/**
+ * Base class for all the ClientRMProxyService test cases. It provides utility
+ * methods that can be used by the concrete test case classes
+ *
+ */
+public abstract class BaseClientRMProxyTest {
+
+ /**
+ * The ClientRMProxyService instance that will be used by all the test cases
+ */
+ private MockClientRMProxyService clientrmProxyService;
+ /**
+ * Thread pool used for asynchronous operations
+ */
+ private static ExecutorService threadpool = Executors.newCachedThreadPool();
+ private Configuration conf;
+ private AsyncDispatcher dispatcher;
+
+ public final static int TEST_MAX_CACHE_SIZE = 10;
+
+ protected MockClientRMProxyService getClientRMProxyService() {
+ Assert.assertNotNull(this.clientrmProxyService);
+ return this.clientrmProxyService;
+ }
+
+ @Before
+ public void setUp() {
+ this.conf = new YarnConfiguration();
+ String mockPassThroughInterceptorClass =
+ PassThroughClientRequestInterceptor.class.getName();
+
+ // Create a request intercepter pipeline for testing. The last one in the
+ // chain will call the mock resource manager. The others in the chain will
+ // simply forward it to the next one in the chain
+ this.conf.set(
+ YarnConfiguration.ROUTER_CLIENTRM_PROXY_INTERCEPTOR_CLASS_PIPELINE,
+ mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
+ + "," + mockPassThroughInterceptorClass + ","
+ + MockClientRequestInterceptor.class.getName());
+
+ this.conf.setInt(YarnConfiguration.CLIENTRM_PROXY_PIPELINE_CACHE_MAX_SIZE,
+ TEST_MAX_CACHE_SIZE);
+
+ this.dispatcher = new AsyncDispatcher();
+ this.dispatcher.init(conf);
+ this.dispatcher.start();
+ this.clientrmProxyService = createAndStartClientRMProxyService();
+ }
+
+ @After
+ public void tearDown() {
+ if (clientrmProxyService != null) {
+ clientrmProxyService.stop();
+ clientrmProxyService = null;
+ }
+ if (this.dispatcher != null) {
+ this.dispatcher.stop();
+ }
+ }
+
+ protected ExecutorService getThreadPool() {
+ return threadpool;
+ }
+
+ protected MockClientRMProxyService createAndStartClientRMProxyService() {
+ MockClientRMProxyService svc = new MockClientRMProxyService();
+ svc.init(conf);
+ svc.start();
+ return svc;
+ }
+
+ protected static class MockClientRMProxyService extends ClientRMProxyService {
+ public MockClientRMProxyService() {
+ super();
+ }
+ }
+
+ protected GetNewApplicationResponse getNewApplication(String user)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction() {
+ @Override
+ public GetNewApplicationResponse run() throws Exception {
+ GetNewApplicationRequest req =
+ GetNewApplicationRequest.newInstance();
+ GetNewApplicationResponse response =
+ getClientRMProxyService().getNewApplication(req);
+ return response;
+ }
+ });
+ }
+
+ protected SubmitApplicationResponse submitApplication(
+ final ApplicationId appId, String user)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction() {
+ @Override
+ public SubmitApplicationResponse run() throws Exception {
+ ApplicationSubmissionContext context =
+ ApplicationSubmissionContext.newInstance(appId, "", "", null,
+ null, false, false, -1, null, null);
+ SubmitApplicationRequest req =
+ SubmitApplicationRequest.newInstance(context);
+ SubmitApplicationResponse response =
+ getClientRMProxyService().submitApplication(req);
+ return response;
+ }
+ });
+ }
+
+ protected KillApplicationResponse forceKillApplication(
+ final ApplicationId appId, String user)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction() {
+ @Override
+ public KillApplicationResponse run() throws Exception {
+ KillApplicationRequest req =
+ KillApplicationRequest.newInstance(appId);
+ KillApplicationResponse response =
+ getClientRMProxyService().forceKillApplication(req);
+ return response;
+ }
+ });
+ }
+
+ protected GetClusterMetricsResponse getClusterMetrics(String user)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction() {
+ @Override
+ public GetClusterMetricsResponse run() throws Exception {
+ GetClusterMetricsRequest req =
+ GetClusterMetricsRequest.newInstance();
+ GetClusterMetricsResponse response =
+ getClientRMProxyService().getClusterMetrics(req);
+ return response;
+ }
+ });
+ }
+
+ protected GetClusterNodesResponse getClusterNodes(String user)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction() {
+ @Override
+ public GetClusterNodesResponse run() throws Exception {
+ GetClusterNodesRequest req = GetClusterNodesRequest.newInstance();
+ GetClusterNodesResponse response =
+ getClientRMProxyService().getClusterNodes(req);
+ return response;
+ }
+ });
+ }
+
+ protected GetQueueInfoResponse getQueueInfo(String user)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction() {
+ @Override
+ public GetQueueInfoResponse run() throws Exception {
+ GetQueueInfoRequest req =
+ GetQueueInfoRequest.newInstance("default", false, false, false);
+ GetQueueInfoResponse response =
+ getClientRMProxyService().getQueueInfo(req);
+ return response;
+ }
+ });
+ }
+
+ protected GetQueueUserAclsInfoResponse getQueueUserAcls(String user)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction() {
+ @Override
+ public GetQueueUserAclsInfoResponse run() throws Exception {
+ GetQueueUserAclsInfoRequest req =
+ GetQueueUserAclsInfoRequest.newInstance();
+ GetQueueUserAclsInfoResponse response =
+ getClientRMProxyService().getQueueUserAcls(req);
+ return response;
+ }
+ });
+ }
+
+ protected MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
+ String user, final ApplicationId appId)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user).doAs(
+ new PrivilegedExceptionAction() {
+ @Override
+ public MoveApplicationAcrossQueuesResponse run() throws Exception {
+
+ MoveApplicationAcrossQueuesRequest req =
+ MoveApplicationAcrossQueuesRequest.newInstance(appId,
+ "newQueue");
+ MoveApplicationAcrossQueuesResponse response =
+ getClientRMProxyService().moveApplicationAcrossQueues(req);
+ return response;
+ }
+ });
+ }
+
+ public GetNewReservationResponse getNewReservation(String user)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction() {
+ @Override
+ public GetNewReservationResponse run() throws Exception {
+ GetNewReservationResponse response = getClientRMProxyService()
+ .getNewReservation(GetNewReservationRequest.newInstance());
+ return response;
+ }
+ });
+ }
+
+ protected ReservationSubmissionResponse submitReservation(String user,
+ final ReservationId reservationId)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction() {
+ @Override
+ public ReservationSubmissionResponse run() throws Exception {
+ Clock clock = new UTCClock();
+ long arrival = clock.getTime();
+ long duration = 60000;
+ long deadline = (long) (arrival + 1.05 * duration);
+
+ ReservationSubmissionRequest req = createSimpleReservationRequest(1,
+ arrival, deadline, duration, reservationId);
+ ReservationSubmissionResponse response =
+ getClientRMProxyService().submitReservation(req);
+ return response;
+ }
+ });
+ }
+
+ protected ReservationUpdateResponse updateReservation(String user,
+ final ReservationId reservationId)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction() {
+ @Override
+ public ReservationUpdateResponse run() throws Exception {
+ Clock clock = new UTCClock();
+ long arrival = clock.getTime();
+ long duration = 60000;
+ long deadline = (long) (arrival + 1.05 * duration);
+ ReservationDefinition rDef =
+ createSimpleReservationRequest(1, arrival, deadline, duration,
+ reservationId).getReservationDefinition();
+
+ ReservationUpdateRequest req =
+ ReservationUpdateRequest.newInstance(rDef, reservationId);
+ ReservationUpdateResponse response =
+ getClientRMProxyService().updateReservation(req);
+ return response;
+ }
+ });
+ }
+
+ protected ReservationDeleteResponse deleteReservation(String user,
+ final ReservationId reservationId)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction() {
+ @Override
+ public ReservationDeleteResponse run() throws Exception {
+ ReservationDeleteRequest req =
+ ReservationDeleteRequest.newInstance(reservationId);
+ ReservationDeleteResponse response =
+ getClientRMProxyService().deleteReservation(req);
+ return response;
+ }
+ });
+ }
+
+ protected GetNodesToLabelsResponse getNodeToLabels(String user)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction() {
+ @Override
+ public GetNodesToLabelsResponse run() throws Exception {
+ GetNodesToLabelsRequest req = GetNodesToLabelsRequest.newInstance();
+ GetNodesToLabelsResponse response =
+ getClientRMProxyService().getNodeToLabels(req);
+ return response;
+ }
+ });
+ }
+
+ protected GetLabelsToNodesResponse getLabelsToNodes(String user)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction() {
+ @Override
+ public GetLabelsToNodesResponse run() throws Exception {
+ GetLabelsToNodesRequest req = GetLabelsToNodesRequest.newInstance();
+ GetLabelsToNodesResponse response =
+ getClientRMProxyService().getLabelsToNodes(req);
+ return response;
+ }
+ });
+ }
+
+ protected GetClusterNodeLabelsResponse getClusterNodeLabels(String user)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction() {
+ @Override
+ public GetClusterNodeLabelsResponse run() throws Exception {
+ GetClusterNodeLabelsRequest req =
+ GetClusterNodeLabelsRequest.newInstance();
+ GetClusterNodeLabelsResponse response =
+ getClientRMProxyService().getClusterNodeLabels(req);
+ return response;
+ }
+ });
+ }
+
+ protected GetApplicationReportResponse getApplicationReport(String user,
+ final ApplicationId appId)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction() {
+ @Override
+ public GetApplicationReportResponse run() throws Exception {
+ GetApplicationReportRequest req =
+ GetApplicationReportRequest.newInstance(appId);
+ GetApplicationReportResponse response =
+ getClientRMProxyService().getApplicationReport(req);
+ return response;
+ }
+ });
+ }
+
+ protected GetApplicationsResponse getApplications(String user)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction() {
+ @Override
+ public GetApplicationsResponse run() throws Exception {
+ GetApplicationsRequest req = GetApplicationsRequest.newInstance();
+ GetApplicationsResponse response =
+ getClientRMProxyService().getApplications(req);
+ return response;
+ }
+ });
+ }
+
+ protected GetApplicationAttemptReportResponse getApplicationAttemptReport(
+ String user, final ApplicationAttemptId appAttemptId)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user).doAs(
+ new PrivilegedExceptionAction() {
+ @Override
+ public GetApplicationAttemptReportResponse run() throws Exception {
+ GetApplicationAttemptReportRequest req =
+ GetApplicationAttemptReportRequest.newInstance(appAttemptId);
+ GetApplicationAttemptReportResponse response =
+ getClientRMProxyService().getApplicationAttemptReport(req);
+ return response;
+ }
+ });
+ }
+
+ protected GetApplicationAttemptsResponse getApplicationAttempts(String user,
+ final ApplicationId applicationId)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction() {
+ @Override
+ public GetApplicationAttemptsResponse run() throws Exception {
+ GetApplicationAttemptsRequest req =
+ GetApplicationAttemptsRequest.newInstance(applicationId);
+ GetApplicationAttemptsResponse response =
+ getClientRMProxyService().getApplicationAttempts(req);
+ return response;
+ }
+ });
+ }
+
+ protected GetContainerReportResponse getContainerReport(String user,
+ final ContainerId containerId)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction() {
+ @Override
+ public GetContainerReportResponse run() throws Exception {
+ GetContainerReportRequest req =
+ GetContainerReportRequest.newInstance(containerId);
+ GetContainerReportResponse response =
+ getClientRMProxyService().getContainerReport(req);
+ return response;
+ }
+ });
+ }
+
+ protected GetContainersResponse getContainers(String user,
+ final ApplicationAttemptId appAttemptId)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction() {
+ @Override
+ public GetContainersResponse run() throws Exception {
+ GetContainersRequest req =
+ GetContainersRequest.newInstance(appAttemptId);
+ GetContainersResponse response =
+ getClientRMProxyService().getContainers(req);
+ return response;
+ }
+ });
+ }
+
+ protected GetDelegationTokenResponse getDelegationToken(final String user)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction() {
+ @Override
+ public GetDelegationTokenResponse run() throws Exception {
+ GetDelegationTokenRequest req =
+ GetDelegationTokenRequest.newInstance(user);
+ GetDelegationTokenResponse response =
+ getClientRMProxyService().getDelegationToken(req);
+ return response;
+ }
+ });
+ }
+
+ protected RenewDelegationTokenResponse renewDelegationToken(String user,
+ final Token token)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction() {
+ @Override
+ public RenewDelegationTokenResponse run() throws Exception {
+ RenewDelegationTokenRequest req =
+ RenewDelegationTokenRequest.newInstance(token);
+ RenewDelegationTokenResponse response =
+ getClientRMProxyService().renewDelegationToken(req);
+ return response;
+ }
+ });
+ }
+
+ protected CancelDelegationTokenResponse cancelDelegationToken(String user,
+ final Token token)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction() {
+ @Override
+ public CancelDelegationTokenResponse run() throws Exception {
+ CancelDelegationTokenRequest req =
+ CancelDelegationTokenRequest.newInstance(token);
+ CancelDelegationTokenResponse response =
+ getClientRMProxyService().cancelDelegationToken(req);
+ return response;
+ }
+ });
+ }
+
+ private ReservationSubmissionRequest createSimpleReservationRequest(
+ int numContainers, long arrival, long deadline, long duration,
+ ReservationId reservationId) {
+ // create a request with a single atomic ask
+ ReservationRequest r = ReservationRequest
+ .newInstance(Resource.newInstance(1024, 1), numContainers, 1, duration);
+ ReservationRequests reqs = ReservationRequests.newInstance(
+ Collections.singletonList(r), ReservationRequestInterpreter.R_ALL);
+ ReservationDefinition rDef = ReservationDefinition.newInstance(arrival,
+ deadline, reqs, "testClientRMProxyService#reservation");
+ ReservationSubmissionRequest request = ReservationSubmissionRequest
+ .newInstance(rDef, "dedicated", reservationId);
+ return request;
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrmproxy/MockClientRequestInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrmproxy/MockClientRequestInterceptor.java
new file mode 100644
index 0000000..15a9a5f
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrmproxy/MockClientRequestInterceptor.java
@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.clientrmproxy;
+
+import java.io.IOException;
+
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+public class MockClientRequestInterceptor
+ extends AbstractClientRequestInterceptor {
+
+ private MockResourceManagerFacade mockRM;
+
+ public MockClientRequestInterceptor() {
+ }
+
+ public void init(String user) {
+ super.init(user);
+ mockRM = new MockResourceManagerFacade(
+ new YarnConfiguration(super.getConf()), 0);
+ }
+
+ @Override
+ public GetNewApplicationResponse getNewApplication(
+ GetNewApplicationRequest request) throws YarnException, IOException {
+ return mockRM.getNewApplication(request);
+ }
+
+ @Override
+ public SubmitApplicationResponse submitApplication(
+ SubmitApplicationRequest request) throws YarnException, IOException {
+ return mockRM.submitApplication(request);
+ }
+
+ @Override
+ public KillApplicationResponse forceKillApplication(
+ KillApplicationRequest request) throws YarnException, IOException {
+ return mockRM.forceKillApplication(request);
+ }
+
+ @Override
+ public GetClusterMetricsResponse getClusterMetrics(
+ GetClusterMetricsRequest request) throws YarnException, IOException {
+ return mockRM.getClusterMetrics(request);
+ }
+
+ @Override
+ public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request)
+ throws YarnException, IOException {
+ return mockRM.getClusterNodes(request);
+ }
+
+ @Override
+ public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
+ throws YarnException, IOException {
+ return mockRM.getQueueInfo(request);
+ }
+
+ @Override
+ public GetQueueUserAclsInfoResponse getQueueUserAcls(
+ GetQueueUserAclsInfoRequest request) throws YarnException, IOException {
+ return mockRM.getQueueUserAcls(request);
+ }
+
+ @Override
+ public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
+ MoveApplicationAcrossQueuesRequest request)
+ throws YarnException, IOException {
+ return mockRM.moveApplicationAcrossQueues(request);
+ }
+
+ @Override
+ public GetNewReservationResponse getNewReservation(
+ GetNewReservationRequest request) throws YarnException, IOException {
+ return mockRM.getNewReservation(request);
+ }
+
+ @Override
+ public ReservationSubmissionResponse submitReservation(
+ ReservationSubmissionRequest request) throws YarnException, IOException {
+ return mockRM.submitReservation(request);
+ }
+
+ @Override
+ public ReservationListResponse listReservations(
+ ReservationListRequest request) throws YarnException, IOException {
+ return mockRM.listReservations(request);
+ }
+
+ @Override
+ public ReservationUpdateResponse updateReservation(
+ ReservationUpdateRequest request) throws YarnException, IOException {
+ return mockRM.updateReservation(request);
+ }
+
+ @Override
+ public ReservationDeleteResponse deleteReservation(
+ ReservationDeleteRequest request) throws YarnException, IOException {
+ return mockRM.deleteReservation(request);
+ }
+
+ @Override
+ public GetNodesToLabelsResponse getNodeToLabels(
+ GetNodesToLabelsRequest request) throws YarnException, IOException {
+ return mockRM.getNodeToLabels(request);
+ }
+
+ @Override
+ public GetLabelsToNodesResponse getLabelsToNodes(
+ GetLabelsToNodesRequest request) throws YarnException, IOException {
+ return mockRM.getLabelsToNodes(request);
+ }
+
+ @Override
+ public GetClusterNodeLabelsResponse getClusterNodeLabels(
+ GetClusterNodeLabelsRequest request) throws YarnException, IOException {
+ return mockRM.getClusterNodeLabels(request);
+ }
+
+ @Override
+ public GetApplicationReportResponse getApplicationReport(
+ GetApplicationReportRequest request) throws YarnException, IOException {
+ return mockRM.getApplicationReport(request);
+ }
+
+ @Override
+ public GetApplicationsResponse getApplications(GetApplicationsRequest request)
+ throws YarnException, IOException {
+ return mockRM.getApplications(request);
+ }
+
+ @Override
+ public GetApplicationAttemptReportResponse getApplicationAttemptReport(
+ GetApplicationAttemptReportRequest request)
+ throws YarnException, IOException {
+ return mockRM.getApplicationAttemptReport(request);
+ }
+
+ @Override
+ public GetApplicationAttemptsResponse getApplicationAttempts(
+ GetApplicationAttemptsRequest request) throws YarnException, IOException {
+ return mockRM.getApplicationAttempts(request);
+ }
+
+ @Override
+ public GetContainerReportResponse getContainerReport(
+ GetContainerReportRequest request) throws YarnException, IOException {
+ return mockRM.getContainerReport(request);
+ }
+
+ @Override
+ public GetContainersResponse getContainers(GetContainersRequest request)
+ throws YarnException, IOException {
+ return mockRM.getContainers(request);
+ }
+
+ @Override
+ public GetDelegationTokenResponse getDelegationToken(
+ GetDelegationTokenRequest request) throws YarnException, IOException {
+ return mockRM.getDelegationToken(request);
+ }
+
+ @Override
+ public RenewDelegationTokenResponse renewDelegationToken(
+ RenewDelegationTokenRequest request) throws YarnException, IOException {
+ return mockRM.renewDelegationToken(request);
+ }
+
+ @Override
+ public CancelDelegationTokenResponse cancelDelegationToken(
+ CancelDelegationTokenRequest request) throws YarnException, IOException {
+ return mockRM.cancelDelegationToken(request);
+ }
+
+ @Override
+ public FailApplicationAttemptResponse failApplicationAttempt(
+ FailApplicationAttemptRequest request) throws YarnException, IOException {
+ return mockRM.failApplicationAttempt(request);
+ }
+
+ @Override
+ public UpdateApplicationPriorityResponse updateApplicationPriority(
+ UpdateApplicationPriorityRequest request)
+ throws YarnException, IOException {
+ return mockRM.updateApplicationPriority(request);
+ }
+
+ @Override
+ public SignalContainerResponse signalToContainer(
+ SignalContainerRequest request) throws YarnException, IOException {
+ return mockRM.signalToContainer(request);
+ }
+
+ @Override
+ public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
+ UpdateApplicationTimeoutsRequest request)
+ throws YarnException, IOException {
+ return mockRM.updateApplicationTimeouts(request);
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrmproxy/MockResourceManagerFacade.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrmproxy/MockResourceManagerFacade.java
new file mode 100644
index 0000000..5909282
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrmproxy/MockResourceManagerFacade.java
@@ -0,0 +1,312 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.clientrmproxy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeLabel;
+import org.apache.hadoop.yarn.api.records.ReservationAllocationState;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * Mock Resource Manager facade implementation that exposes all the methods
+ * implemented by the YARN RM. The behavior and the values returned by this mock
+ * implementation is expected by the unit test cases. So please change the
+ * implementation with care.
+ */
+public class MockResourceManagerFacade implements ApplicationClientProtocol {
+
+ private AtomicInteger containerIndex = new AtomicInteger(0);
+
+ public MockResourceManagerFacade(Configuration conf,
+ int startContainerIndex) {
+ this.containerIndex.set(startContainerIndex);
+ }
+
+ @Override
+ public GetApplicationReportResponse getApplicationReport(
+ GetApplicationReportRequest request) throws YarnException, IOException {
+
+ GetApplicationReportResponse response =
+ Records.newRecord(GetApplicationReportResponse.class);
+ ApplicationReport report = Records.newRecord(ApplicationReport.class);
+ report.setYarnApplicationState(YarnApplicationState.ACCEPTED);
+ report.setApplicationId(request.getApplicationId());
+ report.setCurrentApplicationAttemptId(
+ ApplicationAttemptId.newInstance(request.getApplicationId(), 1));
+ response.setApplicationReport(report);
+ return response;
+ }
+
+ @Override
+ public GetApplicationAttemptReportResponse getApplicationAttemptReport(
+ GetApplicationAttemptReportRequest request)
+ throws YarnException, IOException {
+
+ GetApplicationAttemptReportResponse response =
+ Records.newRecord(GetApplicationAttemptReportResponse.class);
+ ApplicationAttemptReport report =
+ Records.newRecord(ApplicationAttemptReport.class);
+ report.setApplicationAttemptId(request.getApplicationAttemptId());
+ report.setYarnApplicationAttemptState(YarnApplicationAttemptState.LAUNCHED);
+ response.setApplicationAttemptReport(report);
+ return response;
+ }
+
+ @Override
+ public GetNewApplicationResponse getNewApplication(
+ GetNewApplicationRequest request) throws YarnException, IOException {
+ return GetNewApplicationResponse.newInstance(null, null, null);
+ }
+
+ @Override
+ public SubmitApplicationResponse submitApplication(
+ SubmitApplicationRequest request) throws YarnException, IOException {
+ return SubmitApplicationResponse.newInstance();
+ }
+
+ @Override
+ public KillApplicationResponse forceKillApplication(
+ KillApplicationRequest request) throws YarnException, IOException {
+ return KillApplicationResponse.newInstance(true);
+ }
+
+ @Override
+ public GetClusterMetricsResponse getClusterMetrics(
+ GetClusterMetricsRequest request) throws YarnException, IOException {
+ return GetClusterMetricsResponse.newInstance(null);
+ }
+
+ @Override
+ public GetApplicationsResponse getApplications(GetApplicationsRequest request)
+ throws YarnException, IOException {
+ return GetApplicationsResponse.newInstance(null);
+ }
+
+ @Override
+ public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request)
+ throws YarnException, IOException {
+ return GetClusterNodesResponse.newInstance(null);
+ }
+
+ @Override
+ public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
+ throws YarnException, IOException {
+ return GetQueueInfoResponse.newInstance(null);
+ }
+
+ @Override
+ public GetQueueUserAclsInfoResponse getQueueUserAcls(
+ GetQueueUserAclsInfoRequest request) throws YarnException, IOException {
+ return GetQueueUserAclsInfoResponse.newInstance(null);
+ }
+
+ @Override
+ public GetDelegationTokenResponse getDelegationToken(
+ GetDelegationTokenRequest request) throws YarnException, IOException {
+ return GetDelegationTokenResponse.newInstance(null);
+ }
+
+ @Override
+ public RenewDelegationTokenResponse renewDelegationToken(
+ RenewDelegationTokenRequest request) throws YarnException, IOException {
+ return RenewDelegationTokenResponse.newInstance(0);
+ }
+
+ @Override
+ public CancelDelegationTokenResponse cancelDelegationToken(
+ CancelDelegationTokenRequest request) throws YarnException, IOException {
+ return CancelDelegationTokenResponse.newInstance();
+ }
+
+ @Override
+ public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
+ MoveApplicationAcrossQueuesRequest request)
+ throws YarnException, IOException {
+ return MoveApplicationAcrossQueuesResponse.newInstance();
+ }
+
+ @Override
+ public GetApplicationAttemptsResponse getApplicationAttempts(
+ GetApplicationAttemptsRequest request) throws YarnException, IOException {
+ return GetApplicationAttemptsResponse.newInstance(null);
+ }
+
+ @Override
+ public GetContainerReportResponse getContainerReport(
+ GetContainerReportRequest request) throws YarnException, IOException {
+ return GetContainerReportResponse.newInstance(null);
+ }
+
+ @Override
+ public GetContainersResponse getContainers(GetContainersRequest request)
+ throws YarnException, IOException {
+ return GetContainersResponse.newInstance(null);
+ }
+
+ @Override
+ public ReservationSubmissionResponse submitReservation(
+ ReservationSubmissionRequest request) throws YarnException, IOException {
+ return ReservationSubmissionResponse.newInstance();
+ }
+
+ @Override
+ public ReservationListResponse listReservations(
+ ReservationListRequest request) throws YarnException, IOException {
+ return ReservationListResponse
+ .newInstance(new ArrayList());
+ }
+
+ @Override
+ public ReservationUpdateResponse updateReservation(
+ ReservationUpdateRequest request) throws YarnException, IOException {
+ return ReservationUpdateResponse.newInstance();
+ }
+
+ @Override
+ public ReservationDeleteResponse deleteReservation(
+ ReservationDeleteRequest request) throws YarnException, IOException {
+ return ReservationDeleteResponse.newInstance();
+ }
+
+ @Override
+ public GetNodesToLabelsResponse getNodeToLabels(
+ GetNodesToLabelsRequest request) throws YarnException, IOException {
+ return GetNodesToLabelsResponse
+ .newInstance(new HashMap>());
+ }
+
+ @Override
+ public GetClusterNodeLabelsResponse getClusterNodeLabels(
+ GetClusterNodeLabelsRequest request) throws YarnException, IOException {
+ return GetClusterNodeLabelsResponse.newInstance(new ArrayList());
+ }
+
+ @Override
+ public GetLabelsToNodesResponse getLabelsToNodes(
+ GetLabelsToNodesRequest request) throws YarnException, IOException {
+ return GetLabelsToNodesResponse.newInstance(null);
+ }
+
+ @Override
+ public GetNewReservationResponse getNewReservation(
+ GetNewReservationRequest request) throws YarnException, IOException {
+ return GetNewReservationResponse
+ .newInstance(ReservationId.newInstance(0, 0));
+ }
+
+ @Override
+ public FailApplicationAttemptResponse failApplicationAttempt(
+ FailApplicationAttemptRequest request) throws YarnException, IOException {
+ return FailApplicationAttemptResponse.newInstance();
+ }
+
+ @Override
+ public UpdateApplicationPriorityResponse updateApplicationPriority(
+ UpdateApplicationPriorityRequest request)
+ throws YarnException, IOException {
+ return UpdateApplicationPriorityResponse.newInstance(null);
+ }
+
+ @Override
+ public SignalContainerResponse signalToContainer(
+ SignalContainerRequest request) throws YarnException, IOException {
+ return new SignalContainerResponsePBImpl();
+ }
+
+ @Override
+ public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
+ UpdateApplicationTimeoutsRequest request)
+ throws YarnException, IOException {
+ return UpdateApplicationTimeoutsResponse.newInstance();
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrmproxy/PassThroughClientRequestInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrmproxy/PassThroughClientRequestInterceptor.java
new file mode 100644
index 0000000..a5c3e7e
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrmproxy/PassThroughClientRequestInterceptor.java
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.clientrmproxy;
+
+import java.io.IOException;
+
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * Mock intercepter that does not do anything other than forwarding it to the
+ * next intercepter in the chain
+ *
+ */
+public class PassThroughClientRequestInterceptor
+ extends AbstractClientRequestInterceptor {
+
+ @Override
+ public GetNewApplicationResponse getNewApplication(
+ GetNewApplicationRequest request) throws YarnException, IOException {
+ return getNextInterceptor().getNewApplication(request);
+ }
+
+ @Override
+ public SubmitApplicationResponse submitApplication(
+ SubmitApplicationRequest request) throws YarnException, IOException {
+ return getNextInterceptor().submitApplication(request);
+ }
+
+ @Override
+ public KillApplicationResponse forceKillApplication(
+ KillApplicationRequest request) throws YarnException, IOException {
+ return getNextInterceptor().forceKillApplication(request);
+ }
+
+ @Override
+ public GetClusterMetricsResponse getClusterMetrics(
+ GetClusterMetricsRequest request) throws YarnException, IOException {
+ return getNextInterceptor().getClusterMetrics(request);
+ }
+
+ @Override
+ public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request)
+ throws YarnException, IOException {
+ return getNextInterceptor().getClusterNodes(request);
+ }
+
+ @Override
+ public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
+ throws YarnException, IOException {
+ return getNextInterceptor().getQueueInfo(request);
+ }
+
+ @Override
+ public GetQueueUserAclsInfoResponse getQueueUserAcls(
+ GetQueueUserAclsInfoRequest request) throws YarnException, IOException {
+ return getNextInterceptor().getQueueUserAcls(request);
+ }
+
+ @Override
+ public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
+ MoveApplicationAcrossQueuesRequest request)
+ throws YarnException, IOException {
+ return getNextInterceptor().moveApplicationAcrossQueues(request);
+ }
+
+ @Override
+ public GetNewReservationResponse getNewReservation(
+ GetNewReservationRequest request) throws YarnException, IOException {
+ return getNextInterceptor().getNewReservation(request);
+ }
+
+ @Override
+ public ReservationSubmissionResponse submitReservation(
+ ReservationSubmissionRequest request) throws YarnException, IOException {
+ return getNextInterceptor().submitReservation(request);
+ }
+
+ @Override
+ public ReservationListResponse listReservations(
+ ReservationListRequest request) throws YarnException, IOException {
+ return getNextInterceptor().listReservations(request);
+ }
+
+ @Override
+ public ReservationUpdateResponse updateReservation(
+ ReservationUpdateRequest request) throws YarnException, IOException {
+ return getNextInterceptor().updateReservation(request);
+ }
+
+ @Override
+ public ReservationDeleteResponse deleteReservation(
+ ReservationDeleteRequest request) throws YarnException, IOException {
+ return getNextInterceptor().deleteReservation(request);
+ }
+
+ @Override
+ public GetNodesToLabelsResponse getNodeToLabels(
+ GetNodesToLabelsRequest request) throws YarnException, IOException {
+ return getNextInterceptor().getNodeToLabels(request);
+ }
+
+ @Override
+ public GetLabelsToNodesResponse getLabelsToNodes(
+ GetLabelsToNodesRequest request) throws YarnException, IOException {
+ return getNextInterceptor().getLabelsToNodes(request);
+ }
+
+ @Override
+ public GetClusterNodeLabelsResponse getClusterNodeLabels(
+ GetClusterNodeLabelsRequest request) throws YarnException, IOException {
+ return getNextInterceptor().getClusterNodeLabels(request);
+ }
+
+ @Override
+ public GetApplicationReportResponse getApplicationReport(
+ GetApplicationReportRequest request) throws YarnException, IOException {
+ return getNextInterceptor().getApplicationReport(request);
+ }
+
+ @Override
+ public GetApplicationsResponse getApplications(GetApplicationsRequest request)
+ throws YarnException, IOException {
+ return getNextInterceptor().getApplications(request);
+ }
+
+ @Override
+ public GetApplicationAttemptReportResponse getApplicationAttemptReport(
+ GetApplicationAttemptReportRequest request)
+ throws YarnException, IOException {
+ return getNextInterceptor().getApplicationAttemptReport(request);
+ }
+
+ @Override
+ public GetApplicationAttemptsResponse getApplicationAttempts(
+ GetApplicationAttemptsRequest request) throws YarnException, IOException {
+ return getNextInterceptor().getApplicationAttempts(request);
+ }
+
+ @Override
+ public GetContainerReportResponse getContainerReport(
+ GetContainerReportRequest request) throws YarnException, IOException {
+ return getNextInterceptor().getContainerReport(request);
+ }
+
+ @Override
+ public GetContainersResponse getContainers(GetContainersRequest request)
+ throws YarnException, IOException {
+ return getNextInterceptor().getContainers(request);
+ }
+
+ @Override
+ public GetDelegationTokenResponse getDelegationToken(
+ GetDelegationTokenRequest request) throws YarnException, IOException {
+ return getNextInterceptor().getDelegationToken(request);
+ }
+
+ @Override
+ public RenewDelegationTokenResponse renewDelegationToken(
+ RenewDelegationTokenRequest request) throws YarnException, IOException {
+ return getNextInterceptor().renewDelegationToken(request);
+ }
+
+ @Override
+ public CancelDelegationTokenResponse cancelDelegationToken(
+ CancelDelegationTokenRequest request) throws YarnException, IOException {
+ return getNextInterceptor().cancelDelegationToken(request);
+ }
+
+ @Override
+ public FailApplicationAttemptResponse failApplicationAttempt(
+ FailApplicationAttemptRequest request) throws YarnException, IOException {
+ return getNextInterceptor().failApplicationAttempt(request);
+ }
+
+ @Override
+ public UpdateApplicationPriorityResponse updateApplicationPriority(
+ UpdateApplicationPriorityRequest request)
+ throws YarnException, IOException {
+ return getNextInterceptor().updateApplicationPriority(request);
+ }
+
+ @Override
+ public SignalContainerResponse signalToContainer(
+ SignalContainerRequest request) throws YarnException, IOException {
+ return getNextInterceptor().signalToContainer(request);
+ }
+
+ @Override
+ public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
+ UpdateApplicationTimeoutsRequest request)
+ throws YarnException, IOException {
+ return getNextInterceptor().updateApplicationTimeouts(request);
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrmproxy/TestClientRMProxyService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrmproxy/TestClientRMProxyService.java
new file mode 100644
index 0000000..0cec953
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrmproxy/TestClientRMProxyService.java
@@ -0,0 +1,199 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.router.clientrmproxy;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.router.clientrmproxy.ClientRMProxyService.RequestInterceptorChainWrapper;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestClientRMProxyService extends BaseClientRMProxyTest {
+
+ private static final Log LOG =
+ LogFactory.getLog(TestClientRMProxyService.class);
+
+ /**
+ * Test if the pipeline is created properly.
+ */
+ @Test
+ public void testRequestInterceptorChainCreation() throws Exception {
+ ClientRequestInterceptor root =
+ super.getClientRMProxyService().createRequestInterceptorChain();
+ int index = 0;
+ while (root != null) {
+ switch (index) {
+ case 0:
+ case 1:
+ case 2:
+ Assert.assertEquals(PassThroughClientRequestInterceptor.class.getName(),
+ root.getClass().getName());
+ break;
+ case 3:
+ Assert.assertEquals(MockClientRequestInterceptor.class.getName(),
+ root.getClass().getName());
+ break;
+ }
+ root = root.getNextInterceptor();
+ index++;
+ }
+ Assert.assertEquals("The number of interceptors in chain does not match", 4,
+ index);
+ }
+
+ /**
+ * Test if the ClientRMProxy forwards all the requests to the MockRM and get
+ * back the responses.
+ */
+ @Test
+ public void testClientRMProxyServiceE2E() throws Exception {
+
+ String user = "test1";
+
+ LOG.info("testClientRMProxyServiceE2E - Get New Application");
+
+ GetNewApplicationResponse responseGetNewApp = getNewApplication(user);
+ Assert.assertNotNull(responseGetNewApp);
+
+ LOG.info("testClientRMProxyServiceE2E - Submit Application");
+
+ SubmitApplicationResponse responseSubmitApp =
+ submitApplication(responseGetNewApp.getApplicationId(), user);
+ Assert.assertNotNull(responseSubmitApp);
+
+ LOG.info("testClientRMProxyServiceE2E - Kill Application");
+
+ KillApplicationResponse responseKillApp =
+ forceKillApplication(responseGetNewApp.getApplicationId(), user);
+ Assert.assertNotNull(responseKillApp);
+
+ LOG.info("testClientRMProxyServiceE2E - Get Cluster Metrics");
+
+ GetClusterMetricsResponse responseGetClusterMetrics =
+ getClusterMetrics(user);
+ Assert.assertNotNull(responseGetClusterMetrics);
+
+ LOG.info("testClientRMProxyServiceE2E - Get Cluster Nodes");
+
+ GetClusterNodesResponse responseGetClusterNodes = getClusterNodes(user);
+ Assert.assertNotNull(responseGetClusterNodes);
+
+ LOG.info("testClientRMProxyServiceE2E - Get Queue Info");
+
+ GetQueueInfoResponse responseGetQueueInfo = getQueueInfo(user);
+ Assert.assertNotNull(responseGetQueueInfo);
+
+ LOG.info("testClientRMProxyServiceE2E - Get Queue User");
+
+ GetQueueUserAclsInfoResponse responseGetQueueUser = getQueueUserAcls(user);
+ Assert.assertNotNull(responseGetQueueUser);
+
+ LOG.info("testClientRMProxyServiceE2E - Get Cluster Node");
+
+ GetClusterNodeLabelsResponse responseGetClusterNode =
+ getClusterNodeLabels(user);
+ Assert.assertNotNull(responseGetClusterNode);
+
+ LOG.info("testClientRMProxyServiceE2E - Move Application Across Queues");
+
+ MoveApplicationAcrossQueuesResponse responseMoveApp =
+ moveApplicationAcrossQueues(user, responseGetNewApp.getApplicationId());
+ Assert.assertNotNull(responseMoveApp);
+
+ LOG.info("testClientRMProxyServiceE2E - Get New Reservation");
+
+ GetNewReservationResponse getNewReservationResponse =
+ getNewReservation(user);
+
+ LOG.info("testClientRMProxyServiceE2E - Submit Reservation");
+
+ ReservationSubmissionResponse responseSubmitReser =
+ submitReservation(user, getNewReservationResponse.getReservationId());
+ Assert.assertNotNull(responseSubmitReser);
+
+ LOG.info("testClientRMProxyServiceE2E - Update Reservation");
+
+ ReservationUpdateResponse responseUpdateReser =
+ updateReservation(user, getNewReservationResponse.getReservationId());
+ Assert.assertNotNull(responseUpdateReser);
+
+ LOG.info("testClientRMProxyServiceE2E - Delete Reservation");
+
+ ReservationDeleteResponse responseDeleteReser =
+ deleteReservation(user, getNewReservationResponse.getReservationId());
+ Assert.assertNotNull(responseDeleteReser);
+ }
+
+ /**
+ * Test if the different chains for users are generated, and LRU cache is
+ * working as expected
+ */
+ @Test
+ public void testUsersChainMapWithLRUCache()
+ throws YarnException, IOException, InterruptedException {
+
+ Map pipelines;
+ RequestInterceptorChainWrapper chain;
+
+ getNewApplication("test1");
+ getNewApplication("test2");
+ getNewApplication("test3");
+ getNewApplication("test4");
+ getNewApplication("test5");
+ getNewApplication("test6");
+ getNewApplication("test7");
+ getNewApplication("test8");
+
+ pipelines = super.getClientRMProxyService().getPipelines();
+ Assert.assertEquals(8, pipelines.size());
+
+ getNewApplication("test9");
+ getNewApplication("test10");
+ getNewApplication("test1");
+ getNewApplication("test11");
+
+ // The cache max size is defined in
+ // BaseClientRMProxyTest.TEST_MAX_CACHE_SIZE
+ Assert.assertEquals(10, pipelines.size());
+
+ chain = pipelines.get("test1");
+ Assert.assertNotNull("test1 should not be evicted", chain);
+
+ chain = pipelines.get("test2");
+ Assert.assertNull("test2 should have been evicted", chain);
+ }
+
+}