diff --git hadoop-project/pom.xml hadoop-project/pom.xml
index e471283..9bab4fd 100755
--- hadoop-project/pom.xml
+++ hadoop-project/pom.xml
@@ -326,6 +326,13 @@
org.apache.hadoop
+ hadoop-yarn-server-common
+ ${project.version}
+ test-jar
+
+
+
+ org.apache.hadoop
hadoop-yarn-server-tests
${project.version}
test-jar
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java
index e3f9155..8c78ec6 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java
@@ -32,7 +32,6 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
-import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
import org.apache.hadoop.yarn.server.federation.failover.FederationRMFailoverProxyProvider;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
@@ -43,6 +42,7 @@
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -100,9 +100,9 @@ public void testFederationRMFailoverProxyProvider() throws Exception {
// Transition rm3 to active;
makeRMActive(subClusterId, cluster, 2);
- ApplicationClientProtocol client = FederationProxyProviderUtil
- .createRMProxy(conf, ApplicationClientProtocol.class, subClusterId,
- UserGroupInformation.getCurrentUser());
+ ApplicationClientProtocol client = YarnServerSecurityUtils.createRMProxy(
+ conf, ApplicationClientProtocol.class, subClusterId,
+ UserGroupInformation.getCurrentUser());
// client will retry until the rm becomes active.
GetClusterMetricsResponse response =
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AsyncCallback.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AsyncCallback.java
new file mode 100644
index 0000000..b4f75c9
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AsyncCallback.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.util;
+
+/**
+ * Generic interface that can be used for calling back when a corresponding
+ * asynchronous operation completes.
+ *
+ * @param parameter type for the callback
+ */
+public interface AsyncCallback {
+ /**
+ * This method is called back when the corresponding asynchronous operation
+ * completes.
+ *
+ * @param response response of the callback
+ */
+ void callback(T response);
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
index ae284ce..e7cdc3d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
@@ -206,6 +206,17 @@
+ maven-jar-plugin
+
+
+
+ test-jar
+
+ test-compile
+
+
+
+
org.apache.rat
apache-rat-plugin
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java
deleted file mode 100644
index 18f1338..0000000
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.federation.failover;
-
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceAudience.Public;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.security.SaslRpcServer;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.client.ClientRMProxy;
-import org.apache.hadoop.yarn.client.RMFailoverProxyProvider;
-import org.apache.hadoop.yarn.conf.HAUtil;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Utility class that creates proxy for specified protocols when federation is
- * enabled. The class creates a federation aware failover provider, i.e. the
- * failover provider uses the {@code FederationStateStore} to determine the
- * current active ResourceManager
- */
-@Private
-@Unstable
-public final class FederationProxyProviderUtil {
-
- public static final Logger LOG =
- LoggerFactory.getLogger(FederationProxyProviderUtil.class);
-
- /**
- * Create a proxy for the specified protocol. For non-HA, this is a direct
- * connection to the ResourceManager address. When HA is enabled, the proxy
- * handles the failover between the ResourceManagers as well.
- *
- * @param configuration Configuration to generate {@link ClientRMProxy}
- * @param protocol Protocol for the proxy
- * @param subClusterId the unique identifier or the sub-cluster
- * @param user the user on whose behalf the proxy is being created
- * @param Type information of the proxy
- * @return Proxy to the RM
- * @throws IOException on failure
- */
- @Public
- @Unstable
- public static T createRMProxy(Configuration configuration,
- final Class protocol, SubClusterId subClusterId,
- UserGroupInformation user) throws IOException {
- return createRMProxy(configuration, protocol, subClusterId, user, null);
- }
-
- /**
- * Create a proxy for the specified protocol. For non-HA, this is a direct
- * connection to the ResourceManager address. When HA is enabled, the proxy
- * handles the failover between the ResourceManagers as well.
- *
- * @param configuration Configuration to generate {@link ClientRMProxy}
- * @param protocol Protocol for the proxy
- * @param subClusterId the unique identifier or the sub-cluster
- * @param user the user on whose behalf the proxy is being created
- * @param token the auth token to use for connection
- * @param Type information of the proxy
- * @return Proxy to the RM
- * @throws IOException on failure
- */
- @Public
- @Unstable
- @SuppressWarnings("unchecked")
- public static T createRMProxy(final Configuration configuration,
- final Class protocol, SubClusterId subClusterId,
- UserGroupInformation user, final Token token) throws IOException {
- try {
- final YarnConfiguration conf = new YarnConfiguration(configuration);
- updateConf(conf, subClusterId);
- if (token != null) {
- LOG.info(
- "Creating RMProxy with a token: {} to subcluster: {}"
- + " for protocol: {}",
- token, subClusterId, protocol.getSimpleName());
- user.addToken(token);
- setAuthModeInConf(conf);
- } else {
- LOG.info("Creating RMProxy without a token to subcluster: {}"
- + " for protocol: {}", subClusterId, protocol.getSimpleName());
- }
- final T proxyConnection = user.doAs(new PrivilegedExceptionAction() {
- @Override
- public T run() throws Exception {
- return ClientRMProxy.createRMProxy(conf, protocol);
- }
- });
-
- return proxyConnection;
- } catch (IOException e) {
- String message =
- "Error while creating of RM application master service proxy for"
- + " appAttemptId: " + user;
- LOG.info(message);
- throw new YarnRuntimeException(message, e);
- } catch (InterruptedException e) {
- throw new YarnRuntimeException(e);
- }
- }
-
- private static void setAuthModeInConf(Configuration conf) {
- conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
- SaslRpcServer.AuthMethod.TOKEN.toString());
- }
-
- // updating the conf with the refreshed RM addresses as proxy creations
- // are based out of conf
- private static void updateConf(Configuration conf,
- SubClusterId subClusterId) {
- conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId.getId());
- // In a Federation setting, we will connect to not just the local cluster RM
- // but also multiple external RMs. The membership information of all the RMs
- // that are currently
- // participating in Federation is available in the central
- // FederationStateStore.
- // So we will:
- // 1. obtain the RM service addresses from FederationStateStore using the
- // FederationRMFailoverProxyProvider.
- // 2. disable traditional HA as that depends on local configuration lookup
- // for RMs using indexes.
- // 3. we will enable federation failover IF traditional HA is enabled so
- // that the appropriate failover RetryPolicy is initialized.
- conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
- conf.setClass(YarnConfiguration.CLIENT_FAILOVER_PROXY_PROVIDER,
- FederationRMFailoverProxyProvider.class, RMFailoverProxyProvider.class);
- if (HAUtil.isHAEnabled(conf)) {
- conf.setBoolean(YarnConfiguration.FEDERATION_FAILOVER_ENABLED, true);
- conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, false);
- }
- }
-
- // disable instantiation
- private FederationProxyProviderUtil() {
- }
-
-}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
new file mode 100644
index 0000000..e68e3c1
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.uam;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.AsyncCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A service that manages a pool of UAMs in {@link UnmanagedApplicationManager}.
+ */
+public class UnmanagedAMPoolManager extends AbstractService {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(UnmanagedAMPoolManager.class);
+
+ // Map from uamId to UAM instance
+ private Map unmanagedAppMasterMap;
+
+ private Map attemptIdMap;
+
+ private ExecutorService threadpool;
+
+ public UnmanagedAMPoolManager(ExecutorService threadpool) {
+ super(UnmanagedAMPoolManager.class.getName());
+ this.threadpool = threadpool;
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ super.serviceStart();
+ this.unmanagedAppMasterMap =
+ new ConcurrentHashMap();
+ this.attemptIdMap = new ConcurrentHashMap();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ super.serviceStop();
+
+ ExecutorCompletionService completionService =
+ new ExecutorCompletionService(this.threadpool);
+
+ // Normally we should finish all application before stop. If there are
+ // still UAMs running, force kill all of them.
+ synchronized (this.unmanagedAppMasterMap) {
+ if (!this.unmanagedAppMasterMap.isEmpty()) {
+ int mapSize = this.unmanagedAppMasterMap.size();
+ LOG.warn("Abnormal shutdown of UAMPoolManager, still {} UAMs in map",
+ mapSize);
+
+ for (final Map.Entry entry : this.unmanagedAppMasterMap
+ .entrySet()) {
+ completionService.submit(new Callable() {
+ @Override
+ public KillApplicationResponse call() throws Exception {
+ try {
+ LOG.info("Force-killing UAM id " + entry.getKey()
+ + " for application " + attemptIdMap.get(entry.getKey()));
+ return entry.getValue().forceKillApplication();
+ } catch (Exception e) {
+ LOG.error("Failed to kill unmanaged application master", e);
+ return null;
+ }
+ }
+ });
+ }
+
+ for (int i = 0; i < mapSize; ++i) {
+ try {
+ Future future = completionService.take();
+ future.get();
+ } catch (Exception e) {
+ LOG.error("Failed to kill unmanaged application master", e);
+ }
+ }
+ }
+ }
+
+ this.unmanagedAppMasterMap.clear();
+ this.attemptIdMap.clear();
+ }
+
+ /**
+ * Creates the UAM instance. Pull out to make unit test easy.
+ *
+ * @param conf Configuration
+ * @param attemptId application attempt id
+ * @param submitter submitter name of the application
+ * @param appNameSuffix application name suffix
+ * @return the UAM instance
+ */
+ public UnmanagedApplicationManager createUAM(Configuration conf,
+ ApplicationAttemptId attemptId, String submitter, String appNameSuffix) {
+ return new UnmanagedApplicationManager(conf, attemptId, submitter,
+ appNameSuffix);
+ }
+
+ /**
+ * Create a new UAM and register the application
+ *
+ * @param uamId identifier for the UAM
+ * @param registerRequest RegisterApplicationMasterRequest
+ * @param conf configuration for this UAM
+ * @param attemptId application attempt it for this UAM
+ * @param submitter submitter name of the UAM
+ * @param appNameSuffix application name suffix for the UAM
+ * @return RegisterApplicationMasterResponse
+ * @throws YarnException if registerApplicationMaster fails
+ * @throws IOException if registerApplicationMaster fails
+ */
+ public RegisterApplicationMasterResponse createAndRegisterNewUAM(String uamId,
+ RegisterApplicationMasterRequest registerRequest, Configuration conf,
+ ApplicationAttemptId attemptId, String submitter, String appNameSuffix)
+ throws YarnException, IOException {
+
+ if (this.unmanagedAppMasterMap.containsKey(uamId)) {
+ throw new YarnException("UAM " + uamId + " already exists");
+ }
+ UnmanagedApplicationManager uam =
+ createUAM(conf, attemptId, submitter, appNameSuffix);
+ this.unmanagedAppMasterMap.put(uamId, uam);
+ this.attemptIdMap.put(uamId, attemptId);
+
+ try {
+ LOG.info("Register application for UAM id {} and application {}", uamId,
+ attemptId);
+ return uam.createAndRegisterApplicationMaster(registerRequest);
+ } catch (Exception e) {
+ // Add the map earlier and remove here if register failed because we want
+ // to make sure there is only one uam instance per uamId at any given time
+ this.unmanagedAppMasterMap.remove(uamId);
+ this.attemptIdMap.remove(uamId);
+ throw e;
+ }
+ }
+
+ /**
+ * AllocateAsync to an UAM
+ *
+ * @param uamId identifier for the UAM
+ * @param request AllocateRequest
+ * @param callback callback for response
+ * @throws YarnException if allocate fails
+ * @throws IOException if allocate fails
+ */
+ public void allocateAsync(String uamId, AllocateRequest request,
+ AsyncCallback callback)
+ throws YarnException, IOException {
+ if (!this.unmanagedAppMasterMap.containsKey(uamId)) {
+ throw new YarnException("UAM " + uamId + " does not exist");
+ }
+ UnmanagedApplicationManager uam = this.unmanagedAppMasterMap.get(uamId);
+ uam.allocateAsync(request, callback);
+ }
+
+ /**
+ * Finish an UAM/application
+ *
+ * @param uamId identifier for the UAM
+ * @param request FinishApplicationMasterRequest
+ * @return FinishApplicationMasterResponse
+ * @throws YarnException if finishApplicationMaster call fails
+ * @throws IOException if finishApplicationMaster call fails
+ */
+ public FinishApplicationMasterResponse finishApplicationMaster(String uamId,
+ FinishApplicationMasterRequest request)
+ throws YarnException, IOException {
+ if (!this.unmanagedAppMasterMap.containsKey(uamId)) {
+ throw new YarnException("UAM " + uamId + " does not exist");
+ }
+ LOG.info("Finishing application for UAM id {} ", uamId);
+ UnmanagedApplicationManager uam = this.unmanagedAppMasterMap.get(uamId);
+ FinishApplicationMasterResponse response =
+ uam.finishApplicationMaster(request);
+
+ if (response.getIsUnregistered()) {
+ // Only remove the UAM when the unregister finished
+ this.unmanagedAppMasterMap.remove(uamId);
+ this.attemptIdMap.remove(uamId);
+ LOG.info("UAM id {} is unregistered", uamId);
+ }
+ return response;
+ }
+
+ /**
+ * Get the id of all running UAMs.
+ *
+ * @return uamId set
+ */
+ public Set getAllUAMIds() {
+ // Return a clone of the current id set for concurrency reasons
+ return new HashSet(this.unmanagedAppMasterMap.keySet());
+ }
+
+ /**
+ * Return whether an UAM exists.
+ *
+ * @param uamId identifier for the UAM
+ * @return UAM exists or not
+ */
+ public boolean hasUAMId(String uamId) {
+ return this.unmanagedAppMasterMap.containsKey(uamId);
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
new file mode 100644
index 0000000..ae01fab
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
@@ -0,0 +1,694 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.uam;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
+import org.apache.hadoop.yarn.util.AsyncCallback;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+/**
+ * UnmanagedApplicationManager is used to register unmanaged application and
+ * negotiate for resources from resource managers. Allocate calls are handled
+ * asynchronously using {@link AsyncCallback}.
+ */
+public class UnmanagedApplicationManager {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(UnmanagedApplicationManager.class);
+
+ private volatile boolean keepRunning;
+ private BlockingQueue requestQueue;
+ private CallbackHandlerThread handlerThread;
+ private ApplicationMasterProtocol rmProxy;
+ private ApplicationAttemptId attemptId;
+ private String submitter;
+ private String appNameSuffix;
+ private Configuration conf;
+ private String queueName;
+ private UserGroupInformation userUgi;
+ private RegisterApplicationMasterRequest registerRequest;
+ private int lastResponseId;
+
+ public UnmanagedApplicationManager(Configuration conf,
+ ApplicationAttemptId attemptId, String submitter, String appNameSuffix) {
+ Preconditions.checkNotNull(conf, "Configuration cannot be null");
+ Preconditions.checkNotNull(attemptId,
+ "ApplicationAttemptId cannot be null");
+ Preconditions.checkNotNull(submitter, "App submitter cannot be null");
+
+ this.conf = conf;
+ this.attemptId = attemptId;
+ this.submitter = submitter;
+ this.appNameSuffix = appNameSuffix;
+ this.handlerThread = new CallbackHandlerThread();
+ this.requestQueue = new LinkedBlockingQueue();
+ this.keepRunning = true;
+ this.queueName = null;
+ this.rmProxy = null;
+ this.registerRequest = null;
+ }
+
+ /**
+ * Registers this {@link UnmanagedApplicationManager} with the resource
+ * manager.
+ *
+ * @param request the register request
+ * @return the register response
+ * @throws YarnException if register fails
+ * @throws IOException if register fails
+ */
+ public RegisterApplicationMasterResponse createAndRegisterApplicationMaster(
+ RegisterApplicationMasterRequest request)
+ throws YarnException, IOException {
+ // This need to be done first in this method, because it is used as an
+ // indication that this method is called (and perhaps blocked due to RM
+ // connection and not finished yet)
+ this.registerRequest = request;
+
+ UnmanagedAMLauncher launcher = new UnmanagedAMLauncher(this.conf,
+ this.queueName, this.attemptId, this.submitter);
+ UnmanagedAMIdentifier identifier = launcher.initializeUnmanagedAM();
+
+ try {
+ this.userUgi = UserGroupInformation.createProxyUser(
+ identifier.getAttemptId().toString(),
+ UserGroupInformation.getCurrentUser());
+ } catch (IOException e) {
+ LOG.error("Exception while trying to get current user", e);
+ throw new YarnRuntimeException(e);
+ }
+
+ this.rmProxy = createRMProxy(ApplicationMasterProtocol.class, this.conf,
+ this.userUgi, identifier.getToken());
+
+ LOG.info("Registering the Unmanaged application master {} with RM",
+ this.attemptId);
+ RegisterApplicationMasterResponse response =
+ this.rmProxy.registerApplicationMaster(this.registerRequest);
+
+ // Only when register succeed that we start the heartbeat thread
+ this.handlerThread.setDaemon(true);
+ this.handlerThread.start();
+
+ this.lastResponseId = 0;
+ return response;
+ }
+
+ /**
+ * Unregisters from the resource manager and stops the request handler thread.
+ *
+ * @param request the finishApplicationMaster request
+ * @return the response
+ * @throws YarnException if finishAM call fails
+ * @throws IOException if finishAM call fails
+ */
+ public FinishApplicationMasterResponse finishApplicationMaster(
+ FinishApplicationMasterRequest request)
+ throws YarnException, IOException {
+ if (this.rmProxy == null) {
+ if (this.registerRequest != null) {
+ // This is possible if the async registerApplicationMaster is still
+ // blocked and retrying
+ LOG.warn("Unmanaged AM still not successfully launched/registered yet."
+ + " Stopping the UAM client thread anyways.");
+ } else {
+ throw new YarnException("finishApplicationMaster should not "
+ + "be called before createAndRegister");
+ }
+ }
+ this.keepRunning = false;
+ this.handlerThread.interrupt();
+ if (this.rmProxy == null) {
+ // Return a dummy response when register call is still stuck.
+ return FinishApplicationMasterResponse.newInstance(false);
+ }
+
+ FinishApplicationMasterResponse response = null;
+ try {
+ response = this.rmProxy.finishApplicationMaster(request);
+ } catch (ApplicationMasterNotRegisteredException ex) {
+ YarnServerSecurityUtils.handleNotRegisteredExceptionAndReRegister(
+ this.attemptId, this.rmProxy, this.registerRequest);
+ // Retry finish after re-register
+ response = this.rmProxy.finishApplicationMaster(request);
+ }
+ return response;
+ }
+
+ /**
+ * Force kill the UAM.
+ *
+ * @return kill response
+ * @throws IOException if fails to create rmProxy
+ * @throws YarnException if force kill fails
+ */
+ public KillApplicationResponse forceKillApplication()
+ throws IOException, YarnException {
+ KillApplicationRequest request =
+ KillApplicationRequest.newInstance(this.attemptId.getApplicationId());
+
+ ApplicationClientProtocol rmClient =
+ createRMProxy(ApplicationClientProtocol.class, this.conf,
+ UserGroupInformation.createRemoteUser(this.submitter), null);
+ return rmClient.forceKillApplication(request);
+ }
+
+ /**
+ * Sends the specified heart beat request to the resource manager and invokes
+ * the callback asynchronously with the response.
+ *
+ * @param request the allocate request
+ * @param callback the callback method for the request
+ * @throws YarnException if registerAM is not called yet
+ */
+ public void allocateAsync(AllocateRequest request,
+ AsyncCallback callback) throws YarnException {
+ try {
+ this.requestQueue.put(new AsyncAllocateRequestInfo(request, callback));
+ } catch (InterruptedException ex) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Interrupted while waiting to put on response queue", ex);
+ }
+ }
+ // Two possible cases why the UAM is not successfully registered yet:
+ // 1. registerApplicationMaster is not called at all. Should throw here.
+ // 2. registerApplicationMaster is called but hasn't successfully returned.
+ // In case 2, we have already save the allocate request above, so if the
+ // registration succeed later, no request is lost.
+ if (this.rmProxy == null) {
+ if (this.registerRequest != null) {
+ LOG.warn("Unmanaged AM still not successfully launched/registered yet."
+ + " Saving the allocate request and send later.");
+ } else {
+ throw new YarnException(
+ "AllocateAsync should not be called before createAndRegister");
+ }
+ }
+ }
+
+ /**
+ * Set the RM queue name to which this application will be submitted.
+ *
+ * @param queueName the queue name to set
+ */
+ public void setQueueName(String queueName) {
+ this.queueName = queueName;
+ }
+
+ /**
+ * Returns the proxy object used by this instance to connect to the resource
+ * manager.
+ *
+ * @return the RM proxy instance
+ */
+ public ApplicationMasterProtocol getRMProxy() {
+ return this.rmProxy;
+ }
+
+ /**
+ * Returns RM proxy for the specified protocol type. Unit test cases can
+ * override this method and return mock proxy instances.
+ *
+ * @param protocol protocal of the proxy
+ * @param config configuration
+ * @param user ugi for the proxy connection
+ * @param token token for the connection
+ * @param type of the proxy
+ * @return the proxy instance
+ * @throws IOException if fails to create the proxy
+ */
+ protected T createRMProxy(final Class protocol, Configuration config,
+ UserGroupInformation user, Token token)
+ throws IOException {
+ return YarnServerSecurityUtils.createRMProxy(config, protocol, user, token,
+ null);
+ }
+
+ /**
+ * The UnmanagedLauncher is used to launch an unmanaged AM. An unmanagedAM is
+ * an AM that is not launched and managed by the RM. The client creates a new
+ * application on the RM and negotiates a new attempt id. Then it waits for
+ * the RM application state to reach be YarnApplicationState.ACCEPTED after
+ * which it returns the AM-RM token and the attemptId.
+ */
+ public class UnmanagedAMLauncher {
+ private static final long AM_STATE_WAIT_TIMEOUT_MS = 10000;
+ private static final String APP_NAME = "Unmanaged-AM";
+ private static final String DEFAULT_QUEUE_CONFIG = "uam.default.queue.name";
+
+ private ApplicationAttemptId attemptId;
+ private String submitter;
+ private ApplicationClientProtocol rmClient;
+ private Configuration conf;
+ private RecordFactory recordFactory;
+ private String queueName;
+
+ public UnmanagedAMLauncher(Configuration conf, String queueName,
+ ApplicationAttemptId attemptId, String submitter) {
+ this.conf = conf;
+ this.queueName = queueName;
+ this.attemptId = attemptId;
+ this.submitter = submitter;
+ this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
+ }
+
+ public UnmanagedAMIdentifier initializeUnmanagedAM() {
+ UnmanagedAMIdentifier identifier = null;
+ try {
+ ApplicationId applicationId = this.attemptId.getApplicationId();
+ LOG.info(
+ "Creating client for launching Unmanaged AM for applicationid: {}",
+ applicationId);
+
+ UserGroupInformation appSubmitter =
+ UserGroupInformation.createRemoteUser(this.submitter);
+
+ this.rmClient = createRMProxy(ApplicationClientProtocol.class,
+ this.conf, appSubmitter, null);
+
+ ApplicationId appId = submitAppAndGetAppId(applicationId);
+ ApplicationReport appReport = monitorApplicationSubmission(appId,
+ EnumSet.of(YarnApplicationState.ACCEPTED,
+ YarnApplicationState.RUNNING, YarnApplicationState.KILLED,
+ YarnApplicationState.FAILED, YarnApplicationState.FINISHED));
+
+ if (appReport
+ .getYarnApplicationState() == YarnApplicationState.ACCEPTED) {
+ // Monitor the application attempt to wait for launch state
+ ApplicationAttemptReport attemptReport = monitorCurrentAppAttempt(
+ appId, YarnApplicationAttemptState.LAUNCHED);
+ ApplicationAttemptId appAttemptId =
+ attemptReport.getApplicationAttemptId();
+ LOG.info("Launching UAM with application attempt id {}",
+ appAttemptId);
+
+ /**
+ * TODO (YARN-6128) we enforce that the attempt id must be the same in
+ * all subclusters. Need to revisit here once we turn on HA, and that
+ * attempt id could be larger than one.
+ */
+ if (!appAttemptId.equals(this.attemptId)) {
+ throw new YarnException("Invalid state. The application attempt "
+ + "id returned by RM is different. Required id: "
+ + this.attemptId + " Received id:" + appAttemptId);
+ }
+
+ identifier = getUAMIdentifier();
+ } else {
+ throw new YarnRuntimeException(
+ "Received non-accepted application state: "
+ + appReport.getYarnApplicationState()
+ + ". Application attempt " + this.attemptId
+ + " not the first attempt?");
+ }
+ } catch (Exception e) {
+ throw new YarnRuntimeException("Error launching unmanaged AM", e);
+ } finally {
+ this.rmClient = null;
+ }
+ return identifier;
+ }
+
+ /**
+ * Gets the identifier of the unmanaged AM.
+ *
+ * @return the identifier of the unmanaged AM.
+ * @throws IOException if getApplicationReport fails
+ * @throws YarnException if getApplicationReport fails
+ */
+ public UnmanagedAMIdentifier getUAMIdentifier()
+ throws IOException, YarnException {
+
+ Object amrmToken = getApplicationReport(this.attemptId.getApplicationId())
+ .getAMRMToken();
+ if (amrmToken != null) {
+ Token token = ConverterUtils.convertFromYarn(
+ getApplicationReport(this.attemptId.getApplicationId())
+ .getAMRMToken(),
+ (Text) null);
+ return new UnmanagedAMIdentifier(this.attemptId, token);
+ } else {
+ LOG.warn(
+ "AMRMToken not found in the application report for attemptId: {}",
+ this.attemptId);
+ return new UnmanagedAMIdentifier(this.attemptId, null);
+ }
+ }
+
+ private ApplicationId submitAppAndGetAppId(ApplicationId originalAppId)
+ throws Exception {
+ SubmitApplicationRequest submitRequest =
+ this.recordFactory.newRecordInstance(SubmitApplicationRequest.class);
+
+ LOG.info("Setting up unmanaged application submission context: {}",
+ this.attemptId);
+ ApplicationSubmissionContext context = this.recordFactory
+ .newRecordInstance(ApplicationSubmissionContext.class);
+
+ context.setApplicationId(originalAppId);
+
+ // ApplicationName in the secondary SubCluster has the following format
+ // Unmanaged-AM-{appNameSuffix}
+ context.setApplicationName(APP_NAME + "-" + appNameSuffix);
+
+ if (StringUtils.isBlank(this.queueName)) {
+ context.setQueue(this.conf.get(DEFAULT_QUEUE_CONFIG,
+ YarnConfiguration.DEFAULT_QUEUE_NAME));
+ } else {
+ context.setQueue(this.queueName);
+ }
+
+ ContainerLaunchContext amContainer =
+ this.recordFactory.newRecordInstance(ContainerLaunchContext.class);
+ Resource resource = BuilderUtils.newResource(1024, 1);
+ context.setResource(resource);
+ context.setAMContainerSpec(amContainer);
+ submitRequest.setApplicationSubmissionContext(context);
+
+ context.setUnmanagedAM(true);
+
+ this.rmClient.submitApplication(submitRequest);
+
+ LOG.info("Submitting unmanaged application: {} ", this.attemptId);
+ return originalAppId;
+ }
+
+ /**
+ * Monitor the submitted application for either acceptance or failure.
+ *
+ * @param appId Application Id of application to be monitored
+ * @return the application report
+ * @throws YarnException if getApplicationReport fails
+ * @throws IOException if getApplicationReport fails
+ */
+ private ApplicationReport monitorApplicationSubmission(ApplicationId appId,
+ Set finalState)
+ throws YarnException, IOException {
+ while (true) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ LOG.debug("Thread sleep in monitoring loop interrupted");
+ }
+
+ // Get application report for the appId we are interested in
+ ApplicationReport report = getApplicationReport(appId);
+
+ if (LOG.isDebugEnabled()) {
+ StringBuffer buffer = new StringBuffer();
+ buffer.append("Got app report from ASM for");
+ buffer.append(", appId=").append(appId.getId());
+ buffer.append(", appAttemptId=");
+ buffer.append(report.getCurrentApplicationAttemptId() != null
+ ? report.getCurrentApplicationAttemptId() : "");
+ buffer.append(", clientToAMToken=");
+ buffer.append(report.getClientToAMToken() != null
+ ? report.getClientToAMToken() : "");
+ buffer.append(", appDiagnostics=").append(
+ report.getDiagnostics() != null ? report.getDiagnostics() : "");
+ buffer.append(", appMasterHost=")
+ .append(report.getHost() != null ? report.getHost() : "");
+ buffer.append(", appQueue=")
+ .append(report.getQueue() != null ? report.getQueue() : "");
+ buffer.append(", appMasterRpcPort=").append(report.getRpcPort());
+ buffer.append(", appStartTime=").append(report.getStartTime());
+ buffer.append(", yarnAppState=")
+ .append(report.getYarnApplicationState().toString());
+ buffer.append(", distributedFinalState=")
+ .append(report.getFinalApplicationStatus().toString());
+ buffer.append(", appTrackingUrl=").append(
+ report.getTrackingUrl() != null ? report.getTrackingUrl() : "");
+ buffer.append(", appUser=")
+ .append(report.getUser() != null ? report.getUser() : "");
+
+ LOG.debug(buffer.toString());
+ }
+
+ YarnApplicationState state = report.getYarnApplicationState();
+ if (finalState.contains(state)) {
+ return report;
+ }
+ LOG.info("Current attempt state of {} is {}, will retry later.", appId,
+ state);
+ }
+ }
+
+ private ApplicationAttemptReport monitorCurrentAppAttempt(
+ ApplicationId appId, YarnApplicationAttemptState attemptState)
+ throws YarnException, IOException {
+ long startTime = System.currentTimeMillis();
+ ApplicationAttemptId appAttemptId = null;
+ while (true) {
+ if (appAttemptId == null) {
+ appAttemptId =
+ getApplicationReport(appId).getCurrentApplicationAttemptId();
+ }
+
+ ApplicationAttemptReport attemptReport = null;
+ if (appAttemptId != null) {
+ GetApplicationAttemptReportRequest req = this.recordFactory
+ .newRecordInstance(GetApplicationAttemptReportRequest.class);
+ req.setApplicationAttemptId(appAttemptId);
+ attemptReport = this.rmClient.getApplicationAttemptReport(req)
+ .getApplicationAttemptReport();
+ if (attemptState
+ .equals(attemptReport.getYarnApplicationAttemptState())) {
+ return attemptReport;
+ }
+ }
+
+ LOG.info("Current attempt state of " + appId + " is "
+ + (attemptReport == null ? " N/A "
+ : attemptReport.getYarnApplicationAttemptState())
+ + ", waiting for current attempt to reach " + attemptState);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while waiting for current attempt of " + appId
+ + " to reach " + attemptState);
+ }
+
+ if (System.currentTimeMillis() - startTime > AM_STATE_WAIT_TIMEOUT_MS) {
+ String errmsg = "Timeout for waiting current attempt of " + appId
+ + " to reach " + attemptState;
+ LOG.error(errmsg);
+ throw new RuntimeException(errmsg);
+ }
+ }
+ }
+
+ private ApplicationReport getApplicationReport(ApplicationId appId)
+ throws YarnException, IOException {
+ GetApplicationReportRequest request = this.recordFactory
+ .newRecordInstance(GetApplicationReportRequest.class);
+ request.setApplicationId(appId);
+ return this.rmClient.getApplicationReport(request).getApplicationReport();
+ }
+ }
+
+ /**
+ * Private structure that encapsulates the application attempt identifier and
+ * the AMRMTokenIdentifier.
+ */
+ public static class UnmanagedAMIdentifier {
+ private ApplicationAttemptId attemptId;
+ private Token token;
+
+ public UnmanagedAMIdentifier(ApplicationAttemptId attemptId,
+ Token token) {
+ this.attemptId = attemptId;
+ this.token = token;
+ }
+
+ public ApplicationAttemptId getAttemptId() {
+ return this.attemptId;
+ }
+
+ public Token getToken() {
+ return this.token;
+ }
+ }
+
+ /**
+ * Data structure that encapsulates AllocateRequest and AsyncCallback
+ * instance.
+ */
+ public static class AsyncAllocateRequestInfo {
+ private AllocateRequest request;
+ private AsyncCallback callback;
+
+ public AsyncAllocateRequestInfo(AllocateRequest request,
+ AsyncCallback callback) {
+ Preconditions.checkArgument(request != null,
+ "AllocateRequest cannot be null");
+ Preconditions.checkArgument(callback != null, "Callback cannot be null");
+
+ this.request = request;
+ this.callback = callback;
+ }
+
+ public AsyncCallback getCallback() {
+ return this.callback;
+ }
+
+ public AllocateRequest getRequest() {
+ return this.request;
+ }
+ }
+
+ @VisibleForTesting
+ public int getRequestQueueSize() {
+ return this.requestQueue.size();
+ }
+
+ /**
+ * Extends Thread and provides an implementation that is used for processing
+ * the AM heart beat request asynchronously and sending back the response
+ * using the callback method registered with the system.
+ */
+ public class CallbackHandlerThread extends Thread {
+ public CallbackHandlerThread() {
+ super("UnmanagedApplicationManager Callback Handler Thread");
+ }
+
+ @Override
+ public void run() {
+ while (keepRunning) {
+ AsyncAllocateRequestInfo requestInfo;
+ try {
+ requestInfo = requestQueue.take();
+ if (requestInfo == null) {
+ throw new RuntimeException(
+ "Null requestInfo taken from request queue");
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sending Heartbeat to Unmanaged AM. AskList:"
+ + ((requestInfo.getRequest().getAskList() == null) ? " empty"
+ : requestInfo.getRequest().getAskList().size()));
+ }
+
+ if (!keepRunning) {
+ break;
+ }
+
+ // set the response id before forwarding the allocate request as we
+ // could have different values for each UAM
+ AllocateRequest request = requestInfo.getRequest();
+ if (request == null) {
+ throw new RuntimeException("Null allocateRequest from requestInfo");
+ }
+
+ request.setResponseId(lastResponseId);
+ AllocateResponse response = null;
+ try {
+ response = rmProxy.allocate(request);
+ } catch (ApplicationMasterNotRegisteredException e) {
+ YarnServerSecurityUtils.handleNotRegisteredExceptionAndReRegister(
+ attemptId, rmProxy, registerRequest);
+ // Retry allocate after re-register
+ response = rmProxy.allocate(request);
+ }
+ if (response == null) {
+ throw new RuntimeException("Null allocateResponse from allocate");
+ }
+
+ lastResponseId = response.getResponseId();
+ // update token if RM has reissued/renewed
+ if (response.getAMRMToken() != null) {
+ LOG.debug("Received new AMRMToken");
+ YarnServerSecurityUtils.updateAMRMToken(response.getAMRMToken(),
+ userUgi, conf);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received Heartbeat reply from RM. Allocated Containers:"
+ + ((response.getAllocatedContainers() == null) ? " empty"
+ : response.getAllocatedContainers().size()));
+ }
+
+ if (requestInfo.getCallback() == null) {
+ throw new RuntimeException("Null callback from requestInfo");
+ }
+ requestInfo.getCallback().callback(response);
+ } catch (InterruptedException ex) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Interrupted while waiting for queue", ex);
+ }
+ } catch (IOException ex) {
+ LOG.info(
+ "IO Error occurred while processing heart beat for " + attemptId,
+ ex);
+ } catch (Throwable ex) {
+ LOG.info(
+ "Error occurred while processing heart beat for " + attemptId,
+ ex);
+ }
+ }
+
+ LOG.info("UnmanagedApplicationManager has been stopped for {}. "
+ + "CallbackHandlerThread thread is exiting", attemptId);
+ }
+ }
+}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/package-info.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/package-info.java
new file mode 100644
index 0000000..0e78094
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/package-info.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.uam;
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerSecurityUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerSecurityUtils.java
index 9af556e..51b71f1 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerSecurityUtils.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerSecurityUtils.java
@@ -20,19 +20,37 @@
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.client.RMFailoverProxyProvider;
+import org.apache.hadoop.yarn.conf.HAUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.federation.failover.FederationRMFailoverProxyProvider;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,8 +60,11 @@
*/
@Private
public final class YarnServerSecurityUtils {
- private static final Logger LOG = LoggerFactory
- .getLogger(YarnServerSecurityUtils.class);
+ private static final Logger LOG =
+ LoggerFactory.getLogger(YarnServerSecurityUtils.class);
+
+ public static final String APP_ALREADY_REGISTERED_MESSAGE =
+ "Application Master is already registered : ";
private YarnServerSecurityUtils() {
}
@@ -55,8 +76,7 @@ private YarnServerSecurityUtils() {
* @return the AMRMTokenIdentifier instance for the current user
* @throws YarnException
*/
- public static AMRMTokenIdentifier authorizeRequest()
- throws YarnException {
+ public static AMRMTokenIdentifier authorizeRequest() throws YarnException {
UserGroupInformation remoteUgi;
try {
@@ -82,9 +102,8 @@ public static AMRMTokenIdentifier authorizeRequest()
}
} catch (IOException e) {
tokenFound = false;
- message =
- "Got exception while looking for AMRMToken for user "
- + remoteUgi.getUserName();
+ message = "Got exception while looking for AMRMToken for user "
+ + remoteUgi.getUserName();
}
if (!tokenFound) {
@@ -113,8 +132,55 @@ private static AMRMTokenIdentifier selectAMRMTokenIdentifier(
}
/**
+ * Update the new AMRMToken into the ugi used for RM proxy.
+ *
+ * @param token the new AMRMToken sent by RM
+ * @param user ugi used for RM proxy
+ * @param conf configuration
+ */
+ public static void updateAMRMToken(
+ org.apache.hadoop.yarn.api.records.Token token, UserGroupInformation user,
+ Configuration conf) {
+ Token amrmToken = new Token(
+ token.getIdentifier().array(), token.getPassword().array(),
+ new Text(token.getKind()), new Text(token.getService()));
+ // Preserve the token service sent by the RM when adding the token
+ // to ensure we replace the previous token setup by the RM.
+ // Afterwards we can update the service address for the RPC layer.
+ user.addToken(amrmToken);
+ amrmToken.setService(ClientRMProxy.getAMRMTokenService(conf));
+ }
+
+ /**
+ * Handle ApplicationNotRegistered exception and re-register.
+ *
+ * @param attemptId app attemptId
+ * @param rmProxy RM proxy instance
+ * @param registerRequest the AM re-register request
+ * @throws YarnException if re-register fails
+ */
+ public static void handleNotRegisteredExceptionAndReRegister(
+ ApplicationAttemptId attemptId, ApplicationMasterProtocol rmProxy,
+ RegisterApplicationMasterRequest registerRequest) throws YarnException {
+ LOG.info("App attempt {} not registered, most likely due to RM failover. "
+ + " Trying to re-register.", attemptId);
+ try {
+ rmProxy.registerApplicationMaster(registerRequest);
+ } catch (Exception e) {
+ if (e instanceof InvalidApplicationMasterRequestException
+ && e.getMessage().contains(APP_ALREADY_REGISTERED_MESSAGE)) {
+ LOG.info("Concurrent thread successfully registered, moving on.");
+ } else {
+ LOG.error("Error trying to re-register AM", e);
+ throw new YarnException(e);
+ }
+ }
+ }
+
+ /**
* Parses the container launch context and returns a Credential instance that
- * contains all the tokens from the launch context.
+ * contains all the tokens from the launch context.
+ *
* @param launchContext
* @return the credential instance
* @throws IOException
@@ -130,8 +196,7 @@ public static Credentials parseCredentials(
buf.reset(tokens);
credentials.readTokenStorageStream(buf);
if (LOG.isDebugEnabled()) {
- for (Token extends TokenIdentifier> tk : credentials
- .getAllTokens()) {
+ for (Token extends TokenIdentifier> tk : credentials.getAllTokens()) {
LOG.debug(tk.getService() + " = " + tk.toString());
}
}
@@ -139,4 +204,130 @@ public static Credentials parseCredentials(
return credentials;
}
+
+ /**
+ * Create a proxy for the specified protocol in the context of Federation. For
+ * non-HA, this is a direct connection to the ResourceManager address. When HA
+ * is enabled, the proxy handles the failover between the ResourceManagers as
+ * well.
+ *
+ * @param configuration Configuration to generate {@link ClientRMProxy}
+ * @param protocol Protocol for the proxy
+ * @param subClusterId the unique identifier or the sub-cluster
+ * @param user the user on whose behalf the proxy is being created
+ * @param Type information of the proxy
+ * @return Proxy to the RM
+ * @throws IOException on failure
+ */
+ @Public
+ @Unstable
+ public static T createRMProxy(Configuration configuration,
+ Class protocol, SubClusterId subClusterId, UserGroupInformation user)
+ throws IOException {
+ return createRMProxy(configuration, protocol, subClusterId, user, null);
+ }
+
+ /**
+ * Create a proxy for the specified protocol in the context of Federation. For
+ * non-HA, this is a direct connection to the ResourceManager address. When HA
+ * is enabled, the proxy handles the failover between the ResourceManagers as
+ * well.
+ *
+ * @param configuration Configuration to generate {@link ClientRMProxy}
+ * @param protocol Protocol for the proxy
+ * @param subClusterId the unique identifier or the sub-cluster
+ * @param user the user on whose behalf the proxy is being created
+ * @param token the auth token to use for connection
+ * @param Type information of the proxy
+ * @return Proxy to the RM
+ * @throws IOException on failure
+ */
+ @Public
+ @Unstable
+ public static T createRMProxy(Configuration configuration,
+ final Class protocol, SubClusterId subClusterId,
+ UserGroupInformation user, Token extends TokenIdentifier> token)
+ throws IOException {
+ final YarnConfiguration config = new YarnConfiguration(configuration);
+ updateConfForFederation(config, subClusterId.getId());
+ return createRMProxy(config, protocol, user, token,
+ subClusterId.toString());
+ }
+
+ /**
+ * Create a proxy for the specified protocol.
+ *
+ * @param configuration Configuration to generate {@link ClientRMProxy}
+ * @param protocol Protocol for the proxy
+ * @param user the user on whose behalf the proxy is being created
+ * @param token the auth token to use for connection
+ * @param rmName name of the RM/cluster for logging purpose
+ * @param Type information of the proxy
+ * @return Proxy to the RM
+ * @throws IOException on failure
+ */
+ @Public
+ @Unstable
+ public static T createRMProxy(final Configuration configuration,
+ final Class protocol, UserGroupInformation user,
+ final Token extends TokenIdentifier> token, String rmName)
+ throws IOException {
+ try {
+ if (token != null) {
+ LOG.info("Creating RMProxy with a token {} to RM {} for protocol {} for"
+ + " user {}", token, rmName, protocol.getSimpleName(), user);
+ token.setService(ClientRMProxy.getAMRMTokenService(configuration));
+ user.addToken(token);
+ setAuthModeInConf(configuration);
+ } else {
+ LOG.info("Creating RMProxy without a token to RM {} for protocol {} for"
+ + " user {}", rmName, protocol.getSimpleName(), user);
+ }
+ final T proxyConnection = user.doAs(new PrivilegedExceptionAction() {
+ @Override
+ public T run() throws Exception {
+ return ClientRMProxy.createRMProxy(configuration, protocol);
+ }
+ });
+ return proxyConnection;
+
+ } catch (InterruptedException e) {
+ throw new YarnRuntimeException(e);
+ }
+ }
+
+ private static void setAuthModeInConf(Configuration conf) {
+ conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+ SaslRpcServer.AuthMethod.TOKEN.toString());
+ }
+
+ /**
+ * Updating the conf with Federation as long as certain subclusterId.
+ *
+ * @param conf configuration
+ * @param subClusterId subclusterId for the conf
+ */
+ public static void updateConfForFederation(Configuration conf,
+ String subClusterId) {
+ conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId);
+ /*
+ * In a Federation setting, we will connect to not just the local cluster RM
+ * but also multiple external RMs. The membership information of all the RMs
+ * that are currently participating in Federation is available in the
+ * central FederationStateStore. So we will: 1. obtain the RM service
+ * addresses from FederationStateStore using the
+ * FederationRMFailoverProxyProvider. 2. disable traditional HA as that
+ * depends on local configuration lookup for RMs using indexes. 3. we will
+ * enable federation failover IF traditional HA is enabled so that the
+ * appropriate failover RetryPolicy is initialized.
+ */
+ conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
+ conf.setClass(YarnConfiguration.CLIENT_FAILOVER_PROXY_PROVIDER,
+ FederationRMFailoverProxyProvider.class, RMFailoverProxyProvider.class);
+ if (HAUtil.isHAEnabled(conf)) {
+ conf.setBoolean(YarnConfiguration.FEDERATION_FAILOVER_ENABLED, true);
+ conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, false);
+ }
+ }
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
index e302c70..118cae4 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
@@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
@@ -115,13 +116,15 @@
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
+import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import com.google.common.base.Strings;
/**
@@ -143,12 +146,21 @@
private AtomicInteger containerIndex = new AtomicInteger(0);
private Configuration conf;
+ private boolean shouldReRegisterNext = false;
+
+ // For unit test synchronization
+ public static Object obj = new Object();
+
public MockResourceManagerFacade(Configuration conf,
int startContainerIndex) {
this.conf = conf;
this.containerIndex.set(startContainerIndex);
}
+ public void setShouldReRegisterNext() {
+ shouldReRegisterNext = true;
+ }
+
private static String getAppIdentifier() throws IOException {
AMRMTokenIdentifier result = null;
UserGroupInformation remoteUgi = UserGroupInformation.getCurrentUser();
@@ -169,14 +181,31 @@ public RegisterApplicationMasterResponse registerApplicationMaster(
String amrmToken = getAppIdentifier();
LOG.info("Registering application attempt: " + amrmToken);
+ shouldReRegisterNext = false;
+
+ synchronized (obj) {
+ obj.notifyAll();
+ // We reuse the port number to indicate whether the unit test want us to
+ // wait here
+ if (request.getRpcPort() > 1000) {
+ LOG.info("Register call in RM start waiting");
+ try {
+ obj.wait();
+ LOG.info("Register call in RM wait finished");
+ } catch (InterruptedException e) {
+ LOG.info("Register call in RM wait interrupted", e);
+ }
+ }
+ }
+
synchronized (applicationContainerIdMap) {
- Assert.assertFalse(
- "The application id is already registered: " + amrmToken,
- applicationContainerIdMap.containsKey(amrmToken));
+ if (applicationContainerIdMap.containsKey(amrmToken)) {
+ throw new InvalidApplicationMasterRequestException(
+ YarnServerSecurityUtils.APP_ALREADY_REGISTERED_MESSAGE);
+ }
// Keep track of the containers that are returned to this application
applicationContainerIdMap.put(amrmToken, new ArrayList());
}
-
return RegisterApplicationMasterResponse.newInstance(null, null, null, null,
null, request.getHost(), null);
}
@@ -188,6 +217,12 @@ public FinishApplicationMasterResponse finishApplicationMaster(
String amrmToken = getAppIdentifier();
LOG.info("Finishing application attempt: " + amrmToken);
+ if (shouldReRegisterNext) {
+ String message = "AM is not registered, should re-register.";
+ LOG.warn(message);
+ throw new ApplicationMasterNotRegisteredException(message);
+ }
+
synchronized (applicationContainerIdMap) {
// Remove the containers that were being tracked for this application
Assert.assertTrue("The application id is NOT registered: " + amrmToken,
@@ -223,6 +258,13 @@ public AllocateResponse allocate(AllocateRequest request)
}
String amrmToken = getAppIdentifier();
+ LOG.info("Allocate from application attempt: " + amrmToken);
+
+ if (shouldReRegisterNext) {
+ String message = "AM is not registered, should re-register.";
+ LOG.warn(message);
+ throw new ApplicationMasterNotRegisteredException(message);
+ }
ArrayList containerList = new ArrayList();
if (request.getAskList() != null) {
@@ -356,6 +398,31 @@ public SubmitApplicationResponse submitApplication(
@Override
public KillApplicationResponse forceKillApplication(
KillApplicationRequest request) throws YarnException, IOException {
+ String appId = "";
+ boolean foundApp = false;
+ if (request.getApplicationId() != null) {
+ appId = request.getApplicationId().toString();
+ synchronized (applicationContainerIdMap) {
+ for (Entry> entry : applicationContainerIdMap
+ .entrySet()) {
+ ApplicationAttemptId attemptId =
+ ApplicationAttemptId.fromString(entry.getKey());
+ if (attemptId.getApplicationId().equals(request.getApplicationId())) {
+ // Remove the apptempt and the containers that were being tracked
+ List ids = applicationContainerIdMap.remove(appId);
+ if (ids != null) {
+ for (ContainerId c : ids) {
+ allocatedContainerMap.remove(c);
+ }
+ }
+ foundApp = true;
+ }
+ }
+ }
+ }
+ Assert.assertTrue("The application id is NOT registered: " + appId,
+ foundApp);
+ LOG.info("Force killing application: " + appId);
return KillApplicationResponse.newInstance(true);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java
new file mode 100644
index 0000000..651d4ed
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.uam;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
+import org.apache.hadoop.yarn.util.AsyncCallback;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unit test for UnmanagedApplicationManager.
+ */
+public class TestUnmanagedApplicationManager {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestUnmanagedApplicationManager.class);
+
+ private TestableUnmanagedApplicationManager uam;
+ private Configuration conf = new YarnConfiguration();
+ private CountingCallback callback;
+
+ private ApplicationAttemptId attemptId;
+
+ @Before
+ public void setup() {
+ conf.set(YarnConfiguration.RM_CLUSTER_ID, "subclusterId");
+ callback = new CountingCallback();
+
+ attemptId =
+ ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1);
+
+ uam = new TestableUnmanagedApplicationManager(conf, attemptId, "submitter",
+ "appNameSuffix");
+ }
+
+ protected void waitForCallBackCountAndCheckZeroPending(
+ CountingCallback callback, int expectCallBackCount) {
+ synchronized (callback) {
+ while (callback.callBackCount != expectCallBackCount) {
+ try {
+ callback.wait();
+ } catch (InterruptedException e) {
+ }
+ }
+ Assert.assertEquals(
+ "Non zero pending requests when number of allocate callbacks reaches "
+ + expectCallBackCount,
+ 0, callback.requestQueueSize);
+ }
+ }
+
+ @Test(timeout = 5000)
+ public void testBasicUsage()
+ throws YarnException, IOException, InterruptedException {
+
+ createAndRegisterApplicationMaster(
+ RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId);
+
+ allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback,
+ attemptId);
+
+ // Wait for outstanding async allocate callback
+ waitForCallBackCountAndCheckZeroPending(callback, 1);
+
+ finishApplicationMaster(
+ FinishApplicationMasterRequest.newInstance(null, null, null),
+ attemptId);
+ }
+
+ @Test(timeout = 5000)
+ public void testReRegister()
+ throws YarnException, IOException, InterruptedException {
+
+ createAndRegisterApplicationMaster(
+ RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId);
+
+ uam.setShouldReRegisterNext();
+
+ allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback,
+ attemptId);
+
+ // Wait for outstanding async allocate callback
+ waitForCallBackCountAndCheckZeroPending(callback, 1);
+
+ uam.setShouldReRegisterNext();
+
+ finishApplicationMaster(
+ FinishApplicationMasterRequest.newInstance(null, null, null),
+ attemptId);
+ }
+
+ /**
+ * If register is slow, async allocate requests in the meanwhile should not
+ * throw or be dropped.
+ */
+ @Test(timeout = 5000)
+ public void testSlowRegisterCall()
+ throws YarnException, IOException, InterruptedException {
+
+ // Register with wait() in RM in a separate thread
+ Thread registerAMThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ createAndRegisterApplicationMaster(
+ RegisterApplicationMasterRequest.newInstance(null, 1001, null),
+ attemptId);
+ } catch (Exception e) {
+ LOG.info("Register thread exception", e);
+ }
+ }
+ });
+
+ // Wait for register call in the thread get into RM and then wake us
+ synchronized (MockResourceManagerFacade.obj) {
+ LOG.info("Starting register thread");
+ registerAMThread.start();
+ try {
+ LOG.info("Test main starts waiting");
+ MockResourceManagerFacade.obj.wait();
+ LOG.info("Test main wait finished");
+ } catch (Exception e) {
+ LOG.info("Test main wait interrupted", e);
+ }
+ }
+
+ // First allocate before register succeeds
+ allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback,
+ attemptId);
+
+ // Notify the register thread
+ synchronized (MockResourceManagerFacade.obj) {
+ MockResourceManagerFacade.obj.notifyAll();
+ }
+
+ LOG.info("Test main wait for register thread to finish");
+ registerAMThread.join();
+ LOG.info("Register thread finished");
+
+ // Second allocate, normal case
+ allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback,
+ attemptId);
+
+ // Both allocate before should respond
+ waitForCallBackCountAndCheckZeroPending(callback, 2);
+
+ finishApplicationMaster(
+ FinishApplicationMasterRequest.newInstance(null, null, null),
+ attemptId);
+
+ // Allocates after finishAM should be ignored
+ allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback,
+ attemptId);
+ allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback,
+ attemptId);
+
+ Assert.assertEquals(0, callback.requestQueueSize);
+
+ // A short wait just in case the allocates get executed
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ }
+
+ Assert.assertEquals(2, callback.callBackCount);
+ }
+
+ @Test(expected = Exception.class)
+ public void testAllocateWithoutRegister()
+ throws YarnException, IOException, InterruptedException {
+ allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback,
+ attemptId);
+ }
+
+ @Test(expected = Exception.class)
+ public void testFinishWithoutRegister()
+ throws YarnException, IOException, InterruptedException {
+ finishApplicationMaster(
+ FinishApplicationMasterRequest.newInstance(null, null, null),
+ attemptId);
+ }
+
+ @Test
+ public void testForceKill()
+ throws YarnException, IOException, InterruptedException {
+ createAndRegisterApplicationMaster(
+ RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId);
+ uam.forceKillApplication();
+
+ try {
+ uam.forceKillApplication();
+ Assert.fail("Should fail because application is already killed");
+ } catch (Throwable t) {
+ }
+ }
+
+ protected UserGroupInformation getUGIWithToken(
+ ApplicationAttemptId attemptId) {
+ UserGroupInformation ugi =
+ UserGroupInformation.createRemoteUser(attemptId.toString());
+ AMRMTokenIdentifier token = new AMRMTokenIdentifier(attemptId, 1);
+ ugi.addTokenIdentifier(token);
+ return ugi;
+ }
+
+ protected RegisterApplicationMasterResponse createAndRegisterApplicationMaster(
+ final RegisterApplicationMasterRequest request,
+ ApplicationAttemptId attemptId)
+ throws YarnException, IOException, InterruptedException {
+ return getUGIWithToken(attemptId).doAs(
+ new PrivilegedExceptionAction() {
+ @Override
+ public RegisterApplicationMasterResponse run()
+ throws YarnException, IOException {
+ RegisterApplicationMasterResponse response =
+ uam.createAndRegisterApplicationMaster(request);
+ return response;
+ }
+ });
+ }
+
+ protected void allocateAsync(final AllocateRequest request,
+ final AsyncCallback callback,
+ ApplicationAttemptId attemptId)
+ throws YarnException, IOException, InterruptedException {
+ getUGIWithToken(attemptId).doAs(new PrivilegedExceptionAction
+ org.apache.hadoop
+ hadoop-yarn-server-common
+ test-jar
+ test
+
+
org.fusesource.leveldbjni
leveldbjni-all
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
index 22fc8f6..3ba4d20 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
@@ -36,7 +36,6 @@
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -48,6 +47,7 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
+import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -134,7 +134,8 @@ public AllocateResponse allocate(final AllocateRequest request)
}
AllocateResponse allocateResponse = rmClient.allocate(request);
if (allocateResponse.getAMRMToken() != null) {
- updateAMRMToken(allocateResponse.getAMRMToken());
+ YarnServerSecurityUtils.updateAMRMToken(allocateResponse.getAMRMToken(),
+ this.user, getConf());
}
return allocateResponse;
@@ -170,7 +171,9 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
((DistributedSchedulingAMProtocol)rmClient)
.allocateForDistributedScheduling(request);
if (allocateResponse.getAllocateResponse().getAMRMToken() != null) {
- updateAMRMToken(allocateResponse.getAllocateResponse().getAMRMToken());
+ YarnServerSecurityUtils.updateAMRMToken(
+ allocateResponse.getAllocateResponse().getAMRMToken(), this.user,
+ getConf());
}
return allocateResponse;
} else {
@@ -195,18 +198,6 @@ public void setNextInterceptor(RequestInterceptor next) {
+ "Check if the interceptor pipeline configuration is correct");
}
- private void updateAMRMToken(Token token) throws IOException {
- org.apache.hadoop.security.token.Token amrmToken =
- new org.apache.hadoop.security.token.Token(
- token.getIdentifier().array(), token.getPassword().array(),
- new Text(token.getKind()), new Text(token.getService()));
- // Preserve the token service sent by the RM when adding the token
- // to ensure we replace the previous token setup by the RM.
- // Afterwards we can update the service address for the RPC layer.
- user.addToken(amrmToken);
- amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConf()));
- }
-
@VisibleForTesting
public void setRMClient(final ApplicationMasterProtocol rmClient) {
if (rmClient instanceof DistributedSchedulingAMProtocol) {
@@ -257,19 +248,12 @@ private static void setAMRMTokenService(final Configuration conf)
for (org.apache.hadoop.security.token.Token extends TokenIdentifier> token : UserGroupInformation
.getCurrentUser().getTokens()) {
if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
- token.setService(getAMRMTokenService(conf));
+ token.setService(ClientRMProxy.getAMRMTokenService(conf));
}
}
}
@InterfaceStability.Unstable
- public static Text getAMRMTokenService(Configuration conf) {
- return getTokenService(conf, YarnConfiguration.RM_SCHEDULER_ADDRESS,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
- }
-
- @InterfaceStability.Unstable
public static Text getTokenService(Configuration conf, String address,
String defaultAddr, int defaultPort) {
if (HAUtil.isHAEnabled(conf)) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index 70a46a1..de90c78 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -224,8 +224,7 @@ public RegisterApplicationMasterResponse registerApplicationMaster(
AllocateResponse lastResponse = lock.getAllocateResponse();
if (hasApplicationMasterRegistered(applicationAttemptId)) {
String message =
- "Application Master is already registered : "
- + appID;
+ YarnServerSecurityUtils.APP_ALREADY_REGISTERED_MESSAGE + appID;
LOG.warn(message);
RMAuditLogger.logFailure(
this.rmContext.getRMApps()
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
index 9e84010..6d07a83 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
@@ -74,6 +74,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -338,9 +339,8 @@ public void testallocateBeforeAMRegistration() throws Exception {
am.registerAppAttempt(false);
Assert.fail();
} catch (Exception e) {
- Assert.assertEquals("Application Master is already registered : "
- + attempt.getAppAttemptId().getApplicationId(),
- e.getMessage());
+ Assert.assertEquals(YarnServerSecurityUtils.APP_ALREADY_REGISTERED_MESSAGE
+ + attempt.getAppAttemptId().getApplicationId(), e.getMessage());
}
// Simulate an AM that was disconnected and app attempt was removed