diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 9832729..459f773 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1932,6 +1932,31 @@ private static void addDeprecatedKeys() { public static final String DEFAULT_NODELABEL_CONFIGURATION_TYPE = CENTALIZED_NODELABEL_CONFIGURATION_TYPE; + + +//- Begin: Configurations for AMRMPRoxy Service + public static final String AMRM_PROXY_ENABLED = NM_PREFIX + + "amrmproxy.enable"; + + public static final boolean DEFAULT_AMRM_PROXY_ENABLED = false; + + public static final String AMRM_PROXY_ADDRESS = NM_PREFIX + "amrmproxy.address"; + + public static final int DEFAULT_AMRM_PROXY_PORT = 8046; + + public static final String DEFAULT_AMRM_PROXY_ADDRESS = "0.0.0.0:" + + DEFAULT_AMRM_PROXY_PORT; + + public static final String AMRM_PROXY_CLIENT_THREAD_COUNT = NM_PREFIX + + "amrmproxy.client.thread-count"; + public static final int DEFAULT_AMRM_PROXY_CLIENT_THREAD_COUNT = 50; + + public static final String AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE = NM_PREFIX + + "amrmproxy.interceptor-class.pipeline"; + + public static final String DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE = + "org.apache.hadoop.yarn.server.nodemanager.amrmproxy.DefaultRequestInterceptor"; + //- End: Configurations for AMRMPRoxy Service public static final String MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY = YARN_PREFIX + "cluster.max-application-priority"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AsyncCallback.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AsyncCallback.java new file mode 100644 index 0000000..db420af --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AsyncCallback.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.util; + +/** + * Generic interface that can be used for calling back when a corresponding + * asynchronous operation completes. + * + * @param + */ +public interface AsyncCallback { + /** + * This method is called back when the corresponding asynchronous operation + * completes + * + * @param response + */ + public void callback(T response); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java new file mode 100644 index 0000000..eb943d5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.nodemanager.Context; + +/** + * Encapsulates the information about one application that is needed by the + * request intercepters + * + */ +public class AMRMProxyApplicationContext { + private final Configuration conf; + private final Context nmContext; + private final ApplicationId applicationId; + private final String user; + private Token amrmToken; + + public AMRMProxyApplicationContext(Context nmContext, Configuration conf, + ApplicationId applicationId, String user, + Token amrmToken) { + this.nmContext = nmContext; + this.conf = conf; + this.applicationId = applicationId; + this.user = user; + this.amrmToken = amrmToken; + } + + public Configuration getConf() { + return conf; + } + + public ApplicationId getApplicationId() { + return applicationId; + } + + public String getUser() { + return user; + } + + public Token getAMRMToken() { + return amrmToken; + } + + public Context getNMCotext() { + return nmContext; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java new file mode 100644 index 0000000..386836c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java @@ -0,0 +1,550 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.SaslRpcServer; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.api.ContainerType; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; + +import com.google.common.base.Preconditions; + +/** + * AMRMProxyService is a service that runs on each node manager that can be used + * to intercept and inspect messages from application master to the cluster + * resource manager. It listens to messages from the application master and + * creates a request intercepting pipeline instance for each application. The + * pipeline is a chain of intercepter instances that can inspect and modify the + * request/response as needed + */ +public class AMRMProxyService extends AbstractService implements + ApplicationMasterProtocol { + private static final Log LOG = LogFactory.getLog(AMRMProxyService.class); + private Server server; + private final Context nmContext; + private final AsyncDispatcher dispatcher; + private InetSocketAddress listenerEndpoint; + private AMRMProxyTokenSecretManager secretManager; + private Map applicationPipelineMap; + + public AMRMProxyService(Context nmContext, AsyncDispatcher dispatcher) { + super(AMRMProxyService.class.getName()); + Preconditions.checkArgument(nmContext != null, "nmContext is null"); + Preconditions.checkArgument(dispatcher != null, "dispatcher is null"); + this.nmContext = nmContext; + this.dispatcher = dispatcher; + this.applicationPipelineMap = + new ConcurrentHashMap(); + + this.dispatcher.register(ContainerEventType.class, + new ContainerEventHandler()); + } + + @Override + protected void serviceStart() throws Exception { + LOG.info("Starting AMRMProxyService"); + Configuration conf = getConfig(); + YarnRPC rpc = YarnRPC.create(conf); + UserGroupInformation.setConfiguration(conf); + + this.listenerEndpoint = + conf.getSocketAddr(YarnConfiguration.AMRM_PROXY_ADDRESS, + YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS, + YarnConfiguration.DEFAULT_AMRM_PROXY_PORT); + + Configuration serverConf = new Configuration(conf); + serverConf.set( + CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + SaslRpcServer.AuthMethod.TOKEN.toString()); + + int numWorkerThreads = + serverConf.getInt(YarnConfiguration.AMRM_PROXY_CLIENT_THREAD_COUNT, + YarnConfiguration.DEFAULT_AMRM_PROXY_CLIENT_THREAD_COUNT); + + this.secretManager = new AMRMProxyTokenSecretManager(serverConf); + + this.server = + rpc.getServer(ApplicationMasterProtocol.class, this, listenerEndpoint, + serverConf, this.secretManager, numWorkerThreads); + + this.server.start(); + LOG.info("AMRMProxyService listening on address: " + + this.server.getListenerAddress()); + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + LOG.info("Stopping AMRMProxyService"); + if (this.server != null) { + this.server.stop(); + } + + super.serviceStop(); + } + + /** + * This is called by the AMs started on this node to register with the RM. + * This method does the initial authorization and then forwards the request to + * the application instance specific intercepter chain. + */ + public RegisterApplicationMasterResponse registerApplicationMaster( + RegisterApplicationMasterRequest request) throws YarnException, + IOException { + LOG.info("Registering application master." + " Host:" + request.getHost() + + " Port:" + request.getRpcPort() + " Tracking Url:" + + request.getTrackingUrl()); + RequestInterceptorChainWrapper pipeline = + authorizeAndGetRequestInterceptorChain(); + return pipeline.getRootInterceptor().registerApplicationMaster(request); + } + + /** + * This is called by the AMs started on this node to unregister from the RM. + * This method does the initial authorization and then forwards the request to + * the application instance specific intercepter chain + */ + public FinishApplicationMasterResponse finishApplicationMaster( + FinishApplicationMasterRequest request) throws YarnException, IOException { + LOG.info("Finishing application master. Tracking Url:" + + request.getTrackingUrl()); + RequestInterceptorChainWrapper pipeline = + authorizeAndGetRequestInterceptorChain(); + return pipeline.getRootInterceptor().finishApplicationMaster(request); + } + + /** + * This is called by the AMs started on this node to send heart beat to RM. + * This method does the initial authorization and then forwards the request to + * the application instance specific pipeline, which is a chain of request + * intercepter objects. One application request processing pipeline is created + * per AM instance + */ + public AllocateResponse allocate(AllocateRequest request) + throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = + authorizeAndGetRequestInterceptorChain(); + return pipeline.getRootInterceptor().allocate(request); + } + + /** + * Callback from the ContainerManager implementation for initializing the + * application request processing pipeline. + * + * @param request - encapsulates information for starting an AM + * @throws IOException + * @throws YarnException + */ + public void processApplicationStartRequest(StartContainerRequest request) + throws IOException, YarnException { + LOG.info("Callback received for initializing request processing pipeline for an AM"); + ContainerTokenIdentifier containerTokenIdentifierForKey = + BuilderUtils.newContainerTokenIdentifier(request.getContainerToken()); + ApplicationAttemptId appAttemptId = + containerTokenIdentifierForKey.getContainerID() + .getApplicationAttemptId(); + Credentials credentials = + parseCredentials(request.getContainerLaunchContext()); + + Token amrmToken = + getAMRMToken(credentials.getAllTokens()); + if (amrmToken == null) { + throw new YarnRuntimeException( + "AMRMToken not found in the start container request for application:" + + appAttemptId.toString()); + } + + // Substitute the existing AMRM Token with a local one. Keep the rest of the + // tokens in the credentials intact. + int masterKeyId = containerTokenIdentifierForKey.getMasterKeyId(); + AMRMTokenIdentifier tokenId = + new AMRMTokenIdentifier(appAttemptId, masterKeyId); + Token newToken = + new Token(tokenId, this.secretManager); + credentials.addToken(newToken.getService(), newToken); + + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + request.getContainerLaunchContext().setTokens( + ByteBuffer.wrap(dob.getData(), 0, dob.getLength())); + + initializePipeline(containerTokenIdentifierForKey.getContainerID() + .getApplicationAttemptId().getApplicationId(), + containerTokenIdentifierForKey.getApplicationSubmitter(), amrmToken); + } + + protected void initializePipeline(ApplicationId applicationId, String user, + Token amrmToken) { + RequestInterceptorChainWrapper chainWrapper = null; + synchronized (applicationPipelineMap) { + if (applicationPipelineMap.containsKey(applicationId)) { + LOG.warn("Request to start an already existing applicationId was received. " + + " This can happen if an application failed and a new attempt was created on this machine. ApplicationId: " + + applicationId.toString()); + return; + } + + chainWrapper = new RequestInterceptorChainWrapper(); + this.applicationPipelineMap.put(applicationId, chainWrapper); + } + + // We register the pipeline instance in the map first and then initialize it + // later because chain initialization can be expensive and we would like to + // release the lock as soon as possible to prevent other applications from + // blocking when one application's chain is initializing + LOG.info("Initializing request processing pipeline for application. " + + " ApplicationId:" + applicationId + " for the user: " + user); + + RequestInterceptor interceptorChain = this.createRequestInterceptorChain(); + interceptorChain.init(createApplicationMasterContext(applicationId, user, + amrmToken)); + chainWrapper.init(interceptorChain, applicationId); + } + + /** + * Shuts down the request processing pipeline for the specified application + * attempt id + * + * @param applicationId + */ + protected void stopApplication(ApplicationId applicationId) { + Preconditions.checkArgument(applicationId != null, "applicationId is null"); + RequestInterceptorChainWrapper pipeline = + this.applicationPipelineMap.remove(applicationId); + + if (pipeline == null) { + LOG.info("Request to stop an application that does not exist. ApplicationId: " + + applicationId); + } else { + LOG.info("Stopping the request processing pipeline for application. ApplicationId: " + + applicationId); + try { + pipeline.getRootInterceptor().shutdown(); + } catch (Throwable ex) { + LOG.warn( + "Failed to gracefully shutdown the request processing pipeline for application." + + applicationId, ex); + } + } + } + + private AMRMProxyApplicationContext createApplicationMasterContext( + ApplicationId applicationId, String user, + Token amrmToken) { + AMRMProxyApplicationContext appContext = + new AMRMProxyApplicationContext(this.nmContext, getConfig(), + applicationId, user, amrmToken); + return appContext; + } + + protected Map GetApplicationPipelines() { + return this.applicationPipelineMap; + } + + /** + * This method creates and returns reference of the first intercepter in the + * chain of request intercepter instances + * + * @return + */ + protected RequestInterceptor createRequestInterceptorChain() { + Configuration conf = getConfig(); + + List interceptorClassNames = getInterceptorClassNames(conf); + + RequestInterceptor pipeline = null; + RequestInterceptor current = null; + for (String interceptorClassName : interceptorClassNames) { + try { + Class interceptorClass = conf.getClassByName(interceptorClassName); + if (RequestInterceptor.class.isAssignableFrom(interceptorClass)) { + RequestInterceptor interceptorInstance = + (RequestInterceptor) ReflectionUtils.newInstance( + interceptorClass, conf); + if (pipeline == null) { + pipeline = interceptorInstance; + current = interceptorInstance; + continue; + } else { + current.setNextInterceptor(interceptorInstance); + current = interceptorInstance; + } + } else { + throw new YarnRuntimeException("Class: " + interceptorClassName + + " not instance of " + + RequestInterceptor.class.getCanonicalName()); + } + } catch (ClassNotFoundException e) { + throw new YarnRuntimeException( + "Could not instantiate ApplicationMasterRequestInterceptor: " + + interceptorClassName, e); + } + } + + if (pipeline == null) { + throw new YarnRuntimeException( + "RequestInterceptor pipeline is not configured in the system"); + } + return pipeline; + } + + /** + * Returns the comma separated intercepter class names from the configuration + * + * @param conf + * @return + */ + private List getInterceptorClassNames(Configuration conf) { + String configuredInterceptorClassNames = + conf.get(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE, + YarnConfiguration.DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE); + + List interceptorClassNames = new ArrayList(); + Collection tempList = + StringUtils.getStringCollection(configuredInterceptorClassNames); + for (String item : tempList) { + interceptorClassNames.add(item.trim()); + } + + return interceptorClassNames; + } + + /** + * Authorizes the request and returns the application specific request + * processing pipeline + * + * @return + * @throws YarnException + */ + private RequestInterceptorChainWrapper authorizeAndGetRequestInterceptorChain() + throws YarnException { + AMRMTokenIdentifier tokenIdentifier = authorizeRequest(); + ApplicationAttemptId appAttemptId = + tokenIdentifier.getApplicationAttemptId(); + + synchronized (this.applicationPipelineMap) { + if (!this.applicationPipelineMap.containsKey(appAttemptId + .getApplicationId())) { + throw new YarnException( + "The AM request processing pipeline is not initialized for application: " + + appAttemptId.getApplicationId().toString()); + } + + return this.applicationPipelineMap.get(appAttemptId.getApplicationId()); + } + } + + /** + * Authorizes the current request and returns the AMRMTokenIdentifier for the + * current application + * + * @return + * @throws YarnException + */ + private AMRMTokenIdentifier authorizeRequest() throws YarnException { + UserGroupInformation remoteUgi; + try { + remoteUgi = UserGroupInformation.getCurrentUser(); + } catch (IOException e) { + String msg = + "Cannot obtain the user-name for authorizing ApplicationMaster. " + + "Got exception: " + StringUtils.stringifyException(e); + LOG.warn(msg); + throw RPCUtil.getRemoteException(msg); + } + + boolean tokenFound = false; + String message = ""; + AMRMTokenIdentifier appTokenIdentifier = null; + try { + appTokenIdentifier = selectAMRMTokenIdentifier(remoteUgi); + if (appTokenIdentifier == null) { + tokenFound = false; + message = "No AMRMToken found for user " + remoteUgi.getUserName(); + } else { + tokenFound = true; + } + } catch (IOException e) { + tokenFound = false; + message = + "Got exception while looking for AMRMToken for user " + + remoteUgi.getUserName(); + } + + if (!tokenFound) { + LOG.warn(message); + throw RPCUtil.getRemoteException(message); + } + + return appTokenIdentifier; + } + + private static AMRMTokenIdentifier selectAMRMTokenIdentifier( + UserGroupInformation remoteUgi) throws IOException { + AMRMTokenIdentifier result = null; + Set tokenIds = remoteUgi.getTokenIdentifiers(); + for (TokenIdentifier tokenId : tokenIds) { + if (tokenId instanceof AMRMTokenIdentifier) { + result = (AMRMTokenIdentifier) tokenId; + break; + } + } + return result; + } + + @SuppressWarnings("unchecked") + private Token getAMRMToken( + Collection> allTokens) { + Iterator> iter = allTokens.iterator(); + while (iter.hasNext()) { + Token token = iter.next(); + if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { + return (Token) token; + } + } + + return null; + } + + private Credentials parseCredentials(ContainerLaunchContext launchContext) { + Credentials credentials = new Credentials(); + ByteBuffer tokens = launchContext.getTokens(); + + if (tokens != null) { + DataInputByteBuffer buf = new DataInputByteBuffer(); + tokens.rewind(); + buf.reset(tokens); + try { + credentials.readTokenStorageStream(buf); + } catch (IOException ex) { + throw new YarnRuntimeException(ex); + } + if (LOG.isDebugEnabled()) { + for (Token tk : credentials.getAllTokens()) { + LOG.debug(tk.getService() + " = " + tk.toString()); + } + } + } + + return credentials; + } + + class ContainerEventHandler implements EventHandler { + + @Override + public void handle(ContainerEvent event) { + Container container = + AMRMProxyService.this.nmContext.getContainers().get( + event.getContainerID()); + if (container != null + && container.getContainerTokenIdentifier().getContainerType() + .equals(ContainerType.APPLICATION_MASTER)) { + switch (event.getType()) { + case CONTAINER_EXITED_WITH_SUCCESS: + case CONTAINER_EXITED_WITH_FAILURE: + case CONTAINER_KILLED_ON_REQUEST: + case CONTAINER_DONE: + case KILL_CONTAINER: { + ApplicationAttemptId attemptId = + container.getContainerId().getApplicationAttemptId(); + LOG.info("Container stop event received for stopping application master. ApplicationAttemptId: " + + attemptId.toString()); + AMRMProxyService.this.stopApplication(attemptId.getApplicationId()); + break; + } + default: + LOG.debug("AMRMProxy is ignoring container event: " + event.getType()); + break; + } + } + } + } + + /** + * Private structure for encapsulating RequestInterceptor and + * ApplicationAttemptId instances + * + */ + private static class RequestInterceptorChainWrapper { + private RequestInterceptor rootInterceptor; + private ApplicationId applicationId; + + public synchronized void init(RequestInterceptor rootInterceptor, + ApplicationId applicationId) { + this.rootInterceptor = rootInterceptor; + this.applicationId = applicationId; + } + + public synchronized RequestInterceptor getRootInterceptor() { + return rootInterceptor; + } + + public synchronized ApplicationId getApplicationId() { + return applicationId; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyTokenSecretManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyTokenSecretManager.java new file mode 100644 index 0000000..92bdc9f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyTokenSecretManager.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; + +import javax.crypto.SecretKey; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.token.SecretManager; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager; + +/** + * AMRMLocalTokenSecretManager class is used by the {@link AMRMProxyService} + * service of the {@link NodeManager} to save the tokens locally in memory till + * application finishes and to a store for restart, so no need to remember + * master-keys even after rolling them. + */ +public class AMRMProxyTokenSecretManager extends + SecretManager { + + private static final Log LOG = LogFactory + .getLog(AMRMProxyTokenSecretManager.class); + + private SecretKey masterKey; + private final Timer timer; + private final long rollingInterval; + + private final Map passwords = + new HashMap(); + + /** + * Create an {@link AMRMProxyTokenSecretManager} + */ + public AMRMProxyTokenSecretManager(Configuration conf) { + rollMasterKey(); + this.timer = new Timer(); + this.rollingInterval = + conf.getLong( + YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS, + YarnConfiguration.DEFAULT_RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS) * 1000; + } + + public void start() { + this.timer.scheduleAtFixedRate(new MasterKeyRoller(), 0, rollingInterval); + } + + public void stop() { + this.timer.cancel(); + } + + public synchronized void applicationMasterFinished( + ApplicationAttemptId appAttemptId) { + if (LOG.isDebugEnabled()) { + LOG.debug("Application finished, removing password for " + appAttemptId); + } + this.passwords.remove(appAttemptId); + } + + private class MasterKeyRoller extends TimerTask { + @Override + public void run() { + rollMasterKey(); + } + } + + @Private + public synchronized void setMasterKey(SecretKey masterKey) { + this.masterKey = masterKey; + } + + @Private + public synchronized SecretKey getMasterKey() { + return this.masterKey; + } + + @Private + synchronized void rollMasterKey() { + LOG.info("Rolling master-key for amrmlocal-tokens"); + this.masterKey = generateSecret(); + } + + /** + * Create a password for a given {@link AMRMTokenIdentifier}. Used to send to + * the AppicationAttempt which can give it back during authentication. + */ + @Override + public synchronized byte[] createPassword(AMRMTokenIdentifier identifier) { + ApplicationAttemptId applicationAttemptId = + identifier.getApplicationAttemptId(); + if (LOG.isDebugEnabled()) { + LOG.debug("Creating password for " + applicationAttemptId); + } + byte[] password = createPassword(identifier.getBytes(), masterKey); + this.passwords.put(applicationAttemptId, password); + return password; + } + + /** + * Populate persisted password of AMRMToken back to {@link AMRMProxyTokenSecretManager}. + * + * @param token + * @throws IOException + */ + public synchronized void addPersistedPassword(Token token) + throws IOException { + AMRMTokenIdentifier identifier = token.decodeIdentifier(); + if (LOG.isDebugEnabled()) { + LOG.debug("Adding password for " + identifier.getApplicationAttemptId()); + } + this.passwords.put(identifier.getApplicationAttemptId(), + token.getPassword()); + } + + /** + * Retrieve the password for the given {@link AMRMTokenIdentifier}. Used by + * RPC layer to validate a remote {@link AMRMTokenIdentifier}. + */ + @Override + public synchronized byte[] retrievePassword(AMRMTokenIdentifier identifier) + throws InvalidToken { + ApplicationAttemptId applicationAttemptId = + identifier.getApplicationAttemptId(); + if (LOG.isDebugEnabled()) { + LOG.debug("Trying to retrieve password for " + applicationAttemptId); + } + byte[] password = this.passwords.get(applicationAttemptId); + if (password == null) { + throw new InvalidToken("Password not found for ApplicationAttempt " + + applicationAttemptId); + } + return password; + } + + /** + * Creates an empty TokenId to be used for de-serializing an + * {@link AMRMTokenIdentifier} by the RPC layer. + */ + @Override + public AMRMTokenIdentifier createIdentifier() { + return new AMRMTokenIdentifier(); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java new file mode 100644 index 0000000..18ce129 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import org.apache.hadoop.conf.Configuration; + +import com.google.common.base.Preconditions; + +/** + * Implements the RequestInterceptor interface and provides common functionality + * which can can be used and/or extended by other concrete intercepter classes. + * + */ +public abstract class AbstractRequestInterceptor implements RequestInterceptor { + private Configuration conf; + private AMRMProxyApplicationContext appContext; + private RequestInterceptor nextInterceptor; + + /** + * Sets the {@link RequestInterceptor} in the chain + */ + public void setNextInterceptor(RequestInterceptor nextInterceptor) { + this.nextInterceptor = nextInterceptor; + } + + /** + * Sets the {@link Configuration} + */ + + public void setConf(Configuration conf) { + this.conf = conf; + if (this.nextInterceptor != null) { + this.nextInterceptor.setConf(conf); + } + } + + /** + * Gets the {@link Configuration} + */ + public Configuration getConf() { + return this.conf; + } + + /** + * Initializes the {@link RequestInterceptor} + */ + public void init(AMRMProxyApplicationContext appContext) { + Preconditions.checkState(this.appContext == null, + "init is called multiple times on this interceptor: " + + this.getClass().getName()); + this.appContext = appContext; + if (this.nextInterceptor != null) { + this.nextInterceptor.init(appContext); + } + } + + /** + * Disposes the {@link RequestInterceptor} + */ + public void shutdown() { + if (this.nextInterceptor != null) { + this.nextInterceptor.shutdown(); + } + } + + /** + * Gets the next {@link RequestInterceptor} in the chain + */ + public RequestInterceptor getNextInterceptor() { + return this.nextInterceptor; + } + + /** + * Gets the {@link AMRMProxyApplicationContext} + */ + protected AMRMProxyApplicationContext getApplicationContext() { + return this.appContext; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java new file mode 100644 index 0000000..ed92d9b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; + +/** + * Extends the AbstractRequestInterceptor class and provides an implementation + * that simply forwards the AM requests to the cluster resource manager. + * + */ +public final class DefaultRequestInterceptor extends AbstractRequestInterceptor { + private static final Log LOG = LogFactory + .getLog(DefaultRequestInterceptor.class); + private ApplicationMasterProtocol rmClient; + + @Override + public void init(AMRMProxyApplicationContext appContext) { + super.init(appContext); + UserGroupInformation user = null; + try { + user = + UserGroupInformation.createProxyUser(appContext.getApplicationId() + .toString(), UserGroupInformation.getCurrentUser()); + user.addToken(appContext.getAMRMToken()); + final Configuration conf = this.getConf(); + + rmClient = + user.doAs(new PrivilegedExceptionAction() { + @Override + public ApplicationMasterProtocol run() throws Exception { + return ClientRMProxy.createRMProxy(conf, + ApplicationMasterProtocol.class); + } + }); + + } catch (IOException e) { + String message = + "Error while creating of RM application master service proxy for appId: " + + appContext.getApplicationId().toString(); + if (user != null) { + message += ", user: " + user; + } + + LOG.info(message); + throw new YarnRuntimeException(message, e); + } catch (InterruptedException e) { + throw new YarnRuntimeException(e); + } + } + + @Override + public RegisterApplicationMasterResponse registerApplicationMaster( + RegisterApplicationMasterRequest request) throws YarnException, + IOException { + LOG.info("Forwarding registration request to the real YARN Resource Manager"); + return rmClient.registerApplicationMaster(request); + } + + @Override + public AllocateResponse allocate(AllocateRequest request) + throws YarnException, IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Forwarding allocate request to the real YARN Resource Manager"); + } + return rmClient.allocate(request); + } + + @Override + public FinishApplicationMasterResponse finishApplicationMaster( + FinishApplicationMasterRequest request) throws YarnException, IOException { + LOG.info("Forwarding finish application request to the real YARN Resource Manager"); + return rmClient.finishApplicationMaster(request); + } + + @Override + public void setNextInterceptor(RequestInterceptor next) { + throw new YarnRuntimeException( + "setNextInterceptor is being called on DefaultRequestInterceptor, which should be the last one in the chain " + + "Check if the interceptor pipeline configuration is correct"); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java new file mode 100644 index 0000000..b370418 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; + +/** + * Defines the contract to be implemented by the request intercepter classes, + * that can be used to intercept and inspect messages sent from the application + * master to the resource manager. + */ +public interface RequestInterceptor extends ApplicationMasterProtocol, + Configurable { + /** + * This method is called for initializing the intercepter. This is guaranteed + * to be called only once in the lifetime of this instance. + * + * @param ctx + */ + public void init(AMRMProxyApplicationContext ctx); + + /** + * This method is called to release the resources held by the intercepter. + * This will be called when the application pipeline is being destroyed. The + * concrete implementations should dispose the resources and forward the + * request to the next intercepter, if any + */ + public void shutdown(); + + /** + * Sets the next intercepter in the pipeline. The concrete implementation of + * this interface should always pass the request to the nextInterceptor after + * inspecting the message. The last intercepter in the chain is responsible to + * send the messages to the resource manager service and so the last + * intercepter will not receive this method call + * + * @param nextInterceptor + */ + public void setNextInterceptor(RequestInterceptor nextInterceptor); + + + /** + * Returns the next intercepter in the chain + * @return + */ + public RequestInterceptor getNextInterceptor(); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 494fa8f..242b687 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -1,20 +1,20 @@ /** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.yarn.server.nodemanager.containermanager; @@ -92,6 +92,7 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.NMTokenIdentifier; +import org.apache.hadoop.yarn.server.api.ContainerType; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; @@ -103,6 +104,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; +import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerInitEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; @@ -152,8 +154,7 @@ private static final Log LOG = LogFactory.getLog(ContainerManagerImpl.class); static final String INVALID_NMTOKEN_MSG = "Invalid NMToken"; - static final String INVALID_CONTAINERTOKEN_MSG = - "Invalid ContainerToken"; + static final String INVALID_CONTAINERTOKEN_MSG = "Invalid ContainerToken"; final Context context; private final ContainersMonitor containersMonitor; @@ -165,6 +166,7 @@ private final NodeStatusUpdater nodeStatusUpdater; + private AMRMProxyService amrmProxyService; protected LocalDirsHandlerService dirsHandler; protected final AsyncDispatcher dispatcher; private final ApplicationACLsManager aclsManager; @@ -217,7 +219,7 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec, dispatcher.register(AuxServicesEventType.class, auxiliaryServices); dispatcher.register(ContainersMonitorEventType.class, containersMonitor); dispatcher.register(ContainersLauncherEventType.class, containersLauncher); - + addService(dispatcher); ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); @@ -228,10 +230,10 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec, @Override public void serviceInit(Configuration conf) throws Exception { LogHandler logHandler = - createLogHandler(conf, this.context, this.deletionService); + createLogHandler(conf, this.context, this.deletionService); addIfService(logHandler); dispatcher.register(LogHandlerEventType.class, logHandler); - + // add the shared cache upload service (it will do nothing if the shared // cache is disabled) SharedCacheUploadService sharedCacheUploader = @@ -239,12 +241,25 @@ public void serviceInit(Configuration conf) throws Exception { addService(sharedCacheUploader); dispatcher.register(SharedCacheUploadEventType.class, sharedCacheUploader); + Boolean amrmProxyEnabled = + conf.getBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, + YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED); + if (amrmProxyEnabled) { + LOG.info("AMRMProxyService is enabled. " + + "All the AM->RM requests will be intercepted by the proxy"); + this.amrmProxyService = + new AMRMProxyService(this.context, this.dispatcher); + addService(this.amrmProxyService); + } else { + LOG.info("AMRMProxyService is disabled"); + } + waitForContainersOnShutdownMillis = conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, - YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS) + - conf.getLong(YarnConfiguration.NM_PROCESS_KILL_WAIT_MS, - YarnConfiguration.DEFAULT_NM_PROCESS_KILL_WAIT_MS) + - SHUTDOWN_CLEANUP_SLOP_MS; + YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS) + + conf.getLong(YarnConfiguration.NM_PROCESS_KILL_WAIT_MS, + YarnConfiguration.DEFAULT_NM_PROCESS_KILL_WAIT_MS) + + SHUTDOWN_CLEANUP_SLOP_MS; super.serviceInit(conf); recover(); @@ -254,12 +269,11 @@ public void serviceInit(Configuration conf) throws Exception { private void recover() throws IOException, URISyntaxException { NMStateStoreService stateStore = context.getNMStateStore(); if (stateStore.canRecover()) { - rsrcLocalizationSrvc.recoverLocalizedResources( - stateStore.loadLocalizationState()); + rsrcLocalizationSrvc.recoverLocalizedResources(stateStore + .loadLocalizationState()); RecoveredApplicationsState appsState = stateStore.loadApplicationsState(); - for (ContainerManagerApplicationProto proto : - appsState.getApplications()) { + for (ContainerManagerApplicationProto proto : appsState.getApplications()) { recoverApplication(proto); } @@ -279,8 +293,8 @@ private void recoverApplication(ContainerManagerApplicationProto p) throws IOException { ApplicationId appId = new ApplicationIdPBImpl(p.getId()); Credentials creds = new Credentials(); - creds.readTokenStorageStream( - new DataInputStream(p.getCredentials().newInput())); + creds.readTokenStorageStream(new DataInputStream(p.getCredentials() + .newInput())); List aclProtoList = p.getAclsList(); Map acls = @@ -297,15 +311,14 @@ private void recoverApplication(ContainerManagerApplicationProto p) } LOG.info("Recovering application " + appId); - ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), appId, - creds, context); + ApplicationImpl app = + new ApplicationImpl(dispatcher, p.getUser(), appId, creds, context); context.getApplications().put(appId, app); app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext)); } @SuppressWarnings("unchecked") - private void recoverContainer(RecoveredContainerState rcs) - throws IOException { + private void recoverContainer(RecoveredContainerState rcs) throws IOException { StartContainerRequest req = rcs.getStartRequest(); ContainerLaunchContext launchContext = req.getContainerLaunchContext(); ContainerTokenIdentifier token = @@ -319,10 +332,11 @@ private void recoverContainer(RecoveredContainerState rcs) if (context.getApplications().containsKey(appId)) { Credentials credentials = parseCredentials(launchContext); - Container container = new ContainerImpl(getConfig(), dispatcher, - context.getNMStateStore(), req.getContainerLaunchContext(), - credentials, metrics, token, rcs.getStatus(), rcs.getExitCode(), - rcs.getDiagnostics(), rcs.getKilled()); + Container container = + new ContainerImpl(getConfig(), dispatcher, context.getNMStateStore(), + req.getContainerLaunchContext(), credentials, metrics, token, + rcs.getStatus(), rcs.getExitCode(), rcs.getDiagnostics(), + rcs.getKilled()); context.getContainers().put(containerId, container); dispatcher.getEventHandler().handle( new ApplicationContainerInitEvent(container)); @@ -365,8 +379,7 @@ protected LogHandler createLogHandler(Configuration conf, Context context, deletionService, dirsHandler); } else { return new NonAggregatingLogHandler(this.dispatcher, deletionService, - dirsHandler, - context.getNMStateStore()); + dirsHandler, context.getNMStateStore()); } } @@ -386,7 +399,8 @@ protected SharedCacheUploadService createSharedCacheUploaderService() { protected ContainersLauncher createContainersLauncher(Context context, ContainerExecutor exec) { - return new ContainersLauncher(context, this.dispatcher, exec, dirsHandler, this); + return new ContainersLauncher(context, this.dispatcher, exec, dirsHandler, + this); } @Override @@ -395,11 +409,10 @@ protected void serviceStart() throws Exception { // Enqueue user dirs in deletion context Configuration conf = getConfig(); - final InetSocketAddress initialAddress = conf.getSocketAddr( - YarnConfiguration.NM_BIND_HOST, - YarnConfiguration.NM_ADDRESS, - YarnConfiguration.DEFAULT_NM_ADDRESS, - YarnConfiguration.DEFAULT_NM_PORT); + final InetSocketAddress initialAddress = + conf.getSocketAddr(YarnConfiguration.NM_BIND_HOST, + YarnConfiguration.NM_ADDRESS, YarnConfiguration.DEFAULT_NM_ADDRESS, + YarnConfiguration.DEFAULT_NM_PORT); boolean usingEphemeralPort = (initialAddress.getPort() == 0); if (context.getNMStateStore().canRecover() && usingEphemeralPort) { throw new IllegalArgumentException("Cannot support recovery with an " @@ -416,37 +429,36 @@ protected void serviceStart() throws Exception { // always enforce it to be token-based. serverConf.set( - CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, - SaslRpcServer.AuthMethod.TOKEN.toString()); - + CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + SaslRpcServer.AuthMethod.TOKEN.toString()); + YarnRPC rpc = YarnRPC.create(conf); server = - rpc.getServer(ContainerManagementProtocol.class, this, initialAddress, - serverConf, this.context.getNMTokenSecretManager(), - conf.getInt(YarnConfiguration.NM_CONTAINER_MGR_THREAD_COUNT, + rpc.getServer(ContainerManagementProtocol.class, this, initialAddress, + serverConf, this.context.getNMTokenSecretManager(), conf.getInt( + YarnConfiguration.NM_CONTAINER_MGR_THREAD_COUNT, YarnConfiguration.DEFAULT_NM_CONTAINER_MGR_THREAD_COUNT)); - + // Enable service authorization? if (conf.getBoolean( - CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, - false)) { + CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) { refreshServiceAcls(conf, new NMPolicyProvider()); } - - LOG.info("Blocking new container-requests as container manager rpc" + - " server is still starting."); + + LOG.info("Blocking new container-requests as container manager rpc" + + " server is still starting."); this.setBlockNewContainerRequests(true); String bindHost = conf.get(YarnConfiguration.NM_BIND_HOST); String nmAddress = conf.getTrimmed(YarnConfiguration.NM_ADDRESS); String hostOverride = null; - if (bindHost != null && !bindHost.isEmpty() - && nmAddress != null && !nmAddress.isEmpty()) { - //a bind-host case with an address, to support overriding the first - //hostname found when querying for our hostname with the specified - //address, combine the specified address with the actual port listened - //on by the server + if (bindHost != null && !bindHost.isEmpty() && nmAddress != null + && !nmAddress.isEmpty()) { + // a bind-host case with an address, to support overriding the first + // hostname found when querying for our hostname with the specified + // address, combine the specified address with the actual port listened + // on by the server hostOverride = nmAddress.split(":")[0]; } @@ -459,7 +471,7 @@ protected void serviceStart() throws Exception { connectAddress = NetUtils.getConnectAddress(server); } NodeId nodeId = buildNodeId(connectAddress, hostOverride); - ((NodeManager.NMContext)context).setNodeId(nodeId); + ((NodeManager.NMContext) context).setNodeId(nodeId); this.context.getNMTokenSecretManager().setNodeId(nodeId); this.context.getContainerTokenSecretManager().setNodeId(nodeId); @@ -486,15 +498,15 @@ protected void serviceStart() throws Exception { private NodeId buildNodeId(InetSocketAddress connectAddress, String hostOverride) { if (hostOverride != null) { - connectAddress = NetUtils.getConnectAddress( - new InetSocketAddress(hostOverride, connectAddress.getPort())); + connectAddress = + NetUtils.getConnectAddress(new InetSocketAddress(hostOverride, + connectAddress.getPort())); } - return NodeId.newInstance( - connectAddress.getAddress().getCanonicalHostName(), - connectAddress.getPort()); + return NodeId.newInstance(connectAddress.getAddress() + .getCanonicalHostName(), connectAddress.getPort()); } - void refreshServiceAcls(Configuration configuration, + void refreshServiceAcls(Configuration configuration, PolicyProvider policyProvider) { this.server.refreshServiceAcl(configuration, policyProvider); } @@ -539,9 +551,8 @@ public void cleanUpApplicationsOnNMShutDown() { List appIds = new ArrayList(applications.keySet()); - this.handle( - new CMgrCompletedAppsEvent(appIds, - CMgrCompletedAppsEvent.Reason.ON_SHUTDOWN)); + this.handle(new CMgrCompletedAppsEvent(appIds, + CMgrCompletedAppsEvent.Reason.ON_SHUTDOWN)); LOG.info("Waiting for Applications to be Finished"); @@ -552,7 +563,7 @@ public void cleanUpApplicationsOnNMShutDown() { Thread.sleep(1000); } catch (InterruptedException ex) { LOG.warn( - "Interrupted while sleeping on applications finish on shutdown", ex); + "Interrupted while sleeping on applications finish on shutdown", ex); } } @@ -560,8 +571,8 @@ public void cleanUpApplicationsOnNMShutDown() { if (applications.isEmpty()) { LOG.info("All applications in FINISHED state"); } else { - LOG.info("Done waiting for Applications to be Finished. Still alive: " + - applications.keySet()); + LOG.info("Done waiting for Applications to be Finished. Still alive: " + + applications.keySet()); } } @@ -575,30 +586,29 @@ public void cleanupContainersOnNMResync() { + containers.keySet()); List containerIds = - new ArrayList(containers.keySet()); + new ArrayList(containers.keySet()); LOG.info("Waiting for containers to be killed"); this.handle(new CMgrCompletedContainersEvent(containerIds, - CMgrCompletedContainersEvent.Reason.ON_NODEMANAGER_RESYNC)); + CMgrCompletedContainersEvent.Reason.ON_NODEMANAGER_RESYNC)); /* * We will wait till all the containers change their state to COMPLETE. We - * will not remove the container statuses from nm context because these - * are used while re-registering node manager with resource manager. + * will not remove the container statuses from nm context because these are + * used while re-registering node manager with resource manager. */ boolean allContainersCompleted = false; while (!containers.isEmpty() && !allContainersCompleted) { allContainersCompleted = true; for (Entry container : containers.entrySet()) { - if (((ContainerImpl) container.getValue()).getCurrentState() - != ContainerState.COMPLETE) { + if (((ContainerImpl) container.getValue()).getCurrentState() != ContainerState.COMPLETE) { allContainersCompleted = false; try { Thread.sleep(1000); } catch (InterruptedException ex) { LOG.warn("Interrupted while sleeping on container kill on resync", - ex); + ex); } break; } @@ -608,20 +618,20 @@ public void cleanupContainersOnNMResync() { if (allContainersCompleted) { LOG.info("All containers in DONE state"); } else { - LOG.info("Done waiting for containers to be killed. Still alive: " + - containers.keySet()); + LOG.info("Done waiting for containers to be killed. Still alive: " + + containers.keySet()); } } // Get the remoteUGI corresponding to the api call. - protected UserGroupInformation getRemoteUgi() - throws YarnException { + protected UserGroupInformation getRemoteUgi() throws YarnException { UserGroupInformation remoteUgi; try { remoteUgi = UserGroupInformation.getCurrentUser(); } catch (IOException e) { - String msg = "Cannot obtain the user-name. Got exception: " - + StringUtils.stringifyException(e); + String msg = + "Cannot obtain the user-name. Got exception: " + + StringUtils.stringifyException(e); LOG.warn(msg); throw RPCUtil.getRemoteException(msg); } @@ -652,7 +662,7 @@ protected void authorizeUser(UserGroupInformation remoteUgi, throw RPCUtil.getRemoteException(INVALID_NMTOKEN_MSG); } if (!remoteUgi.getUserName().equals( - nmTokenIdentifier.getApplicationAttemptId().toString())) { + nmTokenIdentifier.getApplicationAttemptId().toString())) { throw RPCUtil.getRemoteException("Expected applicationAttemptId: " + remoteUgi.getUserName() + "Found: " + nmTokenIdentifier.getApplicationAttemptId()); @@ -660,8 +670,7 @@ protected void authorizeUser(UserGroupInformation remoteUgi, } /** - * @param containerTokenIdentifier - * of the container to be started + * @param containerTokenIdentifier of the container to be started * @throws YarnException */ @Private @@ -679,30 +688,31 @@ protected void authorizeStartRequest(NMTokenIdentifier nmTokenIdentifier, boolean unauthorized = false; StringBuilder messageBuilder = new StringBuilder("Unauthorized request to start container. "); - if (!nmTokenIdentifier.getApplicationAttemptId().getApplicationId(). - equals(containerId.getApplicationAttemptId().getApplicationId())) { + if (!nmTokenIdentifier.getApplicationAttemptId().getApplicationId() + .equals(containerId.getApplicationAttemptId().getApplicationId())) { unauthorized = true; messageBuilder.append("\nNMToken for application attempt : ") - .append(nmTokenIdentifier.getApplicationAttemptId()) - .append(" was used for starting container with container token") - .append(" issued for application attempt : ") - .append(containerId.getApplicationAttemptId()); + .append(nmTokenIdentifier.getApplicationAttemptId()) + .append(" was used for starting container with container token") + .append(" issued for application attempt : ") + .append(containerId.getApplicationAttemptId()); } else if (!this.context.getContainerTokenSecretManager() .isValidStartContainerRequest(containerTokenIdentifier)) { // Is the container being relaunched? Or RPC layer let startCall with // tokens generated off old-secret through? unauthorized = true; messageBuilder.append("\n Attempt to relaunch the same ") - .append("container with id ").append(containerIDStr).append("."); + .append("container with id ").append(containerIDStr).append("."); } else if (containerTokenIdentifier.getExpiryTimeStamp() < System - .currentTimeMillis()) { + .currentTimeMillis()) { // Ensure the token is not expired. unauthorized = true; messageBuilder.append("\nThis token is expired. current time is ") - .append(System.currentTimeMillis()).append(" found ") - .append(containerTokenIdentifier.getExpiryTimeStamp()); - messageBuilder.append("\nNote: System times on machines may be out of sync.") - .append(" Check system time and time zones."); + .append(System.currentTimeMillis()).append(" found ") + .append(containerTokenIdentifier.getExpiryTimeStamp()); + messageBuilder.append( + "\nNote: System times on machines may be out of sync.").append( + " Check system time and time zones."); } if (unauthorized) { String msg = messageBuilder.toString(); @@ -715,34 +725,43 @@ protected void authorizeStartRequest(NMTokenIdentifier nmTokenIdentifier, * Start a list of containers on this NodeManager. */ @Override - public StartContainersResponse - startContainers(StartContainersRequest requests) throws YarnException, - IOException { + public StartContainersResponse startContainers(StartContainersRequest requests) + throws YarnException, IOException { if (blockNewContainerRequests.get()) { throw new NMNotYetReadyException( - "Rejecting new containers as NodeManager has not" - + " yet connected with ResourceManager"); + "Rejecting new containers as NodeManager has not" + + " yet connected with ResourceManager"); } UserGroupInformation remoteUgi = getRemoteUgi(); NMTokenIdentifier nmTokenIdentifier = selectNMTokenIdentifier(remoteUgi); - authorizeUser(remoteUgi,nmTokenIdentifier); + authorizeUser(remoteUgi, nmTokenIdentifier); List succeededContainers = new ArrayList(); Map failedContainers = new HashMap(); for (StartContainerRequest request : requests.getStartContainerRequests()) { ContainerId containerId = null; try { - if (request.getContainerToken() == null || - request.getContainerToken().getIdentifier() == null) { + if (request.getContainerToken() == null + || request.getContainerToken().getIdentifier() == null) { throw new IOException(INVALID_CONTAINERTOKEN_MSG); } ContainerTokenIdentifier containerTokenIdentifier = - BuilderUtils.newContainerTokenIdentifier(request.getContainerToken()); + BuilderUtils.newContainerTokenIdentifier(request + .getContainerToken()); verifyAndGetContainerTokenIdentifier(request.getContainerToken(), - containerTokenIdentifier); + containerTokenIdentifier); containerId = containerTokenIdentifier.getContainerID(); + + // Initialize the AMRMProxy service instance only if the container is of + // type AM and if the AMRMProxy service is enabled + if (amrmProxyService != null + && containerTokenIdentifier.getContainerType().equals( + ContainerType.APPLICATION_MASTER)) { + this.amrmProxyService.processApplicationStartRequest(request); + } + startContainerInternal(nmTokenIdentifier, containerTokenIdentifier, - request); + request); succeededContainers.add(containerId); } catch (YarnException e) { failedContainers.put(containerId, SerializedException.newInstance(e)); @@ -755,7 +774,7 @@ protected void authorizeStartRequest(NMTokenIdentifier nmTokenIdentifier, } return StartContainersResponse.newInstance(getAuxServiceMetaData(), - succeededContainers, failedContainers); + succeededContainers, failedContainers); } private ContainerManagerApplicationProto buildAppProto(ApplicationId appId, @@ -769,8 +788,9 @@ private ContainerManagerApplicationProto buildAppProto(ApplicationId appId, builder.setUser(user); if (logAggregationContext != null) { - builder.setLogAggregationContext(( - (LogAggregationContextPBImpl)logAggregationContext).getProto()); + builder + .setLogAggregationContext(((LogAggregationContextPBImpl) logAggregationContext) + .getProto()); } builder.clearCredentials(); @@ -788,10 +808,10 @@ private ContainerManagerApplicationProto buildAppProto(ApplicationId appId, builder.clearAcls(); if (appAcls != null) { for (Map.Entry acl : appAcls.entrySet()) { - ApplicationACLMapProto p = ApplicationACLMapProto.newBuilder() - .setAccessType(ProtoUtils.convertToProtoFormat(acl.getKey())) - .setAcl(acl.getValue()) - .build(); + ApplicationACLMapProto p = + ApplicationACLMapProto.newBuilder() + .setAccessType(ProtoUtils.convertToProtoFormat(acl.getKey())) + .setAcl(acl.getValue()).build(); builder.addAcls(p); } } @@ -816,14 +836,14 @@ private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier, * correct RMIdentifier. d) It is not expired. */ authorizeStartRequest(nmTokenIdentifier, containerTokenIdentifier); - + if (containerTokenIdentifier.getRMIdentifier() != nodeStatusUpdater .getRMIdentifier()) { - // Is the container coming from unknown RM - StringBuilder sb = new StringBuilder("\nContainer "); - sb.append(containerTokenIdentifier.getContainerID().toString()) - .append(" rejected as it is allocated by a previous RM"); - throw new InvalidContainerException(sb.toString()); + // Is the container coming from unknown RM + StringBuilder sb = new StringBuilder("\nContainer "); + sb.append(containerTokenIdentifier.getContainerID().toString()).append( + " rejected as it is allocated by a previous RM"); + throw new InvalidContainerException(sb.toString()); } // update NMToken updateNMTokenIdentifier(nmTokenIdentifier); @@ -837,13 +857,13 @@ private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier, ContainerLaunchContext launchContext = request.getContainerLaunchContext(); Map serviceData = getAuxServiceMetaData(); - if (launchContext.getServiceData()!=null && - !launchContext.getServiceData().isEmpty()) { + if (launchContext.getServiceData() != null + && !launchContext.getServiceData().isEmpty()) { for (Map.Entry meta : launchContext.getServiceData() .entrySet()) { if (null == serviceData.get(meta.getKey())) { - throw new InvalidAuxServiceException("The auxService:" + meta.getKey() - + " does not exist"); + throw new InvalidAuxServiceException("The auxService:" + + meta.getKey() + " does not exist"); } } } @@ -852,14 +872,14 @@ private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier, Container container = new ContainerImpl(getConfig(), this.dispatcher, - context.getNMStateStore(), launchContext, - credentials, metrics, containerTokenIdentifier); + context.getNMStateStore(), launchContext, credentials, metrics, + containerTokenIdentifier); ApplicationId applicationID = containerId.getApplicationAttemptId().getApplicationId(); if (context.getContainers().putIfAbsent(containerId, container) != null) { NMAuditLogger.logFailure(user, AuditConstants.START_CONTAINER, - "ContainerManagerImpl", "Container already running on this node!", - applicationID, containerId); + "ContainerManagerImpl", "Container already running on this node!", + applicationID, containerId); throw RPCUtil.getRemoteException("Container " + containerIdStr + " already is running on this node!!"); } @@ -869,38 +889,41 @@ private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier, if (!serviceStopped) { // Create the application Application application = - new ApplicationImpl(dispatcher, user, applicationID, credentials, context); + new ApplicationImpl(dispatcher, user, applicationID, credentials, + context); if (null == context.getApplications().putIfAbsent(applicationID, - application)) { - LOG.info("Creating a new application reference for app " + applicationID); + application)) { + LOG.info("Creating a new application reference for app " + + applicationID); LogAggregationContext logAggregationContext = containerTokenIdentifier.getLogAggregationContext(); Map appAcls = container.getLaunchContext().getApplicationACLs(); - context.getNMStateStore().storeApplication(applicationID, + context.getNMStateStore().storeApplication( + applicationID, buildAppProto(applicationID, user, credentials, appAcls, - logAggregationContext)); + logAggregationContext)); dispatcher.getEventHandler().handle( - new ApplicationInitEvent(applicationID, appAcls, - logAggregationContext)); + new ApplicationInitEvent(applicationID, appAcls, + logAggregationContext)); } this.context.getNMStateStore().storeContainer(containerId, request); dispatcher.getEventHandler().handle( - new ApplicationContainerInitEvent(container)); + new ApplicationContainerInitEvent(container)); this.context.getContainerTokenSecretManager().startContainerSuccessful( - containerTokenIdentifier); + containerTokenIdentifier); NMAuditLogger.logSuccess(user, AuditConstants.START_CONTAINER, - "ContainerManageImpl", applicationID, containerId); - // TODO launchedContainer misplaced -> doesn't necessarily mean a container + "ContainerManageImpl", applicationID, containerId); + // TODO launchedContainer misplaced -> doesn't necessarily mean a + // container // launch. A finished Application will not launch containers. metrics.launchedContainer(); metrics.allocateContainer(containerTokenIdentifier.getResource()); } else { - throw new YarnException( - "Container start failed as the NodeManager is " + - "in the process of shutting down"); + throw new YarnException("Container start failed as the NodeManager is " + + "in the process of shutting down"); } } finally { this.readLock.unlock(); @@ -913,13 +936,13 @@ protected ContainerTokenIdentifier verifyAndGetContainerTokenIdentifier( InvalidToken { byte[] password = context.getContainerTokenSecretManager().retrievePassword( - containerTokenIdentifier); + containerTokenIdentifier); byte[] tokenPass = token.getPassword().array(); if (password == null || tokenPass == null || !Arrays.equals(password, tokenPass)) { throw new InvalidToken( - "Invalid container token used for starting container on : " - + context.getNodeId().toString()); + "Invalid container token used for starting container on : " + + context.getNodeId().toString()); } return containerTokenIdentifier; } @@ -929,7 +952,7 @@ protected ContainerTokenIdentifier verifyAndGetContainerTokenIdentifier( protected void updateNMTokenIdentifier(NMTokenIdentifier nmTokenIdentifier) throws InvalidToken { context.getNMTokenSecretManager().appAttemptStartContainer( - nmTokenIdentifier); + nmTokenIdentifier); } private Credentials parseCredentials(ContainerLaunchContext launchContext) @@ -977,7 +1000,7 @@ public StopContainersResponse stopContainers(StopContainersRequest requests) } } return StopContainersResponse - .newInstance(succeededRequests, failedRequests); + .newInstance(succeededRequests, failedRequests); } @SuppressWarnings("unchecked") @@ -987,23 +1010,23 @@ private void stopContainerInternal(NMTokenIdentifier nmTokenIdentifier, Container container = this.context.getContainers().get(containerID); LOG.info("Stopping container with container Id: " + containerIDStr); authorizeGetAndStopContainerRequest(containerID, container, true, - nmTokenIdentifier); + nmTokenIdentifier); if (container == null) { if (!nodeStatusUpdater.isContainerRecentlyStopped(containerID)) { throw RPCUtil.getRemoteException("Container " + containerIDStr - + " is not handled by this NodeManager"); + + " is not handled by this NodeManager"); } } else { context.getNMStateStore().storeContainerKilled(containerID); dispatcher.getEventHandler().handle( - new ContainerKillEvent(containerID, - ContainerExitStatus.KILLED_BY_APPMASTER, - "Container killed by the ApplicationMaster.")); + new ContainerKillEvent(containerID, + ContainerExitStatus.KILLED_BY_APPMASTER, + "Container killed by the ApplicationMaster.")); - NMAuditLogger.logSuccess(container.getUser(), - AuditConstants.STOP_CONTAINER, "ContainerManageImpl", containerID - .getApplicationAttemptId().getApplicationId(), containerID); + NMAuditLogger.logSuccess(container.getUser(), + AuditConstants.STOP_CONTAINER, "ContainerManageImpl", containerID + .getApplicationAttemptId().getApplicationId(), containerID); // TODO: Move this code to appropriate place once kill_container is // implemented. @@ -1035,7 +1058,7 @@ public GetContainerStatusesResponse getContainerStatuses( } } return GetContainerStatusesResponse.newInstance(succeededRequests, - failedRequests); + failedRequests); } private ContainerStatus getContainerStatusInternal(ContainerId containerID, @@ -1045,15 +1068,15 @@ private ContainerStatus getContainerStatusInternal(ContainerId containerID, LOG.info("Getting container-status for " + containerIDStr); authorizeGetAndStopContainerRequest(containerID, container, false, - nmTokenIdentifier); + nmTokenIdentifier); if (container == null) { if (nodeStatusUpdater.isContainerRecentlyStopped(containerID)) { throw RPCUtil.getRemoteException("Container " + containerIDStr - + " was recently stopped on node manager."); + + " was recently stopped on node manager."); } else { throw RPCUtil.getRemoteException("Container " + containerIDStr - + " is not handled by this NodeManager"); + + " is not handled by this NodeManager"); } } ContainerStatus containerStatus = container.cloneAndGetContainerStatus(); @@ -1077,8 +1100,9 @@ protected void authorizeGetAndStopContainerRequest(ContainerId containerId, */ ApplicationId nmTokenAppId = identifier.getApplicationAttemptId().getApplicationId(); - - if ((!nmTokenAppId.equals(containerId.getApplicationAttemptId().getApplicationId())) + + if ((!nmTokenAppId.equals(containerId.getApplicationAttemptId() + .getApplicationId())) || (container != null && !nmTokenAppId.equals(container .getContainerId().getApplicationAttemptId().getApplicationId()))) { if (stopRequest) { @@ -1086,8 +1110,8 @@ protected void authorizeGetAndStopContainerRequest(ContainerId containerId, + " attempted to stop non-application container : " + container.getContainerId()); NMAuditLogger.logFailure("UnknownUser", AuditConstants.STOP_CONTAINER, - "ContainerManagerImpl", "Trying to stop unknown container!", - nmTokenAppId, container.getContainerId()); + "ContainerManagerImpl", "Trying to stop unknown container!", + nmTokenAppId, container.getContainerId()); } else { LOG.warn(identifier.getApplicationAttemptId() + " attempted to get status for non-application container : " @@ -1099,14 +1123,14 @@ protected void authorizeGetAndStopContainerRequest(ContainerId containerId, class ContainerEventDispatcher implements EventHandler { @Override public void handle(ContainerEvent event) { - Map containers = - ContainerManagerImpl.this.context.getContainers(); + Map containers = + ContainerManagerImpl.this.context.getContainers(); Container c = containers.get(event.getContainerID()); if (c != null) { c.handle(event); } else { - LOG.warn("Event " + event + " sent to absent container " + - event.getContainerID()); + LOG.warn("Event " + event + " sent to absent container " + + event.getContainerID()); } } } @@ -1132,8 +1156,7 @@ public void handle(ApplicationEvent event) { public void handle(ContainerManagerEvent event) { switch (event.getType()) { case FINISH_APPS: - CMgrCompletedAppsEvent appsFinishedEvent = - (CMgrCompletedAppsEvent) event; + CMgrCompletedAppsEvent appsFinishedEvent = (CMgrCompletedAppsEvent) event; for (ApplicationId appID : appsFinishedEvent.getAppsToCleanup()) { String diagnostic = ""; if (appsFinishedEvent.getReason() == CMgrCompletedAppsEvent.Reason.ON_SHUTDOWN) { @@ -1147,8 +1170,7 @@ public void handle(ContainerManagerEvent event) { LOG.error("Unable to update application state in store", e); } this.dispatcher.getEventHandler().handle( - new ApplicationFinishEvent(appID, - diagnostic)); + new ApplicationFinishEvent(appID, diagnostic)); } break; case FINISH_CONTAINERS: @@ -1156,15 +1178,15 @@ public void handle(ContainerManagerEvent event) { (CMgrCompletedContainersEvent) event; for (ContainerId container : containersFinishedEvent .getContainersToCleanup()) { - this.dispatcher.getEventHandler().handle( - new ContainerKillEvent(container, - ContainerExitStatus.KILLED_BY_RESOURCEMANAGER, - "Container Killed by ResourceManager")); + this.dispatcher.getEventHandler().handle( + new ContainerKillEvent(container, + ContainerExitStatus.KILLED_BY_RESOURCEMANAGER, + "Container Killed by ResourceManager")); } break; default: - throw new YarnRuntimeException( - "Got an unknown ContainerManagerEvent type: " + event.getType()); + throw new YarnRuntimeException( + "Got an unknown ContainerManagerEvent type: " + event.getType()); } } @@ -1177,12 +1199,12 @@ public void setBlockNewContainerRequests(boolean blockNewContainerRequests) { public boolean getBlockNewContainerRequestsStatus() { return this.blockNewContainerRequests.get(); } - + @Override public void stateChanged(Service service) { // TODO Auto-generated method stub } - + public Context getContext() { return this.context; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java new file mode 100644 index 0000000..8be612e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -0,0 +1,657 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; +import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; +import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.apache.hadoop.yarn.util.Records; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; + +/** + * Base class for all the AMRMProxyService test cases. It provides utility + * methods that can be used by the concrete test case classes + * + */ +public abstract class BaseAMRMProxyTest { + private static final Log LOG = LogFactory.getLog(BaseAMRMProxyTest.class); + /** + * The AMRMProxyService instance that will be used by all the test cases + */ + private MockAMRMProxyService amrmProxyService; + /** + * Thread pool used for asynchronous operations + */ + private static ExecutorService threadpool = Executors.newCachedThreadPool(); + private Configuration conf; + private AsyncDispatcher dispatcher; + + protected MockAMRMProxyService getAMRMProxyService() { + Assert.assertNotNull(this.amrmProxyService); + return this.amrmProxyService; + } + + @Before + public void setUp() { + this.conf = new YarnConfiguration(); + this.conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true); + String mockPassThroughInterceptorClass = + PassThroughRequestInterceptor.class.getName(); + + // Create a request intercepter pipeline for testing. The last one in the + // chain will call the mock resource manager. The others in the chain will + // simply forward it to the next one in the chain + this.conf.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE, + mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass + + "," + mockPassThroughInterceptorClass + "," + + MockRequestInterceptor.class.getName()); + + this.dispatcher = new AsyncDispatcher(); + this.dispatcher.init(conf); + this.dispatcher.start(); + this.amrmProxyService = createAndStartAMRMProxyService(); + } + + @After + public void tearDown() { + amrmProxyService.stop(); + amrmProxyService = null; + this.dispatcher.stop(); + } + + protected ExecutorService getThreadPool() { + return threadpool; + } + + protected MockAMRMProxyService createAndStartAMRMProxyService() { + MockAMRMProxyService svc = + new MockAMRMProxyService(new NullContext(), dispatcher); + svc.init(conf); + svc.start(); + return svc; + } + + /** + * This helper method will invoke the specified function in parallel for each + * end point in the specified list using a thread pool and return the + * responses received from the function. It implements the logic required for + * dispatching requests in parallel and waiting for the responses. If any of + * the function call fails or times out, it will ignore and proceed with the + * rest. So the responses returned can be less than the number of end points + * specified + * + * @param testContext + * @param func + * @return + */ + protected List runInParallel(List testContexts, + final Function func) { + ExecutorCompletionService completionService = + new ExecutorCompletionService(this.getThreadPool()); + LOG.info("Sending requests to endpoints asynchronously. Number of test contexts=" + + testContexts.size()); + for (int index = 0; index < testContexts.size(); index++) { + final T testContext = testContexts.get(index); + + LOG.info("Adding request to threadpool for test context: " + + testContext.toString()); + + completionService.submit(new Callable() { + @Override + public R call() throws Exception { + LOG.info("Sending request. Test context:" + testContext.toString()); + + R response = null; + try { + response = func.invoke(testContext); + LOG.info("Successfully sent request for context: " + + testContext.toString()); + } catch (Throwable ex) { + LOG.error("Failed to process request for context: " + testContext); + response = null; + } + + return response; + } + }); + } + + ArrayList responseList = new ArrayList(); + LOG.info("Waiting for responses from endpoints. Number of contexts=" + + testContexts.size()); + for (int i = 0; i < testContexts.size(); ++i) { + try { + final Future future = completionService.take(); + final R response = future.get(3000, TimeUnit.MILLISECONDS); + responseList.add(response); + } catch (Throwable e) { + LOG.error("Failed to process request " + e.getMessage()); + } + } + + return responseList; + } + + /** + * Helper method to register an application master using specified testAppId + * as the application identifier and return the response + * + * @param testAppId + * @return + * @throws Exception + * @throws YarnException + * @throws IOException + */ + protected RegisterApplicationMasterResponse registerApplicationMaster( + final int testAppId) throws Exception, YarnException, IOException { + final ApplicationUserInfo ugi = getApplicationUserInfo(testAppId); + + return ugi.getUser().doAs( + new PrivilegedExceptionAction() { + @Override + public RegisterApplicationMasterResponse run() throws Exception { + getAMRMProxyService().initApp( + ugi.getAppAttemptId().getApplicationId(), + ugi.getUser().getUserName()); + + final RegisterApplicationMasterRequest req = + Records.newRecord(RegisterApplicationMasterRequest.class); + req.setHost(Integer.toString(testAppId)); + req.setRpcPort(testAppId); + req.setTrackingUrl(""); + + RegisterApplicationMasterResponse response = + getAMRMProxyService().registerApplicationMaster(req); + return response; + } + }); + } + + /** + * Helper method that can be used to register multiple application masters in + * parallel to the specified RM end points + * + * @param testContexts - used to identify the requests + * @return + */ + protected List> registerApplicationMastersInParallel( + final ArrayList testContexts) { + List> responses = + runInParallel( + testContexts, + new Function>() { + @Override + public RegisterApplicationMasterResponseInfo invoke( + T testContext) { + RegisterApplicationMasterResponseInfo response = null; + try { + int index = testContexts.indexOf(testContext); + response = + new RegisterApplicationMasterResponseInfo( + registerApplicationMaster(index), testContext); + Assert.assertNotNull(response.getResponse()); + Assert.assertEquals(Integer.toString(index), response + .getResponse().getQueue()); + + LOG.info("Sucessfully registered application master with test context: " + + testContext); + } catch (Throwable ex) { + response = null; + LOG.error("Failed to register application master with test context: " + + testContext); + } + + return response; + } + }); + + Assert.assertEquals( + "Number of responses received does not match with request", + testContexts.size(), responses.size()); + + Set contextResponses = new TreeSet(); + for (RegisterApplicationMasterResponseInfo item : responses) { + contextResponses.add(item.getTestContext()); + } + + for (T ep : testContexts) { + Assert.assertTrue(contextResponses.contains(ep)); + } + + return responses; + } + + /** + * Unregisters the application master for specified application id + * + * @param appId + * @param status + * @return + * @throws Exception + * @throws YarnException + * @throws IOException + */ + protected FinishApplicationMasterResponse finishApplicationMaster( + final int appId, final FinalApplicationStatus status) throws Exception, + YarnException, IOException { + + final ApplicationUserInfo ugi = getApplicationUserInfo(appId); + + return ugi.getUser().doAs( + new PrivilegedExceptionAction() { + @Override + public FinishApplicationMasterResponse run() throws Exception { + final FinishApplicationMasterRequest req = + Records.newRecord(FinishApplicationMasterRequest.class); + req.setDiagnostics(""); + req.setTrackingUrl(""); + req.setFinalApplicationStatus(status); + + FinishApplicationMasterResponse response = + getAMRMProxyService().finishApplicationMaster(req); + + getAMRMProxyService().stopApp( + ugi.getAppAttemptId().getApplicationId()); + + return response; + } + }); + } + + protected List> finishApplicationMastersInParallel( + final ArrayList testContexts) { + List> responses = + runInParallel( + testContexts, + new Function>() { + @Override + public FinishApplicationMasterResponseInfo invoke( + T testContext) { + FinishApplicationMasterResponseInfo response = null; + try { + response = + new FinishApplicationMasterResponseInfo( + finishApplicationMaster(testContexts.indexOf(testContext), + FinalApplicationStatus.SUCCEEDED), testContext); + Assert.assertNotNull(response.getResponse()); + + LOG.info("Sucessfully finished application master with test contexts: " + + testContext); + } catch (Throwable ex) { + response = null; + LOG.error("Failed to finish application master with test context: " + + testContext); + } + + return response; + } + }); + + Assert.assertEquals( + "Number of responses received does not match with request", + testContexts.size(), responses.size()); + + Set contextResponses = new TreeSet(); + for (FinishApplicationMasterResponseInfo item : responses) { + Assert.assertNotNull(item); + Assert.assertNotNull(item.getResponse()); + contextResponses.add(item.getTestContext()); + } + + for (T ep : testContexts) { + Assert.assertTrue(contextResponses.contains(ep)); + } + + return responses; + } + + protected AllocateResponse allocate(final int testAppId) throws Exception, + YarnException, IOException { + final AllocateRequest req = Records.newRecord(AllocateRequest.class); + req.setResponseId(testAppId); + return allocate(testAppId, req); + } + + protected AllocateResponse allocate(final int testAppId, + final AllocateRequest request) throws Exception, YarnException, + IOException { + + final ApplicationUserInfo ugi = getApplicationUserInfo(testAppId); + + return ugi.getUser().doAs( + new PrivilegedExceptionAction() { + @Override + public AllocateResponse run() throws Exception { + AllocateResponse response = getAMRMProxyService().allocate(request); + return response; + } + }); + } + + protected ApplicationUserInfo getApplicationUserInfo(final int testAppId) { + final ApplicationAttemptId attemptId = getApplicationAttemptId(testAppId); + + UserGroupInformation ugi = + UserGroupInformation.createRemoteUser(attemptId.toString()); + AMRMTokenIdentifier token = new AMRMTokenIdentifier(attemptId, 1); + ugi.addTokenIdentifier(token); + return new ApplicationUserInfo(ugi, attemptId); + } + + protected List createResourceRequests(String[] hosts, + int memory, int vCores, int priority, int containers) throws Exception { + return createResourceRequests(hosts, memory, vCores, priority, containers, + null); + } + + protected List createResourceRequests(String[] hosts, + int memory, int vCores, int priority, int containers, + String labelExpression) throws Exception { + List reqs = new ArrayList(); + for (String host : hosts) { + ResourceRequest hostReq = + createResourceRequest(host, memory, vCores, priority, containers, + labelExpression); + reqs.add(hostReq); + ResourceRequest rackReq = + createResourceRequest("/default-rack", memory, vCores, priority, + containers, labelExpression); + reqs.add(rackReq); + } + + ResourceRequest offRackReq = + createResourceRequest(ResourceRequest.ANY, memory, vCores, priority, + containers, labelExpression); + reqs.add(offRackReq); + return reqs; + } + + protected ResourceRequest createResourceRequest(String resource, int memory, + int vCores, int priority, int containers) throws Exception { + return createResourceRequest(resource, memory, vCores, priority, + containers, null); + } + + protected ResourceRequest createResourceRequest(String resource, int memory, + int vCores, int priority, int containers, String labelExpression) + throws Exception { + ResourceRequest req = Records.newRecord(ResourceRequest.class); + req.setResourceName(resource); + req.setNumContainers(containers); + Priority pri = Records.newRecord(Priority.class); + pri.setPriority(priority); + req.setPriority(pri); + Resource capability = Records.newRecord(Resource.class); + capability.setMemory(memory); + capability.setVirtualCores(vCores); + req.setCapability(capability); + if (labelExpression != null) { + req.setNodeLabelExpression(labelExpression); + } + return req; + } + + /** + * Returns an ApplicationId with the specified identifier + * + * @param testAppId + * @return + */ + protected ApplicationId getApplicationId(int testAppId) { + return ApplicationId.newInstance(123456, testAppId); + } + + /** + * Return an instance of ApplicationAttemptId using specified identifier. This + * identifier will be used for the ApplicationId too. + * + * @param testAppId + * @return + */ + protected ApplicationAttemptId getApplicationAttemptId(int testAppId) { + return ApplicationAttemptId.newInstance(getApplicationId(testAppId), + testAppId); + } + + /** + * Return an instance of ApplicationAttemptId using specified identifier and + * application id + * + * @param testAppId + * @return + */ + protected ApplicationAttemptId getApplicationAttemptId(int testAppId, + ApplicationId appId) { + return ApplicationAttemptId.newInstance(appId, testAppId); + } + + protected static class RegisterApplicationMasterResponseInfo { + private RegisterApplicationMasterResponse response; + private T testContext; + + RegisterApplicationMasterResponseInfo( + RegisterApplicationMasterResponse response, T testContext) { + this.response = response; + this.testContext = testContext; + } + + public RegisterApplicationMasterResponse getResponse() { + return response; + } + + public T getTestContext() { + return testContext; + } + } + + protected static class FinishApplicationMasterResponseInfo { + private FinishApplicationMasterResponse response; + private T testContext; + + FinishApplicationMasterResponseInfo( + FinishApplicationMasterResponse response, T testContext) { + this.response = response; + this.testContext = testContext; + } + + public FinishApplicationMasterResponse getResponse() { + return response; + } + + public T getTestContext() { + return testContext; + } + } + + protected static class ApplicationUserInfo { + private UserGroupInformation user; + private ApplicationAttemptId attemptId; + + ApplicationUserInfo(UserGroupInformation user, + ApplicationAttemptId attemptId) { + this.user = user; + this.attemptId = attemptId; + } + + public UserGroupInformation getUser() { + return this.user; + } + + public ApplicationAttemptId getAppAttemptId() { + return this.attemptId; + } + } + + protected static class MockAMRMProxyService extends AMRMProxyService { + public MockAMRMProxyService(Context nmContext, AsyncDispatcher dispatcher) { + super(nmContext, dispatcher); + } + + /** + * This method is used by the test code to initialize the pipeline. In the + * actual service, the initialization is called by the + * ContainerManagerImpl::StartContainers method + * + * @param applicationId + * @param user + */ + public void initApp(ApplicationId applicationId, String user) { + super.initializePipeline(applicationId, user, null); + } + + public void stopApp(ApplicationId applicationId) { + super.stopApplication(applicationId); + } + } + + /** + * The Function interface is used for passing method pointers that can be + * invoked asynchronously at a later point. + */ + protected interface Function { + public R invoke(T input); + } + + protected class NullContext implements Context { + + @Override + public NodeId getNodeId() { + return null; + } + + @Override + public int getHttpPort() { + return 0; + } + + @Override + public ConcurrentMap getApplications() { + return null; + } + + @Override + public Map getSystemCredentialsForApps() { + return null; + } + + @Override + public ConcurrentMap getContainers() { + return null; + } + + @Override + public NMContainerTokenSecretManager getContainerTokenSecretManager() { + return null; + } + + @Override + public NMTokenSecretManagerInNM getNMTokenSecretManager() { + return null; + } + + @Override + public NodeHealthStatus getNodeHealthStatus() { + return null; + } + + @Override + public ContainerManagementProtocol getContainerManager() { + return null; + } + + @Override + public LocalDirsHandlerService getLocalDirsHandler() { + return null; + } + + @Override + public ApplicationACLsManager getApplicationACLsManager() { + return null; + } + + @Override + public NMStateStoreService getNMStateStore() { + return null; + } + + @Override + public boolean getDecommissioned() { + return false; + } + + @Override + public void setDecommissioned(boolean isDecommissioned) { + } + + @Override + public ConcurrentLinkedQueue getLogAggregationStatusForApps() { + return null; + } + + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java new file mode 100644 index 0000000..f63c799 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java @@ -0,0 +1,63 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import java.io.IOException; + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; + +public class MockRequestInterceptor extends AbstractRequestInterceptor { + + private MockResourceManagerFacade mockRM; + + public MockRequestInterceptor() { + } + + public void init(AMRMProxyApplicationContext appContext) { + super.init(appContext); + mockRM = + new MockResourceManagerFacade(new YarnConfiguration(super.getConf()), 0); + } + + @Override + public RegisterApplicationMasterResponse registerApplicationMaster( + RegisterApplicationMasterRequest request) throws YarnException, + IOException { + return mockRM.registerApplicationMaster(request); + } + + @Override + public FinishApplicationMasterResponse finishApplicationMaster( + FinishApplicationMasterRequest request) throws YarnException, IOException { + return mockRM.finishApplicationMaster(request); + } + + @Override + public AllocateResponse allocate(AllocateRequest request) + throws YarnException, IOException { + return mockRM.allocate(request); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java new file mode 100644 index 0000000..22b2aa3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java @@ -0,0 +1,445 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.directory.api.util.Strings; +import org.apache.directory.api.util.exception.NotImplementedException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.records.AMCommand; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease; +import org.apache.hadoop.yarn.api.records.ContainerResourceIncrease; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NMToken; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.util.Records; +import org.junit.Assert; +import org.mortbay.log.Log; + +/** + * Mock Resource Manager facade implementation that exposes all the methods + * implemented by the YARN RM. The behavior and the values returned by this mock + * implementation is expected by the unit test cases. So please change the + * implementation with care. + */ +public class MockResourceManagerFacade implements ApplicationMasterProtocol, + ApplicationClientProtocol { + + private HashMap> applicationContainerIdMap = + new HashMap>(); + private HashMap allocatedContainerMap = + new HashMap(); + private AtomicInteger containerIndex = new AtomicInteger(0); + private Configuration conf; + + public MockResourceManagerFacade(Configuration conf, int startContainerIndex) { + this.conf = conf; + this.containerIndex.set(startContainerIndex); + } + + private static String getAppIdentifier() throws IOException { + AMRMTokenIdentifier result = null; + UserGroupInformation remoteUgi = UserGroupInformation.getCurrentUser(); + Set tokenIds = remoteUgi.getTokenIdentifiers(); + for (TokenIdentifier tokenId : tokenIds) { + if (tokenId instanceof AMRMTokenIdentifier) { + result = (AMRMTokenIdentifier) tokenId; + break; + } + } + return result != null ? result.getApplicationAttemptId().toString() : ""; + } + + @Override + public RegisterApplicationMasterResponse registerApplicationMaster( + RegisterApplicationMasterRequest request) throws YarnException, + IOException { + String amrmToken = getAppIdentifier(); + Log.info("Registering application attempt: " + amrmToken); + + synchronized (applicationContainerIdMap) { + Assert.assertFalse("The application id is already registered: " + + amrmToken, applicationContainerIdMap.containsKey(amrmToken)); + // Keep track of the containers that are returned to this application + applicationContainerIdMap.put(amrmToken, new ArrayList()); + } + + return RegisterApplicationMasterResponse.newInstance(null, null, null, + null, null, request.getHost(), null); + } + + @Override + public FinishApplicationMasterResponse finishApplicationMaster( + FinishApplicationMasterRequest request) throws YarnException, IOException { + String amrmToken = getAppIdentifier(); + Log.info("Finishing application attempt: " + amrmToken); + + synchronized (applicationContainerIdMap) { + // Remove the containers that were being tracked for this application + Assert.assertTrue("The application id is NOT registered: " + amrmToken, + applicationContainerIdMap.containsKey(amrmToken)); + List ids = applicationContainerIdMap.remove(amrmToken); + for (ContainerId c : ids) { + allocatedContainerMap.remove(c); + } + } + + return FinishApplicationMasterResponse.newInstance(request + .getFinalApplicationStatus() == FinalApplicationStatus.SUCCEEDED ? true + : false); + } + + protected ApplicationId getApplicationId(int id) { + return ApplicationId.newInstance(12345, id); + } + + protected ApplicationAttemptId getApplicationAttemptId(int id) { + return ApplicationAttemptId.newInstance(getApplicationId(id), 1); + } + + @SuppressWarnings("deprecation") + @Override + public AllocateResponse allocate(AllocateRequest request) + throws YarnException, IOException { + if (request.getAskList() != null && request.getAskList().size() > 0 + && request.getReleaseList() != null + && request.getReleaseList().size() > 0) { + Assert.fail("The mock RM implementation does not support receiving " + + "askList and releaseList in the same heartbeat"); + } + + String amrmToken = getAppIdentifier(); + + ArrayList containerList = new ArrayList(); + if (request.getAskList() != null) { + for (ResourceRequest rr : request.getAskList()) { + for (int i = 0; i < rr.getNumContainers(); i++) { + ContainerId containerId = + ContainerId.newInstance(getApplicationAttemptId(1), + containerIndex.incrementAndGet()); + Container container = Records.newRecord(Container.class); + container.setId(containerId); + container.setPriority(rr.getPriority()); + + // We don't use the node for running containers in the test cases. So + // it is OK to hard code it to some dummy value + NodeId nodeId = + NodeId.newInstance( + !Strings.isEmpty(rr.getResourceName()) ? rr.getResourceName() + : "dummy", 1000); + container.setNodeId(nodeId); + container.setResource(rr.getCapability()); + containerList.add(container); + + synchronized (applicationContainerIdMap) { + // Keep track of the containers returned to this application. We + // will need it in future + Assert.assertTrue( + "The application id is Not registered before allocate(): " + + amrmToken, + applicationContainerIdMap.containsKey(amrmToken)); + List ids = applicationContainerIdMap.get(amrmToken); + ids.add(containerId); + this.allocatedContainerMap.put(containerId, container); + } + } + } + } + + if (request.getReleaseList() != null && request.getReleaseList().size() > 0) { + Log.info("Releasing containers: " + request.getReleaseList().size()); + synchronized (applicationContainerIdMap) { + Assert.assertTrue( + "The application id is not registered before allocate(): " + + amrmToken, applicationContainerIdMap.containsKey(amrmToken)); + List ids = applicationContainerIdMap.get(amrmToken); + + for (ContainerId id : request.getReleaseList()) { + boolean found = false; + for (ContainerId c : ids) { + if (c.equals(id)) { + found = true; + break; + } + } + + Assert.assertTrue( + "ContainerId " + id + + " being released is not valid for application: " + + conf.get("AMRMTOKEN"), found); + + ids.remove(id); + + // Return the released container back to the AM with new fake Ids. The + // test case does not care about the IDs. The IDs are faked because + // otherwise the LRM will throw duplication identifier exception. This + // returning of fake containers is ONLY done for testing purpose - for + // the test code to get confirmation that the sub-cluster resource + // managers received the release request + ContainerId fakeContainerId = + ContainerId.newInstance(getApplicationAttemptId(1), + containerIndex.incrementAndGet()); + Container fakeContainer = allocatedContainerMap.get(id); + fakeContainer.setId(fakeContainerId); + containerList.add(fakeContainer); + } + } + } + + Log.info("Allocating containers: " + containerList.size() + + " for application attempt: " + conf.get("AMRMTOKEN")); + return AllocateResponse.newInstance(0, new ArrayList(), + containerList, new ArrayList(), null, AMCommand.AM_RESYNC, + 1, null, new ArrayList(), + new ArrayList(), + new ArrayList()); + } + + @Override + public GetApplicationReportResponse getApplicationReport( + GetApplicationReportRequest request) throws YarnException, IOException { + + GetApplicationReportResponse response = + Records.newRecord(GetApplicationReportResponse.class); + ApplicationReport report = Records.newRecord(ApplicationReport.class); + report.setYarnApplicationState(YarnApplicationState.ACCEPTED); + report.setApplicationId(request.getApplicationId()); + report.setCurrentApplicationAttemptId(ApplicationAttemptId.newInstance( + request.getApplicationId(), 1)); + response.setApplicationReport(report); + return response; + } + + @Override + public GetApplicationAttemptReportResponse getApplicationAttemptReport( + GetApplicationAttemptReportRequest request) throws YarnException, + IOException { + GetApplicationAttemptReportResponse response = + Records.newRecord(GetApplicationAttemptReportResponse.class); + ApplicationAttemptReport report = + Records.newRecord(ApplicationAttemptReport.class); + report.setApplicationAttemptId(request.getApplicationAttemptId()); + report.setYarnApplicationAttemptState(YarnApplicationAttemptState.LAUNCHED); + response.setApplicationAttemptReport(report); + return response; + } + + @Override + public GetNewApplicationResponse getNewApplication( + GetNewApplicationRequest request) throws YarnException, IOException { + return null; + } + + @Override + public SubmitApplicationResponse submitApplication( + SubmitApplicationRequest request) throws YarnException, IOException { + return null; + } + + @Override + public KillApplicationResponse forceKillApplication( + KillApplicationRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public GetClusterMetricsResponse getClusterMetrics( + GetClusterMetricsRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public GetApplicationsResponse getApplications(GetApplicationsRequest request) + throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request) + throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request) + throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public GetQueueUserAclsInfoResponse getQueueUserAcls( + GetQueueUserAclsInfoRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public GetDelegationTokenResponse getDelegationToken( + GetDelegationTokenRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public RenewDelegationTokenResponse renewDelegationToken( + RenewDelegationTokenRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public CancelDelegationTokenResponse cancelDelegationToken( + CancelDelegationTokenRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( + MoveApplicationAcrossQueuesRequest request) throws YarnException, + IOException { + throw new NotImplementedException(); + } + + @Override + public GetApplicationAttemptsResponse getApplicationAttempts( + GetApplicationAttemptsRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public GetContainerReportResponse getContainerReport( + GetContainerReportRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public GetContainersResponse getContainers(GetContainersRequest request) + throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public ReservationSubmissionResponse submitReservation( + ReservationSubmissionRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public ReservationUpdateResponse updateReservation( + ReservationUpdateRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public ReservationDeleteResponse deleteReservation( + ReservationDeleteRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public GetNodesToLabelsResponse getNodeToLabels( + GetNodesToLabelsRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public GetClusterNodeLabelsResponse getClusterNodeLabels( + GetClusterNodeLabelsRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public GetLabelsToNodesResponse getLabelsToNodes( + GetLabelsToNodesRequest request) throws YarnException, IOException { + // TODO Auto-generated method stub + return null; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/PassThroughRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/PassThroughRequestInterceptor.java new file mode 100644 index 0000000..fe77ae2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/PassThroughRequestInterceptor.java @@ -0,0 +1,57 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import java.io.IOException; + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.exceptions.YarnException; + +/** + * Mock intercepter that does not do anything other than forwarding it to the + * next intercepter in the chain + * + */ +public class PassThroughRequestInterceptor extends + AbstractRequestInterceptor { + + @Override + public RegisterApplicationMasterResponse registerApplicationMaster( + RegisterApplicationMasterRequest request) throws YarnException, + IOException { + return getNextInterceptor().registerApplicationMaster(request); + } + + @Override + public FinishApplicationMasterResponse finishApplicationMaster( + FinishApplicationMasterRequest request) throws YarnException, IOException { + return getNextInterceptor().finishApplicationMaster(request); + } + + @Override + public AllocateResponse allocate(AllocateRequest request) + throws YarnException, IOException { + return getNextInterceptor().allocate(request); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java new file mode 100644 index 0000000..1461477 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java @@ -0,0 +1,467 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.util.Records; +import org.junit.Assert; +import org.junit.Test; + +public class TestAMRMProxyService extends BaseAMRMProxyTest { + + private static final Log LOG = LogFactory.getLog(TestAMRMProxyService.class); + + /** + * Test if the pipeline is created properly + */ + @Test + public void testRequestInterceptorChainCreation() throws Exception { + RequestInterceptor root = + super.getAMRMProxyService().createRequestInterceptorChain(); + int index = 0; + while (root != null) { + switch (index) { + case 0: + case 1: + case 2: + Assert.assertEquals(PassThroughRequestInterceptor.class.getName(), root + .getClass().getName()); + break; + case 3: + Assert.assertEquals(MockRequestInterceptor.class.getName(), root + .getClass().getName()); + break; + } + + root = root.getNextInterceptor(); + index++; + } + + Assert.assertEquals("The number of interceptors in chain does not match", + Integer.toString(4), Integer.toString(index)); + + } + + /** + * Tests registration of a single application master. + * + * @throws Exception + */ + @Test + public void testRegisterOneApplicationMaster() throws Exception { + // The testAppId identifier is used as host name and the mock resource + // manager return it as the queue name. Assert that we received the queue + // name + int testAppId = 1; + RegisterApplicationMasterResponse response1 = + registerApplicationMaster(testAppId); + Assert.assertNotNull(response1); + Assert.assertEquals(Integer.toString(testAppId), response1.getQueue()); + } + + /** + * Tests the registration of multiple application master serially one at a + * time. + * + * @throws Exception + */ + @Test + public void testRegisterMulitpleApplicationMasters() throws Exception { + for (int testAppId = 0; testAppId < 3; testAppId++) { + RegisterApplicationMasterResponse response = + registerApplicationMaster(testAppId); + Assert.assertNotNull(response); + Assert.assertEquals(Integer.toString(testAppId), response.getQueue()); + } + } + + /** + * Tests the registration of multiple application masters using multiple + * threads in parallel + * + * @throws Exception + */ + @Test + public void testRegisterMulitpleApplicationMastersInParallel() + throws Exception { + int numberOfRequests = 5; + ArrayList testContexts = + CreateTestRequestIdentifiers(numberOfRequests); + super.registerApplicationMastersInParallel(testContexts); + } + + private ArrayList CreateTestRequestIdentifiers(int numberOfRequests) { + ArrayList testContexts = new ArrayList(); + LOG.info("Creating " + numberOfRequests + " contexts for testing"); + for (int ep = 0; ep < numberOfRequests; ep++) { + testContexts.add("test-endpoint-" + Integer.toString(ep)); + LOG.info("Created test context: " + testContexts.get(ep)); + } + return testContexts; + } + + @Test + public void testFinishOneApplicationMasterWithSuccess() throws Exception { + int testAppId = 1; + RegisterApplicationMasterResponse registerResponse = + registerApplicationMaster(testAppId); + Assert.assertNotNull(registerResponse); + Assert.assertEquals(Integer.toString(testAppId), + registerResponse.getQueue()); + + FinishApplicationMasterResponse finshResponse = + finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED); + + Assert.assertNotNull(finshResponse); + Assert.assertEquals(true, finshResponse.getIsUnregistered()); + } + + @Test + public void testFinishOneApplicationMasterWithFailure() throws Exception { + int testAppId = 1; + RegisterApplicationMasterResponse registerResponse = + registerApplicationMaster(testAppId); + Assert.assertNotNull(registerResponse); + Assert.assertEquals(Integer.toString(testAppId), + registerResponse.getQueue()); + + FinishApplicationMasterResponse finshResponse = + finishApplicationMaster(testAppId, FinalApplicationStatus.FAILED); + + Assert.assertNotNull(finshResponse); + Assert.assertEquals(false, finshResponse.getIsUnregistered()); + + try { + // Try to finish an application master that is already finished. + finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED); + Assert + .fail("The request to finish application master should have failed"); + } catch (Throwable ex) { + // This is expected. So nothing required here. + LOG.info("Finish registration failed as expected because it was not registered"); + } + } + + @Test + public void testFinishInvalidApplicationMaster() throws Exception { + try { + // Try to finish an application master that was not registered. + finishApplicationMaster(4, FinalApplicationStatus.SUCCEEDED); + Assert + .fail("The request to finish application master should have failed"); + } catch (Throwable ex) { + // This is expected. So nothing required here. + LOG.info("Finish registration failed as expected because it was not registered"); + } + } + + @Test + public void testFinishMulitpleApplicationMasters() throws Exception { + int numberOfRequests = 3; + for (int index = 0; index < numberOfRequests; index++) { + RegisterApplicationMasterResponse registerResponse = + registerApplicationMaster(index); + Assert.assertNotNull(registerResponse); + Assert.assertEquals(Integer.toString(index), registerResponse.getQueue()); + } + + // Finish in reverse sequence + for (int index = numberOfRequests - 1; index >= 0; index--) { + FinishApplicationMasterResponse finshResponse = + finishApplicationMaster(index, FinalApplicationStatus.SUCCEEDED); + + Assert.assertNotNull(finshResponse); + Assert.assertEquals(true, finshResponse.getIsUnregistered()); + + //Assert that the application has been removed from the collection + Assert.assertTrue(this.getAMRMProxyService().GetApplicationPipelines().size() == index); + } + + try { + // Try to finish an application master that is already finished. + finishApplicationMaster(1, FinalApplicationStatus.SUCCEEDED); + Assert + .fail("The request to finish application master should have failed"); + } catch (Throwable ex) { + // This is expected. So nothing required here. + LOG.info("Finish registration failed as expected because it was not registered"); + } + + try { + // Try to finish an application master that was not registered. + finishApplicationMaster(4, FinalApplicationStatus.SUCCEEDED); + Assert + .fail("The request to finish application master should have failed"); + } catch (Throwable ex) { + // This is expected. So nothing required here. + LOG.info("Finish registration failed as expected because it was not registered"); + } + } + + @Test + public void testFinishMulitpleApplicationMastersInParallel() throws Exception { + int numberOfRequests = 5; + ArrayList testContexts = new ArrayList(); + LOG.info("Creating " + numberOfRequests + " contexts for testing"); + for (int i = 0; i < numberOfRequests; i++) { + testContexts.add("test-endpoint-" + Integer.toString(i)); + LOG.info("Created test context: " + testContexts.get(i)); + + RegisterApplicationMasterResponse registerResponse = + registerApplicationMaster(i); + Assert.assertNotNull(registerResponse); + Assert.assertEquals(Integer.toString(i), registerResponse.getQueue()); + } + + finishApplicationMastersInParallel(testContexts); + } + + @Test + public void testAllocateRequestWithNullValues() throws Exception { + int testAppId = 1; + RegisterApplicationMasterResponse registerResponse = + registerApplicationMaster(testAppId); + Assert.assertNotNull(registerResponse); + Assert.assertEquals(Integer.toString(testAppId), + registerResponse.getQueue()); + + AllocateResponse allocateResponse = allocate(testAppId); + Assert.assertNotNull(allocateResponse); + + FinishApplicationMasterResponse finshResponse = + finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED); + + Assert.assertNotNull(finshResponse); + Assert.assertEquals(true, finshResponse.getIsUnregistered()); + } + + @Test + public void testAllocateRequestWithoutRegistering() throws Exception { + + try { + // Try to allocate an application master without registering. + allocate(1); + Assert + .fail("The request to allocate application master should have failed"); + } catch (Throwable ex) { + // This is expected. So nothing required here. + LOG.info("AllocateRequest failed as expected because AM was not registered"); + } + } + + @Test + public void testAllocateWithOneResourceRequest() throws Exception { + int testAppId = 1; + RegisterApplicationMasterResponse registerResponse = + registerApplicationMaster(testAppId); + Assert.assertNotNull(registerResponse); + getContainersAndAssert(testAppId, 1); + finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED); + } + + @Test + public void testAllocateWithMultipleResourceRequest() throws Exception { + int testAppId = 1; + RegisterApplicationMasterResponse registerResponse = + registerApplicationMaster(testAppId); + Assert.assertNotNull(registerResponse); + getContainersAndAssert(testAppId, 10); + finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED); + } + + @Test + public void testAllocateAndReleaseContainers() throws Exception { + int testAppId = 1; + RegisterApplicationMasterResponse registerResponse = + registerApplicationMaster(testAppId); + Assert.assertNotNull(registerResponse); + List containers = getContainersAndAssert(testAppId, 10); + releaseContainersAndAssert(testAppId, containers); + finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED); + } + + @Test + public void testAllocateAndReleaseContainersForMultipleAM() throws Exception { + int numberOfApps = 5; + for (int testAppId = 0; testAppId < numberOfApps; testAppId++) { + RegisterApplicationMasterResponse registerResponse = + registerApplicationMaster(testAppId); + Assert.assertNotNull(registerResponse); + List containers = getContainersAndAssert(testAppId, 10); + releaseContainersAndAssert(testAppId, containers); + } + for (int testAppId = 0; testAppId < numberOfApps; testAppId++) { + finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED); + } + } + + @Test + public void testAllocateAndReleaseContainersForMultipleAMInParallel() + throws Exception { + int numberOfApps = 6; + ArrayList tempAppIds = new ArrayList(); + for (int i = 0; i < numberOfApps; i++) { + tempAppIds.add(new Integer(i)); + } + + final ArrayList appIds = tempAppIds; + List responses = + runInParallel(appIds, new Function() { + @Override + public Integer invoke(Integer testAppId) { + try { + RegisterApplicationMasterResponse registerResponse = + registerApplicationMaster(testAppId); + Assert.assertNotNull("response is null", registerResponse); + List containers = + getContainersAndAssert(testAppId, 10); + releaseContainersAndAssert(testAppId, containers); + + LOG.info("Sucessfully registered application master with appId: " + + testAppId); + } catch (Throwable ex) { + LOG.error("Failed to register application master with appId: " + + testAppId, ex); + testAppId = null; + } + + return testAppId; + } + }); + + Assert.assertEquals( + "Number of responses received does not match with request", + appIds.size(), responses.size()); + + for (Integer testAppId : responses) { + Assert.assertNotNull(testAppId); + finishApplicationMaster(testAppId.intValue(), + FinalApplicationStatus.SUCCEEDED); + } + } + + private List getContainersAndAssert(int appId, + int numberOfResourceRequests) throws Exception { + AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class); + allocateRequest.setResponseId(1); + + List containers = + new ArrayList(numberOfResourceRequests); + List askList = + new ArrayList(numberOfResourceRequests); + for (int testAppId = 0; testAppId < numberOfResourceRequests; testAppId++) { + askList + .add(createResourceRequest( + "test-node-" + Integer.toString(testAppId), 6000, 2, + testAppId % 5, 1)); + } + + allocateRequest.setAskList(askList); + + AllocateResponse allocateResponse = allocate(appId, allocateRequest); + Assert.assertNotNull("allocate() returned null response", allocateResponse); + + containers.addAll(allocateResponse.getAllocatedContainers()); + + // Send max 10 heart beats to receive all the containers. If not, we will + // fail the test + int numHeartbeat = 0; + while (containers.size() < askList.size() && numHeartbeat++ < 10) { + allocateResponse = + allocate(appId, Records.newRecord(AllocateRequest.class)); + Assert.assertNotNull("allocate() returned null response", + allocateResponse); + + containers.addAll(allocateResponse.getAllocatedContainers()); + + LOG.info("Number of allocated containers in this request: " + + Integer.toString(allocateResponse.getAllocatedContainers().size())); + LOG.info("Total number of allocated containers: " + + Integer.toString(containers.size())); + Thread.sleep(10); + } + + // We broadcast the request, the number of containers we received will be + // higher than we ask + Assert.assertTrue("The asklist count is not same as response", + askList.size() <= containers.size()); + return containers; + } + + private void releaseContainersAndAssert(int appId, List containers) + throws Exception { + Assert.assertTrue(containers.size() > 0); + AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class); + allocateRequest.setResponseId(1); + + List relList = new ArrayList(containers.size()); + for (Container container : containers) { + relList.add(container.getId()); + } + + allocateRequest.setReleaseList(relList); + + AllocateResponse allocateResponse = allocate(appId, allocateRequest); + Assert.assertNotNull(allocateResponse); + + // The way the mock resource manager is setup, it will return the containers + // that were released in the response. This is done because the UAMs run + // asynchronously and we need to if all the resource managers received the + // release it. The containers sent by the mock resource managers will be + // aggregated and returned back to us and we can assert if all the release + // lists reached the sub-clusters + List containersForReleasedContainerIds = + new ArrayList(); + containersForReleasedContainerIds.addAll(allocateResponse + .getAllocatedContainers()); + + // Send max 10 heart beats to receive all the containers. If not, we will + // fail the test + int numHeartbeat = 0; + while (containersForReleasedContainerIds.size() < relList.size() + && numHeartbeat++ < 10) { + allocateResponse = + allocate(appId, Records.newRecord(AllocateRequest.class)); + Assert.assertNotNull(allocateResponse); + containersForReleasedContainerIds.addAll(allocateResponse + .getAllocatedContainers()); + + LOG.info("Number of containers received in this request: " + + Integer.toString(allocateResponse.getAllocatedContainers().size())); + LOG.info("Total number of containers received: " + + Integer.toString(containersForReleasedContainerIds.size())); + Thread.sleep(10); + } + + Assert.assertEquals(relList.size(), + containersForReleasedContainerIds.size()); + } +}