diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 640e86e..1cadf85 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2094,6 +2094,9 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE =
"org.apache.hadoop.yarn.server.nodemanager.amrmproxy."
+ "DefaultRequestInterceptor";
+ public static final String AMRM_PROXY_HA_ENABLED = NM_PREFIX
+ + "amrmproxy.ha.enable";
+ public static final boolean DEFAULT_AMRM_PROXY_HA_ENABLED = false;
/**
* Default platform-agnostic CLASSPATH for YARN applications. A
@@ -2950,6 +2953,16 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String DEFAULT_FEDERATION_POLICY_KEY = "*";
+ public static final String YARN_REGISTRY_CLASS =
+ YARN_PREFIX + "registry.class";
+ public static final String DEFAULT_YARN_REGISTRY_CLASS =
+ "org.apache.hadoop.registry.client.impl.FSRegistryOperationsService";
+
+ public static final String FEDERATION_REGISTRY_BASE_KEY =
+ FEDERATION_PREFIX + "registry.base-dir";
+ public static final String DEFAULT_FEDERATION_REGISTRY_BASE_KEY =
+ "yarnfederation/";
+
public static final String FEDERATION_POLICY_MANAGER = FEDERATION_PREFIX
+ "policy-manager";
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
index 43ae3af..cd5195d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
@@ -67,6 +67,11 @@
+ org.apache.hadoop
+ hadoop-yarn-registry
+
+
+
com.google.guava
guava
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
index 08aee77..7118eb2 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
@@ -33,6 +33,7 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@@ -44,9 +45,9 @@
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
import org.apache.hadoop.yarn.util.AsyncCallback;
import org.slf4j.Logger;
@@ -67,7 +68,7 @@
// Map from uamId to UAM instances
private Map unmanagedAppMasterMap;
- private Map attemptIdMap;
+ private Map appIdMap;
private ExecutorService threadpool;
@@ -82,7 +83,7 @@ protected void serviceStart() throws Exception {
this.threadpool = Executors.newCachedThreadPool();
}
this.unmanagedAppMasterMap = new ConcurrentHashMap<>();
- this.attemptIdMap = new ConcurrentHashMap<>();
+ this.appIdMap = new ConcurrentHashMap<>();
super.serviceStart();
}
@@ -114,7 +115,7 @@ protected void serviceStop() throws Exception {
public KillApplicationResponse call() throws Exception {
try {
LOG.info("Force-killing UAM id " + uamId + " for application "
- + attemptIdMap.get(uamId));
+ + appIdMap.get(uamId));
return unmanagedAppMasterMap.remove(uamId).forceKillApplication();
} catch (Exception e) {
LOG.error("Failed to kill unmanaged application master", e);
@@ -132,7 +133,7 @@ public KillApplicationResponse call() throws Exception {
LOG.error("Failed to kill unmanaged application master", e);
}
}
- this.attemptIdMap.clear();
+ this.appIdMap.clear();
super.serviceStop();
}
@@ -173,28 +174,31 @@ public String createAndRegisterNewUAM(
rmClient = null;
}
- createAndRegisterNewUAM(appId.toString(), registerRequest, conf, appId,
- queueName, submitter, appNameSuffix);
+ // Launch the UAM in RM
+ launchUAM(appId.toString(), conf, appId, queueName, submitter,
+ appNameSuffix);
+
+ // Register the UAM application
+ registerApplicationMaster(appId.toString(), registerRequest);
+
+ // Returns the appId as uamId
return appId.toString();
}
/**
- * Create a new UAM and register the application, using the provided uamId and
- * appId.
+ * Launch a new UAM, using the provided uamId and appId.
*
- * @param uamId identifier for the UAM
- * @param registerRequest RegisterApplicationMasterRequest
+ * @param uamId uam Id
* @param conf configuration for this UAM
* @param appId application id for the UAM
* @param queueName queue of the application
* @param submitter submitter name of the UAM
* @param appNameSuffix application name suffix for the UAM
- * @return RegisterApplicationMasterResponse
- * @throws YarnException if registerApplicationMaster fails
- * @throws IOException if registerApplicationMaster fails
+ * @return UAM token
+ * @throws YarnException if fails
+ * @throws IOException if fails
*/
- public RegisterApplicationMasterResponse createAndRegisterNewUAM(String uamId,
- RegisterApplicationMasterRequest registerRequest, Configuration conf,
+ public Token launchUAM(String uamId, Configuration conf,
ApplicationId appId, String queueName, String submitter,
String appNameSuffix) throws YarnException, IOException {
@@ -207,11 +211,10 @@ public RegisterApplicationMasterResponse createAndRegisterNewUAM(String uamId,
// for the same uamId being created concurrently
this.unmanagedAppMasterMap.put(uamId, uam);
- RegisterApplicationMasterResponse response = null;
+ Token amrmToken = null;
try {
- LOG.info("Creating and registering UAM id {} for application {}", uamId,
- appId);
- response = uam.createAndRegisterApplicationMaster(registerRequest);
+ LOG.info("Launching UAM id {} for application {}", uamId, appId);
+ amrmToken = uam.launchUAM();
} catch (Exception e) {
// Add the map earlier and remove here if register failed because we want
// to make sure there is only one uam instance per uamId at any given time
@@ -219,8 +222,48 @@ public RegisterApplicationMasterResponse createAndRegisterNewUAM(String uamId,
throw e;
}
- this.attemptIdMap.put(uamId, uam.getAttemptId());
- return response;
+ this.appIdMap.put(uamId, uam.getAppId());
+ return amrmToken;
+ }
+
+ /**
+ * Re-attach to an existing UAM, using the provided uamIdentifier.
+ *
+ * @param uamId uam Id
+ * @param conf configuration for this UAM
+ * @param appId application id for the UAM
+ * @param queueName queue of the application
+ * @param submitter submitter name of the UAM
+ * @param appNameSuffix application name suffix for the UAM
+ * @param uamToken UAM token
+ * @throws YarnException if fails
+ * @throws IOException if fails
+ */
+ public void reAttachUAM(String uamId, Configuration conf,
+ ApplicationId appId, String queueName, String submitter,
+ String appNameSuffix, Token uamToken)
+ throws YarnException, IOException {
+
+ if (this.unmanagedAppMasterMap.containsKey(uamId)) {
+ throw new YarnException("UAM " + uamId + " already exists");
+ }
+ UnmanagedApplicationManager uam =
+ createUAM(conf, appId, queueName, submitter, appNameSuffix);
+ // Put the UAM into map first before initializing it to avoid additional UAM
+ // for the same uamId being created concurrently
+ this.unmanagedAppMasterMap.put(uamId, uam);
+
+ try {
+ LOG.info("Reattaching UAM id {} for application {}", uamId, appId);
+ uam.reAttachUAM(uamToken);
+ } catch (Exception e) {
+ // Add the map earlier and remove here if register failed because we want
+ // to make sure there is only one uam instance per uamId at any given time
+ this.unmanagedAppMasterMap.remove(uamId);
+ throw e;
+ }
+
+ this.appIdMap.put(uamId, uam.getAppId());
}
/**
@@ -242,9 +285,30 @@ protected UnmanagedApplicationManager createUAM(Configuration conf,
}
/**
+ * Register application master for the UAM.
+ *
+ * @param uamId uam Id
+ * @param registerRequest RegisterApplicationMasterRequest
+ * @return register response
+ * @throws YarnException if register fails
+ * @throws IOException if register fails
+ */
+ public RegisterApplicationMasterResponse registerApplicationMaster(
+ String uamId, RegisterApplicationMasterRequest registerRequest)
+ throws YarnException, IOException {
+ if (!this.unmanagedAppMasterMap.containsKey(uamId)) {
+ throw new YarnException("UAM " + uamId + " does not exist");
+ }
+ LOG.info("Registering UAM id {} for application {}", uamId,
+ this.appIdMap.get(uamId));
+ return this.unmanagedAppMasterMap.get(uamId)
+ .registerApplicationMaster(registerRequest);
+ }
+
+ /**
* AllocateAsync to an UAM.
*
- * @param uamId identifier for the UAM
+ * @param uamId uam Id
* @param request AllocateRequest
* @param callback callback for response
* @throws YarnException if allocate fails
@@ -262,7 +326,7 @@ public void allocateAsync(String uamId, AllocateRequest request,
/**
* Finish an UAM/application.
*
- * @param uamId identifier for the UAM
+ * @param uamId uam Id
* @param request FinishApplicationMasterRequest
* @return FinishApplicationMasterResponse
* @throws YarnException if finishApplicationMaster call fails
@@ -274,14 +338,15 @@ public FinishApplicationMasterResponse finishApplicationMaster(String uamId,
if (!this.unmanagedAppMasterMap.containsKey(uamId)) {
throw new YarnException("UAM " + uamId + " does not exist");
}
- LOG.info("Finishing application for UAM id {} ", uamId);
+ LOG.info("Finishing UAM id {} for application {}", uamId,
+ this.appIdMap.get(uamId));
FinishApplicationMasterResponse response =
this.unmanagedAppMasterMap.get(uamId).finishApplicationMaster(request);
if (response.getIsUnregistered()) {
// Only remove the UAM when the unregister finished
this.unmanagedAppMasterMap.remove(uamId);
- this.attemptIdMap.remove(uamId);
+ this.appIdMap.remove(uamId);
LOG.info("UAM id {} is unregistered", uamId);
}
return response;
@@ -301,7 +366,7 @@ public FinishApplicationMasterResponse finishApplicationMaster(String uamId,
/**
* Return whether an UAM exists.
*
- * @param uamId identifier for the UAM
+ * @param uamId uam Id
* @return UAM exists or not
*/
public boolean hasUAMId(String uamId) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
index 6531a75..9a11753 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
@@ -50,7 +50,9 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -90,7 +92,6 @@
private AMRequestHandlerThread handlerThread;
private ApplicationMasterProtocol rmProxy;
private ApplicationId applicationId;
- private ApplicationAttemptId attemptId;
private String submitter;
private String appNameSuffix;
private Configuration conf;
@@ -102,6 +103,14 @@
private long asyncApiPollIntervalMillis;
private RecordFactory recordFactory;
+ /*
+ * This flag is used as an indication that this method launchUAM/reAttachUAM
+ * is called (and perhaps blocked in initializeUnmanagedAM below due to RM
+ * connection/failover issue and not finished yet). Set the flag before
+ * calling the blocking call to RM.
+ */
+ private boolean connectionInitiated;
+
public UnmanagedApplicationManager(Configuration conf, ApplicationId appId,
String queueName, String submitter, String appNameSuffix) {
Preconditions.checkNotNull(conf, "Configuration cannot be null");
@@ -116,6 +125,7 @@ public UnmanagedApplicationManager(Configuration conf, ApplicationId appId,
this.handlerThread = new AMRequestHandlerThread();
this.requestQueue = new LinkedBlockingQueue<>();
this.rmProxy = null;
+ this.connectionInitiated = false;
this.registerRequest = null;
this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
this.asyncApiPollIntervalMillis = conf.getLong(
@@ -126,42 +136,77 @@ public UnmanagedApplicationManager(Configuration conf, ApplicationId appId,
}
/**
+ * Launch a new UAM in the resource manager.
+ *
+ * @return identifier uam identifier
+ * @throws YarnException if fails
+ * @throws IOException if fails
+ */
+ public Token launchUAM() throws YarnException, IOException {
+ this.connectionInitiated = true;
+
+ // Blocking call to RM
+ Token amrmToken =
+ initializeUnmanagedAM(this.applicationId);
+
+ // Creates the UAM connection
+ createUAMProxy(amrmToken);
+ return amrmToken;
+ }
+
+ /**
+ * Re-attach to an existing UAM in the resource manager.
+ *
+ * @param identifier identifying info about the uam
+ * @throws IOException if re-attach fails
+ * @throws YarnException if re-attach fails
+ */
+ public void reAttachUAM(Token amrmToken)
+ throws IOException, YarnException {
+ this.connectionInitiated = true;
+
+ // Creates the UAM connection
+ createUAMProxy(amrmToken);
+ }
+
+ protected void createUAMProxy(Token amrmToken)
+ throws IOException {
+ this.userUgi = UserGroupInformation.createProxyUser(
+ this.applicationId.toString(), UserGroupInformation.getCurrentUser());
+ this.rmProxy = createRMProxy(ApplicationMasterProtocol.class, this.conf,
+ this.userUgi, amrmToken);
+ }
+
+ /**
* Registers this {@link UnmanagedApplicationManager} with the resource
* manager.
*
- * @param request the register request
- * @return the register response
+ * @param request RegisterApplicationMasterRequest
+ * @return register response
* @throws YarnException if register fails
* @throws IOException if register fails
*/
- public RegisterApplicationMasterResponse createAndRegisterApplicationMaster(
+ public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request)
throws YarnException, IOException {
- // This need to be done first in this method, because it is used as an
- // indication that this method is called (and perhaps blocked due to RM
- // connection and not finished yet)
+ // Save the register request for re-register later
this.registerRequest = request;
- // attemptId will be available after this call
- UnmanagedAMIdentifier identifier =
- initializeUnmanagedAM(this.applicationId);
-
- try {
- this.userUgi = UserGroupInformation.createProxyUser(
- identifier.getAttemptId().toString(),
- UserGroupInformation.getCurrentUser());
- } catch (IOException e) {
- LOG.error("Exception while trying to get current user", e);
- throw new YarnRuntimeException(e);
- }
-
- this.rmProxy = createRMProxy(ApplicationMasterProtocol.class, this.conf,
- this.userUgi, identifier.getToken());
-
- LOG.info("Registering the Unmanaged application master {}", this.attemptId);
+ // Since we have setKeepContainersAcrossApplicationAttempts = true for UAM.
+ // We do not expect application already registered exception here
+ LOG.info("Registering the Unmanaged application master {}", this.applicationId);
RegisterApplicationMasterResponse response =
this.rmProxy.registerApplicationMaster(this.registerRequest);
+ for (Container container : response.getContainersFromPreviousAttempts()) {
+ LOG.info("RegisterUAM returned existing running container "
+ + container.getId());
+ }
+ for (NMToken nmToken : response.getNMTokensFromPreviousAttempts()) {
+ LOG.info("RegisterUAM returned existing NM token for node "
+ + nmToken.getNodeId());
+ }
+
// Only when register succeed that we start the heartbeat thread
this.handlerThread.setUncaughtExceptionHandler(
new HeartBeatThreadUncaughtExceptionHandler());
@@ -187,11 +232,11 @@ public FinishApplicationMasterResponse finishApplicationMaster(
this.handlerThread.shutdown();
if (this.rmProxy == null) {
- if (this.registerRequest != null) {
- // This is possible if the async registerApplicationMaster is still
+ if (this.connectionInitiated) {
+ // This is possible if the async launchUAM is still
// blocked and retrying. Return a dummy response in this case.
LOG.warn("Unmanaged AM still not successfully launched/registered yet."
- + " Stopping the UAM client thread anyways.");
+ + " Stopping the UAM heartbeat thread anyways.");
return FinishApplicationMasterResponse.newInstance(false);
} else {
throw new YarnException("finishApplicationMaster should not "
@@ -199,7 +244,7 @@ public FinishApplicationMasterResponse finishApplicationMaster(
}
}
return AMRMClientUtils.finishAMWithReRegister(request, this.rmProxy,
- this.registerRequest, this.attemptId);
+ this.registerRequest, this.applicationId);
}
/**
@@ -212,7 +257,7 @@ public FinishApplicationMasterResponse finishApplicationMaster(
public KillApplicationResponse forceKillApplication()
throws IOException, YarnException {
KillApplicationRequest request =
- KillApplicationRequest.newInstance(this.attemptId.getApplicationId());
+ KillApplicationRequest.newInstance(this.applicationId);
this.handlerThread.shutdown();
@@ -240,29 +285,29 @@ public void allocateAsync(AllocateRequest request,
LOG.debug("Interrupted while waiting to put on response queue", ex);
}
// Two possible cases why the UAM is not successfully registered yet:
- // 1. registerApplicationMaster is not called at all. Should throw here.
- // 2. registerApplicationMaster is called but hasn't successfully returned.
+ // 1. launchUAM is not called at all. Should throw here.
+ // 2. launchUAM is called but hasn't successfully returned.
//
// In case 2, we have already save the allocate request above, so if the
// registration succeed later, no request is lost.
if (this.rmProxy == null) {
- if (this.registerRequest != null) {
+ if (this.connectionInitiated) {
LOG.info("Unmanaged AM still not successfully launched/registered yet."
+ " Saving the allocate request and send later.");
} else {
throw new YarnException(
- "AllocateAsync should not be called before createAndRegister");
+ "AllocateAsync should not be called before launchUAM");
}
}
}
/**
- * Returns the application attempt id of the UAM.
+ * Returns the application id of the UAM.
*
- * @return attempt id of the UAM
+ * @return application id of the UAM
*/
- public ApplicationAttemptId getAttemptId() {
- return this.attemptId;
+ public ApplicationId getAppId() {
+ return this.applicationId;
}
/**
@@ -287,15 +332,15 @@ public ApplicationAttemptId getAttemptId() {
* Launch and initialize an unmanaged AM. First, it creates a new application
* on the RM and negotiates a new attempt id. Then it waits for the RM
* application attempt state to reach YarnApplicationAttemptState.LAUNCHED
- * after which it returns the AM-RM token and the attemptId.
+ * after which it returns the AM-RM token.
*
* @param appId application id
- * @return the UAM identifier
+ * @return the UAM token
* @throws IOException if initialize fails
* @throws YarnException if initialize fails
*/
- protected UnmanagedAMIdentifier initializeUnmanagedAM(ApplicationId appId)
- throws IOException, YarnException {
+ protected Token initializeUnmanagedAM(
+ ApplicationId appId) throws IOException, YarnException {
try {
UserGroupInformation appSubmitter =
UserGroupInformation.createRemoteUser(this.submitter);
@@ -306,13 +351,12 @@ protected UnmanagedAMIdentifier initializeUnmanagedAM(ApplicationId appId)
submitUnmanagedApp(appId);
// Monitor the application attempt to wait for launch state
- ApplicationAttemptReport attemptReport = monitorCurrentAppAttempt(appId,
+ monitorCurrentAppAttempt(appId,
EnumSet.of(YarnApplicationState.ACCEPTED,
YarnApplicationState.RUNNING, YarnApplicationState.KILLED,
YarnApplicationState.FAILED, YarnApplicationState.FINISHED),
YarnApplicationAttemptState.LAUNCHED);
- this.attemptId = attemptReport.getApplicationAttemptId();
- return getUAMIdentifier();
+ return getUAMToken();
} finally {
this.rmClient = null;
}
@@ -343,6 +387,8 @@ private void submitUnmanagedApp(ApplicationId appId)
submitRequest.setApplicationSubmissionContext(context);
context.setUnmanagedAM(true);
+ // Required for Federation because it need UAM restart support in RM
+ context.setKeepContainersAcrossApplicationAttempts(true);
LOG.info("Submitting unmanaged application {}", appId);
this.rmClient.submitApplication(submitRequest);
@@ -415,25 +461,25 @@ private ApplicationAttemptReport monitorCurrentAppAttempt(ApplicationId appId,
}
/**
- * Gets the identifier of the unmanaged AM.
+ * Gets the amrmToken of the unmanaged AM.
*
- * @return the identifier of the unmanaged AM.
+ * @return the amrmToken of the unmanaged AM.
* @throws IOException if getApplicationReport fails
* @throws YarnException if getApplicationReport fails
*/
- protected UnmanagedAMIdentifier getUAMIdentifier()
+ protected Token getUAMToken()
throws IOException, YarnException {
Token token = null;
org.apache.hadoop.yarn.api.records.Token amrmToken =
- getApplicationReport(this.attemptId.getApplicationId()).getAMRMToken();
+ getApplicationReport(this.applicationId).getAMRMToken();
if (amrmToken != null) {
token = ConverterUtils.convertFromYarn(amrmToken, (Text) null);
} else {
LOG.warn(
"AMRMToken not found in the application report for application: {}",
- this.attemptId.getApplicationId());
+ this.applicationId);
}
- return new UnmanagedAMIdentifier(this.attemptId, token);
+ return token;
}
private ApplicationReport getApplicationReport(ApplicationId appId)
@@ -445,29 +491,6 @@ private ApplicationReport getApplicationReport(ApplicationId appId)
}
/**
- * Data structure that encapsulates the application attempt identifier and the
- * AMRMTokenIdentifier. Make it public because clients with HA need it.
- */
- public static class UnmanagedAMIdentifier {
- private ApplicationAttemptId attemptId;
- private Token token;
-
- public UnmanagedAMIdentifier(ApplicationAttemptId attemptId,
- Token token) {
- this.attemptId = attemptId;
- this.token = token;
- }
-
- public ApplicationAttemptId getAttemptId() {
- return this.attemptId;
- }
-
- public Token getToken() {
- return this.token;
- }
- }
-
- /**
* Data structure that encapsulates AllocateRequest and AsyncCallback
* instance.
*/
@@ -549,8 +572,10 @@ public void run() {
}
request.setResponseId(lastResponseId);
+
AllocateResponse response = AMRMClientUtils.allocateWithReRegister(
- request, rmProxy, registerRequest, attemptId);
+ request, rmProxy, registerRequest, applicationId);
+
if (response == null) {
throw new YarnException("Null allocateResponse from allocate");
}
@@ -578,18 +603,17 @@ public void run() {
LOG.debug("Interrupted while waiting for queue", ex);
}
} catch (IOException ex) {
- LOG.warn(
- "IO Error occurred while processing heart beat for " + attemptId,
- ex);
+ LOG.warn("IO Error occurred while processing heart beat for "
+ + applicationId, ex);
} catch (Throwable ex) {
LOG.warn(
- "Error occurred while processing heart beat for " + attemptId,
+ "Error occurred while processing heart beat for " + applicationId,
ex);
}
}
LOG.info("UnmanagedApplicationManager has been stopped for {}. "
- + "AMRequestHandlerThread thread is exiting", attemptId);
+ + "AMRequestHandlerThread thread is exiting", applicationId);
}
}
@@ -600,8 +624,8 @@ public void run() {
implements UncaughtExceptionHandler {
@Override
public void uncaughtException(Thread t, Throwable e) {
- LOG.error("Heartbeat thread {} for application attempt {} crashed!",
- t.getName(), attemptId, e);
+ LOG.error("Heartbeat thread {} for application {} crashed!",
+ t.getName(), applicationId, e);
}
}
}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java
index 7993bd8..3cecdca 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java
@@ -36,7 +36,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
@@ -63,16 +63,16 @@ private AMRMClientUtils() {
/**
* Handle ApplicationNotRegistered exception and re-register.
*
- * @param attemptId app attemptId
+ * @param appId application Id
* @param rmProxy RM proxy instance
* @param registerRequest the AM re-register request
* @throws YarnException if re-register fails
*/
public static void handleNotRegisteredExceptionAndReRegister(
- ApplicationAttemptId attemptId, ApplicationMasterProtocol rmProxy,
+ ApplicationId appId, ApplicationMasterProtocol rmProxy,
RegisterApplicationMasterRequest registerRequest) throws YarnException {
LOG.info("App attempt {} not registered, most likely due to RM failover. "
- + " Trying to re-register.", attemptId);
+ + " Trying to re-register.", appId);
try {
rmProxy.registerApplicationMaster(registerRequest);
} catch (Exception e) {
@@ -93,25 +93,24 @@ public static void handleNotRegisteredExceptionAndReRegister(
* @param request allocate request
* @param rmProxy RM proxy
* @param registerRequest the register request for re-register
- * @param attemptId application attempt id
+ * @param appId application id
* @return allocate response
* @throws YarnException if RM call fails
* @throws IOException if RM call fails
*/
public static AllocateResponse allocateWithReRegister(AllocateRequest request,
ApplicationMasterProtocol rmProxy,
- RegisterApplicationMasterRequest registerRequest,
- ApplicationAttemptId attemptId) throws YarnException, IOException {
+ RegisterApplicationMasterRequest registerRequest, ApplicationId appId)
+ throws YarnException, IOException {
try {
return rmProxy.allocate(request);
} catch (ApplicationMasterNotRegisteredException e) {
- handleNotRegisteredExceptionAndReRegister(attemptId, rmProxy,
+ handleNotRegisteredExceptionAndReRegister(appId, rmProxy,
registerRequest);
// reset responseId after re-register
request.setResponseId(0);
// retry allocate
- return allocateWithReRegister(request, rmProxy, registerRequest,
- attemptId);
+ return allocateWithReRegister(request, rmProxy, registerRequest, appId);
}
}
@@ -123,23 +122,22 @@ public static AllocateResponse allocateWithReRegister(AllocateRequest request,
* @param request finishApplicationMaster request
* @param rmProxy RM proxy
* @param registerRequest the register request for re-register
- * @param attemptId application attempt id
+ * @param appId application id
* @return finishApplicationMaster response
* @throws YarnException if RM call fails
* @throws IOException if RM call fails
*/
public static FinishApplicationMasterResponse finishAMWithReRegister(
FinishApplicationMasterRequest request, ApplicationMasterProtocol rmProxy,
- RegisterApplicationMasterRequest registerRequest,
- ApplicationAttemptId attemptId) throws YarnException, IOException {
+ RegisterApplicationMasterRequest registerRequest, ApplicationId appId)
+ throws YarnException, IOException {
try {
return rmProxy.finishApplicationMaster(request);
} catch (ApplicationMasterNotRegisteredException ex) {
- handleNotRegisteredExceptionAndReRegister(attemptId, rmProxy,
+ handleNotRegisteredExceptionAndReRegister(appId, rmProxy,
registerRequest);
// retry finishAM after re-register
- return finishAMWithReRegister(request, rmProxy, registerRequest,
- attemptId);
+ return finishAMWithReRegister(request, rmProxy, registerRequest, appId);
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
index 628c781..9709deb 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
@@ -111,6 +111,8 @@
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerReport;
+import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
@@ -177,10 +179,9 @@
LoggerFactory.getLogger(MockResourceManagerFacade.class);
private HashSet applicationMap = new HashSet<>();
- private HashMap> applicationContainerIdMap =
- new HashMap>();
- private HashMap allocatedContainerMap =
- new HashMap();
+ private HashSet keepContainerOnUams = new HashSet<>();
+ private HashMap> applicationContainerIdMap =
+ new HashMap<>();
private AtomicInteger containerIndex = new AtomicInteger(0);
private Configuration conf;
private int subClusterId;
@@ -221,7 +222,7 @@ public void setRunningMode(boolean mode) {
this.isRunning = mode;
}
- private static String getAppIdentifier() throws IOException {
+ private static ApplicationAttemptId getAppIdentifier() throws IOException {
AMRMTokenIdentifier result = null;
UserGroupInformation remoteUgi = UserGroupInformation.getCurrentUser();
Set tokenIds = remoteUgi.getTokenIdentifiers();
@@ -231,7 +232,8 @@ private static String getAppIdentifier() throws IOException {
break;
}
}
- return result != null ? result.getApplicationAttemptId().toString() : "";
+ return result != null ? result.getApplicationAttemptId()
+ : ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0);
}
private void validateRunning() throws ConnectException {
@@ -246,19 +248,32 @@ public RegisterApplicationMasterResponse registerApplicationMaster(
throws YarnException, IOException {
validateRunning();
-
- String amrmToken = getAppIdentifier();
- LOG.info("Registering application attempt: " + amrmToken);
+ ApplicationAttemptId attemptId = getAppIdentifier();
+ LOG.info("Registering application attempt: " + attemptId);
shouldReRegisterNext = false;
+ List containersFromPreviousAttempt = null;
+
synchronized (applicationContainerIdMap) {
- if (applicationContainerIdMap.containsKey(amrmToken)) {
- throw new InvalidApplicationMasterRequestException(
- AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE);
+ if (applicationContainerIdMap.containsKey(attemptId)) {
+ if (keepContainerOnUams.contains(attemptId.getApplicationId())) {
+ // For UAM with the keepContainersFromPreviousAttempt flag, return all
+ // running containers
+ containersFromPreviousAttempt = new ArrayList<>();
+ for (ContainerId containerId : applicationContainerIdMap
+ .get(attemptId)) {
+ containersFromPreviousAttempt.add(Container.newInstance(containerId,
+ null, null, null, null, null));
+ }
+ } else {
+ throw new InvalidApplicationMasterRequestException(
+ AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE);
+ }
+ } else {
+ // Keep track of the containers that are returned to this application
+ applicationContainerIdMap.put(attemptId, new ArrayList());
}
- // Keep track of the containers that are returned to this application
- applicationContainerIdMap.put(amrmToken, new ArrayList());
}
// Make sure we wait for certain test cases last in the method
@@ -278,7 +293,7 @@ public RegisterApplicationMasterResponse registerApplicationMaster(
}
return RegisterApplicationMasterResponse.newInstance(null, null, null, null,
- null, request.getHost(), null);
+ containersFromPreviousAttempt, request.getHost(), null);
}
@Override
@@ -288,8 +303,8 @@ public FinishApplicationMasterResponse finishApplicationMaster(
validateRunning();
- String amrmToken = getAppIdentifier();
- LOG.info("Finishing application attempt: " + amrmToken);
+ ApplicationAttemptId attemptId = getAppIdentifier();
+ LOG.info("Finishing application attempt: " + attemptId);
if (shouldReRegisterNext) {
String message = "AM is not registered, should re-register.";
@@ -299,12 +314,9 @@ public FinishApplicationMasterResponse finishApplicationMaster(
synchronized (applicationContainerIdMap) {
// Remove the containers that were being tracked for this application
- Assert.assertTrue("The application id is NOT registered: " + amrmToken,
- applicationContainerIdMap.containsKey(amrmToken));
- List ids = applicationContainerIdMap.remove(amrmToken);
- for (ContainerId c : ids) {
- allocatedContainerMap.remove(c);
- }
+ Assert.assertTrue("The application id is NOT registered: " + attemptId,
+ applicationContainerIdMap.containsKey(attemptId));
+ applicationContainerIdMap.remove(attemptId);
}
return FinishApplicationMasterResponse.newInstance(
@@ -334,8 +346,8 @@ public AllocateResponse allocate(AllocateRequest request)
+ "askList and releaseList in the same heartbeat");
}
- String amrmToken = getAppIdentifier();
- LOG.info("Allocate from application attempt: " + amrmToken);
+ ApplicationAttemptId attemptId = getAppIdentifier();
+ LOG.info("Allocate from application attempt: " + attemptId);
if (shouldReRegisterNext) {
String message = "AM is not registered, should re-register.";
@@ -367,16 +379,16 @@ public AllocateResponse allocate(AllocateRequest request)
// will need it in future
Assert.assertTrue(
"The application id is Not registered before allocate(): "
- + amrmToken,
- applicationContainerIdMap.containsKey(amrmToken));
- List ids = applicationContainerIdMap.get(amrmToken);
+ + attemptId,
+ applicationContainerIdMap.containsKey(attemptId));
+ List ids = applicationContainerIdMap.get(attemptId);
ids.add(containerId);
- this.allocatedContainerMap.put(containerId, container);
}
}
}
}
+ List completedList = new ArrayList<>();
if (request.getReleaseList() != null
&& request.getReleaseList().size() > 0) {
LOG.info("Releasing containers: " + request.getReleaseList().size());
@@ -384,9 +396,9 @@ public AllocateResponse allocate(AllocateRequest request)
Assert
.assertTrue(
"The application id is not registered before allocate(): "
- + amrmToken,
- applicationContainerIdMap.containsKey(amrmToken));
- List ids = applicationContainerIdMap.get(amrmToken);
+ + attemptId,
+ applicationContainerIdMap.containsKey(attemptId));
+ List ids = applicationContainerIdMap.get(attemptId);
for (ContainerId id : request.getReleaseList()) {
boolean found = false;
@@ -402,18 +414,8 @@ public AllocateResponse allocate(AllocateRequest request)
+ conf.get("AMRMTOKEN"), found);
ids.remove(id);
-
- // Return the released container back to the AM with new fake Ids. The
- // test case does not care about the IDs. The IDs are faked because
- // otherwise the LRM will throw duplication identifier exception. This
- // returning of fake containers is ONLY done for testing purpose - for
- // the test code to get confirmation that the sub-cluster resource
- // managers received the release request
- ContainerId fakeContainerId = ContainerId.newInstance(
- getApplicationAttemptId(1), containerIndex.incrementAndGet());
- Container fakeContainer = allocatedContainerMap.get(id);
- fakeContainer.setId(fakeContainerId);
- containerList.add(fakeContainer);
+ completedList.add(
+ ContainerStatus.newInstance(id, ContainerState.COMPLETE, "", 0));
}
}
}
@@ -424,9 +426,9 @@ public AllocateResponse allocate(AllocateRequest request)
// Always issue a new AMRMToken as if RM rolled master key
Token newAMRMToken = Token.newInstance(new byte[0], "", new byte[0], "");
- return AllocateResponse.newInstance(0, new ArrayList(),
- containerList, new ArrayList(), null, AMCommand.AM_RESYNC,
- 1, null, new ArrayList(), newAMRMToken,
+ return AllocateResponse.newInstance(0, completedList, containerList,
+ new ArrayList(), null, AMCommand.AM_RESYNC, 1, null,
+ new ArrayList(), newAMRMToken,
new ArrayList());
}
@@ -443,6 +445,7 @@ public GetApplicationReportResponse getApplicationReport(
report.setApplicationId(request.getApplicationId());
report.setCurrentApplicationAttemptId(
ApplicationAttemptId.newInstance(request.getApplicationId(), 1));
+ report.setAMRMToken(Token.newInstance(new byte[0], "", new byte[0], ""));
response.setApplicationReport(report);
return response;
}
@@ -486,6 +489,12 @@ public SubmitApplicationResponse submitApplication(
}
LOG.info("Application submitted: " + appId);
applicationMap.add(appId);
+
+ if (request.getApplicationSubmissionContext().getUnmanagedAM()
+ || request.getApplicationSubmissionContext()
+ .getKeepContainersAcrossApplicationAttempts()) {
+ keepContainerOnUams.add(appId);
+ }
return SubmitApplicationResponse.newInstance();
}
@@ -502,6 +511,7 @@ public KillApplicationResponse forceKillApplication(
throw new ApplicationNotFoundException(
"Trying to kill an absent application: " + appId);
}
+ keepContainerOnUams.remove(appId);
}
LOG.info("Force killing application: " + appId);
return KillApplicationResponse.newInstance(true);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java
index 9159cf7..242d945 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java
@@ -88,7 +88,8 @@ protected void waitForCallBackCountAndCheckZeroPending(
public void testBasicUsage()
throws YarnException, IOException, InterruptedException {
- createAndRegisterApplicationMaster(
+ launchUAM(attemptId);
+ registerApplicationMaster(
RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId);
allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback,
@@ -102,11 +103,48 @@ public void testBasicUsage()
attemptId);
}
+ /*
+ * Test re-attaching of an existing UAM. This is for HA of UAM client.
+ */
+ @Test(timeout = 5000)
+ public void testUAMReAttach()
+ throws YarnException, IOException, InterruptedException {
+
+ launchUAM(attemptId);
+ registerApplicationMaster(
+ RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId);
+
+ allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback,
+ attemptId);
+ // Wait for outstanding async allocate callback
+ waitForCallBackCountAndCheckZeroPending(callback, 1);
+
+ MockResourceManagerFacade rmProxy = uam.getRMProxy();
+ uam = new TestableUnmanagedApplicationManager(conf,
+ attemptId.getApplicationId(), null, "submitter", "appNameSuffix");
+ uam.setRMProxy(rmProxy);
+
+ reAttachUAM(null, attemptId);
+ registerApplicationMaster(
+ RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId);
+
+ allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback,
+ attemptId);
+
+ // Wait for outstanding async allocate callback
+ waitForCallBackCountAndCheckZeroPending(callback, 2);
+
+ finishApplicationMaster(
+ FinishApplicationMasterRequest.newInstance(null, null, null),
+ attemptId);
+ }
+
@Test(timeout = 5000)
public void testReRegister()
throws YarnException, IOException, InterruptedException {
- createAndRegisterApplicationMaster(
+ launchUAM(attemptId);
+ registerApplicationMaster(
RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId);
uam.setShouldReRegisterNext();
@@ -137,7 +175,8 @@ public void testSlowRegisterCall()
@Override
public void run() {
try {
- createAndRegisterApplicationMaster(
+ launchUAM(attemptId);
+ registerApplicationMaster(
RegisterApplicationMasterRequest.newInstance(null, 1001, null),
attemptId);
} catch (Exception e) {
@@ -221,7 +260,8 @@ public void testFinishWithoutRegister()
@Test
public void testForceKill()
throws YarnException, IOException, InterruptedException {
- createAndRegisterApplicationMaster(
+ launchUAM(attemptId);
+ registerApplicationMaster(
RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId);
uam.forceKillApplication();
@@ -241,19 +281,40 @@ protected UserGroupInformation getUGIWithToken(
return ugi;
}
- protected RegisterApplicationMasterResponse
- createAndRegisterApplicationMaster(
- final RegisterApplicationMasterRequest request,
- ApplicationAttemptId appAttemptId)
- throws YarnException, IOException, InterruptedException {
+ protected Token launchUAM(
+ ApplicationAttemptId appAttemptId)
+ throws IOException, InterruptedException {
+ return getUGIWithToken(appAttemptId)
+ .doAs(new PrivilegedExceptionAction>() {
+ @Override
+ public Token run() throws Exception {
+ return uam.launchUAM();
+ }
+ });
+ }
+
+ protected void reAttachUAM(final Token uamToken,
+ ApplicationAttemptId appAttemptId)
+ throws IOException, InterruptedException {
+ getUGIWithToken(appAttemptId).doAs(new PrivilegedExceptionAction