diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 4799137..bbdc212 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2096,6 +2096,9 @@ public static boolean isAclEnabled(Configuration conf) { public static final String DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE = "org.apache.hadoop.yarn.server.nodemanager.amrmproxy." + "DefaultRequestInterceptor"; + public static final String AMRM_PROXY_HA_ENABLED = NM_PREFIX + + "amrmproxy.ha.enable"; + public static final boolean DEFAULT_AMRM_PROXY_HA_ENABLED = false; /** * Default platform-agnostic CLASSPATH for YARN applications. A @@ -2930,6 +2933,11 @@ public static boolean isAclEnabled(Configuration conf) { public static final String FEDERATION_CACHE_TIME_TO_LIVE_SECS = FEDERATION_PREFIX + "cache-ttl.secs"; + public static final String FEDERATION_REGISTRY_BASE_KEY = + FEDERATION_PREFIX + "registry.base-dir"; + public static final String DEFAULT_FEDERATION_REGISTRY_BASE_KEY = + "yarnfederation/"; + // 5 minutes public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60; @@ -3087,6 +3095,11 @@ public static boolean isAclEnabled(Configuration conf) { // Other Configs //////////////////////////////// + public static final String YARN_REGISTRY_CLASS = + YARN_PREFIX + "registry.class"; + public static final String DEFAULT_YARN_REGISTRY_CLASS = + "org.apache.hadoop.registry.client.impl.FSRegistryOperationsService"; + /** * Use YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS instead. * The interval of the yarn client's querying application state after diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index e90d0f2..12cb902 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2826,7 +2826,20 @@ 300 + + The registry base directory for federation. + yarn.federation.registry.base-dir + yarnfederation/ + + + + + The registry implementation to use. + yarn.registry.class + org.apache.hadoop.registry.client.impl.FSRegistryOperationsService + + The interval that the yarn client library uses to poll the completion status of the asynchronous API of application client protocol. @@ -2989,6 +3002,14 @@ + Whether AMRMProxy HA is enabled. + + yarn.nodemanager.amrmproxy.ha.enable + false + + + + Setting that controls whether distributed scheduling is enabled. yarn.nodemanager.distributed-scheduling.enabled diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml index 43ae3af..cd5195d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml @@ -67,6 +67,11 @@ + org.apache.hadoop + hadoop-yarn-registry + + + com.google.guava guava diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java new file mode 100644 index 0000000..6624318 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java @@ -0,0 +1,338 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.utils; + +import java.io.IOException; +import java.security.PrivilegedAction; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.registry.client.api.BindFlags; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Helper class that handles reads and writes to Yarn Registry to support UAM HA + * and second attempt. + */ +public class FederationRegistryClient { + private static final Logger LOG = + LoggerFactory.getLogger(FederationRegistryClient.class); + + private RegistryOperations registry; + + private UserGroupInformation user; + + // AppId -> SubClusterId -> UAM token + private Map>> + appSubClusterTokenMap; + + // Structure in registry: // -> UAMToken + private String registryBaseDir; + + public FederationRegistryClient(Configuration conf, + RegistryOperations registry, UserGroupInformation user) { + this.registry = registry; + this.user = user; + this.appSubClusterTokenMap = new ConcurrentHashMap<>(); + this.registryBaseDir = + conf.get(YarnConfiguration.FEDERATION_REGISTRY_BASE_KEY, + YarnConfiguration.DEFAULT_FEDERATION_REGISTRY_BASE_KEY); + LOG.info("Using registry {} with base directory: {}", + this.registry.getClass().getName(), this.registryBaseDir); + } + + /** + * Get the list of known applications in the registry. + * + * @return the list of known applications + */ + public List getAllApplications() { + // Suppress the exception here because it is valid that the entry does not + // exist + List applications = null; + try { + applications = listDirRegistry(this.registry, this.user, + getRegistryKey(null, null), false); + } catch (YarnException e) { + LOG.warn("Unexpected exception from listDirRegistry", e); + } + if (applications == null) { + // It is valid for listDirRegistry to return null + return new ArrayList<>(); + } + return applications; + } + + /** + * For testing, delete all application records in registry. + */ + @VisibleForTesting + public void cleanAllApplications() { + try { + removeKeyRegistry(this.registry, this.user, getRegistryKey(null, null), + true, false); + } catch (YarnException e) { + LOG.warn("Unexpected exception from removeKeyRegistry", e); + } + } + + /** + * Write/update the UAM token for an application and a sub-cluster. + * + * @param subClusterId sub-cluster id of the token + * @param token the UAM of the application + * @return whether the amrmToken is added or updated to a new value + */ + public boolean writeAMRMTokenForUAM(ApplicationId appId, + String subClusterId, Token token) { + Map> subClusterTokenMap = + this.appSubClusterTokenMap.get(appId); + if (subClusterTokenMap == null) { + subClusterTokenMap = new ConcurrentHashMap<>(); + this.appSubClusterTokenMap.put(appId, subClusterTokenMap); + } + + boolean update = !token.equals(subClusterTokenMap.get(subClusterId)); + if (!update) { + LOG.debug("Same amrmToken received from {}, skip writing registry for {}", + subClusterId, appId); + return update; + } + + LOG.info("Writing/Updating amrmToken for {} to registry for {}", + subClusterId, appId); + try { + // First, write the token entry + writeRegistry(this.registry, this.user, + getRegistryKey(appId, subClusterId), token.encodeToUrlString(), true); + + // Then update the subClusterTokenMap + subClusterTokenMap.put(subClusterId, token); + } catch (YarnException | IOException e) { + LOG.error( + "Failed writing AMRMToken to registry for subcluster " + subClusterId, + e); + } + return update; + } + + /** + * Load the information of one application from registry. + * + * @param appId application id + * @return the sub-cluster to UAM token mapping + */ + public Map> + loadStateFromRegistry(ApplicationId appId) { + Map> retMap = new HashMap<>(); + // Suppress the exception here because it is valid that the entry does not + // exist + List subclusters = null; + try { + subclusters = listDirRegistry(this.registry, this.user, + getRegistryKey(appId, null), false); + } catch (YarnException e) { + LOG.warn("Unexpected exception from listDirRegistry", e); + } + + if (subclusters == null) { + LOG.info("Application {} does not exist in registry", appId); + return retMap; + } + + // Read the amrmToken for each sub-cluster with an existing UAM + for (String scId : subclusters) { + LOG.info("Reading amrmToken for subcluster {} for {}", scId, appId); + String key = getRegistryKey(appId, scId); + try { + String tokenString = readRegistry(this.registry, this.user, key, true); + if (tokenString == null) { + throw new YarnException("Null string from readRegistry key " + key); + } + Token amrmToken = new Token<>(); + amrmToken.decodeFromUrlString(tokenString); + // Clear the service field, as if RM just issued the token + amrmToken.setService(new Text()); + + retMap.put(scId, amrmToken); + } catch (Exception e) { + LOG.error("Failed reading registry key " + key + + ", skipping subcluster " + scId, e); + } + } + + // Override existing map if there + this.appSubClusterTokenMap.put(appId, new ConcurrentHashMap<>(retMap)); + return retMap; + } + + /** + * Remove an application from registry. + * + * @param appId application id + */ + public void removeAppFromRegistry(ApplicationId appId) { + Map> subClusterTokenMap = + this.appSubClusterTokenMap.get(appId); + LOG.info("Removing all registry entries for {}", appId); + + if (subClusterTokenMap == null || subClusterTokenMap.size() == 0) { + return; + } + + // Lastly remove the application directory + String key = getRegistryKey(appId, null); + try { + removeKeyRegistry(this.registry, this.user, key, true, true); + subClusterTokenMap.clear(); + } catch (YarnException e) { + LOG.error("Failed removing registry directory key " + key, e); + } + } + + private String getRegistryKey(ApplicationId appId, String fileName) { + if (appId == null) { + return this.registryBaseDir; + } + if (fileName == null) { + return this.registryBaseDir + appId.toString(); + } + return this.registryBaseDir + appId.toString() + "/" + fileName; + } + + private String readRegistry(final RegistryOperations registryImpl, + UserGroupInformation ugi, final String key, final boolean throwIfFails) + throws YarnException { + // Use the ugi loaded with app credentials to access registry + String result = ugi.doAs(new PrivilegedAction() { + @Override + public String run() { + try { + ServiceRecord value = registryImpl.resolve(key); + if (value != null) { + return value.description; + } + } catch (Throwable e) { + if (throwIfFails) { + LOG.error("Registry resolve key " + key + " failed", e); + } + } + return null; + } + }); + if (result == null && throwIfFails) { + throw new YarnException("Registry resolve key " + key + " failed"); + } + return result; + } + + private void removeKeyRegistry(final RegistryOperations registryImpl, + UserGroupInformation ugi, final String key, final boolean recursive, + final boolean throwIfFails) throws YarnException { + // Use the ugi loaded with app credentials to access registry + boolean success = ugi.doAs(new PrivilegedAction() { + @Override + public Boolean run() { + try { + registryImpl.delete(key, recursive); + return true; + } catch (Throwable e) { + if (throwIfFails) { + LOG.error("Registry remove key " + key + " failed", e); + } + } + return false; + } + }); + if (!success && throwIfFails) { + throw new YarnException("Registry remove key " + key + " failed"); + } + } + + /** + * Write registry entry, override if exists. + */ + private void writeRegistry(final RegistryOperations registryImpl, + UserGroupInformation ugi, final String key, final String value, + final boolean throwIfFails) throws YarnException { + + final ServiceRecord recordValue = new ServiceRecord(); + recordValue.description = value; + // Use the ugi loaded with app credentials to access registry + boolean success = ugi.doAs(new PrivilegedAction() { + @Override + public Boolean run() { + try { + registryImpl.bind(key, recordValue, BindFlags.OVERWRITE); + return true; + } catch (Throwable e) { + if (throwIfFails) { + LOG.error("Registry write key " + key + " failed", e); + } + } + return false; + } + }); + if (!success && throwIfFails) { + throw new YarnException("Registry write key " + key + " failed"); + } + } + + /** + * List the sub directories in the given directory. + */ + private List listDirRegistry(final RegistryOperations registryImpl, + UserGroupInformation ugi, final String key, final boolean throwIfFails) + throws YarnException { + List result = ugi.doAs(new PrivilegedAction>() { + @Override + public List run() { + try { + return registryImpl.list(key); + } catch (Throwable e) { + if (throwIfFails) { + LOG.error("Registry list key " + key + " failed", e); + } + } + return null; + } + }); + if (result == null && throwIfFails) { + throw new YarnException("Registry list key " + key + " failed"); + } + return result; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java index 08aee77..7704ac2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java @@ -33,6 +33,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; @@ -44,9 +45,9 @@ import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.utils.AMRMClientUtils; import org.apache.hadoop.yarn.util.AsyncCallback; import org.slf4j.Logger; @@ -67,7 +68,7 @@ // Map from uamId to UAM instances private Map unmanagedAppMasterMap; - private Map attemptIdMap; + private Map appIdMap; private ExecutorService threadpool; @@ -82,7 +83,7 @@ protected void serviceStart() throws Exception { this.threadpool = Executors.newCachedThreadPool(); } this.unmanagedAppMasterMap = new ConcurrentHashMap<>(); - this.attemptIdMap = new ConcurrentHashMap<>(); + this.appIdMap = new ConcurrentHashMap<>(); super.serviceStart(); } @@ -114,7 +115,7 @@ protected void serviceStop() throws Exception { public KillApplicationResponse call() throws Exception { try { LOG.info("Force-killing UAM id " + uamId + " for application " - + attemptIdMap.get(uamId)); + + appIdMap.get(uamId)); return unmanagedAppMasterMap.remove(uamId).forceKillApplication(); } catch (Exception e) { LOG.error("Failed to kill unmanaged application master", e); @@ -132,7 +133,7 @@ public KillApplicationResponse call() throws Exception { LOG.error("Failed to kill unmanaged application master", e); } } - this.attemptIdMap.clear(); + this.appIdMap.clear(); super.serviceStop(); } @@ -145,13 +146,18 @@ public KillApplicationResponse call() throws Exception { * @param queueName queue of the application * @param submitter submitter name of the UAM * @param appNameSuffix application name suffix for the UAM + * @param keepContainersAcrossApplicationAttempts keep container flag for UAM + * recovery + * @see ApplicationSubmissionContext + * #setKeepContainersAcrossApplicationAttempts(boolean) * @return uamId for the UAM * @throws YarnException if registerApplicationMaster fails * @throws IOException if registerApplicationMaster fails */ public String createAndRegisterNewUAM( RegisterApplicationMasterRequest registerRequest, Configuration conf, - String queueName, String submitter, String appNameSuffix) + String queueName, String submitter, String appNameSuffix, + boolean keepContainersAcrossApplicationAttempts) throws YarnException, IOException { ApplicationId appId = null; ApplicationClientProtocol rmClient; @@ -173,45 +179,93 @@ public String createAndRegisterNewUAM( rmClient = null; } - createAndRegisterNewUAM(appId.toString(), registerRequest, conf, appId, - queueName, submitter, appNameSuffix); + // Launch the UAM in RM + launchUAM(appId.toString(), conf, appId, queueName, submitter, + appNameSuffix, keepContainersAcrossApplicationAttempts); + + // Register the UAM application + registerApplicationMaster(appId.toString(), registerRequest); + + // Returns the appId as uamId return appId.toString(); } /** - * Create a new UAM and register the application, using the provided uamId and - * appId. + * Launch a new UAM, using the provided uamId and appId. * - * @param uamId identifier for the UAM - * @param registerRequest RegisterApplicationMasterRequest + * @param uamId uam Id * @param conf configuration for this UAM * @param appId application id for the UAM * @param queueName queue of the application * @param submitter submitter name of the UAM * @param appNameSuffix application name suffix for the UAM - * @return RegisterApplicationMasterResponse - * @throws YarnException if registerApplicationMaster fails - * @throws IOException if registerApplicationMaster fails + * @param keepContainersAcrossApplicationAttempts keep container flag for UAM + * recovery + * @see ApplicationSubmissionContext + * #setKeepContainersAcrossApplicationAttempts(boolean) + * @return UAM token + * @throws YarnException if fails + * @throws IOException if fails */ - public RegisterApplicationMasterResponse createAndRegisterNewUAM(String uamId, - RegisterApplicationMasterRequest registerRequest, Configuration conf, + public Token launchUAM(String uamId, Configuration conf, + ApplicationId appId, String queueName, String submitter, + String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) + throws YarnException, IOException { + + if (this.unmanagedAppMasterMap.containsKey(uamId)) { + throw new YarnException("UAM " + uamId + " already exists"); + } + UnmanagedApplicationManager uam = createUAM(conf, appId, queueName, + submitter, appNameSuffix, keepContainersAcrossApplicationAttempts); + // Put the UAM into map first before initializing it to avoid additional UAM + // for the same uamId being created concurrently + this.unmanagedAppMasterMap.put(uamId, uam); + + Token amrmToken = null; + try { + LOG.info("Launching UAM id {} for application {}", uamId, appId); + amrmToken = uam.launchUAM(); + } catch (Exception e) { + // Add the map earlier and remove here if register failed because we want + // to make sure there is only one uam instance per uamId at any given time + this.unmanagedAppMasterMap.remove(uamId); + throw e; + } + + this.appIdMap.put(uamId, uam.getAppId()); + return amrmToken; + } + + /** + * Re-attach to an existing UAM, using the provided uamIdentifier. + * + * @param uamId uam Id + * @param conf configuration for this UAM + * @param appId application id for the UAM + * @param queueName queue of the application + * @param submitter submitter name of the UAM + * @param appNameSuffix application name suffix for the UAM + * @param uamToken UAM token + * @throws YarnException if fails + * @throws IOException if fails + */ + public void reAttachUAM(String uamId, Configuration conf, ApplicationId appId, String queueName, String submitter, - String appNameSuffix) throws YarnException, IOException { + String appNameSuffix, Token uamToken) + throws YarnException, IOException { if (this.unmanagedAppMasterMap.containsKey(uamId)) { throw new YarnException("UAM " + uamId + " already exists"); } UnmanagedApplicationManager uam = - createUAM(conf, appId, queueName, submitter, appNameSuffix); + createUAM(conf, appId, queueName, submitter, appNameSuffix, true); // Put the UAM into map first before initializing it to avoid additional UAM // for the same uamId being created concurrently this.unmanagedAppMasterMap.put(uamId, uam); - RegisterApplicationMasterResponse response = null; try { - LOG.info("Creating and registering UAM id {} for application {}", uamId, - appId); - response = uam.createAndRegisterApplicationMaster(registerRequest); + LOG.info("Reattaching UAM id {} for application {}", uamId, appId); + uam.reAttachUAM(uamToken); } catch (Exception e) { // Add the map earlier and remove here if register failed because we want // to make sure there is only one uam instance per uamId at any given time @@ -219,8 +273,7 @@ public RegisterApplicationMasterResponse createAndRegisterNewUAM(String uamId, throw e; } - this.attemptIdMap.put(uamId, uam.getAttemptId()); - return response; + this.appIdMap.put(uamId, uam.getAppId()); } /** @@ -231,20 +284,42 @@ public RegisterApplicationMasterResponse createAndRegisterNewUAM(String uamId, * @param queueName queue of the application * @param submitter submitter name of the application * @param appNameSuffix application name suffix + * @param keepContainersAcrossApplicationAttempts keep container flag for UAM * @return the UAM instance */ @VisibleForTesting protected UnmanagedApplicationManager createUAM(Configuration conf, ApplicationId appId, String queueName, String submitter, - String appNameSuffix) { + String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) { return new UnmanagedApplicationManager(conf, appId, queueName, submitter, - appNameSuffix); + appNameSuffix, keepContainersAcrossApplicationAttempts); + } + + /** + * Register application master for the UAM. + * + * @param uamId uam Id + * @param registerRequest RegisterApplicationMasterRequest + * @return register response + * @throws YarnException if register fails + * @throws IOException if register fails + */ + public RegisterApplicationMasterResponse registerApplicationMaster( + String uamId, RegisterApplicationMasterRequest registerRequest) + throws YarnException, IOException { + if (!this.unmanagedAppMasterMap.containsKey(uamId)) { + throw new YarnException("UAM " + uamId + " does not exist"); + } + LOG.info("Registering UAM id {} for application {}", uamId, + this.appIdMap.get(uamId)); + return this.unmanagedAppMasterMap.get(uamId) + .registerApplicationMaster(registerRequest); } /** * AllocateAsync to an UAM. * - * @param uamId identifier for the UAM + * @param uamId uam Id * @param request AllocateRequest * @param callback callback for response * @throws YarnException if allocate fails @@ -262,7 +337,7 @@ public void allocateAsync(String uamId, AllocateRequest request, /** * Finish an UAM/application. * - * @param uamId identifier for the UAM + * @param uamId uam Id * @param request FinishApplicationMasterRequest * @return FinishApplicationMasterResponse * @throws YarnException if finishApplicationMaster call fails @@ -274,14 +349,15 @@ public FinishApplicationMasterResponse finishApplicationMaster(String uamId, if (!this.unmanagedAppMasterMap.containsKey(uamId)) { throw new YarnException("UAM " + uamId + " does not exist"); } - LOG.info("Finishing application for UAM id {} ", uamId); + LOG.info("Finishing UAM id {} for application {}", uamId, + this.appIdMap.get(uamId)); FinishApplicationMasterResponse response = this.unmanagedAppMasterMap.get(uamId).finishApplicationMaster(request); if (response.getIsUnregistered()) { // Only remove the UAM when the unregister finished this.unmanagedAppMasterMap.remove(uamId); - this.attemptIdMap.remove(uamId); + this.appIdMap.remove(uamId); LOG.info("UAM id {} is unregistered", uamId); } return response; @@ -301,7 +377,7 @@ public FinishApplicationMasterResponse finishApplicationMaster(String uamId, /** * Return whether an UAM exists. * - * @param uamId identifier for the UAM + * @param uamId uam Id * @return UAM exists or not */ public boolean hasUAMId(String uamId) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java index 6531a75..5eb5bac8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java @@ -50,7 +50,9 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -90,7 +92,6 @@ private AMRequestHandlerThread handlerThread; private ApplicationMasterProtocol rmProxy; private ApplicationId applicationId; - private ApplicationAttemptId attemptId; private String submitter; private String appNameSuffix; private Configuration conf; @@ -101,9 +102,33 @@ private ApplicationClientProtocol rmClient; private long asyncApiPollIntervalMillis; private RecordFactory recordFactory; + private boolean keepContainersAcrossApplicationAttempts; + /* + * This flag is used as an indication that this method launchUAM/reAttachUAM + * is called (and perhaps blocked in initializeUnmanagedAM below due to RM + * connection/failover issue and not finished yet). Set the flag before + * calling the blocking call to RM. + */ + private boolean connectionInitiated; + + /** + * Constructor. + * + * @param conf configuration + * @param appId application Id to use for this UAM + * @param queueName the queue of the UAM + * @param submitter user name of the app + * @param appNameSuffix the app name suffix to use + * @param keepContainersAcrossApplicationAttempts keep container flag for UAM + * recovery + * @see ApplicationSubmissionContext + * #setKeepContainersAcrossApplicationAttempts(boolean) + * @return UAM token + */ public UnmanagedApplicationManager(Configuration conf, ApplicationId appId, - String queueName, String submitter, String appNameSuffix) { + String queueName, String submitter, String appNameSuffix, + boolean keepContainersAcrossApplicationAttempts) { Preconditions.checkNotNull(conf, "Configuration cannot be null"); Preconditions.checkNotNull(appId, "ApplicationId cannot be null"); Preconditions.checkNotNull(submitter, "App submitter cannot be null"); @@ -116,6 +141,7 @@ public UnmanagedApplicationManager(Configuration conf, ApplicationId appId, this.handlerThread = new AMRequestHandlerThread(); this.requestQueue = new LinkedBlockingQueue<>(); this.rmProxy = null; + this.connectionInitiated = false; this.registerRequest = null; this.recordFactory = RecordFactoryProvider.getRecordFactory(conf); this.asyncApiPollIntervalMillis = conf.getLong( @@ -123,45 +149,84 @@ public UnmanagedApplicationManager(Configuration conf, ApplicationId appId, YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS, YarnConfiguration. DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS); + this.keepContainersAcrossApplicationAttempts = + keepContainersAcrossApplicationAttempts; + } + + /** + * Launch a new UAM in the resource manager. + * + * @return identifier uam identifier + * @throws YarnException if fails + * @throws IOException if fails + */ + public Token launchUAM() + throws YarnException, IOException { + this.connectionInitiated = true; + + // Blocking call to RM + Token amrmToken = + initializeUnmanagedAM(this.applicationId); + + // Creates the UAM connection + createUAMProxy(amrmToken); + return amrmToken; + } + + /** + * Re-attach to an existing UAM in the resource manager. + * + * @param amrmToken the UAM token + * @throws IOException if re-attach fails + * @throws YarnException if re-attach fails + */ + public void reAttachUAM(Token amrmToken) + throws IOException, YarnException { + this.connectionInitiated = true; + + // Creates the UAM connection + createUAMProxy(amrmToken); + } + + protected void createUAMProxy(Token amrmToken) + throws IOException { + this.userUgi = UserGroupInformation.createProxyUser( + this.applicationId.toString(), UserGroupInformation.getCurrentUser()); + this.rmProxy = createRMProxy(ApplicationMasterProtocol.class, this.conf, + this.userUgi, amrmToken); } /** * Registers this {@link UnmanagedApplicationManager} with the resource * manager. * - * @param request the register request - * @return the register response + * @param request RegisterApplicationMasterRequest + * @return register response * @throws YarnException if register fails * @throws IOException if register fails */ - public RegisterApplicationMasterResponse createAndRegisterApplicationMaster( + public RegisterApplicationMasterResponse registerApplicationMaster( RegisterApplicationMasterRequest request) throws YarnException, IOException { - // This need to be done first in this method, because it is used as an - // indication that this method is called (and perhaps blocked due to RM - // connection and not finished yet) + // Save the register request for re-register later this.registerRequest = request; - // attemptId will be available after this call - UnmanagedAMIdentifier identifier = - initializeUnmanagedAM(this.applicationId); - - try { - this.userUgi = UserGroupInformation.createProxyUser( - identifier.getAttemptId().toString(), - UserGroupInformation.getCurrentUser()); - } catch (IOException e) { - LOG.error("Exception while trying to get current user", e); - throw new YarnRuntimeException(e); - } - - this.rmProxy = createRMProxy(ApplicationMasterProtocol.class, this.conf, - this.userUgi, identifier.getToken()); - - LOG.info("Registering the Unmanaged application master {}", this.attemptId); + // Since we have setKeepContainersAcrossApplicationAttempts = true for UAM. + // We do not expect application already registered exception here + LOG.info("Registering the Unmanaged application master {}", + this.applicationId); RegisterApplicationMasterResponse response = this.rmProxy.registerApplicationMaster(this.registerRequest); + for (Container container : response.getContainersFromPreviousAttempts()) { + LOG.info("RegisterUAM returned existing running container " + + container.getId()); + } + for (NMToken nmToken : response.getNMTokensFromPreviousAttempts()) { + LOG.info("RegisterUAM returned existing NM token for node " + + nmToken.getNodeId()); + } + // Only when register succeed that we start the heartbeat thread this.handlerThread.setUncaughtExceptionHandler( new HeartBeatThreadUncaughtExceptionHandler()); @@ -187,11 +252,11 @@ public FinishApplicationMasterResponse finishApplicationMaster( this.handlerThread.shutdown(); if (this.rmProxy == null) { - if (this.registerRequest != null) { - // This is possible if the async registerApplicationMaster is still + if (this.connectionInitiated) { + // This is possible if the async launchUAM is still // blocked and retrying. Return a dummy response in this case. LOG.warn("Unmanaged AM still not successfully launched/registered yet." - + " Stopping the UAM client thread anyways."); + + " Stopping the UAM heartbeat thread anyways."); return FinishApplicationMasterResponse.newInstance(false); } else { throw new YarnException("finishApplicationMaster should not " @@ -199,7 +264,7 @@ public FinishApplicationMasterResponse finishApplicationMaster( } } return AMRMClientUtils.finishAMWithReRegister(request, this.rmProxy, - this.registerRequest, this.attemptId); + this.registerRequest, this.applicationId); } /** @@ -212,7 +277,7 @@ public FinishApplicationMasterResponse finishApplicationMaster( public KillApplicationResponse forceKillApplication() throws IOException, YarnException { KillApplicationRequest request = - KillApplicationRequest.newInstance(this.attemptId.getApplicationId()); + KillApplicationRequest.newInstance(this.applicationId); this.handlerThread.shutdown(); @@ -240,29 +305,29 @@ public void allocateAsync(AllocateRequest request, LOG.debug("Interrupted while waiting to put on response queue", ex); } // Two possible cases why the UAM is not successfully registered yet: - // 1. registerApplicationMaster is not called at all. Should throw here. - // 2. registerApplicationMaster is called but hasn't successfully returned. + // 1. launchUAM is not called at all. Should throw here. + // 2. launchUAM is called but hasn't successfully returned. // // In case 2, we have already save the allocate request above, so if the // registration succeed later, no request is lost. if (this.rmProxy == null) { - if (this.registerRequest != null) { + if (this.connectionInitiated) { LOG.info("Unmanaged AM still not successfully launched/registered yet." + " Saving the allocate request and send later."); } else { throw new YarnException( - "AllocateAsync should not be called before createAndRegister"); + "AllocateAsync should not be called before launchUAM"); } } } /** - * Returns the application attempt id of the UAM. + * Returns the application id of the UAM. * - * @return attempt id of the UAM + * @return application id of the UAM */ - public ApplicationAttemptId getAttemptId() { - return this.attemptId; + public ApplicationId getAppId() { + return this.applicationId; } /** @@ -287,15 +352,15 @@ public ApplicationAttemptId getAttemptId() { * Launch and initialize an unmanaged AM. First, it creates a new application * on the RM and negotiates a new attempt id. Then it waits for the RM * application attempt state to reach YarnApplicationAttemptState.LAUNCHED - * after which it returns the AM-RM token and the attemptId. + * after which it returns the AM-RM token. * * @param appId application id - * @return the UAM identifier + * @return the UAM token * @throws IOException if initialize fails * @throws YarnException if initialize fails */ - protected UnmanagedAMIdentifier initializeUnmanagedAM(ApplicationId appId) - throws IOException, YarnException { + protected Token initializeUnmanagedAM( + ApplicationId appId) throws IOException, YarnException { try { UserGroupInformation appSubmitter = UserGroupInformation.createRemoteUser(this.submitter); @@ -306,13 +371,12 @@ protected UnmanagedAMIdentifier initializeUnmanagedAM(ApplicationId appId) submitUnmanagedApp(appId); // Monitor the application attempt to wait for launch state - ApplicationAttemptReport attemptReport = monitorCurrentAppAttempt(appId, + monitorCurrentAppAttempt(appId, EnumSet.of(YarnApplicationState.ACCEPTED, YarnApplicationState.RUNNING, YarnApplicationState.KILLED, YarnApplicationState.FAILED, YarnApplicationState.FINISHED), YarnApplicationAttemptState.LAUNCHED); - this.attemptId = attemptReport.getApplicationAttemptId(); - return getUAMIdentifier(); + return getUAMToken(); } finally { this.rmClient = null; } @@ -343,6 +407,8 @@ private void submitUnmanagedApp(ApplicationId appId) submitRequest.setApplicationSubmissionContext(context); context.setUnmanagedAM(true); + context.setKeepContainersAcrossApplicationAttempts( + this.keepContainersAcrossApplicationAttempts); LOG.info("Submitting unmanaged application {}", appId); this.rmClient.submitApplication(submitRequest); @@ -374,8 +440,10 @@ private ApplicationAttemptReport monitorCurrentAppAttempt(ApplicationId appId, if (appStates.contains(state)) { if (state != YarnApplicationState.ACCEPTED) { throw new YarnRuntimeException( - "Received non-accepted application state: " + state - + ". Application " + appId + " not the first attempt?"); + "Received non-accepted application state: " + state + " for " + + appId + ". This is likely because this is not the first " + + "app attempt in home sub-cluster, and AMRMProxy HA " + + "(yarn.nodemanager.amrmproxy.ha.enable) is not enabled."); } appAttemptId = getApplicationReport(appId).getCurrentApplicationAttemptId(); @@ -415,25 +483,25 @@ private ApplicationAttemptReport monitorCurrentAppAttempt(ApplicationId appId, } /** - * Gets the identifier of the unmanaged AM. + * Gets the amrmToken of the unmanaged AM. * - * @return the identifier of the unmanaged AM. + * @return the amrmToken of the unmanaged AM. * @throws IOException if getApplicationReport fails * @throws YarnException if getApplicationReport fails */ - protected UnmanagedAMIdentifier getUAMIdentifier() + protected Token getUAMToken() throws IOException, YarnException { Token token = null; org.apache.hadoop.yarn.api.records.Token amrmToken = - getApplicationReport(this.attemptId.getApplicationId()).getAMRMToken(); + getApplicationReport(this.applicationId).getAMRMToken(); if (amrmToken != null) { token = ConverterUtils.convertFromYarn(amrmToken, (Text) null); } else { LOG.warn( "AMRMToken not found in the application report for application: {}", - this.attemptId.getApplicationId()); + this.applicationId); } - return new UnmanagedAMIdentifier(this.attemptId, token); + return token; } private ApplicationReport getApplicationReport(ApplicationId appId) @@ -445,29 +513,6 @@ private ApplicationReport getApplicationReport(ApplicationId appId) } /** - * Data structure that encapsulates the application attempt identifier and the - * AMRMTokenIdentifier. Make it public because clients with HA need it. - */ - public static class UnmanagedAMIdentifier { - private ApplicationAttemptId attemptId; - private Token token; - - public UnmanagedAMIdentifier(ApplicationAttemptId attemptId, - Token token) { - this.attemptId = attemptId; - this.token = token; - } - - public ApplicationAttemptId getAttemptId() { - return this.attemptId; - } - - public Token getToken() { - return this.token; - } - } - - /** * Data structure that encapsulates AllocateRequest and AsyncCallback * instance. */ @@ -549,8 +594,10 @@ public void run() { } request.setResponseId(lastResponseId); + AllocateResponse response = AMRMClientUtils.allocateWithReRegister( - request, rmProxy, registerRequest, attemptId); + request, rmProxy, registerRequest, applicationId); + if (response == null) { throw new YarnException("Null allocateResponse from allocate"); } @@ -578,18 +625,17 @@ public void run() { LOG.debug("Interrupted while waiting for queue", ex); } } catch (IOException ex) { - LOG.warn( - "IO Error occurred while processing heart beat for " + attemptId, - ex); + LOG.warn("IO Error occurred while processing heart beat for " + + applicationId, ex); } catch (Throwable ex) { LOG.warn( - "Error occurred while processing heart beat for " + attemptId, + "Error occurred while processing heart beat for " + applicationId, ex); } } LOG.info("UnmanagedApplicationManager has been stopped for {}. " - + "AMRequestHandlerThread thread is exiting", attemptId); + + "AMRequestHandlerThread thread is exiting", applicationId); } } @@ -600,8 +646,8 @@ public void run() { implements UncaughtExceptionHandler { @Override public void uncaughtException(Thread t, Throwable e) { - LOG.error("Heartbeat thread {} for application attempt {} crashed!", - t.getName(), attemptId, e); + LOG.error("Heartbeat thread {} for application {} crashed!", + t.getName(), applicationId, e); } } } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java index 7993bd8..3cecdca 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java @@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; @@ -63,16 +63,16 @@ private AMRMClientUtils() { /** * Handle ApplicationNotRegistered exception and re-register. * - * @param attemptId app attemptId + * @param appId application Id * @param rmProxy RM proxy instance * @param registerRequest the AM re-register request * @throws YarnException if re-register fails */ public static void handleNotRegisteredExceptionAndReRegister( - ApplicationAttemptId attemptId, ApplicationMasterProtocol rmProxy, + ApplicationId appId, ApplicationMasterProtocol rmProxy, RegisterApplicationMasterRequest registerRequest) throws YarnException { LOG.info("App attempt {} not registered, most likely due to RM failover. " - + " Trying to re-register.", attemptId); + + " Trying to re-register.", appId); try { rmProxy.registerApplicationMaster(registerRequest); } catch (Exception e) { @@ -93,25 +93,24 @@ public static void handleNotRegisteredExceptionAndReRegister( * @param request allocate request * @param rmProxy RM proxy * @param registerRequest the register request for re-register - * @param attemptId application attempt id + * @param appId application id * @return allocate response * @throws YarnException if RM call fails * @throws IOException if RM call fails */ public static AllocateResponse allocateWithReRegister(AllocateRequest request, ApplicationMasterProtocol rmProxy, - RegisterApplicationMasterRequest registerRequest, - ApplicationAttemptId attemptId) throws YarnException, IOException { + RegisterApplicationMasterRequest registerRequest, ApplicationId appId) + throws YarnException, IOException { try { return rmProxy.allocate(request); } catch (ApplicationMasterNotRegisteredException e) { - handleNotRegisteredExceptionAndReRegister(attemptId, rmProxy, + handleNotRegisteredExceptionAndReRegister(appId, rmProxy, registerRequest); // reset responseId after re-register request.setResponseId(0); // retry allocate - return allocateWithReRegister(request, rmProxy, registerRequest, - attemptId); + return allocateWithReRegister(request, rmProxy, registerRequest, appId); } } @@ -123,23 +122,22 @@ public static AllocateResponse allocateWithReRegister(AllocateRequest request, * @param request finishApplicationMaster request * @param rmProxy RM proxy * @param registerRequest the register request for re-register - * @param attemptId application attempt id + * @param appId application id * @return finishApplicationMaster response * @throws YarnException if RM call fails * @throws IOException if RM call fails */ public static FinishApplicationMasterResponse finishAMWithReRegister( FinishApplicationMasterRequest request, ApplicationMasterProtocol rmProxy, - RegisterApplicationMasterRequest registerRequest, - ApplicationAttemptId attemptId) throws YarnException, IOException { + RegisterApplicationMasterRequest registerRequest, ApplicationId appId) + throws YarnException, IOException { try { return rmProxy.finishApplicationMaster(request); } catch (ApplicationMasterNotRegisteredException ex) { - handleNotRegisteredExceptionAndReRegister(attemptId, rmProxy, + handleNotRegisteredExceptionAndReRegister(appId, rmProxy, registerRequest); // retry finishAM after re-register - return finishAMWithReRegister(request, rmProxy, registerRequest, - attemptId); + return finishAMWithReRegister(request, rmProxy, registerRequest, appId); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java index 628c781..b5727aa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java @@ -111,6 +111,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NMToken; @@ -177,10 +178,9 @@ LoggerFactory.getLogger(MockResourceManagerFacade.class); private HashSet applicationMap = new HashSet<>(); - private HashMap> applicationContainerIdMap = - new HashMap>(); - private HashMap allocatedContainerMap = - new HashMap(); + private HashSet keepContainerOnUams = new HashSet<>(); + private HashMap> + applicationContainerIdMap = new HashMap<>(); private AtomicInteger containerIndex = new AtomicInteger(0); private Configuration conf; private int subClusterId; @@ -221,7 +221,7 @@ public void setRunningMode(boolean mode) { this.isRunning = mode; } - private static String getAppIdentifier() throws IOException { + private static ApplicationAttemptId getAppIdentifier() throws IOException { AMRMTokenIdentifier result = null; UserGroupInformation remoteUgi = UserGroupInformation.getCurrentUser(); Set tokenIds = remoteUgi.getTokenIdentifiers(); @@ -231,7 +231,8 @@ private static String getAppIdentifier() throws IOException { break; } } - return result != null ? result.getApplicationAttemptId().toString() : ""; + return result != null ? result.getApplicationAttemptId() + : ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0); } private void validateRunning() throws ConnectException { @@ -246,19 +247,32 @@ public RegisterApplicationMasterResponse registerApplicationMaster( throws YarnException, IOException { validateRunning(); - - String amrmToken = getAppIdentifier(); - LOG.info("Registering application attempt: " + amrmToken); + ApplicationAttemptId attemptId = getAppIdentifier(); + LOG.info("Registering application attempt: " + attemptId); shouldReRegisterNext = false; + List containersFromPreviousAttempt = null; + synchronized (applicationContainerIdMap) { - if (applicationContainerIdMap.containsKey(amrmToken)) { - throw new InvalidApplicationMasterRequestException( - AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE); + if (applicationContainerIdMap.containsKey(attemptId)) { + if (keepContainerOnUams.contains(attemptId.getApplicationId())) { + // For UAM with the keepContainersFromPreviousAttempt flag, return all + // running containers + containersFromPreviousAttempt = new ArrayList<>(); + for (ContainerId containerId : applicationContainerIdMap + .get(attemptId)) { + containersFromPreviousAttempt.add(Container.newInstance(containerId, + null, null, null, null, null)); + } + } else { + throw new InvalidApplicationMasterRequestException( + AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE); + } + } else { + // Keep track of the containers that are returned to this application + applicationContainerIdMap.put(attemptId, new ArrayList()); } - // Keep track of the containers that are returned to this application - applicationContainerIdMap.put(amrmToken, new ArrayList()); } // Make sure we wait for certain test cases last in the method @@ -278,7 +292,7 @@ public RegisterApplicationMasterResponse registerApplicationMaster( } return RegisterApplicationMasterResponse.newInstance(null, null, null, null, - null, request.getHost(), null); + containersFromPreviousAttempt, request.getHost(), null); } @Override @@ -288,8 +302,8 @@ public FinishApplicationMasterResponse finishApplicationMaster( validateRunning(); - String amrmToken = getAppIdentifier(); - LOG.info("Finishing application attempt: " + amrmToken); + ApplicationAttemptId attemptId = getAppIdentifier(); + LOG.info("Finishing application attempt: " + attemptId); if (shouldReRegisterNext) { String message = "AM is not registered, should re-register."; @@ -299,12 +313,9 @@ public FinishApplicationMasterResponse finishApplicationMaster( synchronized (applicationContainerIdMap) { // Remove the containers that were being tracked for this application - Assert.assertTrue("The application id is NOT registered: " + amrmToken, - applicationContainerIdMap.containsKey(amrmToken)); - List ids = applicationContainerIdMap.remove(amrmToken); - for (ContainerId c : ids) { - allocatedContainerMap.remove(c); - } + Assert.assertTrue("The application id is NOT registered: " + attemptId, + applicationContainerIdMap.containsKey(attemptId)); + applicationContainerIdMap.remove(attemptId); } return FinishApplicationMasterResponse.newInstance( @@ -334,8 +345,8 @@ public AllocateResponse allocate(AllocateRequest request) + "askList and releaseList in the same heartbeat"); } - String amrmToken = getAppIdentifier(); - LOG.info("Allocate from application attempt: " + amrmToken); + ApplicationAttemptId attemptId = getAppIdentifier(); + LOG.info("Allocate from application attempt: " + attemptId); if (shouldReRegisterNext) { String message = "AM is not registered, should re-register."; @@ -367,16 +378,16 @@ public AllocateResponse allocate(AllocateRequest request) // will need it in future Assert.assertTrue( "The application id is Not registered before allocate(): " - + amrmToken, - applicationContainerIdMap.containsKey(amrmToken)); - List ids = applicationContainerIdMap.get(amrmToken); + + attemptId, + applicationContainerIdMap.containsKey(attemptId)); + List ids = applicationContainerIdMap.get(attemptId); ids.add(containerId); - this.allocatedContainerMap.put(containerId, container); } } } } + List completedList = new ArrayList<>(); if (request.getReleaseList() != null && request.getReleaseList().size() > 0) { LOG.info("Releasing containers: " + request.getReleaseList().size()); @@ -384,9 +395,9 @@ public AllocateResponse allocate(AllocateRequest request) Assert .assertTrue( "The application id is not registered before allocate(): " - + amrmToken, - applicationContainerIdMap.containsKey(amrmToken)); - List ids = applicationContainerIdMap.get(amrmToken); + + attemptId, + applicationContainerIdMap.containsKey(attemptId)); + List ids = applicationContainerIdMap.get(attemptId); for (ContainerId id : request.getReleaseList()) { boolean found = false; @@ -402,18 +413,8 @@ public AllocateResponse allocate(AllocateRequest request) + conf.get("AMRMTOKEN"), found); ids.remove(id); - - // Return the released container back to the AM with new fake Ids. The - // test case does not care about the IDs. The IDs are faked because - // otherwise the LRM will throw duplication identifier exception. This - // returning of fake containers is ONLY done for testing purpose - for - // the test code to get confirmation that the sub-cluster resource - // managers received the release request - ContainerId fakeContainerId = ContainerId.newInstance( - getApplicationAttemptId(1), containerIndex.incrementAndGet()); - Container fakeContainer = allocatedContainerMap.get(id); - fakeContainer.setId(fakeContainerId); - containerList.add(fakeContainer); + completedList.add( + ContainerStatus.newInstance(id, ContainerState.COMPLETE, "", 0)); } } } @@ -424,9 +425,9 @@ public AllocateResponse allocate(AllocateRequest request) // Always issue a new AMRMToken as if RM rolled master key Token newAMRMToken = Token.newInstance(new byte[0], "", new byte[0], ""); - return AllocateResponse.newInstance(0, new ArrayList(), - containerList, new ArrayList(), null, AMCommand.AM_RESYNC, - 1, null, new ArrayList(), newAMRMToken, + return AllocateResponse.newInstance(0, completedList, containerList, + new ArrayList(), null, AMCommand.AM_RESYNC, 1, null, + new ArrayList(), newAMRMToken, new ArrayList()); } @@ -443,6 +444,7 @@ public GetApplicationReportResponse getApplicationReport( report.setApplicationId(request.getApplicationId()); report.setCurrentApplicationAttemptId( ApplicationAttemptId.newInstance(request.getApplicationId(), 1)); + report.setAMRMToken(Token.newInstance(new byte[0], "", new byte[0], "")); response.setApplicationReport(report); return response; } @@ -486,6 +488,12 @@ public SubmitApplicationResponse submitApplication( } LOG.info("Application submitted: " + appId); applicationMap.add(appId); + + if (request.getApplicationSubmissionContext().getUnmanagedAM() + || request.getApplicationSubmissionContext() + .getKeepContainersAcrossApplicationAttempts()) { + keepContainerOnUams.add(appId); + } return SubmitApplicationResponse.newInstance(); } @@ -502,6 +510,7 @@ public KillApplicationResponse forceKillApplication( throw new ApplicationNotFoundException( "Trying to kill an absent application: " + appId); } + keepContainerOnUams.remove(appId); } LOG.info("Force killing application: " + appId); return KillApplicationResponse.newInstance(true); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationRegistryClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationRegistryClient.java new file mode 100644 index 0000000..42be851 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationRegistryClient.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.utils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.registry.client.impl.FSRegistryOperationsService; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Unit test for FederationRegistryClient. + */ +public class TestFederationRegistryClient { + private Configuration conf; + private UserGroupInformation user; + private RegistryOperations registry; + private FederationRegistryClient registryClient; + + @Before + public void setup() throws Exception { + this.conf = new YarnConfiguration(); + + this.registry = new FSRegistryOperationsService(); + this.registry.init(this.conf); + this.registry.start(); + + this.user = UserGroupInformation.getCurrentUser(); + this.registryClient = + new FederationRegistryClient(this.conf, this.registry, this.user); + this.registryClient.cleanAllApplications(); + Assert.assertEquals(0, this.registryClient.getAllApplications().size()); + } + + @After + public void breakDown() { + registryClient.cleanAllApplications(); + Assert.assertEquals(0, registryClient.getAllApplications().size()); + registry.stop(); + } + + @Test + public void testBasicCase() { + ApplicationId appId = ApplicationId.newInstance(0, 0); + String scId1 = "subcluster1"; + String scId2 = "subcluster2"; + + this.registryClient.writeAMRMTokenForUAM(appId, scId1, + new Token()); + this.registryClient.writeAMRMTokenForUAM(appId, scId2, + new Token()); + // Duplicate entry, should overwrite + this.registryClient.writeAMRMTokenForUAM(appId, scId1, + new Token()); + + Assert.assertEquals(1, this.registryClient.getAllApplications().size()); + Assert.assertEquals(2, + this.registryClient.loadStateFromRegistry(appId).size()); + + this.registryClient.removeAppFromRegistry(appId); + + Assert.assertEquals(0, this.registryClient.getAllApplications().size()); + Assert.assertEquals(0, + this.registryClient.loadStateFromRegistry(appId).size()); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java index 9159cf7..5848d3f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java @@ -65,7 +65,7 @@ public void setup() { ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1); uam = new TestableUnmanagedApplicationManager(conf, - attemptId.getApplicationId(), null, "submitter", "appNameSuffix"); + attemptId.getApplicationId(), null, "submitter", "appNameSuffix", true); } protected void waitForCallBackCountAndCheckZeroPending( @@ -88,7 +88,8 @@ protected void waitForCallBackCountAndCheckZeroPending( public void testBasicUsage() throws YarnException, IOException, InterruptedException { - createAndRegisterApplicationMaster( + launchUAM(attemptId); + registerApplicationMaster( RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback, @@ -102,11 +103,48 @@ public void testBasicUsage() attemptId); } + /* + * Test re-attaching of an existing UAM. This is for HA of UAM client. + */ + @Test(timeout = 5000) + public void testUAMReAttach() + throws YarnException, IOException, InterruptedException { + + launchUAM(attemptId); + registerApplicationMaster( + RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); + + allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback, + attemptId); + // Wait for outstanding async allocate callback + waitForCallBackCountAndCheckZeroPending(callback, 1); + + MockResourceManagerFacade rmProxy = uam.getRMProxy(); + uam = new TestableUnmanagedApplicationManager(conf, + attemptId.getApplicationId(), null, "submitter", "appNameSuffix", true); + uam.setRMProxy(rmProxy); + + reAttachUAM(null, attemptId); + registerApplicationMaster( + RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); + + allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback, + attemptId); + + // Wait for outstanding async allocate callback + waitForCallBackCountAndCheckZeroPending(callback, 2); + + finishApplicationMaster( + FinishApplicationMasterRequest.newInstance(null, null, null), + attemptId); + } + @Test(timeout = 5000) public void testReRegister() throws YarnException, IOException, InterruptedException { - createAndRegisterApplicationMaster( + launchUAM(attemptId); + registerApplicationMaster( RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); uam.setShouldReRegisterNext(); @@ -137,7 +175,8 @@ public void testSlowRegisterCall() @Override public void run() { try { - createAndRegisterApplicationMaster( + launchUAM(attemptId); + registerApplicationMaster( RegisterApplicationMasterRequest.newInstance(null, 1001, null), attemptId); } catch (Exception e) { @@ -221,7 +260,8 @@ public void testFinishWithoutRegister() @Test public void testForceKill() throws YarnException, IOException, InterruptedException { - createAndRegisterApplicationMaster( + launchUAM(attemptId); + registerApplicationMaster( RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); uam.forceKillApplication(); @@ -241,19 +281,40 @@ protected UserGroupInformation getUGIWithToken( return ugi; } - protected RegisterApplicationMasterResponse - createAndRegisterApplicationMaster( - final RegisterApplicationMasterRequest request, - ApplicationAttemptId appAttemptId) - throws YarnException, IOException, InterruptedException { + protected Token launchUAM( + ApplicationAttemptId appAttemptId) + throws IOException, InterruptedException { + return getUGIWithToken(appAttemptId) + .doAs(new PrivilegedExceptionAction>() { + @Override + public Token run() throws Exception { + return uam.launchUAM(); + } + }); + } + + protected void reAttachUAM(final Token uamToken, + ApplicationAttemptId appAttemptId) + throws IOException, InterruptedException { + getUGIWithToken(appAttemptId).doAs(new PrivilegedExceptionAction() { + @Override + public Token run() throws Exception { + uam.reAttachUAM(uamToken); + return null; + } + }); + } + + protected RegisterApplicationMasterResponse registerApplicationMaster( + final RegisterApplicationMasterRequest request, + ApplicationAttemptId appAttemptId) + throws YarnException, IOException, InterruptedException { return getUGIWithToken(appAttemptId).doAs( new PrivilegedExceptionAction() { @Override public RegisterApplicationMasterResponse run() throws YarnException, IOException { - RegisterApplicationMasterResponse response = - uam.createAndRegisterApplicationMaster(request); - return response; + return uam.registerApplicationMaster(request); } }); } @@ -311,8 +372,9 @@ public void callback(AllocateResponse response) { public TestableUnmanagedApplicationManager(Configuration conf, ApplicationId appId, String queueName, String submitter, - String appNameSuffix) { - super(conf, appId, queueName, submitter, appNameSuffix); + String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) { + super(conf, appId, queueName, submitter, appNameSuffix, + keepContainersAcrossApplicationAttempts); } @SuppressWarnings("unchecked") @@ -330,6 +392,14 @@ public void setShouldReRegisterNext() { rmProxy.setShouldReRegisterNext(); } } + + public MockResourceManagerFacade getRMProxy() { + return rmProxy; + } + + public void setRMProxy(MockResourceManagerFacade proxy) { + this.rmProxy = proxy; + } } } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java index c355a8b..92afcb7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; @@ -67,4 +69,18 @@ */ Context getNMCotext(); + /** + * Gets the credentials of this application. + * + * @return the credentials. + */ + Credentials getCredentials(); + + /** + * Gets the registry client. + * + * @return the registry. + */ + RegistryOperations getRegistryClient(); + } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java index 9938b37..8a02095 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java @@ -22,6 +22,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -42,6 +44,8 @@ private Integer localTokenKeyId; private Token amrmToken; private Token localToken; + private Credentials credentials; + private RegistryOperations registry; /** * Create an instance of the AMRMProxyApplicationContext. @@ -52,17 +56,23 @@ * @param user user name of the application * @param amrmToken amrmToken issued by RM * @param localToken amrmToken issued by AMRMProxy + * @param credentials application credentials + * @param registry Yarn Registry client */ - public AMRMProxyApplicationContextImpl(Context nmContext, - Configuration conf, ApplicationAttemptId applicationAttemptId, - String user, Token amrmToken, - Token localToken) { + @SuppressWarnings("checkstyle:parameternumber") + public AMRMProxyApplicationContextImpl(Context nmContext, Configuration conf, + ApplicationAttemptId applicationAttemptId, String user, + Token amrmToken, + Token localToken, Credentials credentials, + RegistryOperations registry) { this.nmContext = nmContext; this.conf = conf; this.applicationAttemptId = applicationAttemptId; this.user = user; this.amrmToken = amrmToken; this.localToken = localToken; + this.credentials = credentials; + this.registry = registry; } @Override @@ -88,11 +98,14 @@ public String getUser() { /** * Sets the application's AMRMToken. * - * @param amrmToken amrmToken issued by RM + * @param amrmToken the new amrmToken from RM + * @return whether the saved token is updated to a different value */ - public synchronized void setAMRMToken( + public synchronized boolean setAMRMToken( Token amrmToken) { + Token oldValue = this.amrmToken; this.amrmToken = amrmToken; + return !this.amrmToken.equals(oldValue); } @Override @@ -134,4 +147,14 @@ public synchronized int getLocalAMRMTokenKeyId() { public Context getNMCotext() { return nmContext; } + + @Override + public Credentials getCredentials() { + return this.credentials; + } + + @Override + public RegistryOperations getRegistryClient() { + return this.registry; + } } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java index d63b2cf..ebd85bf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java @@ -34,12 +34,13 @@ import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.registry.client.api.RegistryOperations; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; @@ -60,15 +61,19 @@ import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.api.ContainerType; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState; import org.apache.hadoop.yarn.server.nodemanager.scheduler.DistributedScheduler; import org.apache.hadoop.yarn.server.security.MasterKeyData; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,7 +87,7 @@ * pipeline is a chain of interceptor instances that can inspect and modify the * request/response as needed. */ -public class AMRMProxyService extends AbstractService implements +public class AMRMProxyService extends CompositeService implements ApplicationMasterProtocol { private static final Logger LOG = LoggerFactory .getLogger(AMRMProxyService.class); @@ -96,6 +101,7 @@ private InetSocketAddress listenerEndpoint; private AMRMProxyTokenSecretManager secretManager; private Map applPipelineMap; + private RegistryOperations registry; /** * Creates an instance of the service. @@ -118,10 +124,23 @@ public AMRMProxyService(Context nmContext, AsyncDispatcher dispatcher) { @Override protected void serviceInit(Configuration conf) throws Exception { - super.serviceInit(conf); this.secretManager = new AMRMProxyTokenSecretManager(this.nmContext.getNMStateStore()); this.secretManager.init(conf); + + // Both second app attempt and NM restart within Federation need registry + if (conf.getBoolean(YarnConfiguration.AMRM_PROXY_HA_ENABLED, + YarnConfiguration.DEFAULT_AMRM_PROXY_HA_ENABLED) + || conf.getBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, + YarnConfiguration.DEFAULT_NM_RECOVERY_ENABLED)) { + this.registry = FederationStateStoreFacade.createInstance(conf, + YarnConfiguration.YARN_REGISTRY_CLASS, + YarnConfiguration.DEFAULT_YARN_REGISTRY_CLASS, + RegistryOperations.class); + addService(this.registry); + } + + super.serviceInit(conf); } @Override @@ -203,6 +222,8 @@ public void recover() throws IOException { amrmToken = new Token<>(); amrmToken.decodeFromUrlString( new String(contextEntry.getValue(), "UTF-8")); + // Clear the service field, as if RM just issued the token + amrmToken.setService(new Text()); } } @@ -214,12 +235,36 @@ public void recover() throws IOException { throw new IOException("No user found for app attempt " + attemptId); } + // Regenerate the local AMRMToken for the AM Token localToken = this.secretManager.createAndGetAMRMToken(attemptId); + // Retrieve the AM container credentials from NM context + Credentials amCred = null; + for (Container container : this.nmContext.getContainers().values()) { + LOG.debug("From NM Context container " + container.getContainerId()); + if (container.getContainerId().getApplicationAttemptId().equals( + attemptId) && container.getContainerTokenIdentifier() != null) { + LOG.debug("Container type " + + container.getContainerTokenIdentifier().getContainerType()); + if (container.getContainerTokenIdentifier() + .getContainerType() == ContainerType.APPLICATION_MASTER) { + LOG.info("AM container {} found in context, has credentials: {}", + container.getContainerId(), + (container.getCredentials() != null)); + amCred = container.getCredentials(); + } + } + } + if (amCred == null) { + LOG.error("No credentials found for AM container of {}. " + + "Yarn registry access might not work", attemptId); + } + + // Create the intercepter pipeline for the AM initializePipeline(attemptId, user, amrmToken, localToken, - entry.getValue(), true); - } catch (Exception e) { + entry.getValue(), true, amCred); + } catch (IOException e) { LOG.error("Exception when recovering " + attemptId + ", removing it from NMStateStore and move on", e); this.nmContext.getNMStateStore().removeAMRMProxyAppContext(attemptId); @@ -326,7 +371,7 @@ public void processApplicationStartRequest(StartContainerRequest request) initializePipeline(appAttemptId, containerTokenIdentifierForKey.getApplicationSubmitter(), amrmToken, - localToken, null, false); + localToken, null, false, credentials); } /** @@ -342,7 +387,8 @@ public void processApplicationStartRequest(StartContainerRequest request) protected void initializePipeline(ApplicationAttemptId applicationAttemptId, String user, Token amrmToken, Token localToken, - Map recoveredDataMap, boolean isRecovery) { + Map recoveredDataMap, boolean isRecovery, + Credentials credentials) { RequestInterceptorChainWrapper chainWrapper = null; synchronized (applPipelineMap) { if (applPipelineMap @@ -404,8 +450,9 @@ protected void initializePipeline(ApplicationAttemptId applicationAttemptId, try { RequestInterceptor interceptorChain = this.createRequestInterceptorChain(); - interceptorChain.init(createApplicationMasterContext(this.nmContext, - applicationAttemptId, user, amrmToken, localToken)); + interceptorChain.init( + createApplicationMasterContext(this.nmContext, applicationAttemptId, + user, amrmToken, localToken, credentials, this.registry)); if (isRecovery) { if (recoveredDataMap == null) { throw new YarnRuntimeException( @@ -497,14 +544,12 @@ private void updateAMRMTokens(AMRMTokenIdentifier amrmTokenIdentifier, allocateResponse.setAMRMToken(null); org.apache.hadoop.security.token.Token newToken = - new org.apache.hadoop.security.token.Token( - token.getIdentifier().array(), token.getPassword().array(), - new Text(token.getKind()), new Text(token.getService())); - - context.setAMRMToken(newToken); + ConverterUtils.convertFromYarn(token, (Text) null); - // Update the AMRMToken in context map in NM state store - if (this.nmContext.getNMStateStore() != null) { + // Update the AMRMToken in context map, and in NM state store if it is + // different + if (context.setAMRMToken(newToken) + && this.nmContext.getNMStateStore() != null) { try { this.nmContext.getNMStateStore().storeAMRMProxyAppContextEntry( context.getApplicationAttemptId(), NMSS_AMRMTOKEN_KEY, @@ -547,10 +592,12 @@ private void updateAMRMTokens(AMRMTokenIdentifier amrmTokenIdentifier, private AMRMProxyApplicationContext createApplicationMasterContext( Context context, ApplicationAttemptId applicationAttemptId, String user, Token amrmToken, - Token localToken) { + Token localToken, Credentials credentials, + RegistryOperations registryImpl) { AMRMProxyApplicationContextImpl appContext = new AMRMProxyApplicationContextImpl(context, getConfig(), - applicationAttemptId, user, amrmToken, localToken); + applicationAttemptId, user, amrmToken, localToken, credentials, + registryImpl); return appContext; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java index 33cfca3..ef5e061 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java @@ -34,6 +34,8 @@ import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; @@ -42,6 +44,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -56,17 +59,20 @@ import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager; import org.apache.hadoop.yarn.server.utils.AMRMClientUtils; import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; import org.apache.hadoop.yarn.util.AsyncCallback; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -145,6 +151,8 @@ */ private UserGroupInformation appOwner; + private FederationRegistryClient registryClient; + /** * Creates an instance of the FederationInterceptor class. */ @@ -179,6 +187,10 @@ public void init(AMRMProxyApplicationContext appContext) { } catch (Exception ex) { throw new YarnRuntimeException(ex); } + // Add all app tokens for Yarn Registry access + if (this.registryClient != null && appContext.getCredentials() != null) { + this.appOwner.addCredentials(appContext.getCredentials()); + } this.homeSubClusterId = SubClusterId.newInstance(YarnConfiguration.getClusterId(conf)); @@ -192,6 +204,11 @@ public void init(AMRMProxyApplicationContext appContext) { this.uamPool.init(conf); this.uamPool.start(); + + if (appContext.getRegistryClient() != null) { + this.registryClient = new FederationRegistryClient(conf, + appContext.getRegistryClient(), this.appOwner); + } } /** @@ -250,20 +267,27 @@ public void init(AMRMProxyApplicationContext appContext) { */ this.amRegistrationResponse = this.homeRM.registerApplicationMaster(request); + if (this.amRegistrationResponse + .getContainersFromPreviousAttempts() != null) { + cacheAllocatedContainers( + this.amRegistrationResponse.getContainersFromPreviousAttempts(), + this.homeSubClusterId); + } + + ApplicationId appId = + getApplicationContext().getApplicationAttemptId().getApplicationId(); + reAttachUAMAndMergeRegisterResponse(this.amRegistrationResponse, appId); // the queue this application belongs will be used for getting // AMRMProxy policy from state store. String queue = this.amRegistrationResponse.getQueue(); if (queue == null) { - LOG.warn("Received null queue for application " - + getApplicationContext().getApplicationAttemptId().getApplicationId() - + " from home sub-cluster. Will use default queue name " + LOG.warn("Received null queue for application " + appId + + " from home subcluster. Will use default queue name " + YarnConfiguration.DEFAULT_QUEUE_NAME + " for getting AMRMProxyPolicy"); } else { - LOG.info("Application " - + getApplicationContext().getApplicationAttemptId().getApplicationId() - + " belongs to queue " + queue); + LOG.info("Application " + appId + " belongs to queue " + queue); } // Initialize the AMRMProxyPolicy @@ -304,7 +328,7 @@ public AllocateResponse allocate(AllocateRequest request) AllocateResponse homeResponse = AMRMClientUtils.allocateWithReRegister( requests.get(this.homeSubClusterId), this.homeRM, this.amRegistrationRequest, - getApplicationContext().getApplicationAttemptId()); + getApplicationContext().getApplicationAttemptId().getApplicationId()); // Notify policy of home response try { @@ -393,8 +417,8 @@ public FinishApplicationMasterResponseInfo call() throws Exception { // request to the home resource manager on this thread. FinishApplicationMasterResponse homeResponse = AMRMClientUtils.finishAMWithReRegister(request, this.homeRM, - this.amRegistrationRequest, - getApplicationContext().getApplicationAttemptId()); + this.amRegistrationRequest, getApplicationContext() + .getApplicationAttemptId().getApplicationId()); if (subClusterIds.size() > 0) { // Wait for other sub-cluster resource managers to return the @@ -425,6 +449,14 @@ public FinishApplicationMasterResponseInfo call() throws Exception { if (failedToUnRegister) { homeResponse.setIsUnregistered(false); + } else { + // Clean up UAMs only when the app finishes successfully, so that no more + // attempt will be launched. + this.uamPool.stop(); + if (this.registryClient != null) { + this.registryClient.removeAppFromRegistry(getApplicationContext() + .getApplicationAttemptId().getApplicationId()); + } } return homeResponse; } @@ -442,9 +474,8 @@ public void setNextInterceptor(RequestInterceptor next) { */ @Override public void shutdown() { - if (this.uamPool != null) { - this.uamPool.stop(); - } + // Do not stop uamPool service and kill UAMs here because of possible second + // app attempt if (threadpool != null) { try { threadpool.shutdown(); @@ -456,6 +487,16 @@ public void shutdown() { } /** + * Only for unit test cleanup. + */ + @VisibleForTesting + protected void cleanupRegistry() { + if (this.registryClient != null) { + this.registryClient.cleanAllApplications(); + } + } + + /** * Create the UAM pool manager for secondary sub-clsuters. For unit test to * override. * @@ -486,6 +527,120 @@ protected ApplicationMasterProtocol createHomeRMProxy( } } + private void mergeRegisterResponse( + RegisterApplicationMasterResponse homeResponse, + RegisterApplicationMasterResponse otherResponse) { + + if (!isNullOrEmpty(otherResponse.getContainersFromPreviousAttempts())) { + if (!isNullOrEmpty(homeResponse.getContainersFromPreviousAttempts())) { + homeResponse.getContainersFromPreviousAttempts() + .addAll(otherResponse.getContainersFromPreviousAttempts()); + } else { + homeResponse.setContainersFromPreviousAttempts( + otherResponse.getContainersFromPreviousAttempts()); + } + } + + if (!isNullOrEmpty(otherResponse.getNMTokensFromPreviousAttempts())) { + if (!isNullOrEmpty(homeResponse.getNMTokensFromPreviousAttempts())) { + homeResponse.getNMTokensFromPreviousAttempts() + .addAll(otherResponse.getNMTokensFromPreviousAttempts()); + } else { + homeResponse.setNMTokensFromPreviousAttempts( + otherResponse.getNMTokensFromPreviousAttempts()); + } + } + } + + /** + * Try re-attach to all existing and running UAMs in secondary sub-clusters + * launched by previous application attempts if any. All running containers in + * the UAMs will be combined into the registerResponse. For the first attempt, + * the registry will be empty for this application and thus no-op here. + */ + protected void reAttachUAMAndMergeRegisterResponse( + RegisterApplicationMasterResponse homeResponse, + final ApplicationId appId) { + + if (this.registryClient == null) { + // Both AMRMProxy HA and NM work preserving restart is not enabled + LOG.warn("registryClient is null, skip attaching existing UAM if any"); + return; + } + + // Load existing running UAMs from the previous attempts from + // registry, if any + Map> uamMap = + this.registryClient.loadStateFromRegistry(appId); + if (uamMap.size() == 0) { + LOG.info("No existing UAM for application {} found in Yarn Registry", + appId); + return; + } + LOG.info("Found {} existing UAMs for application {} in Yarn Registry. " + + "Reattaching in parallel", uamMap.size(), appId); + + ExecutorCompletionService + completionService = new ExecutorCompletionService<>(threadpool); + + for (Entry> entry : uamMap.entrySet()) { + final SubClusterId subClusterId = + SubClusterId.newInstance(entry.getKey()); + final Token amrmToken = entry.getValue(); + + completionService + .submit(new Callable() { + @Override + public RegisterApplicationMasterResponse call() throws Exception { + RegisterApplicationMasterResponse response = null; + try { + // Create a config loaded with federation on and subclusterId + // for each UAM + YarnConfiguration config = new YarnConfiguration(getConf()); + FederationProxyProviderUtil.updateConfForFederation(config, + subClusterId.getId()); + + uamPool.reAttachUAM(subClusterId.getId(), config, appId, + amRegistrationResponse.getQueue(), + getApplicationContext().getUser(), homeSubClusterId.getId(), + amrmToken); + + response = uamPool.registerApplicationMaster( + subClusterId.getId(), amRegistrationRequest); + + if (response != null + && response.getContainersFromPreviousAttempts() != null) { + cacheAllocatedContainers( + response.getContainersFromPreviousAttempts(), + subClusterId); + } + LOG.info("UAM {} reattached for {}", subClusterId, appId); + } catch (Throwable e) { + LOG.error( + "Reattaching UAM " + subClusterId + " failed for " + appId, + e); + } + return response; + } + }); + } + + // Wait for the re-attach responses + for (int i = 0; i < uamMap.size(); i++) { + try { + Future future = + completionService.take(); + RegisterApplicationMasterResponse registerResponse = future.get(); + if (registerResponse != null) { + LOG.info("Merging register response for {}", appId); + mergeRegisterResponse(homeResponse, registerResponse); + } + } catch (Exception e) { + LOG.warn("Reattaching UAM failed for ApplicationId: " + appId, e); + } + } + } + private SubClusterId getSubClusterForNode(String nodeName) { SubClusterId subClusterId = null; try { @@ -655,6 +810,20 @@ public void callback(AllocateResponse response) { responses.add(response); } + // Save the new AMRMToken for the UAM in registry if present + if (response.getAMRMToken() != null) { + Token newToken = ConverterUtils + .convertFromYarn(response.getAMRMToken(), (Text) null); + // Update the token in registry + if (registryClient != null) { + registryClient + .writeAMRMTokenForUAM( + getApplicationContext().getApplicationAttemptId() + .getApplicationId(), + subClusterId.getId(), newToken); + } + } + // Notify policy of secondary sub-cluster responses try { policyInterpreter.notifyOfResponse(subClusterId, response); @@ -714,20 +883,23 @@ public RegisterApplicationMasterResponseInfo call() subClusterId); RegisterApplicationMasterResponse uamResponse = null; + Token token = null; try { // For appNameSuffix, use subClusterId of the home sub-cluster - uamResponse = uamPool.createAndRegisterNewUAM(subClusterId, - registerRequest, config, + token = uamPool.launchUAM(subClusterId, config, appContext.getApplicationAttemptId().getApplicationId(), amRegistrationResponse.getQueue(), appContext.getUser(), - homeSubClusterId.toString()); + homeSubClusterId.toString(), registryClient != null); + + uamResponse = uamPool.registerApplicationMaster(subClusterId, + registerRequest); } catch (Throwable e) { LOG.error("Failed to register application master: " + subClusterId + " Application: " + appContext.getApplicationAttemptId(), e); } return new RegisterApplicationMasterResponseInfo(uamResponse, - SubClusterId.newInstance(subClusterId)); + SubClusterId.newInstance(subClusterId), token); } }); } @@ -752,6 +924,14 @@ public RegisterApplicationMasterResponseInfo call() + getApplicationContext().getApplicationAttemptId()); successfulRegistrations.put(uamResponse.getSubClusterId(), uamResponse.getResponse()); + + if (registryClient != null) { + registryClient.writeAMRMTokenForUAM( + getApplicationContext().getApplicationAttemptId() + .getApplicationId(), + uamResponse.getSubClusterId().getId(), + uamResponse.getUamToken()); + } } } catch (Exception e) { LOG.warn("Failed to register unmanaged application master: " @@ -1087,11 +1267,14 @@ public int getUnmanagedAMPoolSize() { private static class RegisterApplicationMasterResponseInfo { private RegisterApplicationMasterResponse response; private SubClusterId subClusterId; + private Token uamToken; RegisterApplicationMasterResponseInfo( - RegisterApplicationMasterResponse response, SubClusterId subClusterId) { + RegisterApplicationMasterResponse response, SubClusterId subClusterId, + Token uamToken) { this.response = response; this.subClusterId = subClusterId; + this.uamToken = uamToken; } public RegisterApplicationMasterResponse getResponse() { @@ -1101,6 +1284,10 @@ public RegisterApplicationMasterResponse getResponse() { public SubClusterId getSubClusterId() { return subClusterId; } + + public Token getUamToken() { + return uamToken; + } } /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 831ba0b..44bfc68 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -353,10 +353,6 @@ private void recover() throws IOException, URISyntaxException { rsrcLocalizationSrvc.recoverLocalizedResources( stateStore.loadLocalizationState()); - if (this.amrmProxyEnabled) { - this.getAMRMProxyService().recover(); - } - RecoveredApplicationsState appsState = stateStore.loadApplicationsState(); for (ContainerManagerApplicationProto proto : appsState.getApplications()) { @@ -373,6 +369,11 @@ private void recover() throws IOException, URISyntaxException { recoverContainer(rcs); } + // Recovery AMRMProxy state after apps and containers are recovered + if (this.amrmProxyEnabled) { + this.getAMRMProxyService().recover(); + } + //Dispatching the RECOVERY_COMPLETED event through the dispatcher //so that all the paused, scheduled and queued containers will //be scheduled for execution on availability of resources. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index 3c57496..da1d047 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; @@ -56,10 +57,10 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; -import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.util.Records; import org.junit.After; @@ -179,6 +180,15 @@ protected Context createContext() { return new NMContext(null, null, null, null, stateStore, false, this.conf); } + protected List getCompletedContainerIds( + List containerStatus) { + List ret = new ArrayList<>(); + for (ContainerStatus status : containerStatus) { + ret.add(status.getContainerId()); + } + return ret; + } + /** * This helper method will invoke the specified function in parallel for each * end point in the specified list using a thread pool and return the @@ -623,7 +633,7 @@ protected void serviceStart() throws Exception { */ public void initApp(ApplicationAttemptId applicationId, String user) { super.initializePipeline(applicationId, user, - new Token(), null, null, false); + new Token(), null, null, false, null); } public void stopApp(ApplicationId applicationId) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java index 937ede5..b955311 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java @@ -444,7 +444,7 @@ public void testMultipleAttemptsSameNode() applicationAttemptId = ApplicationAttemptId.newInstance(appId, 2); getAMRMProxyService().initializePipeline(applicationAttemptId, user, - new Token(), null, null, false); + new Token(), null, null, false, null); RequestInterceptorChainWrapper chain2 = getAMRMProxyService().getPipelines().get(appId); @@ -531,16 +531,14 @@ private void releaseContainersAndAssert(int appId, "new AMRMToken from RM should have been nulled by AMRMProxyService", allocateResponse.getAMRMToken()); - // The way the mock resource manager is setup, it will return the containers - // that were released in the response. This is done because the UAMs run - // asynchronously and we need to if all the resource managers received the - // release it. The containers sent by the mock resource managers will be + // We need to make sure all the resource managers received the + // release list. The containers sent by the mock resource managers will be // aggregated and returned back to us and we can assert if all the release // lists reached the sub-clusters - List containersForReleasedContainerIds = - new ArrayList(); - containersForReleasedContainerIds.addAll(allocateResponse - .getAllocatedContainers()); + List containersForReleasedContainerIds = new ArrayList<>(); + List newlyFinished = getCompletedContainerIds( + allocateResponse.getCompletedContainersStatuses()); + containersForReleasedContainerIds.addAll(newlyFinished); // Send max 10 heart beats to receive all the containers. If not, we will // fail the test @@ -554,8 +552,9 @@ private void releaseContainersAndAssert(int appId, "new AMRMToken from RM should have been nulled by AMRMProxyService", allocateResponse.getAMRMToken()); - containersForReleasedContainerIds.addAll(allocateResponse - .getAllocatedContainers()); + newlyFinished = getCompletedContainerIds( + allocateResponse.getCompletedContainersStatuses()); + containersForReleasedContainerIds.addAll(newlyFinished); LOG.info("Number of containers received in this request: " + Integer.toString(allocateResponse.getAllocatedContainers() diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java index 3db0e35..aa7ed69 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java @@ -19,16 +19,20 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.registry.client.impl.FSRegistryOperationsService; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; @@ -59,6 +63,10 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; import org.junit.Test; @@ -79,7 +87,10 @@ private TestableFederationInterceptor interceptor; private MemoryFederationStateStore stateStore; + private NMStateStoreService nmStateStore; + private RegistryOperations registry; + private Context nmContext; private int testAppId; private ApplicationAttemptId attemptId; @@ -93,15 +104,28 @@ public void setUp() throws IOException { FederationStateStoreFacade.getInstance().reinitialize(stateStore, getConf()); + nmStateStore = new NMMemoryStateStoreService(); + nmStateStore.init(getConf()); + nmStateStore.start(); + + registry = new FSRegistryOperationsService(); + registry.init(getConf()); + registry.start(); + testAppId = 1; attemptId = getApplicationAttemptId(testAppId); - interceptor.init(new AMRMProxyApplicationContextImpl(null, getConf(), - attemptId, "test-user", null, null)); + nmContext = + new NMContext(null, null, null, null, nmStateStore, false, getConf()); + interceptor.init(new AMRMProxyApplicationContextImpl(nmContext, getConf(), + attemptId, "test-user", null, null, null, registry)); + interceptor.cleanupRegistry(); } @Override public void tearDown() { + interceptor.cleanupRegistry(); interceptor.shutdown(); + registry.stop(); super.tearDown(); } @@ -207,18 +231,17 @@ private void releaseContainersAndAssert(List containers) AllocateResponse allocateResponse = interceptor.allocate(allocateRequest); Assert.assertNotNull(allocateResponse); - // The way the mock resource manager is setup, it will return the containers - // that were released in the allocated containers. The release request will - // be split and handled by the corresponding UAM. The release containers - // returned by the mock resource managers will be aggregated and returned - // back to us and we can check if total request size and returned size are - // the same - List containersForReleasedContainerIds = - new ArrayList(); - containersForReleasedContainerIds - .addAll(allocateResponse.getAllocatedContainers()); + // The release request will be split and handled by the corresponding UAM. + // The release containers returned by the mock resource managers will be + // aggregated and returned back to us and we can check if total request size + // and returned size are the same + List containersForReleasedContainerIds = + new ArrayList(); + List newlyFinished = getCompletedContainerIds( + allocateResponse.getCompletedContainersStatuses()); + containersForReleasedContainerIds.addAll(newlyFinished); LOG.info("Number of containers received in the original request: " - + Integer.toString(allocateResponse.getAllocatedContainers().size())); + + Integer.toString(newlyFinished.size())); // Send max 10 heart beats to receive all the containers. If not, we will // fail the test @@ -228,11 +251,12 @@ private void releaseContainersAndAssert(List containers) allocateResponse = interceptor.allocate(Records.newRecord(AllocateRequest.class)); Assert.assertNotNull(allocateResponse); - containersForReleasedContainerIds - .addAll(allocateResponse.getAllocatedContainers()); + newlyFinished = getCompletedContainerIds( + allocateResponse.getCompletedContainersStatuses()); + containersForReleasedContainerIds.addAll(newlyFinished); LOG.info("Number of containers received in this request: " - + Integer.toString(allocateResponse.getAllocatedContainers().size())); + + Integer.toString(newlyFinished.size())); LOG.info("Total number of containers received: " + Integer.toString(containersForReleasedContainerIds.size())); Thread.sleep(10); @@ -547,4 +571,74 @@ public void testAllocateResponse() throws Exception { Assert.assertEquals(1, response.getUpdatedContainers().size()); Assert.assertEquals(1, response.getUpdateErrors().size()); } + + @Test + public void testSecondAttempt() throws Exception { + ApplicationUserInfo userInfo = getApplicationUserInfo(testAppId); + userInfo.getUser().doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + // Register the application + RegisterApplicationMasterRequest registerReq = + Records.newRecord(RegisterApplicationMasterRequest.class); + registerReq.setHost(Integer.toString(testAppId)); + registerReq.setRpcPort(testAppId); + registerReq.setTrackingUrl(""); + + RegisterApplicationMasterResponse registerResponse = + interceptor.registerApplicationMaster(registerReq); + Assert.assertNotNull(registerResponse); + + Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); + + // Allocate one batch of containers + registerSubCluster(SubClusterId.newInstance("SC-1")); + registerSubCluster(SubClusterId.newInstance(HOME_SC_ID)); + + int numberOfContainers = 3; + List containers = + getContainersAndAssert(numberOfContainers, numberOfContainers * 2); + for (Container c : containers) { + System.out.println(c.getId() + " ha"); + } + Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); + + // Preserve the mock RM instances for secondaries + ConcurrentHashMap secondaries = + interceptor.getSecondaryRMs(); + + // Increase the attemptId and create a new intercepter instance for it + attemptId = ApplicationAttemptId.newInstance( + attemptId.getApplicationId(), attemptId.getAttemptId() + 1); + + interceptor = new TestableFederationInterceptor(null, secondaries); + interceptor.init(new AMRMProxyApplicationContextImpl(nmContext, + getConf(), attemptId, "test-user", null, null, null, registry)); + registerResponse = interceptor.registerApplicationMaster(registerReq); + + // Should re-attach secondaries and get the three running containers + Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); + Assert.assertEquals(numberOfContainers, + registerResponse.getContainersFromPreviousAttempts().size()); + + // Release all containers + releaseContainersAndAssert( + registerResponse.getContainersFromPreviousAttempts()); + + // Finish the application + FinishApplicationMasterRequest finishReq = + Records.newRecord(FinishApplicationMasterRequest.class); + finishReq.setDiagnostics(""); + finishReq.setTrackingUrl(""); + finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); + + FinishApplicationMasterResponse finshResponse = + interceptor.finishApplicationMaster(finishReq); + Assert.assertNotNull(finshResponse); + Assert.assertEquals(true, finshResponse.getIsUnregistered()); + return null; + } + }); + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java index d4b8735..23c80ae 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java @@ -44,6 +44,15 @@ private AtomicInteger runningIndex = new AtomicInteger(0); private MockResourceManagerFacade mockRm; + public TestableFederationInterceptor() { + } + + public TestableFederationInterceptor(MockResourceManagerFacade homeRM, + ConcurrentHashMap secondaries) { + mockRm = homeRM; + secondaryResourceManagers = secondaries; + } + @Override protected UnmanagedAMPoolManager createUnmanagedAMPoolManager( ExecutorService threadPool) { @@ -68,7 +77,7 @@ protected ApplicationMasterProtocol createHomeRMProxy( // We create one instance of the mock resource manager per sub cluster. Keep // track of the instances of the RMs in the map keyed by the sub cluster id synchronized (this.secondaryResourceManagers) { - if (this.secondaryResourceManagers.contains(subClusterId)) { + if (this.secondaryResourceManagers.containsKey(subClusterId)) { return (T) this.secondaryResourceManagers.get(subClusterId); } else { // The running index here is used to simulate different RM_EPOCH to @@ -91,6 +100,15 @@ protected void setShouldReRegisterNext() { } } + protected MockResourceManagerFacade getHomeRM() { + return mockRm; + } + + protected ConcurrentHashMap + getSecondaryRMs() { + return secondaryResourceManagers; + } + /** * Extends the UnmanagedAMPoolManager and overrides methods to provide a * testable implementation of UnmanagedAMPoolManager. @@ -104,9 +122,9 @@ public TestableUnmanagedAMPoolManager(ExecutorService threadpool) { @Override public UnmanagedApplicationManager createUAM(Configuration conf, ApplicationId appId, String queueName, String submitter, - String appNameSuffix) { + String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) { return new TestableUnmanagedApplicationManager(conf, appId, queueName, - submitter, appNameSuffix); + submitter, appNameSuffix, keepContainersAcrossApplicationAttempts); } } @@ -119,8 +137,9 @@ public UnmanagedApplicationManager createUAM(Configuration conf, public TestableUnmanagedApplicationManager(Configuration conf, ApplicationId appId, String queueName, String submitter, - String appNameSuffix) { - super(conf, appId, queueName, submitter, appNameSuffix); + String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) { + super(conf, appId, queueName, submitter, appNameSuffix, + keepContainersAcrossApplicationAttempts); } /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index 7a225c8..8245fd6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -36,6 +36,7 @@ import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.net.ServerSocketUtil; import org.apache.hadoop.service.AbstractService; @@ -959,9 +960,10 @@ public ShortCircuitedAMRMProxy(Context context, protected void initializePipeline(ApplicationAttemptId applicationAttemptId, String user, Token amrmToken, Token localToken, - Map recoveredDataMap, boolean isRecovery) { + Map recoveredDataMap, boolean isRecovery, + Credentials credentials) { super.initializePipeline(applicationAttemptId, user, amrmToken, - localToken, recoveredDataMap, isRecovery); + localToken, recoveredDataMap, isRecovery, credentials); RequestInterceptor rt = getPipelines() .get(applicationAttemptId.getApplicationId()).getRootInterceptor(); // The DefaultRequestInterceptor will generally be the last diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md index ef0f713..4f80156 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md @@ -141,6 +141,8 @@ The figure shows a sequence diagram for the following job execution flow: b. The security tokens are also modified by the NM when launching the AM, so that the AM can only talk with the AMRMProxy. Any future communication from AM to the YARN RM is mediated by the AMRMProxy. 7. The AM will then request containers using the locality information exposed by HDFS. 8. Based on a policy the AMRMProxy can impersonate the AM on other sub-clusters, by submitting an Unmanaged AM, and by forwarding the AM heartbeats to relevant sub-clusters. + a. Federation supports multiple application attempts with AMRMProxy HA. AM containers will have different attempt id in home sub-cluster, but the same Unmanaged AM in secondaries will be used across attempts. + b. When AMRMProxy HA is enabled, UAM token will be stored in Yarn Registry. In the registerApplicationMaster call of each application attempt, AMRMProxy will go fetch existing UAM tokens from registry (if any) and re-attached to the existing UAMs. 9. The AMRMProxy will use both locality information and a pluggable policy configured in the state-store to decide whether to forward the resource requests received by the AM to the Home RM or to one (or more) Secondary RMs. In Figure 1, we show the case in which the AMRMProxy decides to forward the request to the secondary RM. 10. The secondary RM will provide the AMRMProxy with valid container tokens to start a new container on some node in its sub-cluster. This mechanism ensures that each sub-cluster uses its own security tokens and avoids the need for a cluster wide shared secret to create tokens. 11. The AMRMProxy forwards the allocation response back to the AM. @@ -262,16 +264,17 @@ These are extra configurations that should appear in the **conf/yarn-site.xml** | Property | Example | Description | |:---- |:---- | -| `yarn.nodemanager.amrmproxy.enabled` | `true` | Whether or not the AMRMProxy is enabled. -|`yarn.nodemanager.amrmproxy.interceptor-class.pipeline` | `org.apache.hadoop.yarn.server.nodemanager.amrmproxy.FederationInterceptor` | A comma-separated list of interceptors to be run at the amrmproxy. For federation the last step in the pipeline should be the FederationInterceptor. +| `yarn.nodemanager.amrmproxy.enabled` | `true` | Whether or not the AMRMProxy is enabled. | +| `yarn.nodemanager.amrmproxy.interceptor-class.pipeline` | `org.apache.hadoop.yarn.server.nodemanager.amrmproxy.FederationInterceptor` | A comma-separated list of interceptors to be run at the amrmproxy. For federation the last step in the pipeline should be the FederationInterceptor. | | `yarn.client.failover-proxy-provider` | `org.apache.hadoop.yarn.server.federation.failover.FederationRMFailoverProxyProvider` | The class used to connect to the RMs by looking up the membership information in federation state-store. This must be set if federation is enabled, even if RM HA is not enabled.| Optional: | Property | Example | Description | |:---- |:---- | -|`yarn.federation.statestore.max-connections` | `1` | The maximum number of parallel connections from each AMRMProxy to the state-store. This value is typically lower than the router one, since we have many AMRMProxy that could burn-through many DB connections quickly. | -|`yarn.federation.cache-ttl.secs` | `300` | The time to leave for the AMRMProxy cache. Typically larger than at the router, as the number of AMRMProxy is large, and we want to limit the load to the centralized state-store. | +| `yarn.nodemanager.amrmproxy.ha.enable` | `true` | Whether or not the AMRMProxy HA is enabled for multiple application attempt suppport. | +| `yarn.federation.statestore.max-connections` | `1` | The maximum number of parallel connections from each AMRMProxy to the state-store. This value is typically lower than the router one, since we have many AMRMProxy that could burn-through many DB connections quickly. | +| `yarn.federation.cache-ttl.secs` | `300` | The time to leave for the AMRMProxy cache. Typically larger than at the router, as the number of AMRMProxy is large, and we want to limit the load to the centralized state-store. | Running a Sample Job --------------------