diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 640e86e4838e..a6101e944f2a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2094,6 +2094,9 @@ public static boolean isAclEnabled(Configuration conf) { public static final String DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE = "org.apache.hadoop.yarn.server.nodemanager.amrmproxy." + "DefaultRequestInterceptor"; + public static final String AMRM_PROXY_HA_ENABLED = NM_PREFIX + + "amrmproxy.ha.enable"; + public static final boolean DEFAULT_AMRM_PROXY_HA_ENABLED = false; /** * Default platform-agnostic CLASSPATH for YARN applications. A @@ -2928,6 +2931,11 @@ public static boolean isAclEnabled(Configuration conf) { public static final String FEDERATION_CACHE_TIME_TO_LIVE_SECS = FEDERATION_PREFIX + "cache-ttl.secs"; + public static final String FEDERATION_REGISTRY_BASE_KEY = + FEDERATION_PREFIX + "registry.base-dir"; + public static final String DEFAULT_FEDERATION_REGISTRY_BASE_KEY = + "yarnfederation/"; + // 5 minutes public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60; @@ -3085,6 +3093,11 @@ public static boolean isAclEnabled(Configuration conf) { // Other Configs //////////////////////////////// + public static final String YARN_REGISTRY_CLASS = + YARN_PREFIX + "registry.class"; + public static final String DEFAULT_YARN_REGISTRY_CLASS = + "org.apache.hadoop.registry.client.impl.FSRegistryOperationsService"; + /** * Use YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS instead. * The interval of the yarn client's querying application state after diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 8487e7285f04..da1637ed2194 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2817,7 +2817,20 @@ 300 + + The registry base directory for federation. + yarn.federation.registry.base-dir + yarnfederation/ + + + + + The registry implementation to use. + yarn.registry.class + org.apache.hadoop.registry.client.impl.FSRegistryOperationsService + + The interval that the yarn client library uses to poll the completion status of the asynchronous API of application client protocol. @@ -2978,6 +2991,14 @@ org.apache.hadoop.yarn.server.nodemanager.amrmproxy.DefaultRequestInterceptor + + + Whether AMRMProxy HA is enabled. + + yarn.nodemanager.amrmproxy.ha.enable + false + + Setting that controls whether distributed scheduling is enabled. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml index 43ae3af40319..cd5195dfa3a3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml @@ -66,6 +66,11 @@ test + + org.apache.hadoop + hadoop-yarn-registry + + com.google.guava guava diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java new file mode 100644 index 000000000000..df219e3573cb --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java @@ -0,0 +1,323 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.utils; + +import java.io.IOException; +import java.security.PrivilegedAction; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.registry.client.api.BindFlags; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Helper class that handles reads and writes to Yarn Registry to support UAM HA + * and second attempt. + */ +public class FederationRegistryClient { + private static final Logger LOG = + LoggerFactory.getLogger(FederationRegistryClient.class); + + private RegistryOperations registry; + + private UserGroupInformation user; + + // AppId -> SubClusterId -> UAM token + private Map>> + appSubClusterTokenMap; + + // Structure in registry: // -> UAMToken + private String registryBaseDir; + + public FederationRegistryClient(Configuration conf, + RegistryOperations registry, UserGroupInformation user) { + this.registry = registry; + this.user = user; + this.appSubClusterTokenMap = new ConcurrentHashMap<>(); + this.registryBaseDir = + conf.get(YarnConfiguration.FEDERATION_REGISTRY_BASE_KEY, + YarnConfiguration.DEFAULT_FEDERATION_REGISTRY_BASE_KEY); + LOG.info("Using registry {} with base directory: {}", + this.registry.getClass().getName(), this.registryBaseDir); + } + + public synchronized List getAllApplications() { + // Suppress the exception here because it is valid that the entry does not + // exist + List applications = null; + try { + applications = listDirRegistry(this.registry, this.user, + getRegistryKey(null, null), false); + } catch (YarnException e) { + LOG.warn("Unexpected exception from listDirRegistry", e); + } + if (applications == null) { + // It is valid for listDirRegistry to return null + return new ArrayList<>(); + } + return applications; + } + + @VisibleForTesting + public synchronized void cleanAllApplications() { + try { + removeKeyRegistry(this.registry, this.user, getRegistryKey(null, null), + true, false); + } catch (YarnException e) { + LOG.warn("Unexpected exception from removeKeyRegistry", e); + } + } + + /** + * Write/update the UAM token for an application and a sub-cluster. + * + * @param subClusterId + * @param token + * @return whether the amrmToken is added or updated to a new value + */ + public synchronized boolean writeAMRMTokenForUAM(ApplicationId appId, + String subClusterId, Token token) { + Map> subClusterTokenMap = + this.appSubClusterTokenMap.get(appId); + if (subClusterTokenMap == null) { + subClusterTokenMap = new ConcurrentHashMap<>(); + this.appSubClusterTokenMap.put(appId, subClusterTokenMap); + } + + boolean update = !token.equals(subClusterTokenMap.get(subClusterId)); + if (!update) { + LOG.info("Same amrmToken received from {}, skip writing registry for {}", + subClusterId, appId); + return update; + } + + LOG.info("Writing/Updating amrmToken for {} to registry for {}", + subClusterId, appId); + try { + // First, write the token entry + writeRegistry(this.registry, this.user, + getRegistryKey(appId, subClusterId), token.encodeToUrlString(), true); + + // Then update the subClusterTokenMap + subClusterTokenMap.put(subClusterId, token); + } catch (YarnException | IOException e) { + LOG.error( + "Failed writing AMRMToken to registry for subcluster " + subClusterId, + e); + } + return update; + } + + public synchronized Map> + loadStateFromRegistry(ApplicationId appId) { + Map> retMap = new HashMap<>(); + // Suppress the exception here because it is valid that the entry does not + // exist + List subclusters = null; + try { + subclusters = listDirRegistry(this.registry, this.user, + getRegistryKey(appId, null), false); + } catch (YarnException e) { + LOG.warn("Unexpected exception from listDirRegistry", e); + } + + if (subclusters == null) { + LOG.info("Application {} does not exist in registry", appId); + return retMap; + } + + // Read the amrmToken for each sub-cluster with an existing UAM + for (String scId : subclusters) { + LOG.info("Reading amrmToken for subcluster {} for {}", scId, appId); + String key = getRegistryKey(appId, scId); + try { + String tokenString = readRegistry(this.registry, this.user, key, true); + if (tokenString == null) { + throw new YarnException("Null string from readRegistry key " + key); + } + Token amrmToken = new Token<>(); + amrmToken.decodeFromUrlString(tokenString); + // Clear the service field, as if RM just issued the token + amrmToken.setService(new Text()); + + retMap.put(scId, amrmToken); + } catch (Exception e) { + LOG.error("Failed reading registry key " + key + + ", skipping subcluster " + scId, e); + } + } + + // Override existing map if there + this.appSubClusterTokenMap.put(appId, new ConcurrentHashMap<>(retMap)); + return retMap; + } + + public synchronized void removeAppFromRegistry(ApplicationId appId, + boolean ignoreMemoryState) { + Map> subClusterTokenMap = + this.appSubClusterTokenMap.get(appId); + if (!ignoreMemoryState) { + if (subClusterTokenMap == null || subClusterTokenMap.size() == 0) { + return; + } + } + LOG.info("Removing all registry entries for {}", appId); + + // Lastly remove the application directory + String key = getRegistryKey(appId, null); + try { + removeKeyRegistry(this.registry, this.user, key, true, true); + if (subClusterTokenMap != null) { + subClusterTokenMap.clear(); + } + } catch (YarnException e) { + LOG.error("Failed removing registry directory key " + key, e); + } + } + + private String getRegistryKey(ApplicationId appId, String fileName) { + if (appId == null) { + return this.registryBaseDir; + } + if (fileName == null) { + return this.registryBaseDir + appId.toString(); + } + return this.registryBaseDir + appId.toString() + "/" + fileName; + } + + private String readRegistry(final RegistryOperations registryImpl, + UserGroupInformation ugi, final String key, final boolean throwIfFails) + throws YarnException { + // Use the ugi loaded with app credentials to access registry + String result = ugi.doAs(new PrivilegedAction() { + @Override + public String run() { + try { + ServiceRecord value = registryImpl.resolve(key); + if (value != null) { + return value.description; + } + } catch (Throwable e) { + if (throwIfFails) { + LOG.error("Registry resolve key " + key + " failed", e); + } + } + return null; + } + }); + if (result == null && throwIfFails) { + throw new YarnException("Registry resolve key " + key + " failed"); + } + return result; + } + + private void removeKeyRegistry(final RegistryOperations registryImpl, + UserGroupInformation ugi, final String key, final boolean recursive, + final boolean throwIfFails) throws YarnException { + // Use the ugi loaded with app credentials to access registry + boolean success = ugi.doAs(new PrivilegedAction() { + @Override + public Boolean run() { + try { + registryImpl.delete(key, recursive); + return true; + } catch (Throwable e) { + if (throwIfFails) { + LOG.error("Registry remove key " + key + " failed", e); + } + } + return false; + } + }); + if (!success && throwIfFails) { + throw new YarnException("Registry remove key " + key + " failed"); + } + } + + /** + * Write registry entry, override if exists. + */ + private void writeRegistry(final RegistryOperations registryImpl, + UserGroupInformation ugi, final String key, final String value, + final boolean throwIfFails) throws YarnException { + + final ServiceRecord recordValue = new ServiceRecord(); + recordValue.description = value; + // Use the ugi loaded with app credentials to access registry + boolean success = ugi.doAs(new PrivilegedAction() { + @Override + public Boolean run() { + try { + registryImpl.bind(key, recordValue, BindFlags.OVERWRITE); + return true; + } catch (Throwable e) { + if (throwIfFails) { + LOG.error("Registry write key " + key + " failed", e); + } + } + return false; + } + }); + if (!success && throwIfFails) { + throw new YarnException("Registry write key " + key + " failed"); + } + } + + /** + * List the sub directories in the given directory. + */ + private List listDirRegistry(final RegistryOperations registryImpl, + UserGroupInformation ugi, final String key, final boolean throwIfFails) + throws YarnException { + List result = ugi.doAs(new PrivilegedAction>() { + @Override + public List run() { + try { + return registryImpl.list(key); + } catch (Throwable e) { + if (throwIfFails) { + LOG.error("Registry list key " + key + " failed", e); + } + } + return null; + } + }); + if (result == null && throwIfFails) { + throw new YarnException("Registry list key " + key + " failed"); + } + return result; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java index 08aee77fe6d6..7118eb2c221d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java @@ -33,6 +33,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; @@ -44,9 +45,9 @@ import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.utils.AMRMClientUtils; import org.apache.hadoop.yarn.util.AsyncCallback; import org.slf4j.Logger; @@ -67,7 +68,7 @@ // Map from uamId to UAM instances private Map unmanagedAppMasterMap; - private Map attemptIdMap; + private Map appIdMap; private ExecutorService threadpool; @@ -82,7 +83,7 @@ protected void serviceStart() throws Exception { this.threadpool = Executors.newCachedThreadPool(); } this.unmanagedAppMasterMap = new ConcurrentHashMap<>(); - this.attemptIdMap = new ConcurrentHashMap<>(); + this.appIdMap = new ConcurrentHashMap<>(); super.serviceStart(); } @@ -114,7 +115,7 @@ protected void serviceStop() throws Exception { public KillApplicationResponse call() throws Exception { try { LOG.info("Force-killing UAM id " + uamId + " for application " - + attemptIdMap.get(uamId)); + + appIdMap.get(uamId)); return unmanagedAppMasterMap.remove(uamId).forceKillApplication(); } catch (Exception e) { LOG.error("Failed to kill unmanaged application master", e); @@ -132,7 +133,7 @@ public KillApplicationResponse call() throws Exception { LOG.error("Failed to kill unmanaged application master", e); } } - this.attemptIdMap.clear(); + this.appIdMap.clear(); super.serviceStop(); } @@ -173,28 +174,31 @@ public String createAndRegisterNewUAM( rmClient = null; } - createAndRegisterNewUAM(appId.toString(), registerRequest, conf, appId, - queueName, submitter, appNameSuffix); + // Launch the UAM in RM + launchUAM(appId.toString(), conf, appId, queueName, submitter, + appNameSuffix); + + // Register the UAM application + registerApplicationMaster(appId.toString(), registerRequest); + + // Returns the appId as uamId return appId.toString(); } /** - * Create a new UAM and register the application, using the provided uamId and - * appId. + * Launch a new UAM, using the provided uamId and appId. * - * @param uamId identifier for the UAM - * @param registerRequest RegisterApplicationMasterRequest + * @param uamId uam Id * @param conf configuration for this UAM * @param appId application id for the UAM * @param queueName queue of the application * @param submitter submitter name of the UAM * @param appNameSuffix application name suffix for the UAM - * @return RegisterApplicationMasterResponse - * @throws YarnException if registerApplicationMaster fails - * @throws IOException if registerApplicationMaster fails + * @return UAM token + * @throws YarnException if fails + * @throws IOException if fails */ - public RegisterApplicationMasterResponse createAndRegisterNewUAM(String uamId, - RegisterApplicationMasterRequest registerRequest, Configuration conf, + public Token launchUAM(String uamId, Configuration conf, ApplicationId appId, String queueName, String submitter, String appNameSuffix) throws YarnException, IOException { @@ -207,11 +211,10 @@ public RegisterApplicationMasterResponse createAndRegisterNewUAM(String uamId, // for the same uamId being created concurrently this.unmanagedAppMasterMap.put(uamId, uam); - RegisterApplicationMasterResponse response = null; + Token amrmToken = null; try { - LOG.info("Creating and registering UAM id {} for application {}", uamId, - appId); - response = uam.createAndRegisterApplicationMaster(registerRequest); + LOG.info("Launching UAM id {} for application {}", uamId, appId); + amrmToken = uam.launchUAM(); } catch (Exception e) { // Add the map earlier and remove here if register failed because we want // to make sure there is only one uam instance per uamId at any given time @@ -219,8 +222,48 @@ public RegisterApplicationMasterResponse createAndRegisterNewUAM(String uamId, throw e; } - this.attemptIdMap.put(uamId, uam.getAttemptId()); - return response; + this.appIdMap.put(uamId, uam.getAppId()); + return amrmToken; + } + + /** + * Re-attach to an existing UAM, using the provided uamIdentifier. + * + * @param uamId uam Id + * @param conf configuration for this UAM + * @param appId application id for the UAM + * @param queueName queue of the application + * @param submitter submitter name of the UAM + * @param appNameSuffix application name suffix for the UAM + * @param uamToken UAM token + * @throws YarnException if fails + * @throws IOException if fails + */ + public void reAttachUAM(String uamId, Configuration conf, + ApplicationId appId, String queueName, String submitter, + String appNameSuffix, Token uamToken) + throws YarnException, IOException { + + if (this.unmanagedAppMasterMap.containsKey(uamId)) { + throw new YarnException("UAM " + uamId + " already exists"); + } + UnmanagedApplicationManager uam = + createUAM(conf, appId, queueName, submitter, appNameSuffix); + // Put the UAM into map first before initializing it to avoid additional UAM + // for the same uamId being created concurrently + this.unmanagedAppMasterMap.put(uamId, uam); + + try { + LOG.info("Reattaching UAM id {} for application {}", uamId, appId); + uam.reAttachUAM(uamToken); + } catch (Exception e) { + // Add the map earlier and remove here if register failed because we want + // to make sure there is only one uam instance per uamId at any given time + this.unmanagedAppMasterMap.remove(uamId); + throw e; + } + + this.appIdMap.put(uamId, uam.getAppId()); } /** @@ -241,10 +284,31 @@ protected UnmanagedApplicationManager createUAM(Configuration conf, appNameSuffix); } + /** + * Register application master for the UAM. + * + * @param uamId uam Id + * @param registerRequest RegisterApplicationMasterRequest + * @return register response + * @throws YarnException if register fails + * @throws IOException if register fails + */ + public RegisterApplicationMasterResponse registerApplicationMaster( + String uamId, RegisterApplicationMasterRequest registerRequest) + throws YarnException, IOException { + if (!this.unmanagedAppMasterMap.containsKey(uamId)) { + throw new YarnException("UAM " + uamId + " does not exist"); + } + LOG.info("Registering UAM id {} for application {}", uamId, + this.appIdMap.get(uamId)); + return this.unmanagedAppMasterMap.get(uamId) + .registerApplicationMaster(registerRequest); + } + /** * AllocateAsync to an UAM. * - * @param uamId identifier for the UAM + * @param uamId uam Id * @param request AllocateRequest * @param callback callback for response * @throws YarnException if allocate fails @@ -262,7 +326,7 @@ public void allocateAsync(String uamId, AllocateRequest request, /** * Finish an UAM/application. * - * @param uamId identifier for the UAM + * @param uamId uam Id * @param request FinishApplicationMasterRequest * @return FinishApplicationMasterResponse * @throws YarnException if finishApplicationMaster call fails @@ -274,14 +338,15 @@ public FinishApplicationMasterResponse finishApplicationMaster(String uamId, if (!this.unmanagedAppMasterMap.containsKey(uamId)) { throw new YarnException("UAM " + uamId + " does not exist"); } - LOG.info("Finishing application for UAM id {} ", uamId); + LOG.info("Finishing UAM id {} for application {}", uamId, + this.appIdMap.get(uamId)); FinishApplicationMasterResponse response = this.unmanagedAppMasterMap.get(uamId).finishApplicationMaster(request); if (response.getIsUnregistered()) { // Only remove the UAM when the unregister finished this.unmanagedAppMasterMap.remove(uamId); - this.attemptIdMap.remove(uamId); + this.appIdMap.remove(uamId); LOG.info("UAM id {} is unregistered", uamId); } return response; @@ -301,7 +366,7 @@ public FinishApplicationMasterResponse finishApplicationMaster(String uamId, /** * Return whether an UAM exists. * - * @param uamId identifier for the UAM + * @param uamId uam Id * @return UAM exists or not */ public boolean hasUAMId(String uamId) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java index 6531a75c95a1..25486eb661ff 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java @@ -50,7 +50,9 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -90,7 +92,6 @@ private AMRequestHandlerThread handlerThread; private ApplicationMasterProtocol rmProxy; private ApplicationId applicationId; - private ApplicationAttemptId attemptId; private String submitter; private String appNameSuffix; private Configuration conf; @@ -102,6 +103,14 @@ private long asyncApiPollIntervalMillis; private RecordFactory recordFactory; + /* + * This flag is used as an indication that this method launchUAM/reAttachUAM + * is called (and perhaps blocked in initializeUnmanagedAM below due to RM + * connection/failover issue and not finished yet). Set the flag before + * calling the blocking call to RM. + */ + private boolean connectionInitiated; + public UnmanagedApplicationManager(Configuration conf, ApplicationId appId, String queueName, String submitter, String appNameSuffix) { Preconditions.checkNotNull(conf, "Configuration cannot be null"); @@ -116,6 +125,7 @@ public UnmanagedApplicationManager(Configuration conf, ApplicationId appId, this.handlerThread = new AMRequestHandlerThread(); this.requestQueue = new LinkedBlockingQueue<>(); this.rmProxy = null; + this.connectionInitiated = false; this.registerRequest = null; this.recordFactory = RecordFactoryProvider.getRecordFactory(conf); this.asyncApiPollIntervalMillis = conf.getLong( @@ -125,43 +135,80 @@ public UnmanagedApplicationManager(Configuration conf, ApplicationId appId, DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS); } + /** + * Launch a new UAM in the resource manager. + * + * @return identifier uam identifier + * @throws YarnException if fails + * @throws IOException if fails + */ + public Token launchUAM() + throws YarnException, IOException { + this.connectionInitiated = true; + + // Blocking call to RM + Token amrmToken = + initializeUnmanagedAM(this.applicationId); + + // Creates the UAM connection + createUAMProxy(amrmToken); + return amrmToken; + } + + /** + * Re-attach to an existing UAM in the resource manager. + * + * @param amrmToken the UAM token + * @throws IOException if re-attach fails + * @throws YarnException if re-attach fails + */ + public void reAttachUAM(Token amrmToken) + throws IOException, YarnException { + this.connectionInitiated = true; + + // Creates the UAM connection + createUAMProxy(amrmToken); + } + + protected void createUAMProxy(Token amrmToken) + throws IOException { + this.userUgi = UserGroupInformation.createProxyUser( + this.applicationId.toString(), UserGroupInformation.getCurrentUser()); + this.rmProxy = createRMProxy(ApplicationMasterProtocol.class, this.conf, + this.userUgi, amrmToken); + } + /** * Registers this {@link UnmanagedApplicationManager} with the resource * manager. * - * @param request the register request - * @return the register response + * @param request RegisterApplicationMasterRequest + * @return register response * @throws YarnException if register fails * @throws IOException if register fails */ - public RegisterApplicationMasterResponse createAndRegisterApplicationMaster( + public RegisterApplicationMasterResponse registerApplicationMaster( RegisterApplicationMasterRequest request) throws YarnException, IOException { - // This need to be done first in this method, because it is used as an - // indication that this method is called (and perhaps blocked due to RM - // connection and not finished yet) + // Save the register request for re-register later this.registerRequest = request; - // attemptId will be available after this call - UnmanagedAMIdentifier identifier = - initializeUnmanagedAM(this.applicationId); - - try { - this.userUgi = UserGroupInformation.createProxyUser( - identifier.getAttemptId().toString(), - UserGroupInformation.getCurrentUser()); - } catch (IOException e) { - LOG.error("Exception while trying to get current user", e); - throw new YarnRuntimeException(e); - } - - this.rmProxy = createRMProxy(ApplicationMasterProtocol.class, this.conf, - this.userUgi, identifier.getToken()); - - LOG.info("Registering the Unmanaged application master {}", this.attemptId); + // Since we have setKeepContainersAcrossApplicationAttempts = true for UAM. + // We do not expect application already registered exception here + LOG.info("Registering the Unmanaged application master {}", + this.applicationId); RegisterApplicationMasterResponse response = this.rmProxy.registerApplicationMaster(this.registerRequest); + for (Container container : response.getContainersFromPreviousAttempts()) { + LOG.info("RegisterUAM returned existing running container " + + container.getId()); + } + for (NMToken nmToken : response.getNMTokensFromPreviousAttempts()) { + LOG.info("RegisterUAM returned existing NM token for node " + + nmToken.getNodeId()); + } + // Only when register succeed that we start the heartbeat thread this.handlerThread.setUncaughtExceptionHandler( new HeartBeatThreadUncaughtExceptionHandler()); @@ -187,11 +234,11 @@ public FinishApplicationMasterResponse finishApplicationMaster( this.handlerThread.shutdown(); if (this.rmProxy == null) { - if (this.registerRequest != null) { - // This is possible if the async registerApplicationMaster is still + if (this.connectionInitiated) { + // This is possible if the async launchUAM is still // blocked and retrying. Return a dummy response in this case. LOG.warn("Unmanaged AM still not successfully launched/registered yet." - + " Stopping the UAM client thread anyways."); + + " Stopping the UAM heartbeat thread anyways."); return FinishApplicationMasterResponse.newInstance(false); } else { throw new YarnException("finishApplicationMaster should not " @@ -199,7 +246,7 @@ public FinishApplicationMasterResponse finishApplicationMaster( } } return AMRMClientUtils.finishAMWithReRegister(request, this.rmProxy, - this.registerRequest, this.attemptId); + this.registerRequest, this.applicationId); } /** @@ -212,7 +259,7 @@ public FinishApplicationMasterResponse finishApplicationMaster( public KillApplicationResponse forceKillApplication() throws IOException, YarnException { KillApplicationRequest request = - KillApplicationRequest.newInstance(this.attemptId.getApplicationId()); + KillApplicationRequest.newInstance(this.applicationId); this.handlerThread.shutdown(); @@ -240,29 +287,29 @@ public void allocateAsync(AllocateRequest request, LOG.debug("Interrupted while waiting to put on response queue", ex); } // Two possible cases why the UAM is not successfully registered yet: - // 1. registerApplicationMaster is not called at all. Should throw here. - // 2. registerApplicationMaster is called but hasn't successfully returned. + // 1. launchUAM is not called at all. Should throw here. + // 2. launchUAM is called but hasn't successfully returned. // // In case 2, we have already save the allocate request above, so if the // registration succeed later, no request is lost. if (this.rmProxy == null) { - if (this.registerRequest != null) { + if (this.connectionInitiated) { LOG.info("Unmanaged AM still not successfully launched/registered yet." + " Saving the allocate request and send later."); } else { throw new YarnException( - "AllocateAsync should not be called before createAndRegister"); + "AllocateAsync should not be called before launchUAM"); } } } /** - * Returns the application attempt id of the UAM. + * Returns the application id of the UAM. * - * @return attempt id of the UAM + * @return application id of the UAM */ - public ApplicationAttemptId getAttemptId() { - return this.attemptId; + public ApplicationId getAppId() { + return this.applicationId; } /** @@ -287,15 +334,15 @@ public ApplicationAttemptId getAttemptId() { * Launch and initialize an unmanaged AM. First, it creates a new application * on the RM and negotiates a new attempt id. Then it waits for the RM * application attempt state to reach YarnApplicationAttemptState.LAUNCHED - * after which it returns the AM-RM token and the attemptId. + * after which it returns the AM-RM token. * * @param appId application id - * @return the UAM identifier + * @return the UAM token * @throws IOException if initialize fails * @throws YarnException if initialize fails */ - protected UnmanagedAMIdentifier initializeUnmanagedAM(ApplicationId appId) - throws IOException, YarnException { + protected Token initializeUnmanagedAM( + ApplicationId appId) throws IOException, YarnException { try { UserGroupInformation appSubmitter = UserGroupInformation.createRemoteUser(this.submitter); @@ -306,13 +353,12 @@ protected UnmanagedAMIdentifier initializeUnmanagedAM(ApplicationId appId) submitUnmanagedApp(appId); // Monitor the application attempt to wait for launch state - ApplicationAttemptReport attemptReport = monitorCurrentAppAttempt(appId, + monitorCurrentAppAttempt(appId, EnumSet.of(YarnApplicationState.ACCEPTED, YarnApplicationState.RUNNING, YarnApplicationState.KILLED, YarnApplicationState.FAILED, YarnApplicationState.FINISHED), YarnApplicationAttemptState.LAUNCHED); - this.attemptId = attemptReport.getApplicationAttemptId(); - return getUAMIdentifier(); + return getUAMToken(); } finally { this.rmClient = null; } @@ -343,6 +389,8 @@ private void submitUnmanagedApp(ApplicationId appId) submitRequest.setApplicationSubmissionContext(context); context.setUnmanagedAM(true); + // Required for Federation because it need UAM restart support in RM + context.setKeepContainersAcrossApplicationAttempts(true); LOG.info("Submitting unmanaged application {}", appId); this.rmClient.submitApplication(submitRequest); @@ -415,25 +463,25 @@ private ApplicationAttemptReport monitorCurrentAppAttempt(ApplicationId appId, } /** - * Gets the identifier of the unmanaged AM. + * Gets the amrmToken of the unmanaged AM. * - * @return the identifier of the unmanaged AM. + * @return the amrmToken of the unmanaged AM. * @throws IOException if getApplicationReport fails * @throws YarnException if getApplicationReport fails */ - protected UnmanagedAMIdentifier getUAMIdentifier() + protected Token getUAMToken() throws IOException, YarnException { Token token = null; org.apache.hadoop.yarn.api.records.Token amrmToken = - getApplicationReport(this.attemptId.getApplicationId()).getAMRMToken(); + getApplicationReport(this.applicationId).getAMRMToken(); if (amrmToken != null) { token = ConverterUtils.convertFromYarn(amrmToken, (Text) null); } else { LOG.warn( "AMRMToken not found in the application report for application: {}", - this.attemptId.getApplicationId()); + this.applicationId); } - return new UnmanagedAMIdentifier(this.attemptId, token); + return token; } private ApplicationReport getApplicationReport(ApplicationId appId) @@ -444,29 +492,6 @@ private ApplicationReport getApplicationReport(ApplicationId appId) return this.rmClient.getApplicationReport(request).getApplicationReport(); } - /** - * Data structure that encapsulates the application attempt identifier and the - * AMRMTokenIdentifier. Make it public because clients with HA need it. - */ - public static class UnmanagedAMIdentifier { - private ApplicationAttemptId attemptId; - private Token token; - - public UnmanagedAMIdentifier(ApplicationAttemptId attemptId, - Token token) { - this.attemptId = attemptId; - this.token = token; - } - - public ApplicationAttemptId getAttemptId() { - return this.attemptId; - } - - public Token getToken() { - return this.token; - } - } - /** * Data structure that encapsulates AllocateRequest and AsyncCallback * instance. @@ -549,8 +574,10 @@ public void run() { } request.setResponseId(lastResponseId); + AllocateResponse response = AMRMClientUtils.allocateWithReRegister( - request, rmProxy, registerRequest, attemptId); + request, rmProxy, registerRequest, applicationId); + if (response == null) { throw new YarnException("Null allocateResponse from allocate"); } @@ -578,18 +605,17 @@ public void run() { LOG.debug("Interrupted while waiting for queue", ex); } } catch (IOException ex) { - LOG.warn( - "IO Error occurred while processing heart beat for " + attemptId, - ex); + LOG.warn("IO Error occurred while processing heart beat for " + + applicationId, ex); } catch (Throwable ex) { LOG.warn( - "Error occurred while processing heart beat for " + attemptId, + "Error occurred while processing heart beat for " + applicationId, ex); } } LOG.info("UnmanagedApplicationManager has been stopped for {}. " - + "AMRequestHandlerThread thread is exiting", attemptId); + + "AMRequestHandlerThread thread is exiting", applicationId); } } @@ -600,8 +626,8 @@ public void run() { implements UncaughtExceptionHandler { @Override public void uncaughtException(Thread t, Throwable e) { - LOG.error("Heartbeat thread {} for application attempt {} crashed!", - t.getName(), attemptId, e); + LOG.error("Heartbeat thread {} for application {} crashed!", + t.getName(), applicationId, e); } } } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java index 7993bd8a5ece..3cecdca55237 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java @@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; @@ -63,16 +63,16 @@ private AMRMClientUtils() { /** * Handle ApplicationNotRegistered exception and re-register. * - * @param attemptId app attemptId + * @param appId application Id * @param rmProxy RM proxy instance * @param registerRequest the AM re-register request * @throws YarnException if re-register fails */ public static void handleNotRegisteredExceptionAndReRegister( - ApplicationAttemptId attemptId, ApplicationMasterProtocol rmProxy, + ApplicationId appId, ApplicationMasterProtocol rmProxy, RegisterApplicationMasterRequest registerRequest) throws YarnException { LOG.info("App attempt {} not registered, most likely due to RM failover. " - + " Trying to re-register.", attemptId); + + " Trying to re-register.", appId); try { rmProxy.registerApplicationMaster(registerRequest); } catch (Exception e) { @@ -93,25 +93,24 @@ public static void handleNotRegisteredExceptionAndReRegister( * @param request allocate request * @param rmProxy RM proxy * @param registerRequest the register request for re-register - * @param attemptId application attempt id + * @param appId application id * @return allocate response * @throws YarnException if RM call fails * @throws IOException if RM call fails */ public static AllocateResponse allocateWithReRegister(AllocateRequest request, ApplicationMasterProtocol rmProxy, - RegisterApplicationMasterRequest registerRequest, - ApplicationAttemptId attemptId) throws YarnException, IOException { + RegisterApplicationMasterRequest registerRequest, ApplicationId appId) + throws YarnException, IOException { try { return rmProxy.allocate(request); } catch (ApplicationMasterNotRegisteredException e) { - handleNotRegisteredExceptionAndReRegister(attemptId, rmProxy, + handleNotRegisteredExceptionAndReRegister(appId, rmProxy, registerRequest); // reset responseId after re-register request.setResponseId(0); // retry allocate - return allocateWithReRegister(request, rmProxy, registerRequest, - attemptId); + return allocateWithReRegister(request, rmProxy, registerRequest, appId); } } @@ -123,23 +122,22 @@ public static AllocateResponse allocateWithReRegister(AllocateRequest request, * @param request finishApplicationMaster request * @param rmProxy RM proxy * @param registerRequest the register request for re-register - * @param attemptId application attempt id + * @param appId application id * @return finishApplicationMaster response * @throws YarnException if RM call fails * @throws IOException if RM call fails */ public static FinishApplicationMasterResponse finishAMWithReRegister( FinishApplicationMasterRequest request, ApplicationMasterProtocol rmProxy, - RegisterApplicationMasterRequest registerRequest, - ApplicationAttemptId attemptId) throws YarnException, IOException { + RegisterApplicationMasterRequest registerRequest, ApplicationId appId) + throws YarnException, IOException { try { return rmProxy.finishApplicationMaster(request); } catch (ApplicationMasterNotRegisteredException ex) { - handleNotRegisteredExceptionAndReRegister(attemptId, rmProxy, + handleNotRegisteredExceptionAndReRegister(appId, rmProxy, registerRequest); // retry finishAM after re-register - return finishAMWithReRegister(request, rmProxy, registerRequest, - attemptId); + return finishAMWithReRegister(request, rmProxy, registerRequest, appId); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java index 628c7819dc1e..b5727aa0fea7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java @@ -111,6 +111,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NMToken; @@ -177,10 +178,9 @@ LoggerFactory.getLogger(MockResourceManagerFacade.class); private HashSet applicationMap = new HashSet<>(); - private HashMap> applicationContainerIdMap = - new HashMap>(); - private HashMap allocatedContainerMap = - new HashMap(); + private HashSet keepContainerOnUams = new HashSet<>(); + private HashMap> + applicationContainerIdMap = new HashMap<>(); private AtomicInteger containerIndex = new AtomicInteger(0); private Configuration conf; private int subClusterId; @@ -221,7 +221,7 @@ public void setRunningMode(boolean mode) { this.isRunning = mode; } - private static String getAppIdentifier() throws IOException { + private static ApplicationAttemptId getAppIdentifier() throws IOException { AMRMTokenIdentifier result = null; UserGroupInformation remoteUgi = UserGroupInformation.getCurrentUser(); Set tokenIds = remoteUgi.getTokenIdentifiers(); @@ -231,7 +231,8 @@ private static String getAppIdentifier() throws IOException { break; } } - return result != null ? result.getApplicationAttemptId().toString() : ""; + return result != null ? result.getApplicationAttemptId() + : ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0); } private void validateRunning() throws ConnectException { @@ -246,19 +247,32 @@ public RegisterApplicationMasterResponse registerApplicationMaster( throws YarnException, IOException { validateRunning(); - - String amrmToken = getAppIdentifier(); - LOG.info("Registering application attempt: " + amrmToken); + ApplicationAttemptId attemptId = getAppIdentifier(); + LOG.info("Registering application attempt: " + attemptId); shouldReRegisterNext = false; + List containersFromPreviousAttempt = null; + synchronized (applicationContainerIdMap) { - if (applicationContainerIdMap.containsKey(amrmToken)) { - throw new InvalidApplicationMasterRequestException( - AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE); + if (applicationContainerIdMap.containsKey(attemptId)) { + if (keepContainerOnUams.contains(attemptId.getApplicationId())) { + // For UAM with the keepContainersFromPreviousAttempt flag, return all + // running containers + containersFromPreviousAttempt = new ArrayList<>(); + for (ContainerId containerId : applicationContainerIdMap + .get(attemptId)) { + containersFromPreviousAttempt.add(Container.newInstance(containerId, + null, null, null, null, null)); + } + } else { + throw new InvalidApplicationMasterRequestException( + AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE); + } + } else { + // Keep track of the containers that are returned to this application + applicationContainerIdMap.put(attemptId, new ArrayList()); } - // Keep track of the containers that are returned to this application - applicationContainerIdMap.put(amrmToken, new ArrayList()); } // Make sure we wait for certain test cases last in the method @@ -278,7 +292,7 @@ public RegisterApplicationMasterResponse registerApplicationMaster( } return RegisterApplicationMasterResponse.newInstance(null, null, null, null, - null, request.getHost(), null); + containersFromPreviousAttempt, request.getHost(), null); } @Override @@ -288,8 +302,8 @@ public FinishApplicationMasterResponse finishApplicationMaster( validateRunning(); - String amrmToken = getAppIdentifier(); - LOG.info("Finishing application attempt: " + amrmToken); + ApplicationAttemptId attemptId = getAppIdentifier(); + LOG.info("Finishing application attempt: " + attemptId); if (shouldReRegisterNext) { String message = "AM is not registered, should re-register."; @@ -299,12 +313,9 @@ public FinishApplicationMasterResponse finishApplicationMaster( synchronized (applicationContainerIdMap) { // Remove the containers that were being tracked for this application - Assert.assertTrue("The application id is NOT registered: " + amrmToken, - applicationContainerIdMap.containsKey(amrmToken)); - List ids = applicationContainerIdMap.remove(amrmToken); - for (ContainerId c : ids) { - allocatedContainerMap.remove(c); - } + Assert.assertTrue("The application id is NOT registered: " + attemptId, + applicationContainerIdMap.containsKey(attemptId)); + applicationContainerIdMap.remove(attemptId); } return FinishApplicationMasterResponse.newInstance( @@ -334,8 +345,8 @@ public AllocateResponse allocate(AllocateRequest request) + "askList and releaseList in the same heartbeat"); } - String amrmToken = getAppIdentifier(); - LOG.info("Allocate from application attempt: " + amrmToken); + ApplicationAttemptId attemptId = getAppIdentifier(); + LOG.info("Allocate from application attempt: " + attemptId); if (shouldReRegisterNext) { String message = "AM is not registered, should re-register."; @@ -367,16 +378,16 @@ public AllocateResponse allocate(AllocateRequest request) // will need it in future Assert.assertTrue( "The application id is Not registered before allocate(): " - + amrmToken, - applicationContainerIdMap.containsKey(amrmToken)); - List ids = applicationContainerIdMap.get(amrmToken); + + attemptId, + applicationContainerIdMap.containsKey(attemptId)); + List ids = applicationContainerIdMap.get(attemptId); ids.add(containerId); - this.allocatedContainerMap.put(containerId, container); } } } } + List completedList = new ArrayList<>(); if (request.getReleaseList() != null && request.getReleaseList().size() > 0) { LOG.info("Releasing containers: " + request.getReleaseList().size()); @@ -384,9 +395,9 @@ public AllocateResponse allocate(AllocateRequest request) Assert .assertTrue( "The application id is not registered before allocate(): " - + amrmToken, - applicationContainerIdMap.containsKey(amrmToken)); - List ids = applicationContainerIdMap.get(amrmToken); + + attemptId, + applicationContainerIdMap.containsKey(attemptId)); + List ids = applicationContainerIdMap.get(attemptId); for (ContainerId id : request.getReleaseList()) { boolean found = false; @@ -402,18 +413,8 @@ public AllocateResponse allocate(AllocateRequest request) + conf.get("AMRMTOKEN"), found); ids.remove(id); - - // Return the released container back to the AM with new fake Ids. The - // test case does not care about the IDs. The IDs are faked because - // otherwise the LRM will throw duplication identifier exception. This - // returning of fake containers is ONLY done for testing purpose - for - // the test code to get confirmation that the sub-cluster resource - // managers received the release request - ContainerId fakeContainerId = ContainerId.newInstance( - getApplicationAttemptId(1), containerIndex.incrementAndGet()); - Container fakeContainer = allocatedContainerMap.get(id); - fakeContainer.setId(fakeContainerId); - containerList.add(fakeContainer); + completedList.add( + ContainerStatus.newInstance(id, ContainerState.COMPLETE, "", 0)); } } } @@ -424,9 +425,9 @@ public AllocateResponse allocate(AllocateRequest request) // Always issue a new AMRMToken as if RM rolled master key Token newAMRMToken = Token.newInstance(new byte[0], "", new byte[0], ""); - return AllocateResponse.newInstance(0, new ArrayList(), - containerList, new ArrayList(), null, AMCommand.AM_RESYNC, - 1, null, new ArrayList(), newAMRMToken, + return AllocateResponse.newInstance(0, completedList, containerList, + new ArrayList(), null, AMCommand.AM_RESYNC, 1, null, + new ArrayList(), newAMRMToken, new ArrayList()); } @@ -443,6 +444,7 @@ public GetApplicationReportResponse getApplicationReport( report.setApplicationId(request.getApplicationId()); report.setCurrentApplicationAttemptId( ApplicationAttemptId.newInstance(request.getApplicationId(), 1)); + report.setAMRMToken(Token.newInstance(new byte[0], "", new byte[0], "")); response.setApplicationReport(report); return response; } @@ -486,6 +488,12 @@ public SubmitApplicationResponse submitApplication( } LOG.info("Application submitted: " + appId); applicationMap.add(appId); + + if (request.getApplicationSubmissionContext().getUnmanagedAM() + || request.getApplicationSubmissionContext() + .getKeepContainersAcrossApplicationAttempts()) { + keepContainerOnUams.add(appId); + } return SubmitApplicationResponse.newInstance(); } @@ -502,6 +510,7 @@ public KillApplicationResponse forceKillApplication( throw new ApplicationNotFoundException( "Trying to kill an absent application: " + appId); } + keepContainerOnUams.remove(appId); } LOG.info("Force killing application: " + appId); return KillApplicationResponse.newInstance(true); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationRegistryClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationRegistryClient.java new file mode 100644 index 000000000000..5b799a7baf2e --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationRegistryClient.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.utils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.registry.client.impl.FSRegistryOperationsService; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Unit test for FederationRegistryClient. + */ +public class TestFederationRegistryClient { + private Configuration conf; + private UserGroupInformation user; + private RegistryOperations registry; + private FederationRegistryClient registryClient; + + @Before + public void setup() throws Exception { + this.conf = new YarnConfiguration(); + + this.registry = new FSRegistryOperationsService(); + this.registry.init(this.conf); + this.registry.start(); + + this.user = UserGroupInformation.getCurrentUser(); + this.registryClient = + new FederationRegistryClient(this.conf, this.registry, this.user); + this.registryClient.cleanAllApplications(); + Assert.assertEquals(0, this.registryClient.getAllApplications().size()); + } + + @After + public void breakDown() { + registryClient.cleanAllApplications(); + Assert.assertEquals(0, registryClient.getAllApplications().size()); + registry.stop(); + } + + @Test + public void testBasicCase() { + ApplicationId appId = ApplicationId.newInstance(0, 0); + String scId1 = "subcluster1"; + String scId2 = "subcluster2"; + + this.registryClient.writeAMRMTokenForUAM(appId, scId1, + new Token()); + this.registryClient.writeAMRMTokenForUAM(appId, scId2, + new Token()); + // Duplicate entry, should overwrite + this.registryClient.writeAMRMTokenForUAM(appId, scId1, + new Token()); + + Assert.assertEquals(1, this.registryClient.getAllApplications().size()); + Assert.assertEquals(2, + this.registryClient.loadStateFromRegistry(appId).size()); + + this.registryClient.removeAppFromRegistry(appId, false); + + Assert.assertEquals(0, this.registryClient.getAllApplications().size()); + Assert.assertEquals(0, + this.registryClient.loadStateFromRegistry(appId).size()); + } + + @Test + public void testRemoveWithMemoryState() { + ApplicationId appId1 = ApplicationId.newInstance(0, 0); + ApplicationId appId2 = ApplicationId.newInstance(0, 1); + String scId0 = "subcluster0"; + + this.registryClient.writeAMRMTokenForUAM(appId1, scId0, + new Token()); + this.registryClient.writeAMRMTokenForUAM(appId2, scId0, + new Token()); + Assert.assertEquals(2, this.registryClient.getAllApplications().size()); + + // Create a new client instance + this.registryClient = + new FederationRegistryClient(this.conf, this.registry, this.user); + + this.registryClient.loadStateFromRegistry(appId2); + // Should remove app2 + this.registryClient.removeAppFromRegistry(appId2, false); + Assert.assertEquals(1, this.registryClient.getAllApplications().size()); + + // Should not remove app1 since memory state don't have it + this.registryClient.removeAppFromRegistry(appId1, false); + Assert.assertEquals(1, this.registryClient.getAllApplications().size()); + + // Should remove app1 + this.registryClient.removeAppFromRegistry(appId1, true); + Assert.assertEquals(0, this.registryClient.getAllApplications().size()); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java index 9159cf751506..242d9459d735 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java @@ -88,7 +88,8 @@ protected void waitForCallBackCountAndCheckZeroPending( public void testBasicUsage() throws YarnException, IOException, InterruptedException { - createAndRegisterApplicationMaster( + launchUAM(attemptId); + registerApplicationMaster( RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback, @@ -102,11 +103,48 @@ public void testBasicUsage() attemptId); } + /* + * Test re-attaching of an existing UAM. This is for HA of UAM client. + */ + @Test(timeout = 5000) + public void testUAMReAttach() + throws YarnException, IOException, InterruptedException { + + launchUAM(attemptId); + registerApplicationMaster( + RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); + + allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback, + attemptId); + // Wait for outstanding async allocate callback + waitForCallBackCountAndCheckZeroPending(callback, 1); + + MockResourceManagerFacade rmProxy = uam.getRMProxy(); + uam = new TestableUnmanagedApplicationManager(conf, + attemptId.getApplicationId(), null, "submitter", "appNameSuffix"); + uam.setRMProxy(rmProxy); + + reAttachUAM(null, attemptId); + registerApplicationMaster( + RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); + + allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback, + attemptId); + + // Wait for outstanding async allocate callback + waitForCallBackCountAndCheckZeroPending(callback, 2); + + finishApplicationMaster( + FinishApplicationMasterRequest.newInstance(null, null, null), + attemptId); + } + @Test(timeout = 5000) public void testReRegister() throws YarnException, IOException, InterruptedException { - createAndRegisterApplicationMaster( + launchUAM(attemptId); + registerApplicationMaster( RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); uam.setShouldReRegisterNext(); @@ -137,7 +175,8 @@ public void testSlowRegisterCall() @Override public void run() { try { - createAndRegisterApplicationMaster( + launchUAM(attemptId); + registerApplicationMaster( RegisterApplicationMasterRequest.newInstance(null, 1001, null), attemptId); } catch (Exception e) { @@ -221,7 +260,8 @@ public void testFinishWithoutRegister() @Test public void testForceKill() throws YarnException, IOException, InterruptedException { - createAndRegisterApplicationMaster( + launchUAM(attemptId); + registerApplicationMaster( RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); uam.forceKillApplication(); @@ -241,19 +281,40 @@ protected UserGroupInformation getUGIWithToken( return ugi; } - protected RegisterApplicationMasterResponse - createAndRegisterApplicationMaster( - final RegisterApplicationMasterRequest request, - ApplicationAttemptId appAttemptId) - throws YarnException, IOException, InterruptedException { + protected Token launchUAM( + ApplicationAttemptId appAttemptId) + throws IOException, InterruptedException { + return getUGIWithToken(appAttemptId) + .doAs(new PrivilegedExceptionAction>() { + @Override + public Token run() throws Exception { + return uam.launchUAM(); + } + }); + } + + protected void reAttachUAM(final Token uamToken, + ApplicationAttemptId appAttemptId) + throws IOException, InterruptedException { + getUGIWithToken(appAttemptId).doAs(new PrivilegedExceptionAction() { + @Override + public Token run() throws Exception { + uam.reAttachUAM(uamToken); + return null; + } + }); + } + + protected RegisterApplicationMasterResponse registerApplicationMaster( + final RegisterApplicationMasterRequest request, + ApplicationAttemptId appAttemptId) + throws YarnException, IOException, InterruptedException { return getUGIWithToken(appAttemptId).doAs( new PrivilegedExceptionAction() { @Override public RegisterApplicationMasterResponse run() throws YarnException, IOException { - RegisterApplicationMasterResponse response = - uam.createAndRegisterApplicationMaster(request); - return response; + return uam.registerApplicationMaster(request); } }); } @@ -330,6 +391,14 @@ public void setShouldReRegisterNext() { rmProxy.setShouldReRegisterNext(); } } + + public MockResourceManagerFacade getRMProxy() { + return rmProxy; + } + + public void setRMProxy(MockResourceManagerFacade proxy) { + this.rmProxy = proxy; + } } } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java index c355a8b4f42b..92afcb7ad12d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; @@ -67,4 +69,18 @@ */ Context getNMCotext(); + /** + * Gets the credentials of this application. + * + * @return the credentials. + */ + Credentials getCredentials(); + + /** + * Gets the registry client. + * + * @return the registry. + */ + RegistryOperations getRegistryClient(); + } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java index 9938b370b24e..c0d1b130911d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java @@ -22,6 +22,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -42,6 +44,8 @@ private Integer localTokenKeyId; private Token amrmToken; private Token localToken; + private Credentials credentials; + private RegistryOperations registry; /** * Create an instance of the AMRMProxyApplicationContext. @@ -52,17 +56,23 @@ * @param user user name of the application * @param amrmToken amrmToken issued by RM * @param localToken amrmToken issued by AMRMProxy + * @param credentials application credentials + * @param registry Yarn Registry client */ - public AMRMProxyApplicationContextImpl(Context nmContext, - Configuration conf, ApplicationAttemptId applicationAttemptId, - String user, Token amrmToken, - Token localToken) { + public AMRMProxyApplicationContextImpl(Context nmContext, Configuration conf, + ApplicationAttemptId applicationAttemptId, String user, + Token amrmToken, + Token localToken, + Credentials credentials, + RegistryOperations registry) { this.nmContext = nmContext; this.conf = conf; this.applicationAttemptId = applicationAttemptId; this.user = user; this.amrmToken = amrmToken; this.localToken = localToken; + this.credentials = credentials; + this.registry = registry; } @Override @@ -88,11 +98,14 @@ public String getUser() { /** * Sets the application's AMRMToken. * - * @param amrmToken amrmToken issued by RM + * @param amrmToken the new amrmToken from RM + * @return whether the saved token is updated to a different value */ - public synchronized void setAMRMToken( + public synchronized boolean setAMRMToken( Token amrmToken) { + Token oldValue = this.amrmToken; this.amrmToken = amrmToken; + return !this.amrmToken.equals(oldValue); } @Override @@ -134,4 +147,14 @@ public synchronized int getLocalAMRMTokenKeyId() { public Context getNMCotext() { return nmContext; } + + @Override + public Credentials getCredentials() { + return this.credentials; + } + + @Override + public RegistryOperations getRegistryClient() { + return this.registry; + } } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java index d63b2cf589b5..ebd85bf44f7a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java @@ -34,12 +34,13 @@ import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.registry.client.api.RegistryOperations; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; @@ -60,15 +61,19 @@ import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.api.ContainerType; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState; import org.apache.hadoop.yarn.server.nodemanager.scheduler.DistributedScheduler; import org.apache.hadoop.yarn.server.security.MasterKeyData; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,7 +87,7 @@ * pipeline is a chain of interceptor instances that can inspect and modify the * request/response as needed. */ -public class AMRMProxyService extends AbstractService implements +public class AMRMProxyService extends CompositeService implements ApplicationMasterProtocol { private static final Logger LOG = LoggerFactory .getLogger(AMRMProxyService.class); @@ -96,6 +101,7 @@ private InetSocketAddress listenerEndpoint; private AMRMProxyTokenSecretManager secretManager; private Map applPipelineMap; + private RegistryOperations registry; /** * Creates an instance of the service. @@ -118,10 +124,23 @@ public AMRMProxyService(Context nmContext, AsyncDispatcher dispatcher) { @Override protected void serviceInit(Configuration conf) throws Exception { - super.serviceInit(conf); this.secretManager = new AMRMProxyTokenSecretManager(this.nmContext.getNMStateStore()); this.secretManager.init(conf); + + // Both second app attempt and NM restart within Federation need registry + if (conf.getBoolean(YarnConfiguration.AMRM_PROXY_HA_ENABLED, + YarnConfiguration.DEFAULT_AMRM_PROXY_HA_ENABLED) + || conf.getBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, + YarnConfiguration.DEFAULT_NM_RECOVERY_ENABLED)) { + this.registry = FederationStateStoreFacade.createInstance(conf, + YarnConfiguration.YARN_REGISTRY_CLASS, + YarnConfiguration.DEFAULT_YARN_REGISTRY_CLASS, + RegistryOperations.class); + addService(this.registry); + } + + super.serviceInit(conf); } @Override @@ -203,6 +222,8 @@ public void recover() throws IOException { amrmToken = new Token<>(); amrmToken.decodeFromUrlString( new String(contextEntry.getValue(), "UTF-8")); + // Clear the service field, as if RM just issued the token + amrmToken.setService(new Text()); } } @@ -214,12 +235,36 @@ public void recover() throws IOException { throw new IOException("No user found for app attempt " + attemptId); } + // Regenerate the local AMRMToken for the AM Token localToken = this.secretManager.createAndGetAMRMToken(attemptId); + // Retrieve the AM container credentials from NM context + Credentials amCred = null; + for (Container container : this.nmContext.getContainers().values()) { + LOG.debug("From NM Context container " + container.getContainerId()); + if (container.getContainerId().getApplicationAttemptId().equals( + attemptId) && container.getContainerTokenIdentifier() != null) { + LOG.debug("Container type " + + container.getContainerTokenIdentifier().getContainerType()); + if (container.getContainerTokenIdentifier() + .getContainerType() == ContainerType.APPLICATION_MASTER) { + LOG.info("AM container {} found in context, has credentials: {}", + container.getContainerId(), + (container.getCredentials() != null)); + amCred = container.getCredentials(); + } + } + } + if (amCred == null) { + LOG.error("No credentials found for AM container of {}. " + + "Yarn registry access might not work", attemptId); + } + + // Create the intercepter pipeline for the AM initializePipeline(attemptId, user, amrmToken, localToken, - entry.getValue(), true); - } catch (Exception e) { + entry.getValue(), true, amCred); + } catch (IOException e) { LOG.error("Exception when recovering " + attemptId + ", removing it from NMStateStore and move on", e); this.nmContext.getNMStateStore().removeAMRMProxyAppContext(attemptId); @@ -326,7 +371,7 @@ public void processApplicationStartRequest(StartContainerRequest request) initializePipeline(appAttemptId, containerTokenIdentifierForKey.getApplicationSubmitter(), amrmToken, - localToken, null, false); + localToken, null, false, credentials); } /** @@ -342,7 +387,8 @@ public void processApplicationStartRequest(StartContainerRequest request) protected void initializePipeline(ApplicationAttemptId applicationAttemptId, String user, Token amrmToken, Token localToken, - Map recoveredDataMap, boolean isRecovery) { + Map recoveredDataMap, boolean isRecovery, + Credentials credentials) { RequestInterceptorChainWrapper chainWrapper = null; synchronized (applPipelineMap) { if (applPipelineMap @@ -404,8 +450,9 @@ protected void initializePipeline(ApplicationAttemptId applicationAttemptId, try { RequestInterceptor interceptorChain = this.createRequestInterceptorChain(); - interceptorChain.init(createApplicationMasterContext(this.nmContext, - applicationAttemptId, user, amrmToken, localToken)); + interceptorChain.init( + createApplicationMasterContext(this.nmContext, applicationAttemptId, + user, amrmToken, localToken, credentials, this.registry)); if (isRecovery) { if (recoveredDataMap == null) { throw new YarnRuntimeException( @@ -497,14 +544,12 @@ private void updateAMRMTokens(AMRMTokenIdentifier amrmTokenIdentifier, allocateResponse.setAMRMToken(null); org.apache.hadoop.security.token.Token newToken = - new org.apache.hadoop.security.token.Token( - token.getIdentifier().array(), token.getPassword().array(), - new Text(token.getKind()), new Text(token.getService())); - - context.setAMRMToken(newToken); + ConverterUtils.convertFromYarn(token, (Text) null); - // Update the AMRMToken in context map in NM state store - if (this.nmContext.getNMStateStore() != null) { + // Update the AMRMToken in context map, and in NM state store if it is + // different + if (context.setAMRMToken(newToken) + && this.nmContext.getNMStateStore() != null) { try { this.nmContext.getNMStateStore().storeAMRMProxyAppContextEntry( context.getApplicationAttemptId(), NMSS_AMRMTOKEN_KEY, @@ -547,10 +592,12 @@ private void updateAMRMTokens(AMRMTokenIdentifier amrmTokenIdentifier, private AMRMProxyApplicationContext createApplicationMasterContext( Context context, ApplicationAttemptId applicationAttemptId, String user, Token amrmToken, - Token localToken) { + Token localToken, Credentials credentials, + RegistryOperations registryImpl) { AMRMProxyApplicationContextImpl appContext = new AMRMProxyApplicationContextImpl(context, getConfig(), - applicationAttemptId, user, amrmToken, localToken); + applicationAttemptId, user, amrmToken, localToken, credentials, + registryImpl); return appContext; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java index 33cfca355b64..0169bb060c58 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java @@ -34,6 +34,8 @@ import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; @@ -42,6 +44,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -56,17 +59,20 @@ import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager; import org.apache.hadoop.yarn.server.utils.AMRMClientUtils; import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; import org.apache.hadoop.yarn.util.AsyncCallback; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -145,6 +151,8 @@ */ private UserGroupInformation appOwner; + private FederationRegistryClient registryClient; + /** * Creates an instance of the FederationInterceptor class. */ @@ -179,6 +187,10 @@ public void init(AMRMProxyApplicationContext appContext) { } catch (Exception ex) { throw new YarnRuntimeException(ex); } + // Add all app tokens for Yarn Registry access + if (appContext.getCredentials() != null) { + this.appOwner.addCredentials(appContext.getCredentials()); + } this.homeSubClusterId = SubClusterId.newInstance(YarnConfiguration.getClusterId(conf)); @@ -192,6 +204,11 @@ public void init(AMRMProxyApplicationContext appContext) { this.uamPool.init(conf); this.uamPool.start(); + + if (appContext.getRegistryClient() != null) { + this.registryClient = new FederationRegistryClient(conf, + appContext.getRegistryClient(), this.appOwner); + } } /** @@ -250,20 +267,27 @@ public void init(AMRMProxyApplicationContext appContext) { */ this.amRegistrationResponse = this.homeRM.registerApplicationMaster(request); + if (this.amRegistrationResponse + .getContainersFromPreviousAttempts() != null) { + cacheAllocatedContainers( + this.amRegistrationResponse.getContainersFromPreviousAttempts(), + this.homeSubClusterId); + } + + ApplicationId appId = + getApplicationContext().getApplicationAttemptId().getApplicationId(); + reAttachUAMAndMergeRegisterResponse(this.amRegistrationResponse, appId); // the queue this application belongs will be used for getting // AMRMProxy policy from state store. String queue = this.amRegistrationResponse.getQueue(); if (queue == null) { - LOG.warn("Received null queue for application " - + getApplicationContext().getApplicationAttemptId().getApplicationId() - + " from home sub-cluster. Will use default queue name " + LOG.warn("Received null queue for application " + appId + + " from home subcluster. Will use default queue name " + YarnConfiguration.DEFAULT_QUEUE_NAME + " for getting AMRMProxyPolicy"); } else { - LOG.info("Application " - + getApplicationContext().getApplicationAttemptId().getApplicationId() - + " belongs to queue " + queue); + LOG.info("Application " + appId + " belongs to queue " + queue); } // Initialize the AMRMProxyPolicy @@ -304,7 +328,7 @@ public AllocateResponse allocate(AllocateRequest request) AllocateResponse homeResponse = AMRMClientUtils.allocateWithReRegister( requests.get(this.homeSubClusterId), this.homeRM, this.amRegistrationRequest, - getApplicationContext().getApplicationAttemptId()); + getApplicationContext().getApplicationAttemptId().getApplicationId()); // Notify policy of home response try { @@ -393,8 +417,8 @@ public FinishApplicationMasterResponseInfo call() throws Exception { // request to the home resource manager on this thread. FinishApplicationMasterResponse homeResponse = AMRMClientUtils.finishAMWithReRegister(request, this.homeRM, - this.amRegistrationRequest, - getApplicationContext().getApplicationAttemptId()); + this.amRegistrationRequest, getApplicationContext() + .getApplicationAttemptId().getApplicationId()); if (subClusterIds.size() > 0) { // Wait for other sub-cluster resource managers to return the @@ -425,6 +449,14 @@ public FinishApplicationMasterResponseInfo call() throws Exception { if (failedToUnRegister) { homeResponse.setIsUnregistered(false); + } else { + // Clean up UAMs only when the app finishes successfully, so that no more + // attempt will be launched. + this.uamPool.stop(); + if (this.registryClient != null) { + this.registryClient.removeAppFromRegistry(getApplicationContext() + .getApplicationAttemptId().getApplicationId(), false); + } } return homeResponse; } @@ -442,9 +474,8 @@ public void setNextInterceptor(RequestInterceptor next) { */ @Override public void shutdown() { - if (this.uamPool != null) { - this.uamPool.stop(); - } + // Do not stop uamPool service and kill UAMs here because of possible second + // app attempt if (threadpool != null) { try { threadpool.shutdown(); @@ -455,6 +486,16 @@ public void shutdown() { super.shutdown(); } + /** + * Only for unit test cleanup. + */ + @VisibleForTesting + protected void cleanupRegistry() { + if (this.registryClient != null) { + this.registryClient.cleanAllApplications(); + } + } + /** * Create the UAM pool manager for secondary sub-clsuters. For unit test to * override. @@ -486,6 +527,108 @@ protected ApplicationMasterProtocol createHomeRMProxy( } } + private void mergeRegisterResponse( + RegisterApplicationMasterResponse homeResponse, + RegisterApplicationMasterResponse otherResponse) { + + if (!isNullOrEmpty(otherResponse.getContainersFromPreviousAttempts())) { + if (!isNullOrEmpty(homeResponse.getContainersFromPreviousAttempts())) { + homeResponse.getContainersFromPreviousAttempts() + .addAll(otherResponse.getContainersFromPreviousAttempts()); + } else { + homeResponse.setContainersFromPreviousAttempts( + otherResponse.getContainersFromPreviousAttempts()); + } + } + + if (!isNullOrEmpty(otherResponse.getNMTokensFromPreviousAttempts())) { + if (!isNullOrEmpty(homeResponse.getNMTokensFromPreviousAttempts())) { + homeResponse.getNMTokensFromPreviousAttempts() + .addAll(otherResponse.getNMTokensFromPreviousAttempts()); + } else { + homeResponse.setNMTokensFromPreviousAttempts( + otherResponse.getNMTokensFromPreviousAttempts()); + } + } + } + + protected void reAttachUAMAndMergeRegisterResponse( + RegisterApplicationMasterResponse homeResponse, + final ApplicationId appId) { + + if (this.registryClient == null) { + LOG.warn("registryClient is null, skip attaching existing UAM if any"); + return; + } + + // Load existing running UAMs from the previous attempts from + // registry, if any + Map> uamMap = + this.registryClient.loadStateFromRegistry(appId); + LOG.info("Found {} existing UAMs for application {} in Yarn Registry. " + + "Reattaching in parallel", uamMap.size(), appId); + + ExecutorCompletionService + completionService = new ExecutorCompletionService<>(threadpool); + + for (Entry> entry : uamMap.entrySet()) { + final SubClusterId subClusterId = + SubClusterId.newInstance(entry.getKey()); + final Token amrmToken = entry.getValue(); + + completionService + .submit(new Callable() { + @Override + public RegisterApplicationMasterResponse call() throws Exception { + RegisterApplicationMasterResponse response = null; + try { + // Create a config loaded with federation on and subclusterId + // for each UAM + YarnConfiguration config = new YarnConfiguration(getConf()); + FederationProxyProviderUtil.updateConfForFederation(config, + subClusterId.getId()); + + uamPool.reAttachUAM(subClusterId.getId(), config, appId, + amRegistrationResponse.getQueue(), + getApplicationContext().getUser(), homeSubClusterId.getId(), + amrmToken); + + response = uamPool.registerApplicationMaster( + subClusterId.getId(), amRegistrationRequest); + + if (response != null + && response.getContainersFromPreviousAttempts() != null) { + cacheAllocatedContainers( + response.getContainersFromPreviousAttempts(), + subClusterId); + } + LOG.info("UAM {} reattached for {}", subClusterId, appId); + } catch (Throwable e) { + LOG.error( + "Reattaching UAM " + subClusterId + " failed for " + appId, + e); + } + return response; + } + }); + } + + // Wait for the re-attach responses + for (int i = 0; i < uamMap.size(); i++) { + try { + Future future = + completionService.take(); + RegisterApplicationMasterResponse registerResponse = future.get(); + if (registerResponse != null) { + LOG.info("Merging register response for {}", appId); + mergeRegisterResponse(homeResponse, registerResponse); + } + } catch (Exception e) { + LOG.warn("Reattaching UAM failed for ApplicationId: " + appId, e); + } + } + } + private SubClusterId getSubClusterForNode(String nodeName) { SubClusterId subClusterId = null; try { @@ -655,6 +798,20 @@ public void callback(AllocateResponse response) { responses.add(response); } + // Save the new AMRMToken for the UAM in registry if present + if (response.getAMRMToken() != null) { + Token newToken = ConverterUtils + .convertFromYarn(response.getAMRMToken(), (Text) null); + // Update the token in registry + if (registryClient != null) { + registryClient + .writeAMRMTokenForUAM( + getApplicationContext().getApplicationAttemptId() + .getApplicationId(), + subClusterId.getId(), newToken); + } + } + // Notify policy of secondary sub-cluster responses try { policyInterpreter.notifyOfResponse(subClusterId, response); @@ -714,20 +871,23 @@ public RegisterApplicationMasterResponseInfo call() subClusterId); RegisterApplicationMasterResponse uamResponse = null; + Token token = null; try { // For appNameSuffix, use subClusterId of the home sub-cluster - uamResponse = uamPool.createAndRegisterNewUAM(subClusterId, - registerRequest, config, + token = uamPool.launchUAM(subClusterId, config, appContext.getApplicationAttemptId().getApplicationId(), amRegistrationResponse.getQueue(), appContext.getUser(), homeSubClusterId.toString()); + + uamResponse = uamPool.registerApplicationMaster(subClusterId, + registerRequest); } catch (Throwable e) { LOG.error("Failed to register application master: " + subClusterId + " Application: " + appContext.getApplicationAttemptId(), e); } return new RegisterApplicationMasterResponseInfo(uamResponse, - SubClusterId.newInstance(subClusterId)); + SubClusterId.newInstance(subClusterId), token); } }); } @@ -752,6 +912,14 @@ public RegisterApplicationMasterResponseInfo call() + getApplicationContext().getApplicationAttemptId()); successfulRegistrations.put(uamResponse.getSubClusterId(), uamResponse.getResponse()); + + if (registryClient != null) { + registryClient.writeAMRMTokenForUAM( + getApplicationContext().getApplicationAttemptId() + .getApplicationId(), + uamResponse.getSubClusterId().getId(), + uamResponse.getUamToken()); + } } } catch (Exception e) { LOG.warn("Failed to register unmanaged application master: " @@ -1087,11 +1255,14 @@ public int getUnmanagedAMPoolSize() { private static class RegisterApplicationMasterResponseInfo { private RegisterApplicationMasterResponse response; private SubClusterId subClusterId; + private Token uamToken; RegisterApplicationMasterResponseInfo( - RegisterApplicationMasterResponse response, SubClusterId subClusterId) { + RegisterApplicationMasterResponse response, SubClusterId subClusterId, + Token uamToken) { this.response = response; this.subClusterId = subClusterId; + this.uamToken = uamToken; } public RegisterApplicationMasterResponse getResponse() { @@ -1101,6 +1272,10 @@ public RegisterApplicationMasterResponse getResponse() { public SubClusterId getSubClusterId() { return subClusterId; } + + public Token getUamToken() { + return uamToken; + } } /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 55119e04324c..5e63d19be624 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -353,10 +353,6 @@ private void recover() throws IOException, URISyntaxException { rsrcLocalizationSrvc.recoverLocalizedResources( stateStore.loadLocalizationState()); - if (this.amrmProxyEnabled) { - this.getAMRMProxyService().recover(); - } - RecoveredApplicationsState appsState = stateStore.loadApplicationsState(); for (ContainerManagerApplicationProto proto : appsState.getApplications()) { @@ -373,6 +369,11 @@ private void recover() throws IOException, URISyntaxException { recoverContainer(rcs); } + // Recovery AMRMProxy state after apps and containers are recovered + if (this.amrmProxyEnabled) { + this.getAMRMProxyService().recover(); + } + //Dispatching the RECOVERY_COMPLETED event through the dispatcher //so that all the paused, scheduled and queued containers will //be scheduled for execution on availability of resources. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index 3c574966be5c..da1d047d957d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; @@ -56,10 +57,10 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; -import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.util.Records; import org.junit.After; @@ -179,6 +180,15 @@ protected Context createContext() { return new NMContext(null, null, null, null, stateStore, false, this.conf); } + protected List getCompletedContainerIds( + List containerStatus) { + List ret = new ArrayList<>(); + for (ContainerStatus status : containerStatus) { + ret.add(status.getContainerId()); + } + return ret; + } + /** * This helper method will invoke the specified function in parallel for each * end point in the specified list using a thread pool and return the @@ -623,7 +633,7 @@ protected void serviceStart() throws Exception { */ public void initApp(ApplicationAttemptId applicationId, String user) { super.initializePipeline(applicationId, user, - new Token(), null, null, false); + new Token(), null, null, false, null); } public void stopApp(ApplicationId applicationId) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java index 937ede59d7a5..b955311bb9c9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java @@ -444,7 +444,7 @@ public void testMultipleAttemptsSameNode() applicationAttemptId = ApplicationAttemptId.newInstance(appId, 2); getAMRMProxyService().initializePipeline(applicationAttemptId, user, - new Token(), null, null, false); + new Token(), null, null, false, null); RequestInterceptorChainWrapper chain2 = getAMRMProxyService().getPipelines().get(appId); @@ -531,16 +531,14 @@ private void releaseContainersAndAssert(int appId, "new AMRMToken from RM should have been nulled by AMRMProxyService", allocateResponse.getAMRMToken()); - // The way the mock resource manager is setup, it will return the containers - // that were released in the response. This is done because the UAMs run - // asynchronously and we need to if all the resource managers received the - // release it. The containers sent by the mock resource managers will be + // We need to make sure all the resource managers received the + // release list. The containers sent by the mock resource managers will be // aggregated and returned back to us and we can assert if all the release // lists reached the sub-clusters - List containersForReleasedContainerIds = - new ArrayList(); - containersForReleasedContainerIds.addAll(allocateResponse - .getAllocatedContainers()); + List containersForReleasedContainerIds = new ArrayList<>(); + List newlyFinished = getCompletedContainerIds( + allocateResponse.getCompletedContainersStatuses()); + containersForReleasedContainerIds.addAll(newlyFinished); // Send max 10 heart beats to receive all the containers. If not, we will // fail the test @@ -554,8 +552,9 @@ private void releaseContainersAndAssert(int appId, "new AMRMToken from RM should have been nulled by AMRMProxyService", allocateResponse.getAMRMToken()); - containersForReleasedContainerIds.addAll(allocateResponse - .getAllocatedContainers()); + newlyFinished = getCompletedContainerIds( + allocateResponse.getCompletedContainersStatuses()); + containersForReleasedContainerIds.addAll(newlyFinished); LOG.info("Number of containers received in this request: " + Integer.toString(allocateResponse.getAllocatedContainers() diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java index 3db0e351e0e0..aa7ed697b875 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java @@ -19,16 +19,20 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.registry.client.impl.FSRegistryOperationsService; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; @@ -59,6 +63,10 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; import org.junit.Test; @@ -79,7 +87,10 @@ private TestableFederationInterceptor interceptor; private MemoryFederationStateStore stateStore; + private NMStateStoreService nmStateStore; + private RegistryOperations registry; + private Context nmContext; private int testAppId; private ApplicationAttemptId attemptId; @@ -93,15 +104,28 @@ public void setUp() throws IOException { FederationStateStoreFacade.getInstance().reinitialize(stateStore, getConf()); + nmStateStore = new NMMemoryStateStoreService(); + nmStateStore.init(getConf()); + nmStateStore.start(); + + registry = new FSRegistryOperationsService(); + registry.init(getConf()); + registry.start(); + testAppId = 1; attemptId = getApplicationAttemptId(testAppId); - interceptor.init(new AMRMProxyApplicationContextImpl(null, getConf(), - attemptId, "test-user", null, null)); + nmContext = + new NMContext(null, null, null, null, nmStateStore, false, getConf()); + interceptor.init(new AMRMProxyApplicationContextImpl(nmContext, getConf(), + attemptId, "test-user", null, null, null, registry)); + interceptor.cleanupRegistry(); } @Override public void tearDown() { + interceptor.cleanupRegistry(); interceptor.shutdown(); + registry.stop(); super.tearDown(); } @@ -207,18 +231,17 @@ private void releaseContainersAndAssert(List containers) AllocateResponse allocateResponse = interceptor.allocate(allocateRequest); Assert.assertNotNull(allocateResponse); - // The way the mock resource manager is setup, it will return the containers - // that were released in the allocated containers. The release request will - // be split and handled by the corresponding UAM. The release containers - // returned by the mock resource managers will be aggregated and returned - // back to us and we can check if total request size and returned size are - // the same - List containersForReleasedContainerIds = - new ArrayList(); - containersForReleasedContainerIds - .addAll(allocateResponse.getAllocatedContainers()); + // The release request will be split and handled by the corresponding UAM. + // The release containers returned by the mock resource managers will be + // aggregated and returned back to us and we can check if total request size + // and returned size are the same + List containersForReleasedContainerIds = + new ArrayList(); + List newlyFinished = getCompletedContainerIds( + allocateResponse.getCompletedContainersStatuses()); + containersForReleasedContainerIds.addAll(newlyFinished); LOG.info("Number of containers received in the original request: " - + Integer.toString(allocateResponse.getAllocatedContainers().size())); + + Integer.toString(newlyFinished.size())); // Send max 10 heart beats to receive all the containers. If not, we will // fail the test @@ -228,11 +251,12 @@ private void releaseContainersAndAssert(List containers) allocateResponse = interceptor.allocate(Records.newRecord(AllocateRequest.class)); Assert.assertNotNull(allocateResponse); - containersForReleasedContainerIds - .addAll(allocateResponse.getAllocatedContainers()); + newlyFinished = getCompletedContainerIds( + allocateResponse.getCompletedContainersStatuses()); + containersForReleasedContainerIds.addAll(newlyFinished); LOG.info("Number of containers received in this request: " - + Integer.toString(allocateResponse.getAllocatedContainers().size())); + + Integer.toString(newlyFinished.size())); LOG.info("Total number of containers received: " + Integer.toString(containersForReleasedContainerIds.size())); Thread.sleep(10); @@ -547,4 +571,74 @@ public void testAllocateResponse() throws Exception { Assert.assertEquals(1, response.getUpdatedContainers().size()); Assert.assertEquals(1, response.getUpdateErrors().size()); } + + @Test + public void testSecondAttempt() throws Exception { + ApplicationUserInfo userInfo = getApplicationUserInfo(testAppId); + userInfo.getUser().doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + // Register the application + RegisterApplicationMasterRequest registerReq = + Records.newRecord(RegisterApplicationMasterRequest.class); + registerReq.setHost(Integer.toString(testAppId)); + registerReq.setRpcPort(testAppId); + registerReq.setTrackingUrl(""); + + RegisterApplicationMasterResponse registerResponse = + interceptor.registerApplicationMaster(registerReq); + Assert.assertNotNull(registerResponse); + + Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); + + // Allocate one batch of containers + registerSubCluster(SubClusterId.newInstance("SC-1")); + registerSubCluster(SubClusterId.newInstance(HOME_SC_ID)); + + int numberOfContainers = 3; + List containers = + getContainersAndAssert(numberOfContainers, numberOfContainers * 2); + for (Container c : containers) { + System.out.println(c.getId() + " ha"); + } + Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); + + // Preserve the mock RM instances for secondaries + ConcurrentHashMap secondaries = + interceptor.getSecondaryRMs(); + + // Increase the attemptId and create a new intercepter instance for it + attemptId = ApplicationAttemptId.newInstance( + attemptId.getApplicationId(), attemptId.getAttemptId() + 1); + + interceptor = new TestableFederationInterceptor(null, secondaries); + interceptor.init(new AMRMProxyApplicationContextImpl(nmContext, + getConf(), attemptId, "test-user", null, null, null, registry)); + registerResponse = interceptor.registerApplicationMaster(registerReq); + + // Should re-attach secondaries and get the three running containers + Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); + Assert.assertEquals(numberOfContainers, + registerResponse.getContainersFromPreviousAttempts().size()); + + // Release all containers + releaseContainersAndAssert( + registerResponse.getContainersFromPreviousAttempts()); + + // Finish the application + FinishApplicationMasterRequest finishReq = + Records.newRecord(FinishApplicationMasterRequest.class); + finishReq.setDiagnostics(""); + finishReq.setTrackingUrl(""); + finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); + + FinishApplicationMasterResponse finshResponse = + interceptor.finishApplicationMaster(finishReq); + Assert.assertNotNull(finshResponse); + Assert.assertEquals(true, finshResponse.getIsUnregistered()); + return null; + } + }); + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java index d4b8735d4646..f21536d29e58 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java @@ -44,6 +44,15 @@ private AtomicInteger runningIndex = new AtomicInteger(0); private MockResourceManagerFacade mockRm; + public TestableFederationInterceptor() { + } + + public TestableFederationInterceptor(MockResourceManagerFacade homeRM, + ConcurrentHashMap secondaries) { + mockRm = homeRM; + secondaryResourceManagers = secondaries; + } + @Override protected UnmanagedAMPoolManager createUnmanagedAMPoolManager( ExecutorService threadPool) { @@ -68,7 +77,7 @@ protected ApplicationMasterProtocol createHomeRMProxy( // We create one instance of the mock resource manager per sub cluster. Keep // track of the instances of the RMs in the map keyed by the sub cluster id synchronized (this.secondaryResourceManagers) { - if (this.secondaryResourceManagers.contains(subClusterId)) { + if (this.secondaryResourceManagers.containsKey(subClusterId)) { return (T) this.secondaryResourceManagers.get(subClusterId); } else { // The running index here is used to simulate different RM_EPOCH to @@ -91,6 +100,15 @@ protected void setShouldReRegisterNext() { } } + protected MockResourceManagerFacade getHomeRM() { + return mockRm; + } + + protected ConcurrentHashMap + getSecondaryRMs() { + return secondaryResourceManagers; + } + /** * Extends the UnmanagedAMPoolManager and overrides methods to provide a * testable implementation of UnmanagedAMPoolManager. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index 7a225c82749b..8245fd6983a6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -36,6 +36,7 @@ import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.net.ServerSocketUtil; import org.apache.hadoop.service.AbstractService; @@ -959,9 +960,10 @@ public ShortCircuitedAMRMProxy(Context context, protected void initializePipeline(ApplicationAttemptId applicationAttemptId, String user, Token amrmToken, Token localToken, - Map recoveredDataMap, boolean isRecovery) { + Map recoveredDataMap, boolean isRecovery, + Credentials credentials) { super.initializePipeline(applicationAttemptId, user, amrmToken, - localToken, recoveredDataMap, isRecovery); + localToken, recoveredDataMap, isRecovery, credentials); RequestInterceptor rt = getPipelines() .get(applicationAttemptId.getApplicationId()).getRootInterceptor(); // The DefaultRequestInterceptor will generally be the last