diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AsyncCallback.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AsyncCallback.java new file mode 100644 index 0000000..b4f75c9 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AsyncCallback.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.util; + +/** + * Generic interface that can be used for calling back when a corresponding + * asynchronous operation completes. + * + * @param parameter type for the callback + */ +public interface AsyncCallback { + /** + * This method is called back when the corresponding asynchronous operation + * completes. + * + * @param response response of the callback + */ + void callback(T response); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationMaster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationMaster.java new file mode 100644 index 0000000..8b38438 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationMaster.java @@ -0,0 +1,706 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.uam; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.EnumSet; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.SaslRpcServer; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.AsyncCallback; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * UnmanagedApplicationMaster is used to register unmanaged application and + * negotiate for resources from resource managers. Allocate calls are handled + * asynchronously using {@link AsyncCallback}. + */ +public class UnmanagedApplicationMaster { + private static final Logger LOG = + LoggerFactory.getLogger(UnmanagedApplicationMaster.class); + private volatile boolean keepRunning; + private BlockingQueue requestQueue; + private CallbackHandlerThread handlerThread; + private ApplicationMasterProtocol rmProxy; + private boolean registerUAMCalled; + private ApplicationAttemptId attemptId; + private String submitter; + private String appNameSuffix; + private Configuration conf; + private String queueName; + private UserGroupInformation userUgi; + private RegisterApplicationMasterRequest registerRequest; + private int lastResponseId; + + public UnmanagedApplicationMaster(Configuration conf, + ApplicationAttemptId attemptId, String submitter, String appNameSuffix) { + Preconditions.checkNotNull(conf, "Configuration cannot be null"); + Preconditions.checkNotNull(attemptId, + "ApplicationAttemptId cannot be null"); + Preconditions.checkNotNull(submitter, "App submitter cannot be null"); + + this.conf = conf; + this.attemptId = attemptId; + this.submitter = submitter; + this.appNameSuffix = appNameSuffix; + this.handlerThread = new CallbackHandlerThread(); + this.requestQueue = new LinkedBlockingQueue(); + this.keepRunning = true; + this.queueName = null; + this.rmProxy = null; + this.registerUAMCalled = false; + this.userUgi = null; + } + + /** + * Registers this {@link UnmanagedApplicationMaster} with the resource + * manager. + * + * @param request the register request + * @return the register response + * @throws YarnException + * @throws IOException + */ + public RegisterApplicationMasterResponse registerApplicationMaster( + RegisterApplicationMasterRequest request) + throws YarnException, IOException { + this.registerUAMCalled = true; + this.registerRequest = request; + RegisterApplicationMasterResponse response = + registerApplicationMasterInternal(); + + this.handlerThread.setDaemon(true); + this.handlerThread.start(); + + return response; + } + + private RegisterApplicationMasterResponse registerApplicationMasterInternal() + throws YarnException, IOException { + LOG.info("Registering the Unmanaged application master with RM"); + + UnmanagedAMLauncher launcher = new UnmanagedAMLauncher(this.conf, + this.queueName, this.attemptId, this.submitter); + UnmanagedAMIdentifier identifier = + launcher.initializeUnmanagedAM(this.attemptId.getApplicationId()); + + if (this.userUgi == null) { + try { + this.userUgi = UserGroupInformation.createProxyUser( + identifier.getAttemptId().toString(), + UserGroupInformation.getCurrentUser()); + } catch (IOException e) { + LOG.error("Exception while trying to get current user", e); + throw new YarnRuntimeException(e); + } + } + + this.rmProxy = createRMProxy(ApplicationMasterProtocol.class, this.conf, + this.userUgi, identifier.getToken()); + + RegisterApplicationMasterResponse response = + this.rmProxy.registerApplicationMaster(this.registerRequest); + // reset response id on registration + this.lastResponseId = 0; + return response; + } + + /** + * Unregisters from the resource manager and stops the request handler thread. + * + * @param request the finishApplicationMaster request + * @return the response + * @throws YarnException + * @throws IOException + */ + public FinishApplicationMasterResponse finishApplicationMaster( + FinishApplicationMasterRequest request) + throws YarnException, IOException { + if (this.rmProxy == null) { + if (this.registerUAMCalled) { + // This is possible if the async registerApplicationMaster is still + // blocked and retrying + LOG.warn("Unmanaged AM still not successfully launched/registered yet." + + " Stopping the UAM client thread anyways."); + } else { + throw new YarnException( + "finishApplicationMaster should not be called before register"); + } + } + this.keepRunning = false; + this.handlerThread.interrupt(); + if (this.rmProxy == null) { + // Return a dummy response when register call is still stuck. + return FinishApplicationMasterResponse.newInstance(false); + } else { + return this.rmProxy.finishApplicationMaster(request); + } + } + + /** + * Sends the specified heart beat request to the resource manager and invokes + * the callback asynchronously with the response. + * + * @param request the allocate request + * @param callback the callback method for the request + * @throws YarnException + */ + public void allocateAsync(AllocateRequest request, + AsyncCallback callback) throws YarnException { + try { + this.requestQueue.put(new AsyncAllocateRequestInfo(request, callback)); + } catch (InterruptedException ex) { + if (LOG.isDebugEnabled()) { + LOG.debug("Interrupted while waiting to put on response queue", ex); + } + } + // Two possible cases why the UAM is not successfully registered yet: + // 1. registerApplicationMaster is not called at all. Should throw here. + // 2. registerApplicationMaster is called but hasn't successfully returned. + // In case 2, we have already save the allocate request above, so if the + // registration succeed later, no request is lost. + if (this.rmProxy == null) { + if (this.registerUAMCalled) { + LOG.warn("Unmanaged AM still not successfully launched/registered yet." + + " Saving the allocate request and send later."); + } else { + throw new YarnException("AllocateAsync should not " + + "be called before registerApplicationMaster"); + } + } + } + + /** + * Set the RM queue name to which this application will be submitted. + * + * @param queueName the queue name to set + */ + public void setQueueName(String queueName) { + this.queueName = queueName; + } + + /** + * Returns the proxy object used by this instance to connect to the resource + * manager. + * + * @return the RM proxy instance + */ + public ApplicationMasterProtocol getRMProxy() { + return this.rmProxy; + } + + /** + * Returns RM proxy for the specified protocol type. Unit test cases can + * override this method and return mock proxy instances. + * + * @param protocol protocal of the proxy + * @param config configuration + * @param user ugi for the proxy connection + * @param token token for the connection + * @return the proxy instance + * @throws IOException + */ + protected T createRMProxy(final Class protocol, Configuration config, + UserGroupInformation user, Token token) + throws IOException { + try { + final Configuration configuration; + if (token != null) { + configuration = new Configuration(config); + LOG.info( + "Creating RMProxy with a AMRMToken for protocol: {} for user: {}", + protocol.getSimpleName(), user); + token.setService(ClientRMProxy.getAMRMTokenService(configuration)); + user.addToken(token); + configuration.set( + CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + SaslRpcServer.AuthMethod.TOKEN.toString()); + } else { + configuration = config; + LOG.info( + "Creating RMProxy without a token for protocol: {} for user: {}", + protocol.getSimpleName(), user); + } + final T proxyConnection = user.doAs(new PrivilegedExceptionAction() { + @Override + public T run() throws Exception { + return ClientRMProxy.createRMProxy(configuration, protocol); + } + }); + return proxyConnection; + + } catch (InterruptedException e) { + throw new YarnRuntimeException(e); + } + } + + public static void updateAMRMToken( + org.apache.hadoop.yarn.api.records.Token token, UserGroupInformation user, + Configuration conf) { + Token amrmToken = new Token( + token.getIdentifier().array(), token.getPassword().array(), + new Text(token.getKind()), new Text(token.getService())); + // Preserve the token service sent by the RM when adding the token + // to ensure we replace the previous token setup by the RM. + // Afterwards we can update the service address for the RPC layer. + user.addToken(amrmToken); + amrmToken.setService(ClientRMProxy.getAMRMTokenService(conf)); + } + + /** + * The UnmanagedLauncher is used to launch an unmanaged AM. An unmanagedAM is + * an AM that is not launched and managed by the RM. The client creates a new + * application on the RM and negotiates a new attempt id. Then it waits for + * the RM application state to reach be YarnApplicationState.ACCEPTED after + * which it returns the AM-RM token and the attemptId. + */ + public class UnmanagedAMLauncher { + private static final long AM_STATE_WAIT_TIMEOUT_MS = 10000; + private static final String APP_NAME = "Unmanaged-AM"; + private static final String DEFAULT_QUEUE = "default"; + private static final String QUEUE_NAME = "queue.name"; + + private ApplicationAttemptId attemptId; + private String submitter; + private ApplicationClientProtocol rmClient; + private Configuration conf; + private RecordFactory recordFactory; + private String queueName; + + public UnmanagedAMLauncher(Configuration conf, String queueName, + ApplicationAttemptId attemptId, String submitter) { + this.conf = conf; + this.queueName = queueName; + this.attemptId = attemptId; + this.submitter = submitter; + this.recordFactory = RecordFactoryProvider.getRecordFactory(conf); + } + + public UnmanagedAMIdentifier initializeUnmanagedAM( + ApplicationId applicationId) { + UnmanagedAMIdentifier identifier = null; + try { + LOG.info( + "Creating client for launching Unmanaged AM for applicationid: {}", + applicationId); + + UserGroupInformation appSubmitter = + UserGroupInformation.createRemoteUser(this.submitter); + + this.rmClient = createRMProxy(ApplicationClientProtocol.class, + this.conf, appSubmitter, null); + + ApplicationId appId = submitAppAndGetAppId(applicationId); + ApplicationReport appReport = monitorApplicationSubmission(appId, + EnumSet.of(YarnApplicationState.ACCEPTED, + YarnApplicationState.RUNNING, YarnApplicationState.KILLED, + YarnApplicationState.FAILED, YarnApplicationState.FINISHED)); + + if (appReport + .getYarnApplicationState() == YarnApplicationState.ACCEPTED) { + // Monitor the application attempt to wait for launch state + ApplicationAttemptReport attemptReport = monitorCurrentAppAttempt( + appId, YarnApplicationAttemptState.LAUNCHED); + ApplicationAttemptId appAttemptId = + attemptReport.getApplicationAttemptId(); + LOG.info("Launching UAM with application attempt id {}", + appAttemptId); + + /** + * TODO: we enforce that the attempt id must be the same in all + * subclusters. Need to revisit here once we turn on HA, and that + * attempt id could be larger than one. + */ + if (!appAttemptId.equals(this.attemptId)) { + throw new YarnException("Invalid state. The application attempt " + + "id returned by RM is different. Required id: " + + this.attemptId + " Received id:" + appAttemptId); + } + + identifier = getUAMIdentifier(); + } else { + throw new YarnRuntimeException( + "Received non-accepted application state: " + + appReport.getYarnApplicationState() + + ". Application attempt " + this.attemptId + + " not the first attempt?"); + } + } catch (Exception e) { + throw new YarnRuntimeException("Error launching unmanaged AM", e); + } finally { + this.rmClient = null; + } + return identifier; + } + + /** + * Gets the identifier of the unmanaged AM. + * + * @return the identifier of the unmanaged AM. + * @throws IOException + * @throws YarnException + */ + public UnmanagedAMIdentifier getUAMIdentifier() + throws IOException, YarnException { + + Object amrmToken = getApplicationReport(this.attemptId.getApplicationId()) + .getAMRMToken(); + if (amrmToken != null) { + Token token = ConverterUtils.convertFromYarn( + getApplicationReport(this.attemptId.getApplicationId()) + .getAMRMToken(), + (Text) null); + return new UnmanagedAMIdentifier(this.attemptId, token); + } else { + LOG.warn( + "AMRMToken not found in the application report for attemptId: {}", + this.attemptId); + return new UnmanagedAMIdentifier(this.attemptId, null); + } + } + + private ApplicationId submitAppAndGetAppId(ApplicationId originalAppId) + throws Exception { + this.recordFactory = RecordFactoryProvider.getRecordFactory(this.conf); + SubmitApplicationRequest submitRequest = + this.recordFactory.newRecordInstance(SubmitApplicationRequest.class); + + LOG.info("Setting up unmanaged application submission context: {}", + this.attemptId); + ApplicationSubmissionContext context = this.recordFactory + .newRecordInstance(ApplicationSubmissionContext.class); + + context.setApplicationId(originalAppId); + + // ApplicationName in the secondary SubCluster has the following format + // Unmanaged-AM-{appNameSuffix} + context.setApplicationName(APP_NAME + "-" + appNameSuffix); + + if (StringUtils.isBlank(this.queueName)) { + context.setQueue(this.conf.get(QUEUE_NAME, DEFAULT_QUEUE)); + } else { + context.setQueue(this.queueName); + } + + ContainerLaunchContext amContainer = + this.recordFactory.newRecordInstance(ContainerLaunchContext.class); + Resource resource = BuilderUtils.newResource(1024, 1); + context.setResource(resource); + context.setAMContainerSpec(amContainer); + submitRequest.setApplicationSubmissionContext(context); + + context.setUnmanagedAM(true); + + this.rmClient.submitApplication(submitRequest); + + LOG.info("Submitting unmanaged application: {} ", this.attemptId); + return originalAppId; + } + + /** + * Monitor the submitted application for either acceptance or failure. + * + * @param appId Application Id of application to be monitored + * @return the application report + * @throws YarnException + * @throws IOException + */ + private ApplicationReport monitorApplicationSubmission(ApplicationId appId, + Set finalState) + throws YarnException, IOException { + while (true) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.debug("Thread sleep in monitoring loop interrupted"); + } + + // Get application report for the appId we are interested in + ApplicationReport report = getApplicationReport(appId); + + if (LOG.isDebugEnabled()) { + StringBuffer buffer = new StringBuffer(); + buffer.append("Got app report from ASM for"); + buffer.append(", appId=").append(appId.getId()); + buffer.append(", appAttemptId="); + buffer.append(report.getCurrentApplicationAttemptId() != null + ? report.getCurrentApplicationAttemptId() : ""); + buffer.append(", clientToAMToken="); + buffer.append(report.getClientToAMToken() != null + ? report.getClientToAMToken() : ""); + buffer.append(", appDiagnostics=").append( + report.getDiagnostics() != null ? report.getDiagnostics() : ""); + buffer.append(", appMasterHost=") + .append(report.getHost() != null ? report.getHost() : ""); + buffer.append(", appQueue=") + .append(report.getQueue() != null ? report.getQueue() : ""); + buffer.append(", appMasterRpcPort=").append(report.getRpcPort()); + buffer.append(", appStartTime=").append(report.getStartTime()); + buffer.append(", yarnAppState=") + .append(report.getYarnApplicationState().toString()); + buffer.append(", distributedFinalState=") + .append(report.getFinalApplicationStatus().toString()); + buffer.append(", appTrackingUrl=").append( + report.getTrackingUrl() != null ? report.getTrackingUrl() : ""); + buffer.append(", appUser=") + .append(report.getUser() != null ? report.getUser() : ""); + + LOG.debug(buffer.toString()); + } + + YarnApplicationState state = report.getYarnApplicationState(); + if (finalState.contains(state)) { + return report; + } + LOG.info("Current attempt state of {} is {}, will retry later.", appId, + state); + } + } + + private ApplicationAttemptReport monitorCurrentAppAttempt( + ApplicationId appId, YarnApplicationAttemptState attemptState) + throws YarnException, IOException { + long startTime = System.currentTimeMillis(); + ApplicationAttemptId appAttemptId = null; + while (true) { + if (appAttemptId == null) { + appAttemptId = + getApplicationReport(appId).getCurrentApplicationAttemptId(); + } + + ApplicationAttemptReport attemptReport = null; + if (appAttemptId != null) { + GetApplicationAttemptReportRequest req = this.recordFactory + .newRecordInstance(GetApplicationAttemptReportRequest.class); + req.setApplicationAttemptId(appAttemptId); + attemptReport = this.rmClient.getApplicationAttemptReport(req) + .getApplicationAttemptReport(); + if (attemptState + .equals(attemptReport.getYarnApplicationAttemptState())) { + return attemptReport; + } + } + + LOG.info("Current attempt state of " + appId + " is " + + (attemptReport == null ? " N/A " + : attemptReport.getYarnApplicationAttemptState()) + + ", waiting for current attempt to reach " + attemptState); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting for current attempt of " + appId + + " to reach " + attemptState); + } + + if (System.currentTimeMillis() - startTime > AM_STATE_WAIT_TIMEOUT_MS) { + String errmsg = "Timeout for waiting current attempt of " + appId + + " to reach " + attemptState; + LOG.error(errmsg); + throw new RuntimeException(errmsg); + } + } + } + + private ApplicationReport getApplicationReport(ApplicationId appId) + throws YarnException, IOException { + GetApplicationReportRequest request = this.recordFactory + .newRecordInstance(GetApplicationReportRequest.class); + request.setApplicationId(appId); + return this.rmClient.getApplicationReport(request).getApplicationReport(); + } + } + + /** + * Private structure that encapsulates the application attempt identifier and + * the AMRMTokenIdentifier. + * + */ + public static class UnmanagedAMIdentifier { + private ApplicationAttemptId attemptId; + private Token token; + + public UnmanagedAMIdentifier(ApplicationAttemptId attemptId, + Token token) { + this.attemptId = attemptId; + this.token = token; + } + + public ApplicationAttemptId getAttemptId() { + return this.attemptId; + } + + public Token getToken() { + return this.token; + } + } + + /** + * Data structure that encapsulates AllocateRequest and AsyncCallback + * instance. + */ + public static class AsyncAllocateRequestInfo { + private AllocateRequest request; + private AsyncCallback callback; + + public AsyncAllocateRequestInfo(AllocateRequest request, + AsyncCallback callback) { + Preconditions.checkArgument(request != null, + "AllocateRequest cannot be null"); + Preconditions.checkArgument(callback != null, "Callback cannot be null"); + + this.request = request; + this.callback = callback; + } + + public AsyncCallback getCallback() { + return this.callback; + } + + public AllocateRequest getRequest() { + return this.request; + } + } + + /** + * Extends Thread and provides an implementation that is used for processing + * the AM heart beat request asynchronously and sending back the response + * using the callback method registered with the system. + */ + public class CallbackHandlerThread extends Thread { + public CallbackHandlerThread() { + super("UnmanagedApplicationMaster Callback Handler Thread"); + } + + @Override + public void run() { + while (keepRunning) { + AsyncAllocateRequestInfo requestInfo; + try { + requestInfo = requestQueue.take(); + if (requestInfo == null) { + throw new RuntimeException( + "Null requestInfo taken from request queue"); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Sending Heartbeat to Unmanaged AM. AskList:" + + ((requestInfo.getRequest().getAskList() == null) ? " empty" + : requestInfo.getRequest().getAskList().size())); + } + + if (!keepRunning) { + break; + } + + // set the response id before forwarding the allocate request as we + // could have different values for each sub-cluster based on whether + // the particular RM has failover or not in the interim + AllocateRequest request = requestInfo.getRequest(); + if (request == null) { + throw new RuntimeException("Null allocateRequest from requestInfo"); + } + + request.setResponseId(lastResponseId); + AllocateResponse response = rmProxy.allocate(request); + if (response == null) { + throw new RuntimeException("Null allocateResponse from allocate"); + } + + lastResponseId = response.getResponseId(); + // update token if RM has reissued/renewed + if (response.getAMRMToken() != null) { + LOG.debug("Received new AMRMToken"); + updateAMRMToken(response.getAMRMToken(), userUgi, conf); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Received Heartbeat reply from RM. Allocated Containers:" + + ((response.getAllocatedContainers() == null) ? " empty" + : response.getAllocatedContainers().size())); + } + + if (requestInfo.getCallback() == null) { + throw new RuntimeException("Null callback from requestInfo"); + } + requestInfo.getCallback().callback(response); + } catch (InterruptedException ex) { + if (LOG.isDebugEnabled()) { + LOG.debug("Interrupted while waiting for queue", ex); + } + } catch (IOException ex) { + LOG.info("IO Error occurred while processing heart beat", ex); + } catch (ApplicationMasterNotRegisteredException ex) { + LOG.info("Unmanaged ApplicationMaster --> {} not registered. " + + "Trying to re-register.", attemptId); + try { + registerApplicationMasterInternal(); + } catch (Exception e) { + LOG.error("Error trying to re-register UAM", e); + } + } catch (Throwable ex) { + LOG.info("Error occurred while processing heart beat", ex); + } + } + + LOG.info("UnmanagedApplicationMaster has been stopped. " + + "CallbackHandlerThread thread is exiting"); + } + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/package-info.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/package-info.java new file mode 100644 index 0000000..0e78094 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.uam; \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/MockResourceManagerFacade.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/MockResourceManagerFacade.java new file mode 100644 index 0000000..3ab6a0d --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/MockResourceManagerFacade.java @@ -0,0 +1,428 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.uam; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Set; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NMToken; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.ReservationAllocationState; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.UpdatedContainer; +import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.util.Records; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Mock Resource Manager facade implementation that exposes all the methods + * implemented by the YARN RM. The behavior and the values returned by this mock + * implementation is expected by the unit test cases. So please change the + * implementation with care. + */ +public class MockResourceManagerFacade + implements ApplicationMasterProtocol, ApplicationClientProtocol { + + private static final Logger LOG = + LoggerFactory.getLogger(MockResourceManagerFacade.class); + + private HashMap> applicationContainerIdMap = + new HashMap>(); + private HashMap allocatedContainerMap = + new HashMap(); + + public MockResourceManagerFacade() { + } + + private static String getAppIdentifier() throws IOException { + AMRMTokenIdentifier result = null; + UserGroupInformation remoteUgi = UserGroupInformation.getCurrentUser(); + Set tokenIds = remoteUgi.getTokenIdentifiers(); + for (TokenIdentifier tokenId : tokenIds) { + if (tokenId instanceof AMRMTokenIdentifier) { + result = (AMRMTokenIdentifier) tokenId; + break; + } + } + return result != null ? result.getApplicationAttemptId().toString() : ""; + } + + @Override + public RegisterApplicationMasterResponse registerApplicationMaster( + RegisterApplicationMasterRequest request) + throws YarnException, IOException { + String amrmToken = getAppIdentifier(); + LOG.info("Registering application attempt: " + amrmToken); + + // We reuse the port number to indicate how long the unit test what us to + // sleep here + if (request.getRpcPort() > 0) { + LOG.info("Register call sleeping for {} miliseconds", + request.getRpcPort()); + try { + Thread.sleep(request.getRpcPort()); + LOG.info("Sleep finished"); + } catch (InterruptedException e) { + LOG.info("Sleep interrupted", e); + } + } + + synchronized (applicationContainerIdMap) { + Assert.assertFalse( + "The application id is already registered: " + amrmToken, + applicationContainerIdMap.containsKey(amrmToken)); + // Keep track of the containers that are returned to this + // application + applicationContainerIdMap.put(amrmToken, new ArrayList()); + } + + return RegisterApplicationMasterResponse.newInstance(null, null, null, null, + null, request.getHost(), null); + } + + @Override + public FinishApplicationMasterResponse finishApplicationMaster( + FinishApplicationMasterRequest request) + throws YarnException, IOException { + String amrmToken = getAppIdentifier(); + LOG.info("Finishing application attempt: " + amrmToken); + + synchronized (applicationContainerIdMap) { + // Remove the containers that were being tracked for this + // application + Assert.assertTrue("The application id is NOT registered: " + amrmToken, + applicationContainerIdMap.containsKey(amrmToken)); + List ids = applicationContainerIdMap.remove(amrmToken); + for (ContainerId c : ids) { + allocatedContainerMap.remove(c); + } + } + + return FinishApplicationMasterResponse.newInstance( + request.getFinalApplicationStatus() == FinalApplicationStatus.SUCCEEDED + ? true : false); + } + + @Override + public AllocateResponse allocate(AllocateRequest request) + throws YarnException, IOException { + String amrmToken = getAppIdentifier(); + LOG.info("Allocate call from application attempt: " + amrmToken); + + return AllocateResponse.newInstance(0, new ArrayList(), + new ArrayList(), new ArrayList(), null, null, 1, + null, new ArrayList(), null, + new ArrayList()); + } + + @Override + public GetApplicationReportResponse getApplicationReport( + GetApplicationReportRequest request) throws YarnException, IOException { + + GetApplicationReportResponse response = + Records.newRecord(GetApplicationReportResponse.class); + ApplicationReport report = Records.newRecord(ApplicationReport.class); + report.setYarnApplicationState(YarnApplicationState.ACCEPTED); + report.setApplicationId(request.getApplicationId()); + report.setCurrentApplicationAttemptId( + ApplicationAttemptId.newInstance(request.getApplicationId(), 1)); + response.setApplicationReport(report); + return response; + } + + @Override + public GetApplicationAttemptReportResponse getApplicationAttemptReport( + GetApplicationAttemptReportRequest request) + throws YarnException, IOException { + GetApplicationAttemptReportResponse response = + Records.newRecord(GetApplicationAttemptReportResponse.class); + ApplicationAttemptReport report = + Records.newRecord(ApplicationAttemptReport.class); + report.setApplicationAttemptId(request.getApplicationAttemptId()); + report.setYarnApplicationAttemptState(YarnApplicationAttemptState.LAUNCHED); + response.setApplicationAttemptReport(report); + return response; + } + + @Override + public GetNewApplicationResponse getNewApplication( + GetNewApplicationRequest request) throws YarnException, IOException { + return GetNewApplicationResponse.newInstance(null, null, null); + } + + @Override + public SubmitApplicationResponse submitApplication( + SubmitApplicationRequest request) throws YarnException, IOException { + String appId = ""; + if (request.getApplicationSubmissionContext() == null && request + .getApplicationSubmissionContext().getApplicationId() != null) { + appId = request.getApplicationSubmissionContext().getApplicationId() + .toString(); + } + LOG.info("Submitting application: " + appId); + return SubmitApplicationResponse.newInstance(); + } + + @Override + public KillApplicationResponse forceKillApplication( + KillApplicationRequest request) throws YarnException, IOException { + String appId = ""; + if (request.getApplicationId() == null) { + appId = request.getApplicationId().toString(); + } + LOG.info("Force killing application: " + appId); + return KillApplicationResponse.newInstance(true); + } + + @Override + public GetClusterMetricsResponse getClusterMetrics( + GetClusterMetricsRequest request) throws YarnException, IOException { + return GetClusterMetricsResponse.newInstance(null); + } + + @Override + public GetApplicationsResponse getApplications(GetApplicationsRequest request) + throws YarnException, IOException { + return GetApplicationsResponse.newInstance(null); + } + + @Override + public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request) + throws YarnException, IOException { + return GetClusterNodesResponse.newInstance(null); + } + + @Override + public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request) + throws YarnException, IOException { + return GetQueueInfoResponse.newInstance(null); + } + + @Override + public GetQueueUserAclsInfoResponse getQueueUserAcls( + GetQueueUserAclsInfoRequest request) throws YarnException, IOException { + return GetQueueUserAclsInfoResponse.newInstance(null); + } + + @Override + public GetDelegationTokenResponse getDelegationToken( + GetDelegationTokenRequest request) throws YarnException, IOException { + return GetDelegationTokenResponse.newInstance(null); + } + + @Override + public RenewDelegationTokenResponse renewDelegationToken( + RenewDelegationTokenRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public CancelDelegationTokenResponse cancelDelegationToken( + CancelDelegationTokenRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( + MoveApplicationAcrossQueuesRequest request) + throws YarnException, IOException { + return MoveApplicationAcrossQueuesResponse.newInstance(); + } + + @Override + public GetApplicationAttemptsResponse getApplicationAttempts( + GetApplicationAttemptsRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public GetContainerReportResponse getContainerReport( + GetContainerReportRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public GetContainersResponse getContainers(GetContainersRequest request) + throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public ReservationSubmissionResponse submitReservation( + ReservationSubmissionRequest request) throws YarnException, IOException { + return ReservationSubmissionResponse.newInstance(); + } + + @Override + public ReservationListResponse listReservations( + ReservationListRequest request) throws YarnException, IOException { + return ReservationListResponse + .newInstance(new ArrayList()); + } + + @Override + public ReservationUpdateResponse updateReservation( + ReservationUpdateRequest request) throws YarnException, IOException { + return ReservationUpdateResponse.newInstance(); + } + + @Override + public ReservationDeleteResponse deleteReservation( + ReservationDeleteRequest request) throws YarnException, IOException { + return ReservationDeleteResponse.newInstance(); + } + + @Override + public GetNodesToLabelsResponse getNodeToLabels( + GetNodesToLabelsRequest request) throws YarnException, IOException { + return GetNodesToLabelsResponse + .newInstance(new HashMap>()); + } + + @Override + public GetClusterNodeLabelsResponse getClusterNodeLabels( + GetClusterNodeLabelsRequest request) throws YarnException, IOException { + return GetClusterNodeLabelsResponse.newInstance(new ArrayList()); + } + + @Override + public GetLabelsToNodesResponse getLabelsToNodes( + GetLabelsToNodesRequest request) throws YarnException, IOException { + return GetLabelsToNodesResponse.newInstance(null); + } + + @Override + public GetNewReservationResponse getNewReservation( + GetNewReservationRequest request) throws YarnException, IOException { + return GetNewReservationResponse + .newInstance(ReservationId.newInstance(0, 0)); + } + + @Override + public FailApplicationAttemptResponse failApplicationAttempt( + FailApplicationAttemptRequest request) throws YarnException, IOException { + return FailApplicationAttemptResponse.newInstance(); + } + + @Override + public UpdateApplicationPriorityResponse updateApplicationPriority( + UpdateApplicationPriorityRequest request) + throws YarnException, IOException { + return UpdateApplicationPriorityResponse.newInstance(null); + } + + @Override + public SignalContainerResponse signalToContainer( + SignalContainerRequest request) throws YarnException, IOException { + return null; + } + + @Override + public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( + UpdateApplicationTimeoutsRequest request) + throws YarnException, IOException { + return UpdateApplicationTimeoutsResponse.newInstance(); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationMaster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationMaster.java new file mode 100644 index 0000000..d645428 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationMaster.java @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.uam; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.util.AsyncCallback; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Unit test for UnmanagedApplicationMaster. + */ +public class TestUnmanagedApplicationMaster { + private static final Logger LOG = + LoggerFactory.getLogger(UnmanagedApplicationMaster.class); + + private UnmanagedApplicationMaster uam; + private Configuration conf = new YarnConfiguration(); + private AsyncCallback callback; + private int callBackCount; + + @Before + public void setup() { + conf.set(YarnConfiguration.RM_CLUSTER_ID, "subclusterId"); + callback = new CountingCallback(); + callBackCount = 0; + + ApplicationAttemptId attemptId = + ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1); + + uam = new TestableUnmanagedApplicationMaster(conf, attemptId, "submitter", + "appNameSuffix"); + } + + @Test + public void testBasicUsage() + throws YarnException, IOException, InterruptedException { + + uam.registerApplicationMaster( + RegisterApplicationMasterRequest.newInstance(null, 0, null)); + + uam.allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), + callback); + + // Wait for the async allocate callback + Thread.sleep(100); + Assert.assertEquals("Received wrong number of allocate callbacks", 1, + callBackCount); + + uam.finishApplicationMaster( + FinishApplicationMasterRequest.newInstance(null, null, null)); + } + + /** + * If register is slow, async allocate calls in the meanwhile should not throw + * or be dropped. + */ + @Test + public void testSlowRegisterCall() + throws YarnException, IOException, InterruptedException { + + // Register with delay/sleep 0.5s in a separate thread + Thread registerAMThread = new Thread(new Runnable() { + @Override + public void run() { + try { + uam.registerApplicationMaster( + RegisterApplicationMasterRequest.newInstance(null, 500, null)); + } catch (Exception e) { + LOG.info("Register thread exception", e); + } + } + }); + registerAMThread.start(); + LOG.info("Register thread started"); + + // A short sleep to let register call in the thread go through + Thread.sleep(100); + + // First allocate before register succeeds + uam.allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), + callback); + + // Wait for register thread to finish + registerAMThread.join(); + + // Second allocate, normal case + uam.allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), + callback); + + // Wait for the async allocate callback + Thread.sleep(100); + Assert.assertEquals("Received wrong number of allocate callbacks", 2, + callBackCount); + + uam.finishApplicationMaster( + FinishApplicationMasterRequest.newInstance(null, null, null)); + + // Third allocate after finishAM should be ignored + uam.allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), + callback); + + // Wait for the async allocate callback + Thread.sleep(100); + Assert.assertEquals("Received wrong number of allocate callbacks", 2, + callBackCount); + } + + @Test(expected = YarnException.class) + public void testAllocateWithoutRegister() throws YarnException { + uam.allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), + callback); + } + + @Test(expected = YarnException.class) + public void testFinishWithoutRegister() throws YarnException, IOException { + uam.finishApplicationMaster( + FinishApplicationMasterRequest.newInstance(null, null, null)); + } + + protected class CountingCallback implements AsyncCallback { + @Override + public void callback(AllocateResponse response) { + synchronized (this) { + callBackCount++; + } + } + } + + /** + * Testable UnmanagedApplicationMaster that talks to a mock RM. + */ + public static class TestableUnmanagedApplicationMaster + extends UnmanagedApplicationMaster { + + public TestableUnmanagedApplicationMaster(Configuration conf, + ApplicationAttemptId attemptId, String submitter, + String appNameSuffix) { + super(conf, attemptId, submitter, appNameSuffix); + } + + @SuppressWarnings("unchecked") + @Override + protected T createRMProxy(final Class protocol, Configuration config, + UserGroupInformation user, Token token) { + return (T) new MockResourceManagerFacade(); + } + } + +} \ No newline at end of file