diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 5193dbe..cf4b0d4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2611,6 +2611,32 @@ public static boolean isAclEnabled(Configuration conf) { public static final int DEFAULT_FEDERATION_STATESTORE_SQL_MAXCONNECTIONS = 1; + public static final String ROUTER_PREFIX = YARN_PREFIX + "federation."; + + public static final String ROUTER_CLIENTRM_PROXY_PREFIX = + YARN_PREFIX + "clientrmproxy."; + + public static final String ROUTER_CLIENTRM_PROXY_ADDRESS = + ROUTER_CLIENTRM_PROXY_PREFIX + ".address"; + public static final int DEFAULT_ROUTER_CLIENTRM_PROXY_PORT = 8050; + public static final String DEFAULT_ROUTER_CLIENTRM_PROXY_ADDRESS = + "0.0.0.0:" + DEFAULT_ROUTER_CLIENTRM_PROXY_PORT; + + public static final String ROUTER_CLIENTRM_PROXY_THREAD_COUNT = + ROUTER_CLIENTRM_PROXY_PREFIX + "client.thread-count"; + public static final int DEFAULT_ROUTER_CLIENTRM_PROXY_THREAD_COUNT = 25; + + public static final String ROUTER_CLIENTRM_PROXY_INTERCEPTOR_CLASS_PIPELINE = + ROUTER_CLIENTRM_PROXY_PREFIX + "interceptor-class.pipeline"; + public static final String DEFAULT_ROUTER_CLIENTRM_PROXY_INTERCEPTOR_CLASS_PIPELINE = + "org.apache.hadoop.yarn.server.router.clientrmproxy." + + "DefaultClientRequestInterceptor"; + + /** Max cache size for user pipeline map */ + public static final String CLIENTRM_PROXY_PIPELINE_CACHE_MAX_SIZE = + YARN_PREFIX + "clientrmproxy.cache-max-size"; + public static final int DEFAULT_CLIENTRM_PROXY_PIPELINE_CACHE_MAX_SIZE = 25; + //////////////////////////////// // Other Configs //////////////////////////////// diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml index 25afa5c..add856e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml @@ -52,6 +52,12 @@ org.apache.hadoop hadoop-yarn-server-common + + + junit + junit + test + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java index 7be8a59..88159ea 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java @@ -18,6 +18,20 @@ package org.apache.hadoop.yarn.server.router; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.util.ShutdownHookManager; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.router.clientrmproxy.ClientRMProxyService; + /** * The router is a stateless YARN component which is the entry point to the * cluster. It can be deployed on multiple nodes behind a Virtual IP (VIP) with @@ -33,6 +47,88 @@ * This provides a placeholder for throttling mis-behaving clients (YARN-1546) * and masks the access to multiple RMs (YARN-3659). */ -public class Router{ +public class Router extends CompositeService { + + private static final Log LOG = LogFactory.getLog(Router.class); + private static CompositeServiceShutdownHook routerShutdownHook; + private Configuration conf; + private AtomicBoolean isStopping = new AtomicBoolean(false); + private ClientRMProxyService clientRMProxyService; + + /** + * Priority of the Router shutdown hook. + */ + public static final int SHUTDOWN_HOOK_PRIORITY = 30; + + public Router() { + super(Router.class.getName()); + } + + protected void doSecureLogin() throws IOException { + // TODO + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + this.conf = conf; + clientRMProxyService = createClientRMProxyService(); + addService(clientRMProxyService); + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + try { + doSecureLogin(); + } catch (IOException e) { + throw new YarnRuntimeException("Failed Router login", e); + } + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + if (isStopping.getAndSet(true)) { + return; + } + super.serviceStop(); + } + + protected void shutDown() { + new Thread() { + @Override + public void run() { + Router.this.stop(); + } + }.start(); + } + + protected ClientRMProxyService createClientRMProxyService() { + return new ClientRMProxyService(); + } + + public static void main(String argv[]) { + Configuration conf = new YarnConfiguration(); + Thread + .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); + StringUtils.startupShutdownMessage(Router.class, argv, LOG); + Router router = new Router(); + try { + + // Remove the old hook if we are rebooting. + if (null != routerShutdownHook) { + ShutdownHookManager.get().removeShutdownHook(routerShutdownHook); + } + + routerShutdownHook = new CompositeServiceShutdownHook(router); + ShutdownHookManager.get().addShutdownHook(routerShutdownHook, + SHUTDOWN_HOOK_PRIORITY); + router.init(conf); + router.start(); + } catch (Throwable t) { + LOG.fatal("Error starting Router", t); + System.exit(-1); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrmproxy/AbstractClientRequestInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrmproxy/AbstractClientRequestInterceptor.java new file mode 100644 index 0000000..5b75426 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrmproxy/AbstractClientRequestInterceptor.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.clientrmproxy; + +import org.apache.hadoop.conf.Configuration; + +/** + * Implements the RequestInterceptor interface and provides common functionality + * which can can be used and/or extended by other concrete intercepter classes. + * + */ +public abstract class AbstractClientRequestInterceptor + implements ClientRequestInterceptor { + private Configuration conf; + private ClientRequestInterceptor nextInterceptor; + + /** + * Sets the {@link RequestInterceptor} in the chain. + */ + @Override + public void setNextInterceptor(ClientRequestInterceptor nextInterceptor) { + this.nextInterceptor = nextInterceptor; + } + + /** + * Sets the {@link Configuration}. + */ + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + if (this.nextInterceptor != null) { + this.nextInterceptor.setConf(conf); + } + } + + /** + * Gets the {@link Configuration}. + */ + @Override + public Configuration getConf() { + return this.conf; + } + + /** + * Initializes the {@link ClientRequestInterceptor}. + */ + @Override + public void init(String user) { + if (this.nextInterceptor != null) { + this.nextInterceptor.init(user); + } + } + + /** + * Disposes the {@link ClientRequestInterceptor}. + */ + @Override + public void shutdown() { + if (this.nextInterceptor != null) { + this.nextInterceptor.shutdown(); + } + } + + /** + * Gets the next {@link ClientRequestInterceptor} in the chain. + */ + @Override + public ClientRequestInterceptor getNextInterceptor() { + return this.nextInterceptor; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrmproxy/ClientRMProxyService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrmproxy/ClientRMProxyService.java new file mode 100644 index 0000000..ec0e61b --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrmproxy/ClientRMProxyService.java @@ -0,0 +1,538 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.clientrmproxy; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.server.router.util.LRUCacheHashMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ClientRMProxyService is a service that runs on each router that can be used + * to intercept and inspect ApplicationClientProtocol messages from client to + * the cluster resource manager. It listens ApplicationClientProtocol messages + * from the client and creates a request intercepting pipeline instance for each + * client. The pipeline is a chain of intercepter instances that can inspect and + * modify the request/response as needed. The main difference with + * AMRMProxyService is the protocol they implement. + */ +public class ClientRMProxyService extends AbstractService + implements ApplicationClientProtocol { + + private static final Logger LOG = + LoggerFactory.getLogger(ClientRMProxyService.class); + + private Server server; + private InetSocketAddress listenerEndpoint; + private Map userPipelineMap; + + public ClientRMProxyService() { + super(ClientRMProxyService.class.getName()); + } + + @Override + protected void serviceStart() throws Exception { + LOG.info("Starting Router ClientRMProxyService"); + Configuration conf = getConfig(); + YarnRPC rpc = YarnRPC.create(conf); + UserGroupInformation.setConfiguration(conf); + + this.listenerEndpoint = + conf.getSocketAddr(YarnConfiguration.ROUTER_CLIENTRM_PROXY_ADDRESS, + YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_PROXY_ADDRESS, + YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_PROXY_PORT); + + int maxCacheSize = + conf.getInt(YarnConfiguration.CLIENTRM_PROXY_PIPELINE_CACHE_MAX_SIZE, + YarnConfiguration.DEFAULT_CLIENTRM_PROXY_PIPELINE_CACHE_MAX_SIZE); + this.userPipelineMap = Collections.synchronizedMap( + new LRUCacheHashMap( + maxCacheSize, true)); + + Configuration serverConf = new Configuration(conf); + + int numWorkerThreads = + serverConf.getInt(YarnConfiguration.ROUTER_CLIENTRM_PROXY_THREAD_COUNT, + YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_PROXY_THREAD_COUNT); + + this.server = rpc.getServer(ApplicationClientProtocol.class, this, + listenerEndpoint, serverConf, null, numWorkerThreads); + + this.server.start(); + LOG.info("Router ClientRMProxyService listening on address: " + + this.server.getListenerAddress()); + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + LOG.info("Stopping Router ClientRMProxyService"); + if (this.server != null) { + this.server.stop(); + } + userPipelineMap.clear(); + super.serviceStop(); + } + + /** + * Returns the comma separated intercepter class names from the configuration. + * + * @param conf + * @return the intercepter class names as an instance of ArrayList + */ + private List getInterceptorClassNames(Configuration conf) { + String configuredInterceptorClassNames = conf.get( + YarnConfiguration.ROUTER_CLIENTRM_PROXY_INTERCEPTOR_CLASS_PIPELINE, + YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_PROXY_INTERCEPTOR_CLASS_PIPELINE); + + List interceptorClassNames = new ArrayList(); + Collection tempList = + StringUtils.getStringCollection(configuredInterceptorClassNames); + for (String item : tempList) { + interceptorClassNames.add(item.trim()); + } + + return interceptorClassNames; + } + + @Override + public GetNewApplicationResponse getNewApplication( + GetNewApplicationRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getNewApplication(request); + } + + @Override + public SubmitApplicationResponse submitApplication( + SubmitApplicationRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().submitApplication(request); + } + + @Override + public KillApplicationResponse forceKillApplication( + KillApplicationRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().forceKillApplication(request); + } + + @Override + public GetClusterMetricsResponse getClusterMetrics( + GetClusterMetricsRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getClusterMetrics(request); + } + + @Override + public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request) + throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getClusterNodes(request); + } + + @Override + public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request) + throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getQueueInfo(request); + } + + @Override + public GetQueueUserAclsInfoResponse getQueueUserAcls( + GetQueueUserAclsInfoRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getQueueUserAcls(request); + } + + @Override + public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( + MoveApplicationAcrossQueuesRequest request) + throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().moveApplicationAcrossQueues(request); + } + + @Override + public GetNewReservationResponse getNewReservation( + GetNewReservationRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getNewReservation(request); + } + + @Override + public ReservationSubmissionResponse submitReservation( + ReservationSubmissionRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().submitReservation(request); + } + + @Override + public ReservationListResponse listReservations( + ReservationListRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().listReservations(request); + } + + @Override + public ReservationUpdateResponse updateReservation( + ReservationUpdateRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().updateReservation(request); + } + + @Override + public ReservationDeleteResponse deleteReservation( + ReservationDeleteRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().deleteReservation(request); + } + + @Override + public GetNodesToLabelsResponse getNodeToLabels( + GetNodesToLabelsRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getNodeToLabels(request); + } + + @Override + public GetLabelsToNodesResponse getLabelsToNodes( + GetLabelsToNodesRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getLabelsToNodes(request); + } + + @Override + public GetClusterNodeLabelsResponse getClusterNodeLabels( + GetClusterNodeLabelsRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getClusterNodeLabels(request); + } + + @Override + public GetApplicationReportResponse getApplicationReport( + GetApplicationReportRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getApplicationReport(request); + } + + @Override + public GetApplicationsResponse getApplications(GetApplicationsRequest request) + throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getApplications(request); + } + + @Override + public GetApplicationAttemptReportResponse getApplicationAttemptReport( + GetApplicationAttemptReportRequest request) + throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getApplicationAttemptReport(request); + } + + @Override + public GetApplicationAttemptsResponse getApplicationAttempts( + GetApplicationAttemptsRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getApplicationAttempts(request); + } + + @Override + public GetContainerReportResponse getContainerReport( + GetContainerReportRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getContainerReport(request); + } + + @Override + public GetContainersResponse getContainers(GetContainersRequest request) + throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getContainers(request); + } + + @Override + public GetDelegationTokenResponse getDelegationToken( + GetDelegationTokenRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getDelegationToken(request); + } + + @Override + public RenewDelegationTokenResponse renewDelegationToken( + RenewDelegationTokenRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().renewDelegationToken(request); + } + + @Override + public CancelDelegationTokenResponse cancelDelegationToken( + CancelDelegationTokenRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().cancelDelegationToken(request); + } + + @Override + public FailApplicationAttemptResponse failApplicationAttempt( + FailApplicationAttemptRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().failApplicationAttempt(request); + } + + @Override + public UpdateApplicationPriorityResponse updateApplicationPriority( + UpdateApplicationPriorityRequest request) + throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().updateApplicationPriority(request); + } + + @Override + public SignalContainerResponse signalToContainer( + SignalContainerRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().signalToContainer(request); + } + + @Override + public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( + UpdateApplicationTimeoutsRequest request) + throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().updateApplicationTimeouts(request); + } + + private RequestInterceptorChainWrapper getInterceptorChain() + throws IOException { + String user = UserGroupInformation.getCurrentUser().getUserName(); + if (!userPipelineMap.containsKey(user)) { + initializePipeline(user); + } + return userPipelineMap.get(user); + } + + /** + * Gets the Request intercepter chains for all the users. + * + * @return the request intercepter chains. + */ + protected Map getPipelines() { + return this.userPipelineMap; + } + + /** + * This method creates and returns reference of the first intercepter in the + * chain of request intercepter instances. + * + * @return the reference of the first intercepter in the chain + */ + protected ClientRequestInterceptor createRequestInterceptorChain() { + Configuration conf = getConfig(); + + List interceptorClassNames = getInterceptorClassNames(conf); + + ClientRequestInterceptor pipeline = null; + ClientRequestInterceptor current = null; + for (String interceptorClassName : interceptorClassNames) { + try { + Class interceptorClass = conf.getClassByName(interceptorClassName); + if (ClientRequestInterceptor.class.isAssignableFrom(interceptorClass)) { + ClientRequestInterceptor interceptorInstance = + (ClientRequestInterceptor) ReflectionUtils + .newInstance(interceptorClass, conf); + if (pipeline == null) { + pipeline = interceptorInstance; + current = interceptorInstance; + continue; + } else { + current.setNextInterceptor(interceptorInstance); + current = interceptorInstance; + } + } else { + throw new YarnRuntimeException( + "Class: " + interceptorClassName + " not instance of " + + ClientRequestInterceptor.class.getCanonicalName()); + } + } catch (ClassNotFoundException e) { + throw new YarnRuntimeException( + "Could not instantiate ApplicationClientRequestInterceptor: " + + interceptorClassName, + e); + } + } + + if (pipeline == null) { + throw new YarnRuntimeException( + "RequestInterceptor pipeline is not configured in the system"); + } + return pipeline; + } + + /** + * Initializes the request intercepter pipeline for the specified application. + * + * @param user + */ + protected void initializePipeline(String user) { + RequestInterceptorChainWrapper chainWrapper = null; + synchronized (this.userPipelineMap) { + if (this.userPipelineMap.containsKey(user)) { + LOG.info( + "Request to start an already existing user: {} was received, so ignoring.", + user); + return; + } + + chainWrapper = new RequestInterceptorChainWrapper(); + this.userPipelineMap.put(user, chainWrapper); + } + + // We register the pipeline instance in the map first and then initialize it + // later because chain initialization can be expensive and we would like to + // release the lock as soon as possible to prevent other applications from + // blocking when one application's chain is initializing + LOG.info( + "Initializing request processing pipeline for application for the user: {}", + user); + + try { + ClientRequestInterceptor interceptorChain = + this.createRequestInterceptorChain(); + interceptorChain.init(user); + chainWrapper.init(interceptorChain); + } catch (Exception e) { + synchronized (this.userPipelineMap) { + this.userPipelineMap.remove(user); + } + throw e; + } + } + + /** + * Private structure for encapsulating RequestInterceptor and user instances. + * + */ + @Private + public static class RequestInterceptorChainWrapper { + private ClientRequestInterceptor rootInterceptor; + + /** + * Initializes the wrapper with the specified parameters. + * + * @param rootInterceptor + */ + public synchronized void init(ClientRequestInterceptor rootInterceptor) { + this.rootInterceptor = rootInterceptor; + } + + /** + * Gets the root request intercepter. + * + * @return the root request intercepter + */ + public synchronized ClientRequestInterceptor getRootInterceptor() { + return rootInterceptor; + } + + /** + * Shutdown the chain of interceptors when the object is destroyed + */ + @Override + public void finalize() { + rootInterceptor.shutdown(); + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrmproxy/ClientRequestInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrmproxy/ClientRequestInterceptor.java new file mode 100644 index 0000000..1b6a859 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrmproxy/ClientRequestInterceptor.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.clientrmproxy; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; + +/** + * Defines the contract to be implemented by the request intercepter classes, + * that can be used to intercept and inspect messages sent from the client to + * the resource manager. + */ +public interface ClientRequestInterceptor + extends ApplicationClientProtocol, Configurable { + /** + * This method is called for initializing the intercepter. This is guaranteed + * to be called only once in the lifetime of this instance. + * + * @param user + */ + void init(String user); + + /** + * This method is called to release the resources held by the intercepter. + * This will be called when the application pipeline is being destroyed. The + * concrete implementations should dispose the resources and forward the + * request to the next intercepter, if any. + */ + void shutdown(); + + /** + * Sets the next intercepter in the pipeline. The concrete implementation of + * this interface should always pass the request to the nextInterceptor after + * inspecting the message. The last intercepter in the chain is responsible to + * send the messages to the resource manager service and so the last + * intercepter will not receive this method call. + * + * @param nextInterceptor + */ + void setNextInterceptor(ClientRequestInterceptor nextInterceptor); + + /** + * Returns the next intercepter in the chain. + * + * @return the next intercepter in the chain + */ + ClientRequestInterceptor getNextInterceptor(); + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrmproxy/DefaultClientRequestInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrmproxy/DefaultClientRequestInterceptor.java new file mode 100644 index 0000000..76f0b16 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrmproxy/DefaultClientRequestInterceptor.java @@ -0,0 +1,334 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.clientrmproxy; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; +import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Extends the AbstractRequestInterceptorClient class and provides an + * implementation that simply forwards the client requests to the cluster + * resource manager. + * + */ +public final class DefaultClientRequestInterceptor + extends AbstractClientRequestInterceptor { + private static final Logger LOG = + LoggerFactory.getLogger(DefaultClientRequestInterceptor.class); + private ApplicationClientProtocol clientRM; + private UserGroupInformation user = null; + + @Override + public void init(String userName) { + super.init(userName); + try { + // Do not create a proxy user if user name matches the user name on + // current UGI + if (userName.equalsIgnoreCase( + UserGroupInformation.getCurrentUser().getUserName())) { + user = UserGroupInformation.getCurrentUser(); + } else { + user = UserGroupInformation.createProxyUser(userName, + UserGroupInformation.getCurrentUser()); + } + + final Configuration conf = this.getConf(); + + clientRM = + user.doAs(new PrivilegedExceptionAction() { + @Override + public ApplicationClientProtocol run() throws Exception { + return ClientRMProxy.createRMProxy(conf, + ApplicationClientProtocol.class); + } + }); + } catch (IOException e) { + String message = "Error while creating ClientRM Proxy Service for user:"; + if (user != null) { + message += ", user: " + user; + } + + LOG.info(message); + throw new YarnRuntimeException(message, e); + } catch (Exception e) { + throw new YarnRuntimeException(e); + } + } + + @Override + public void setNextInterceptor(ClientRequestInterceptor next) { + throw new YarnRuntimeException( + "setNextInterceptor is being called on DefaultRequestInterceptor," + + "which should be the last one in the chain " + + "Check if the interceptor pipeline configuration is correct"); + } + + @Override + public GetNewApplicationResponse getNewApplication( + GetNewApplicationRequest request) throws YarnException, IOException { + return clientRM.getNewApplication(request); + } + + @Override + public SubmitApplicationResponse submitApplication( + SubmitApplicationRequest request) throws YarnException, IOException { + return clientRM.submitApplication(request); + } + + @Override + public KillApplicationResponse forceKillApplication( + KillApplicationRequest request) throws YarnException, IOException { + return clientRM.forceKillApplication(request); + } + + @Override + public GetClusterMetricsResponse getClusterMetrics( + GetClusterMetricsRequest request) throws YarnException, IOException { + return clientRM.getClusterMetrics(request); + } + + @Override + public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request) + throws YarnException, IOException { + return clientRM.getClusterNodes(request); + } + + @Override + public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request) + throws YarnException, IOException { + return clientRM.getQueueInfo(request); + } + + @Override + public GetQueueUserAclsInfoResponse getQueueUserAcls( + GetQueueUserAclsInfoRequest request) throws YarnException, IOException { + return clientRM.getQueueUserAcls(request); + } + + @Override + public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( + MoveApplicationAcrossQueuesRequest request) + throws YarnException, IOException { + return clientRM.moveApplicationAcrossQueues(request); + } + + @Override + public GetNewReservationResponse getNewReservation( + GetNewReservationRequest request) throws YarnException, IOException { + return clientRM.getNewReservation(request); + } + + @Override + public ReservationSubmissionResponse submitReservation( + ReservationSubmissionRequest request) throws YarnException, IOException { + return clientRM.submitReservation(request); + } + + @Override + public ReservationListResponse listReservations( + ReservationListRequest request) throws YarnException, IOException { + return clientRM.listReservations(request); + } + + @Override + public ReservationUpdateResponse updateReservation( + ReservationUpdateRequest request) throws YarnException, IOException { + return clientRM.updateReservation(request); + } + + @Override + public ReservationDeleteResponse deleteReservation( + ReservationDeleteRequest request) throws YarnException, IOException { + return clientRM.deleteReservation(request); + } + + @Override + public GetNodesToLabelsResponse getNodeToLabels( + GetNodesToLabelsRequest request) throws YarnException, IOException { + return clientRM.getNodeToLabels(request); + } + + @Override + public GetLabelsToNodesResponse getLabelsToNodes( + GetLabelsToNodesRequest request) throws YarnException, IOException { + return clientRM.getLabelsToNodes(request); + } + + @Override + public GetClusterNodeLabelsResponse getClusterNodeLabels( + GetClusterNodeLabelsRequest request) throws YarnException, IOException { + return clientRM.getClusterNodeLabels(request); + } + + @Override + public GetApplicationReportResponse getApplicationReport( + GetApplicationReportRequest request) throws YarnException, IOException { + return clientRM.getApplicationReport(request); + } + + @Override + public GetApplicationsResponse getApplications(GetApplicationsRequest request) + throws YarnException, IOException { + return clientRM.getApplications(request); + } + + @Override + public GetApplicationAttemptReportResponse getApplicationAttemptReport( + GetApplicationAttemptReportRequest request) + throws YarnException, IOException { + return clientRM.getApplicationAttemptReport(request); + } + + @Override + public GetApplicationAttemptsResponse getApplicationAttempts( + GetApplicationAttemptsRequest request) throws YarnException, IOException { + return clientRM.getApplicationAttempts(request); + } + + @Override + public GetContainerReportResponse getContainerReport( + GetContainerReportRequest request) throws YarnException, IOException { + return clientRM.getContainerReport(request); + } + + @Override + public GetContainersResponse getContainers(GetContainersRequest request) + throws YarnException, IOException { + return clientRM.getContainers(request); + } + + @Override + public GetDelegationTokenResponse getDelegationToken( + GetDelegationTokenRequest request) throws YarnException, IOException { + return clientRM.getDelegationToken(request); + } + + @Override + public RenewDelegationTokenResponse renewDelegationToken( + RenewDelegationTokenRequest request) throws YarnException, IOException { + return clientRM.renewDelegationToken(request); + } + + @Override + public CancelDelegationTokenResponse cancelDelegationToken( + CancelDelegationTokenRequest request) throws YarnException, IOException { + return clientRM.cancelDelegationToken(request); + } + + @Override + public FailApplicationAttemptResponse failApplicationAttempt( + FailApplicationAttemptRequest request) throws YarnException, IOException { + return clientRM.failApplicationAttempt(request); + } + + @Override + public UpdateApplicationPriorityResponse updateApplicationPriority( + UpdateApplicationPriorityRequest request) + throws YarnException, IOException { + return clientRM.updateApplicationPriority(request); + } + + @Override + public SignalContainerResponse signalToContainer( + SignalContainerRequest request) throws YarnException, IOException { + return clientRM.signalToContainer(request); + } + + @Override + public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( + UpdateApplicationTimeoutsRequest request) + throws YarnException, IOException { + return clientRM.updateApplicationTimeouts(request); + } + + @VisibleForTesting + public void setRMClient(ApplicationClientProtocol clientRM) { + this.clientRM = clientRM; + + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrmproxy/package-info.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrmproxy/package-info.java new file mode 100644 index 0000000..2d35d42 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrmproxy/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Router ClientRM Proxy Service package. **/ +package org.apache.hadoop.yarn.server.router.clientrmproxy; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/util/LRUCacheHashMap.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/util/LRUCacheHashMap.java new file mode 100644 index 0000000..409bfb7 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/util/LRUCacheHashMap.java @@ -0,0 +1,49 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.router.util; + +import java.util.LinkedHashMap; +import java.util.Map; + +/* + * LRU cache with a configurable maximum cache size and access order + */ +public class LRUCacheHashMap extends LinkedHashMap { + + private static final long serialVersionUID = 1L; + + // Maximum size of the cache + private int maxSize; + + /** + * Constructor + * + * @param maxSize max size of the cache + * @param accessOrder true for access-order, false for insertion-order + */ + public LRUCacheHashMap(int maxSize, boolean accessOrder) { + super(maxSize, 0.75f, accessOrder); + this.maxSize = maxSize; + } + + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > maxSize; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrmproxy/BaseClientRMProxyTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrmproxy/BaseClientRMProxyTest.java new file mode 100644 index 0000000..c35a8d0 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrmproxy/BaseClientRMProxyTest.java @@ -0,0 +1,574 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.clientrmproxy; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.Collections; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; +import org.apache.hadoop.yarn.api.records.ReservationRequests; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.UTCClock; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; + +/** + * Base class for all the ClientRMProxyService test cases. It provides utility + * methods that can be used by the concrete test case classes + * + */ +public abstract class BaseClientRMProxyTest { + + /** + * The ClientRMProxyService instance that will be used by all the test cases + */ + private MockClientRMProxyService clientrmProxyService; + /** + * Thread pool used for asynchronous operations + */ + private static ExecutorService threadpool = Executors.newCachedThreadPool(); + private Configuration conf; + private AsyncDispatcher dispatcher; + + public final static int TEST_MAX_CACHE_SIZE = 10; + + protected MockClientRMProxyService getClientRMProxyService() { + Assert.assertNotNull(this.clientrmProxyService); + return this.clientrmProxyService; + } + + @Before + public void setUp() { + this.conf = new YarnConfiguration(); + String mockPassThroughInterceptorClass = + PassThroughClientRequestInterceptor.class.getName(); + + // Create a request intercepter pipeline for testing. The last one in the + // chain will call the mock resource manager. The others in the chain will + // simply forward it to the next one in the chain + this.conf.set( + YarnConfiguration.ROUTER_CLIENTRM_PROXY_INTERCEPTOR_CLASS_PIPELINE, + mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass + + "," + mockPassThroughInterceptorClass + "," + + MockClientRequestInterceptor.class.getName()); + + this.conf.setInt(YarnConfiguration.CLIENTRM_PROXY_PIPELINE_CACHE_MAX_SIZE, + TEST_MAX_CACHE_SIZE); + + this.dispatcher = new AsyncDispatcher(); + this.dispatcher.init(conf); + this.dispatcher.start(); + this.clientrmProxyService = createAndStartClientRMProxyService(); + } + + @After + public void tearDown() { + if (clientrmProxyService != null) { + clientrmProxyService.stop(); + clientrmProxyService = null; + } + if (this.dispatcher != null) { + this.dispatcher.stop(); + } + } + + protected ExecutorService getThreadPool() { + return threadpool; + } + + protected MockClientRMProxyService createAndStartClientRMProxyService() { + MockClientRMProxyService svc = new MockClientRMProxyService(); + svc.init(conf); + svc.start(); + return svc; + } + + protected static class MockClientRMProxyService extends ClientRMProxyService { + public MockClientRMProxyService() { + super(); + } + } + + protected GetNewApplicationResponse getNewApplication(String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetNewApplicationResponse run() throws Exception { + GetNewApplicationRequest req = + GetNewApplicationRequest.newInstance(); + GetNewApplicationResponse response = + getClientRMProxyService().getNewApplication(req); + return response; + } + }); + } + + protected SubmitApplicationResponse submitApplication( + final ApplicationId appId, String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public SubmitApplicationResponse run() throws Exception { + ApplicationSubmissionContext context = + ApplicationSubmissionContext.newInstance(appId, "", "", null, + null, false, false, -1, null, null); + SubmitApplicationRequest req = + SubmitApplicationRequest.newInstance(context); + SubmitApplicationResponse response = + getClientRMProxyService().submitApplication(req); + return response; + } + }); + } + + protected KillApplicationResponse forceKillApplication( + final ApplicationId appId, String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public KillApplicationResponse run() throws Exception { + KillApplicationRequest req = + KillApplicationRequest.newInstance(appId); + KillApplicationResponse response = + getClientRMProxyService().forceKillApplication(req); + return response; + } + }); + } + + protected GetClusterMetricsResponse getClusterMetrics(String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetClusterMetricsResponse run() throws Exception { + GetClusterMetricsRequest req = + GetClusterMetricsRequest.newInstance(); + GetClusterMetricsResponse response = + getClientRMProxyService().getClusterMetrics(req); + return response; + } + }); + } + + protected GetClusterNodesResponse getClusterNodes(String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetClusterNodesResponse run() throws Exception { + GetClusterNodesRequest req = GetClusterNodesRequest.newInstance(); + GetClusterNodesResponse response = + getClientRMProxyService().getClusterNodes(req); + return response; + } + }); + } + + protected GetQueueInfoResponse getQueueInfo(String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetQueueInfoResponse run() throws Exception { + GetQueueInfoRequest req = + GetQueueInfoRequest.newInstance("default", false, false, false); + GetQueueInfoResponse response = + getClientRMProxyService().getQueueInfo(req); + return response; + } + }); + } + + protected GetQueueUserAclsInfoResponse getQueueUserAcls(String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetQueueUserAclsInfoResponse run() throws Exception { + GetQueueUserAclsInfoRequest req = + GetQueueUserAclsInfoRequest.newInstance(); + GetQueueUserAclsInfoResponse response = + getClientRMProxyService().getQueueUserAcls(req); + return response; + } + }); + } + + protected MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( + String user, final ApplicationId appId) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user).doAs( + new PrivilegedExceptionAction() { + @Override + public MoveApplicationAcrossQueuesResponse run() throws Exception { + + MoveApplicationAcrossQueuesRequest req = + MoveApplicationAcrossQueuesRequest.newInstance(appId, + "newQueue"); + MoveApplicationAcrossQueuesResponse response = + getClientRMProxyService().moveApplicationAcrossQueues(req); + return response; + } + }); + } + + public GetNewReservationResponse getNewReservation(String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetNewReservationResponse run() throws Exception { + GetNewReservationResponse response = getClientRMProxyService() + .getNewReservation(GetNewReservationRequest.newInstance()); + return response; + } + }); + } + + protected ReservationSubmissionResponse submitReservation(String user, + final ReservationId reservationId) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public ReservationSubmissionResponse run() throws Exception { + Clock clock = new UTCClock(); + long arrival = clock.getTime(); + long duration = 60000; + long deadline = (long) (arrival + 1.05 * duration); + + ReservationSubmissionRequest req = createSimpleReservationRequest(1, + arrival, deadline, duration, reservationId); + ReservationSubmissionResponse response = + getClientRMProxyService().submitReservation(req); + return response; + } + }); + } + + protected ReservationUpdateResponse updateReservation(String user, + final ReservationId reservationId) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public ReservationUpdateResponse run() throws Exception { + Clock clock = new UTCClock(); + long arrival = clock.getTime(); + long duration = 60000; + long deadline = (long) (arrival + 1.05 * duration); + ReservationDefinition rDef = + createSimpleReservationRequest(1, arrival, deadline, duration, + reservationId).getReservationDefinition(); + + ReservationUpdateRequest req = + ReservationUpdateRequest.newInstance(rDef, reservationId); + ReservationUpdateResponse response = + getClientRMProxyService().updateReservation(req); + return response; + } + }); + } + + protected ReservationDeleteResponse deleteReservation(String user, + final ReservationId reservationId) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public ReservationDeleteResponse run() throws Exception { + ReservationDeleteRequest req = + ReservationDeleteRequest.newInstance(reservationId); + ReservationDeleteResponse response = + getClientRMProxyService().deleteReservation(req); + return response; + } + }); + } + + protected GetNodesToLabelsResponse getNodeToLabels(String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetNodesToLabelsResponse run() throws Exception { + GetNodesToLabelsRequest req = GetNodesToLabelsRequest.newInstance(); + GetNodesToLabelsResponse response = + getClientRMProxyService().getNodeToLabels(req); + return response; + } + }); + } + + protected GetLabelsToNodesResponse getLabelsToNodes(String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetLabelsToNodesResponse run() throws Exception { + GetLabelsToNodesRequest req = GetLabelsToNodesRequest.newInstance(); + GetLabelsToNodesResponse response = + getClientRMProxyService().getLabelsToNodes(req); + return response; + } + }); + } + + protected GetClusterNodeLabelsResponse getClusterNodeLabels(String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetClusterNodeLabelsResponse run() throws Exception { + GetClusterNodeLabelsRequest req = + GetClusterNodeLabelsRequest.newInstance(); + GetClusterNodeLabelsResponse response = + getClientRMProxyService().getClusterNodeLabels(req); + return response; + } + }); + } + + protected GetApplicationReportResponse getApplicationReport(String user, + final ApplicationId appId) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetApplicationReportResponse run() throws Exception { + GetApplicationReportRequest req = + GetApplicationReportRequest.newInstance(appId); + GetApplicationReportResponse response = + getClientRMProxyService().getApplicationReport(req); + return response; + } + }); + } + + protected GetApplicationsResponse getApplications(String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetApplicationsResponse run() throws Exception { + GetApplicationsRequest req = GetApplicationsRequest.newInstance(); + GetApplicationsResponse response = + getClientRMProxyService().getApplications(req); + return response; + } + }); + } + + protected GetApplicationAttemptReportResponse getApplicationAttemptReport( + String user, final ApplicationAttemptId appAttemptId) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user).doAs( + new PrivilegedExceptionAction() { + @Override + public GetApplicationAttemptReportResponse run() throws Exception { + GetApplicationAttemptReportRequest req = + GetApplicationAttemptReportRequest.newInstance(appAttemptId); + GetApplicationAttemptReportResponse response = + getClientRMProxyService().getApplicationAttemptReport(req); + return response; + } + }); + } + + protected GetApplicationAttemptsResponse getApplicationAttempts(String user, + final ApplicationId applicationId) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetApplicationAttemptsResponse run() throws Exception { + GetApplicationAttemptsRequest req = + GetApplicationAttemptsRequest.newInstance(applicationId); + GetApplicationAttemptsResponse response = + getClientRMProxyService().getApplicationAttempts(req); + return response; + } + }); + } + + protected GetContainerReportResponse getContainerReport(String user, + final ContainerId containerId) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetContainerReportResponse run() throws Exception { + GetContainerReportRequest req = + GetContainerReportRequest.newInstance(containerId); + GetContainerReportResponse response = + getClientRMProxyService().getContainerReport(req); + return response; + } + }); + } + + protected GetContainersResponse getContainers(String user, + final ApplicationAttemptId appAttemptId) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetContainersResponse run() throws Exception { + GetContainersRequest req = + GetContainersRequest.newInstance(appAttemptId); + GetContainersResponse response = + getClientRMProxyService().getContainers(req); + return response; + } + }); + } + + protected GetDelegationTokenResponse getDelegationToken(final String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetDelegationTokenResponse run() throws Exception { + GetDelegationTokenRequest req = + GetDelegationTokenRequest.newInstance(user); + GetDelegationTokenResponse response = + getClientRMProxyService().getDelegationToken(req); + return response; + } + }); + } + + protected RenewDelegationTokenResponse renewDelegationToken(String user, + final Token token) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public RenewDelegationTokenResponse run() throws Exception { + RenewDelegationTokenRequest req = + RenewDelegationTokenRequest.newInstance(token); + RenewDelegationTokenResponse response = + getClientRMProxyService().renewDelegationToken(req); + return response; + } + }); + } + + protected CancelDelegationTokenResponse cancelDelegationToken(String user, + final Token token) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public CancelDelegationTokenResponse run() throws Exception { + CancelDelegationTokenRequest req = + CancelDelegationTokenRequest.newInstance(token); + CancelDelegationTokenResponse response = + getClientRMProxyService().cancelDelegationToken(req); + return response; + } + }); + } + + private ReservationSubmissionRequest createSimpleReservationRequest( + int numContainers, long arrival, long deadline, long duration, + ReservationId reservationId) { + // create a request with a single atomic ask + ReservationRequest r = ReservationRequest + .newInstance(Resource.newInstance(1024, 1), numContainers, 1, duration); + ReservationRequests reqs = ReservationRequests.newInstance( + Collections.singletonList(r), ReservationRequestInterpreter.R_ALL); + ReservationDefinition rDef = ReservationDefinition.newInstance(arrival, + deadline, reqs, "testClientRMProxyService#reservation"); + ReservationSubmissionRequest request = ReservationSubmissionRequest + .newInstance(rDef, "dedicated", reservationId); + return request; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrmproxy/MockClientRequestInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrmproxy/MockClientRequestInterceptor.java new file mode 100644 index 0000000..15a9a5f --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrmproxy/MockClientRequestInterceptor.java @@ -0,0 +1,276 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.clientrmproxy; + +import java.io.IOException; + +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; + +public class MockClientRequestInterceptor + extends AbstractClientRequestInterceptor { + + private MockResourceManagerFacade mockRM; + + public MockClientRequestInterceptor() { + } + + public void init(String user) { + super.init(user); + mockRM = new MockResourceManagerFacade( + new YarnConfiguration(super.getConf()), 0); + } + + @Override + public GetNewApplicationResponse getNewApplication( + GetNewApplicationRequest request) throws YarnException, IOException { + return mockRM.getNewApplication(request); + } + + @Override + public SubmitApplicationResponse submitApplication( + SubmitApplicationRequest request) throws YarnException, IOException { + return mockRM.submitApplication(request); + } + + @Override + public KillApplicationResponse forceKillApplication( + KillApplicationRequest request) throws YarnException, IOException { + return mockRM.forceKillApplication(request); + } + + @Override + public GetClusterMetricsResponse getClusterMetrics( + GetClusterMetricsRequest request) throws YarnException, IOException { + return mockRM.getClusterMetrics(request); + } + + @Override + public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request) + throws YarnException, IOException { + return mockRM.getClusterNodes(request); + } + + @Override + public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request) + throws YarnException, IOException { + return mockRM.getQueueInfo(request); + } + + @Override + public GetQueueUserAclsInfoResponse getQueueUserAcls( + GetQueueUserAclsInfoRequest request) throws YarnException, IOException { + return mockRM.getQueueUserAcls(request); + } + + @Override + public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( + MoveApplicationAcrossQueuesRequest request) + throws YarnException, IOException { + return mockRM.moveApplicationAcrossQueues(request); + } + + @Override + public GetNewReservationResponse getNewReservation( + GetNewReservationRequest request) throws YarnException, IOException { + return mockRM.getNewReservation(request); + } + + @Override + public ReservationSubmissionResponse submitReservation( + ReservationSubmissionRequest request) throws YarnException, IOException { + return mockRM.submitReservation(request); + } + + @Override + public ReservationListResponse listReservations( + ReservationListRequest request) throws YarnException, IOException { + return mockRM.listReservations(request); + } + + @Override + public ReservationUpdateResponse updateReservation( + ReservationUpdateRequest request) throws YarnException, IOException { + return mockRM.updateReservation(request); + } + + @Override + public ReservationDeleteResponse deleteReservation( + ReservationDeleteRequest request) throws YarnException, IOException { + return mockRM.deleteReservation(request); + } + + @Override + public GetNodesToLabelsResponse getNodeToLabels( + GetNodesToLabelsRequest request) throws YarnException, IOException { + return mockRM.getNodeToLabels(request); + } + + @Override + public GetLabelsToNodesResponse getLabelsToNodes( + GetLabelsToNodesRequest request) throws YarnException, IOException { + return mockRM.getLabelsToNodes(request); + } + + @Override + public GetClusterNodeLabelsResponse getClusterNodeLabels( + GetClusterNodeLabelsRequest request) throws YarnException, IOException { + return mockRM.getClusterNodeLabels(request); + } + + @Override + public GetApplicationReportResponse getApplicationReport( + GetApplicationReportRequest request) throws YarnException, IOException { + return mockRM.getApplicationReport(request); + } + + @Override + public GetApplicationsResponse getApplications(GetApplicationsRequest request) + throws YarnException, IOException { + return mockRM.getApplications(request); + } + + @Override + public GetApplicationAttemptReportResponse getApplicationAttemptReport( + GetApplicationAttemptReportRequest request) + throws YarnException, IOException { + return mockRM.getApplicationAttemptReport(request); + } + + @Override + public GetApplicationAttemptsResponse getApplicationAttempts( + GetApplicationAttemptsRequest request) throws YarnException, IOException { + return mockRM.getApplicationAttempts(request); + } + + @Override + public GetContainerReportResponse getContainerReport( + GetContainerReportRequest request) throws YarnException, IOException { + return mockRM.getContainerReport(request); + } + + @Override + public GetContainersResponse getContainers(GetContainersRequest request) + throws YarnException, IOException { + return mockRM.getContainers(request); + } + + @Override + public GetDelegationTokenResponse getDelegationToken( + GetDelegationTokenRequest request) throws YarnException, IOException { + return mockRM.getDelegationToken(request); + } + + @Override + public RenewDelegationTokenResponse renewDelegationToken( + RenewDelegationTokenRequest request) throws YarnException, IOException { + return mockRM.renewDelegationToken(request); + } + + @Override + public CancelDelegationTokenResponse cancelDelegationToken( + CancelDelegationTokenRequest request) throws YarnException, IOException { + return mockRM.cancelDelegationToken(request); + } + + @Override + public FailApplicationAttemptResponse failApplicationAttempt( + FailApplicationAttemptRequest request) throws YarnException, IOException { + return mockRM.failApplicationAttempt(request); + } + + @Override + public UpdateApplicationPriorityResponse updateApplicationPriority( + UpdateApplicationPriorityRequest request) + throws YarnException, IOException { + return mockRM.updateApplicationPriority(request); + } + + @Override + public SignalContainerResponse signalToContainer( + SignalContainerRequest request) throws YarnException, IOException { + return mockRM.signalToContainer(request); + } + + @Override + public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( + UpdateApplicationTimeoutsRequest request) + throws YarnException, IOException { + return mockRM.updateApplicationTimeouts(request); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrmproxy/MockResourceManagerFacade.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrmproxy/MockResourceManagerFacade.java new file mode 100644 index 0000000..5909282 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrmproxy/MockResourceManagerFacade.java @@ -0,0 +1,312 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.clientrmproxy; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.api.records.ReservationAllocationState; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.util.Records; + +/** + * Mock Resource Manager facade implementation that exposes all the methods + * implemented by the YARN RM. The behavior and the values returned by this mock + * implementation is expected by the unit test cases. So please change the + * implementation with care. + */ +public class MockResourceManagerFacade implements ApplicationClientProtocol { + + private AtomicInteger containerIndex = new AtomicInteger(0); + + public MockResourceManagerFacade(Configuration conf, + int startContainerIndex) { + this.containerIndex.set(startContainerIndex); + } + + @Override + public GetApplicationReportResponse getApplicationReport( + GetApplicationReportRequest request) throws YarnException, IOException { + + GetApplicationReportResponse response = + Records.newRecord(GetApplicationReportResponse.class); + ApplicationReport report = Records.newRecord(ApplicationReport.class); + report.setYarnApplicationState(YarnApplicationState.ACCEPTED); + report.setApplicationId(request.getApplicationId()); + report.setCurrentApplicationAttemptId( + ApplicationAttemptId.newInstance(request.getApplicationId(), 1)); + response.setApplicationReport(report); + return response; + } + + @Override + public GetApplicationAttemptReportResponse getApplicationAttemptReport( + GetApplicationAttemptReportRequest request) + throws YarnException, IOException { + + GetApplicationAttemptReportResponse response = + Records.newRecord(GetApplicationAttemptReportResponse.class); + ApplicationAttemptReport report = + Records.newRecord(ApplicationAttemptReport.class); + report.setApplicationAttemptId(request.getApplicationAttemptId()); + report.setYarnApplicationAttemptState(YarnApplicationAttemptState.LAUNCHED); + response.setApplicationAttemptReport(report); + return response; + } + + @Override + public GetNewApplicationResponse getNewApplication( + GetNewApplicationRequest request) throws YarnException, IOException { + return GetNewApplicationResponse.newInstance(null, null, null); + } + + @Override + public SubmitApplicationResponse submitApplication( + SubmitApplicationRequest request) throws YarnException, IOException { + return SubmitApplicationResponse.newInstance(); + } + + @Override + public KillApplicationResponse forceKillApplication( + KillApplicationRequest request) throws YarnException, IOException { + return KillApplicationResponse.newInstance(true); + } + + @Override + public GetClusterMetricsResponse getClusterMetrics( + GetClusterMetricsRequest request) throws YarnException, IOException { + return GetClusterMetricsResponse.newInstance(null); + } + + @Override + public GetApplicationsResponse getApplications(GetApplicationsRequest request) + throws YarnException, IOException { + return GetApplicationsResponse.newInstance(null); + } + + @Override + public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request) + throws YarnException, IOException { + return GetClusterNodesResponse.newInstance(null); + } + + @Override + public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request) + throws YarnException, IOException { + return GetQueueInfoResponse.newInstance(null); + } + + @Override + public GetQueueUserAclsInfoResponse getQueueUserAcls( + GetQueueUserAclsInfoRequest request) throws YarnException, IOException { + return GetQueueUserAclsInfoResponse.newInstance(null); + } + + @Override + public GetDelegationTokenResponse getDelegationToken( + GetDelegationTokenRequest request) throws YarnException, IOException { + return GetDelegationTokenResponse.newInstance(null); + } + + @Override + public RenewDelegationTokenResponse renewDelegationToken( + RenewDelegationTokenRequest request) throws YarnException, IOException { + return RenewDelegationTokenResponse.newInstance(0); + } + + @Override + public CancelDelegationTokenResponse cancelDelegationToken( + CancelDelegationTokenRequest request) throws YarnException, IOException { + return CancelDelegationTokenResponse.newInstance(); + } + + @Override + public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( + MoveApplicationAcrossQueuesRequest request) + throws YarnException, IOException { + return MoveApplicationAcrossQueuesResponse.newInstance(); + } + + @Override + public GetApplicationAttemptsResponse getApplicationAttempts( + GetApplicationAttemptsRequest request) throws YarnException, IOException { + return GetApplicationAttemptsResponse.newInstance(null); + } + + @Override + public GetContainerReportResponse getContainerReport( + GetContainerReportRequest request) throws YarnException, IOException { + return GetContainerReportResponse.newInstance(null); + } + + @Override + public GetContainersResponse getContainers(GetContainersRequest request) + throws YarnException, IOException { + return GetContainersResponse.newInstance(null); + } + + @Override + public ReservationSubmissionResponse submitReservation( + ReservationSubmissionRequest request) throws YarnException, IOException { + return ReservationSubmissionResponse.newInstance(); + } + + @Override + public ReservationListResponse listReservations( + ReservationListRequest request) throws YarnException, IOException { + return ReservationListResponse + .newInstance(new ArrayList()); + } + + @Override + public ReservationUpdateResponse updateReservation( + ReservationUpdateRequest request) throws YarnException, IOException { + return ReservationUpdateResponse.newInstance(); + } + + @Override + public ReservationDeleteResponse deleteReservation( + ReservationDeleteRequest request) throws YarnException, IOException { + return ReservationDeleteResponse.newInstance(); + } + + @Override + public GetNodesToLabelsResponse getNodeToLabels( + GetNodesToLabelsRequest request) throws YarnException, IOException { + return GetNodesToLabelsResponse + .newInstance(new HashMap>()); + } + + @Override + public GetClusterNodeLabelsResponse getClusterNodeLabels( + GetClusterNodeLabelsRequest request) throws YarnException, IOException { + return GetClusterNodeLabelsResponse.newInstance(new ArrayList()); + } + + @Override + public GetLabelsToNodesResponse getLabelsToNodes( + GetLabelsToNodesRequest request) throws YarnException, IOException { + return GetLabelsToNodesResponse.newInstance(null); + } + + @Override + public GetNewReservationResponse getNewReservation( + GetNewReservationRequest request) throws YarnException, IOException { + return GetNewReservationResponse + .newInstance(ReservationId.newInstance(0, 0)); + } + + @Override + public FailApplicationAttemptResponse failApplicationAttempt( + FailApplicationAttemptRequest request) throws YarnException, IOException { + return FailApplicationAttemptResponse.newInstance(); + } + + @Override + public UpdateApplicationPriorityResponse updateApplicationPriority( + UpdateApplicationPriorityRequest request) + throws YarnException, IOException { + return UpdateApplicationPriorityResponse.newInstance(null); + } + + @Override + public SignalContainerResponse signalToContainer( + SignalContainerRequest request) throws YarnException, IOException { + return new SignalContainerResponsePBImpl(); + } + + @Override + public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( + UpdateApplicationTimeoutsRequest request) + throws YarnException, IOException { + return UpdateApplicationTimeoutsResponse.newInstance(); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrmproxy/PassThroughClientRequestInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrmproxy/PassThroughClientRequestInterceptor.java new file mode 100644 index 0000000..a5c3e7e --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrmproxy/PassThroughClientRequestInterceptor.java @@ -0,0 +1,268 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.clientrmproxy; + +import java.io.IOException; + +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; +import org.apache.hadoop.yarn.exceptions.YarnException; + +/** + * Mock intercepter that does not do anything other than forwarding it to the + * next intercepter in the chain + * + */ +public class PassThroughClientRequestInterceptor + extends AbstractClientRequestInterceptor { + + @Override + public GetNewApplicationResponse getNewApplication( + GetNewApplicationRequest request) throws YarnException, IOException { + return getNextInterceptor().getNewApplication(request); + } + + @Override + public SubmitApplicationResponse submitApplication( + SubmitApplicationRequest request) throws YarnException, IOException { + return getNextInterceptor().submitApplication(request); + } + + @Override + public KillApplicationResponse forceKillApplication( + KillApplicationRequest request) throws YarnException, IOException { + return getNextInterceptor().forceKillApplication(request); + } + + @Override + public GetClusterMetricsResponse getClusterMetrics( + GetClusterMetricsRequest request) throws YarnException, IOException { + return getNextInterceptor().getClusterMetrics(request); + } + + @Override + public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request) + throws YarnException, IOException { + return getNextInterceptor().getClusterNodes(request); + } + + @Override + public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request) + throws YarnException, IOException { + return getNextInterceptor().getQueueInfo(request); + } + + @Override + public GetQueueUserAclsInfoResponse getQueueUserAcls( + GetQueueUserAclsInfoRequest request) throws YarnException, IOException { + return getNextInterceptor().getQueueUserAcls(request); + } + + @Override + public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( + MoveApplicationAcrossQueuesRequest request) + throws YarnException, IOException { + return getNextInterceptor().moveApplicationAcrossQueues(request); + } + + @Override + public GetNewReservationResponse getNewReservation( + GetNewReservationRequest request) throws YarnException, IOException { + return getNextInterceptor().getNewReservation(request); + } + + @Override + public ReservationSubmissionResponse submitReservation( + ReservationSubmissionRequest request) throws YarnException, IOException { + return getNextInterceptor().submitReservation(request); + } + + @Override + public ReservationListResponse listReservations( + ReservationListRequest request) throws YarnException, IOException { + return getNextInterceptor().listReservations(request); + } + + @Override + public ReservationUpdateResponse updateReservation( + ReservationUpdateRequest request) throws YarnException, IOException { + return getNextInterceptor().updateReservation(request); + } + + @Override + public ReservationDeleteResponse deleteReservation( + ReservationDeleteRequest request) throws YarnException, IOException { + return getNextInterceptor().deleteReservation(request); + } + + @Override + public GetNodesToLabelsResponse getNodeToLabels( + GetNodesToLabelsRequest request) throws YarnException, IOException { + return getNextInterceptor().getNodeToLabels(request); + } + + @Override + public GetLabelsToNodesResponse getLabelsToNodes( + GetLabelsToNodesRequest request) throws YarnException, IOException { + return getNextInterceptor().getLabelsToNodes(request); + } + + @Override + public GetClusterNodeLabelsResponse getClusterNodeLabels( + GetClusterNodeLabelsRequest request) throws YarnException, IOException { + return getNextInterceptor().getClusterNodeLabels(request); + } + + @Override + public GetApplicationReportResponse getApplicationReport( + GetApplicationReportRequest request) throws YarnException, IOException { + return getNextInterceptor().getApplicationReport(request); + } + + @Override + public GetApplicationsResponse getApplications(GetApplicationsRequest request) + throws YarnException, IOException { + return getNextInterceptor().getApplications(request); + } + + @Override + public GetApplicationAttemptReportResponse getApplicationAttemptReport( + GetApplicationAttemptReportRequest request) + throws YarnException, IOException { + return getNextInterceptor().getApplicationAttemptReport(request); + } + + @Override + public GetApplicationAttemptsResponse getApplicationAttempts( + GetApplicationAttemptsRequest request) throws YarnException, IOException { + return getNextInterceptor().getApplicationAttempts(request); + } + + @Override + public GetContainerReportResponse getContainerReport( + GetContainerReportRequest request) throws YarnException, IOException { + return getNextInterceptor().getContainerReport(request); + } + + @Override + public GetContainersResponse getContainers(GetContainersRequest request) + throws YarnException, IOException { + return getNextInterceptor().getContainers(request); + } + + @Override + public GetDelegationTokenResponse getDelegationToken( + GetDelegationTokenRequest request) throws YarnException, IOException { + return getNextInterceptor().getDelegationToken(request); + } + + @Override + public RenewDelegationTokenResponse renewDelegationToken( + RenewDelegationTokenRequest request) throws YarnException, IOException { + return getNextInterceptor().renewDelegationToken(request); + } + + @Override + public CancelDelegationTokenResponse cancelDelegationToken( + CancelDelegationTokenRequest request) throws YarnException, IOException { + return getNextInterceptor().cancelDelegationToken(request); + } + + @Override + public FailApplicationAttemptResponse failApplicationAttempt( + FailApplicationAttemptRequest request) throws YarnException, IOException { + return getNextInterceptor().failApplicationAttempt(request); + } + + @Override + public UpdateApplicationPriorityResponse updateApplicationPriority( + UpdateApplicationPriorityRequest request) + throws YarnException, IOException { + return getNextInterceptor().updateApplicationPriority(request); + } + + @Override + public SignalContainerResponse signalToContainer( + SignalContainerRequest request) throws YarnException, IOException { + return getNextInterceptor().signalToContainer(request); + } + + @Override + public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( + UpdateApplicationTimeoutsRequest request) + throws YarnException, IOException { + return getNextInterceptor().updateApplicationTimeouts(request); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrmproxy/TestClientRMProxyService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrmproxy/TestClientRMProxyService.java new file mode 100644 index 0000000..0cec953 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrmproxy/TestClientRMProxyService.java @@ -0,0 +1,199 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.router.clientrmproxy; + +import java.io.IOException; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.router.clientrmproxy.ClientRMProxyService.RequestInterceptorChainWrapper; +import org.junit.Assert; +import org.junit.Test; + +public class TestClientRMProxyService extends BaseClientRMProxyTest { + + private static final Log LOG = + LogFactory.getLog(TestClientRMProxyService.class); + + /** + * Test if the pipeline is created properly. + */ + @Test + public void testRequestInterceptorChainCreation() throws Exception { + ClientRequestInterceptor root = + super.getClientRMProxyService().createRequestInterceptorChain(); + int index = 0; + while (root != null) { + switch (index) { + case 0: + case 1: + case 2: + Assert.assertEquals(PassThroughClientRequestInterceptor.class.getName(), + root.getClass().getName()); + break; + case 3: + Assert.assertEquals(MockClientRequestInterceptor.class.getName(), + root.getClass().getName()); + break; + } + root = root.getNextInterceptor(); + index++; + } + Assert.assertEquals("The number of interceptors in chain does not match", 4, + index); + } + + /** + * Test if the ClientRMProxy forwards all the requests to the MockRM and get + * back the responses. + */ + @Test + public void testClientRMProxyServiceE2E() throws Exception { + + String user = "test1"; + + LOG.info("testClientRMProxyServiceE2E - Get New Application"); + + GetNewApplicationResponse responseGetNewApp = getNewApplication(user); + Assert.assertNotNull(responseGetNewApp); + + LOG.info("testClientRMProxyServiceE2E - Submit Application"); + + SubmitApplicationResponse responseSubmitApp = + submitApplication(responseGetNewApp.getApplicationId(), user); + Assert.assertNotNull(responseSubmitApp); + + LOG.info("testClientRMProxyServiceE2E - Kill Application"); + + KillApplicationResponse responseKillApp = + forceKillApplication(responseGetNewApp.getApplicationId(), user); + Assert.assertNotNull(responseKillApp); + + LOG.info("testClientRMProxyServiceE2E - Get Cluster Metrics"); + + GetClusterMetricsResponse responseGetClusterMetrics = + getClusterMetrics(user); + Assert.assertNotNull(responseGetClusterMetrics); + + LOG.info("testClientRMProxyServiceE2E - Get Cluster Nodes"); + + GetClusterNodesResponse responseGetClusterNodes = getClusterNodes(user); + Assert.assertNotNull(responseGetClusterNodes); + + LOG.info("testClientRMProxyServiceE2E - Get Queue Info"); + + GetQueueInfoResponse responseGetQueueInfo = getQueueInfo(user); + Assert.assertNotNull(responseGetQueueInfo); + + LOG.info("testClientRMProxyServiceE2E - Get Queue User"); + + GetQueueUserAclsInfoResponse responseGetQueueUser = getQueueUserAcls(user); + Assert.assertNotNull(responseGetQueueUser); + + LOG.info("testClientRMProxyServiceE2E - Get Cluster Node"); + + GetClusterNodeLabelsResponse responseGetClusterNode = + getClusterNodeLabels(user); + Assert.assertNotNull(responseGetClusterNode); + + LOG.info("testClientRMProxyServiceE2E - Move Application Across Queues"); + + MoveApplicationAcrossQueuesResponse responseMoveApp = + moveApplicationAcrossQueues(user, responseGetNewApp.getApplicationId()); + Assert.assertNotNull(responseMoveApp); + + LOG.info("testClientRMProxyServiceE2E - Get New Reservation"); + + GetNewReservationResponse getNewReservationResponse = + getNewReservation(user); + + LOG.info("testClientRMProxyServiceE2E - Submit Reservation"); + + ReservationSubmissionResponse responseSubmitReser = + submitReservation(user, getNewReservationResponse.getReservationId()); + Assert.assertNotNull(responseSubmitReser); + + LOG.info("testClientRMProxyServiceE2E - Update Reservation"); + + ReservationUpdateResponse responseUpdateReser = + updateReservation(user, getNewReservationResponse.getReservationId()); + Assert.assertNotNull(responseUpdateReser); + + LOG.info("testClientRMProxyServiceE2E - Delete Reservation"); + + ReservationDeleteResponse responseDeleteReser = + deleteReservation(user, getNewReservationResponse.getReservationId()); + Assert.assertNotNull(responseDeleteReser); + } + + /** + * Test if the different chains for users are generated, and LRU cache is + * working as expected + */ + @Test + public void testUsersChainMapWithLRUCache() + throws YarnException, IOException, InterruptedException { + + Map pipelines; + RequestInterceptorChainWrapper chain; + + getNewApplication("test1"); + getNewApplication("test2"); + getNewApplication("test3"); + getNewApplication("test4"); + getNewApplication("test5"); + getNewApplication("test6"); + getNewApplication("test7"); + getNewApplication("test8"); + + pipelines = super.getClientRMProxyService().getPipelines(); + Assert.assertEquals(8, pipelines.size()); + + getNewApplication("test9"); + getNewApplication("test10"); + getNewApplication("test1"); + getNewApplication("test11"); + + // The cache max size is defined in + // BaseClientRMProxyTest.TEST_MAX_CACHE_SIZE + Assert.assertEquals(10, pipelines.size()); + + chain = pipelines.get("test1"); + Assert.assertNotNull("test1 should not be evicted", chain); + + chain = pipelines.get("test2"); + Assert.assertNull("test2 should have been evicted", chain); + } + +}