diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClient.java
index 20235d0..84c0166 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClient.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClient.java
@@ -63,7 +63,9 @@
/**
*
- * Submit a new application to YARN.
+ * Submit a new application to YARN. It is a blocking call, such
+ * that it will not return {@link ApplicationId} until the submitted
+ * application has enter SUBMITTED or the states afterwards.
*
*
* @param appContext
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java
index eb84b31..fc8d3de 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java
@@ -53,9 +53,11 @@
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.util.Records;
@@ -68,6 +70,7 @@
protected ClientRMProtocol rmClient;
protected InetSocketAddress rmAddress;
+ protected long statePollIntervalMillis;
private static final String ROOT = "root";
@@ -90,6 +93,9 @@ public synchronized void init(Configuration conf) {
if (this.rmAddress == null) {
this.rmAddress = getRmAddress(conf);
}
+ statePollIntervalMillis = conf.getLong(
+ YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL,
+ YarnConfiguration.DEFAULT_YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL);
super.init(conf);
}
@@ -133,6 +139,28 @@ public GetNewApplicationResponse getNewApplication()
rmClient.submitApplication(request);
LOG.info("Submitted application " + applicationId + " to ResourceManager"
+ " at " + rmAddress);
+
+ int pollCount = 0;
+ while (true) {
+ YarnApplicationState state =
+ getApplicationReport(applicationId).getYarnApplicationState();
+ if (!state.equals(YarnApplicationState.NEW) &&
+ !state.equals(YarnApplicationState.NEW_SAVING)) {
+ break;
+ }
+ // Notify the client through the log every 10 poll, in case the client
+ // is blocked here too long.
+ if (++pollCount % 10 == 0) {
+ LOG.info("Application submission is not finished, " +
+ "submitted application " + applicationId +
+ " is still in " + state);
+ }
+ try {
+ Thread.sleep(statePollIntervalMillis);
+ } catch (InterruptedException ie) {
+ throw RPCUtil.getRemoteException(ie);
+ }
+ }
return applicationId;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java
index 3d7f120..f379f51 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java
@@ -18,10 +18,23 @@
package org.apache.hadoop.yarn.client;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import junit.framework.Assert;
+
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.client.YarnClient;
-import org.apache.hadoop.yarn.client.YarnClientImpl;
+import org.apache.hadoop.yarn.api.ClientRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.util.Records;
import org.junit.Test;
public class TestYarnClient {
@@ -43,4 +56,92 @@ public void testClientStop() {
client.start();
client.stop();
}
+
+ @Test (timeout = 30000)
+ public void testSubmitApplication() {
+ Configuration conf = new Configuration();
+ final YarnClient client = new MockYarnClient();
+ client.init(conf);
+ client.start();
+ final ApplicationSubmissionContext context =
+ mock(ApplicationSubmissionContext.class);
+ ApplicationId applicationId = Records.newRecord(ApplicationId.class);
+ applicationId.setClusterTimestamp(System.currentTimeMillis());
+ applicationId.setId(1);
+ when(context.getApplicationId()).thenReturn(applicationId);
+
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ try {
+ client.submitApplication(context);
+ } catch (YarnRemoteException e) {
+ Assert.fail("Exception is not expected.");
+ }
+ }
+ };
+
+ ((MockYarnClient) client).setYarnApplicationState(
+ YarnApplicationState.NEW);
+ t.start();
+ int maxTries = 20;
+ // app submission doesn't return when YarnApplicationState.NEW
+ assertThreadAlive(t, maxTries, false);
+ ((MockYarnClient) client).setYarnApplicationState(
+ YarnApplicationState.NEW_SAVING);
+ // app submission doesn't return when YarnApplicationState.NEW_SAVING
+ assertThreadAlive(t, maxTries, false);
+ ((MockYarnClient) client).setYarnApplicationState(
+ YarnApplicationState.SUBMITTED);
+ // app submission returns when YarnApplicationState.SUBMITTED
+ assertThreadAlive(t, maxTries, true);
+ Assert.assertFalse(t.isAlive());
+
+ client.stop();
+ }
+
+ private static void assertThreadAlive(Thread t, int maxTries, boolean mode) {
+ for (int i = 0; i < maxTries && (mode ? t.isAlive() : true); ++i) {
+ if (!mode) {
+ Assert.assertTrue(t.isAlive());
+ }
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ie) {
+ ie.printStackTrace();
+ }
+ }
+ }
+
+ private static class MockYarnClient extends YarnClientImpl {
+ private ApplicationReport mockReport;
+
+ public MockYarnClient() {
+ super();
+ }
+
+ @Override
+ public void start() {
+ rmClient = mock(ClientRMProtocol.class);
+ GetApplicationReportResponse mockResponse =
+ mock(GetApplicationReportResponse.class);
+ mockReport = mock(ApplicationReport.class);
+ try{
+ when(rmClient.getApplicationReport(any(
+ GetApplicationReportRequest.class))).thenReturn(mockResponse);
+ } catch (YarnRemoteException e) {
+ Assert.fail("Exception is not expected.");
+ }
+ when(mockResponse.getApplicationReport()).thenReturn(mockReport);
+ }
+
+ @Override
+ public void stop() {
+ }
+
+ public void setYarnApplicationState(YarnApplicationState state) {
+ when(mockReport.getYarnApplicationState()).thenReturn(state);
+ }
+ }
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 786598b..13f661f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -692,6 +692,19 @@
*/
public static boolean DEFAULT_YARN_MINICLUSTER_FIXED_PORTS = false;
+ ////////////////////////////////
+ // Other Configs
+ ////////////////////////////////
+
+ /**
+ * The interval of the yarn client's querying application state after
+ * application submission. The unit is millisecond.
+ */
+ public static final String YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL =
+ YARN_PREFIX + "client.app-submission.poll-interval";
+ public static final long DEFAULT_YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL =
+ 1000;
+
public YarnConfiguration() {
super();
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index f873ff9..681e221 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -707,4 +707,12 @@
$HADOOP_CONF_DIR,$HADOOP_COMMON_HOME/share/hadoop/common/*,$HADOOP_COMMON_HOME/share/hadoop/common/lib/*,$HADOOP_HDFS_HOME/share/hadoop/hdfs/*,$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*,$HADOOP_YARN_HOME/share/hadoop/yarn/*,$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*
+
+
+ The interval of the yarn client's querying application state
+ after application submission. The unit is millisecond.
+ yarn.client.app-submission.poll-interval
+ 1000
+
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
index bcc1f64..475d5b1 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
@@ -38,7 +38,6 @@
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
@@ -72,7 +71,6 @@
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
@@ -83,15 +81,11 @@
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.InvalidResourceRequestException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@@ -266,48 +260,54 @@ public SubmitApplicationResponse submitApplication(
ApplicationSubmissionContext submissionContext = request
.getApplicationSubmissionContext();
ApplicationId applicationId = submissionContext.getApplicationId();
- String user = submissionContext.getAMContainerSpec().getUser();
+
+ // Validations of the application submission context need to be here, if
+ // they is independent of the RM's configuration. They are only need to be
+ // done once during the newly submission.
+
+ String user = null;
try {
+ // Safety
user = UserGroupInformation.getCurrentUser().getShortUserName();
- if (rmContext.getRMApps().get(applicationId) != null) {
- throw new IOException("Application with id " + applicationId
- + " is already present! Cannot add a duplicate!");
- }
-
- // Safety
submissionContext.getAMContainerSpec().setUser(user);
+ } catch (IOException ie) {
+ LOG.warn("Unable to get the current user.", ie);
+ throw RPCUtil.getRemoteException(ie);
+ }
- // Check whether AM resource requirements are within required limits
- if (!submissionContext.getUnmanagedAM()) {
- ResourceRequest amReq = BuilderUtils.newResourceRequest(
- RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
- submissionContext.getResource(), 1);
- try {
- SchedulerUtils.validateResourceRequest(amReq,
- scheduler.getMaximumResourceCapability());
- } catch (InvalidResourceRequestException e) {
- LOG.warn("RM app submission failed in validating AM resource request"
- + " for application " + applicationId, e);
- throw RPCUtil.getRemoteException(e);
- }
- }
+ // Though duplication will checked again when app is put into rmContext,
+ // but it is good to fail the invalid submission as early as possbile.
+ if (rmContext.getRMApps().get(applicationId) != null) {
+ String message = "Application with id " + applicationId +
+ " is already present! Cannot add a duplicate!";
+ LOG.warn(message);
+ throw RPCUtil.getRemoteException(message);
+ }
+
+ if (submissionContext.getQueue() == null) {
+ submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
+ }
+ if (submissionContext.getApplicationName() == null) {
+ submissionContext.setApplicationName(
+ YarnConfiguration.DEFAULT_APPLICATION_NAME);
+ }
- // This needs to be synchronous as the client can query
- // immediately following the submission to get the application status.
- // So call handle directly and do not send an event.
- rmAppManager.handle(new RMAppManagerSubmitEvent(submissionContext, System
- .currentTimeMillis()));
+ try {
+ // call RMAppManager to submit application directly
+ rmAppManager.submitApplication(submissionContext,
+ System.currentTimeMillis(), false);
LOG.info("Application with id " + applicationId.getId() +
" submitted by user " + user);
RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST,
"ClientRMService", applicationId);
- } catch (IOException ie) {
- LOG.info("Exception in submitting application", ie);
- RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
- ie.getMessage(), "ClientRMService",
+ } catch (YarnRemoteException e) {
+ LOG.info("Exception in submitting application with id " +
+ applicationId.getId(), e);
+ RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
+ e.getMessage(), "ClientRMService",
"Exception in submitting application", applicationId);
- throw RPCUtil.getRemoteException(ie);
+ throw e;
}
SubmitApplicationResponse response = recordFactory
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index 8a92ab1..f373d95 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -31,8 +31,10 @@
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
@@ -45,8 +47,12 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.InvalidResourceRequestException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.util.BuilderUtils;
/**
* This class manages the list of applications for the resource manager.
@@ -233,64 +239,76 @@ protected synchronized void checkAppNumCompletedLimit() {
@SuppressWarnings("unchecked")
protected void submitApplication(
ApplicationSubmissionContext submissionContext, long submitTime,
- boolean isRecovered) {
+ boolean isRecovered) throws YarnRemoteException {
ApplicationId applicationId = submissionContext.getApplicationId();
- RMApp application = null;
- try {
- // Sanity checks
- if (submissionContext.getQueue() == null) {
- submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
- }
- if (submissionContext.getApplicationName() == null) {
- submissionContext.setApplicationName(
- YarnConfiguration.DEFAULT_APPLICATION_NAME);
+ // Validations of the application submission context need to be here, if
+ // they depend on the RM's configuration. They have to be done whether
+ // they are new submission or recovered.
+
+ // Check whether AM resource requirements are within required limits
+ if (!submissionContext.getUnmanagedAM()) {
+ ResourceRequest amReq = BuilderUtils.newResourceRequest(
+ RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
+ submissionContext.getResource(), 1);
+ try {
+ SchedulerUtils.validateResourceRequest(amReq,
+ scheduler.getMaximumResourceCapability());
+ } catch (InvalidResourceRequestException e) {
+ LOG.warn("RM app submission failed in validating AM resource request"
+ + " for application " + applicationId, e);
+ throw RPCUtil.getRemoteException(e);
}
+ }
- // Create RMApp
- application =
- new RMAppImpl(applicationId, rmContext, this.conf,
- submissionContext.getApplicationName(),
- submissionContext.getAMContainerSpec().getUser(),
- submissionContext.getQueue(),
- submissionContext, this.scheduler, this.masterService,
- submitTime);
-
- // Sanity check - duplicate?
- if (rmContext.getRMApps().putIfAbsent(applicationId, application) !=
- null) {
- String message = "Application with id " + applicationId
- + " is already present! Cannot add a duplicate!";
- LOG.info(message);
- throw RPCUtil.getRemoteException(message);
- }
+ // Create RMApp
+ RMApp application =
+ new RMAppImpl(applicationId, rmContext, this.conf,
+ submissionContext.getApplicationName(),
+ submissionContext.getAMContainerSpec().getUser(),
+ submissionContext.getQueue(),
+ submissionContext, this.scheduler, this.masterService,
+ submitTime);
- // Inform the ACLs Manager
- this.applicationACLsManager.addApplication(applicationId,
- submissionContext.getAMContainerSpec().getApplicationACLs());
+ // Concurrent app submissions with same applicationId will fail here
+ // Concurrent app submissions with different applicationIds will not
+ // influence each other
+ if (rmContext.getRMApps().putIfAbsent(applicationId, application) !=
+ null) {
+ String message = "Application with id " + applicationId
+ + " is already present! Cannot add a duplicate!";
+ LOG.warn(message);
+ throw RPCUtil.getRemoteException(message);
+ }
+
+ // Inform the ACLs Manager
+ this.applicationACLsManager.addApplication(applicationId,
+ submissionContext.getAMContainerSpec().getApplicationACLs());
+ try {
// Setup tokens for renewal
if (UserGroupInformation.isSecurityEnabled()) {
this.rmContext.getDelegationTokenRenewer().addApplication(
applicationId,parseCredentials(submissionContext),
submissionContext.getCancelTokensWhenComplete()
);
- }
-
- // All done, start the RMApp
- this.rmContext.getDispatcher().getEventHandler().handle(
- new RMAppEvent(applicationId, isRecovered ? RMAppEventType.RECOVER:
- RMAppEventType.START));
+ }
} catch (IOException ie) {
- LOG.info("RMAppManager submit application exception", ie);
- if (application != null) {
- // Sending APP_REJECTED is fine, since we assume that the
- // RMApp is in NEW state and thus we havne't yet informed the
- // Scheduler about the existence of the application
- this.rmContext.getDispatcher().getEventHandler().handle(
- new RMAppRejectedEvent(applicationId, ie.getMessage()));
- }
+ LOG.warn(
+ "Unable to add the application to the delegation token renewer.",
+ ie);
+ // Sending APP_REJECTED is fine, since we assume that the
+ // RMApp is in NEW state and thus we havne't yet informed the
+ // Scheduler about the existence of the application
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppRejectedEvent(applicationId, ie.getMessage()));
+ throw RPCUtil.getRemoteException(ie);
}
+
+ // All done, start the RMApp
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppEvent(applicationId, isRecovered ? RMAppEventType.RECOVER:
+ RMAppEventType.START));
}
private Credentials parseCredentials(ApplicationSubmissionContext application)
@@ -375,14 +393,6 @@ public void handle(RMAppManagerEvent event) {
checkAppNumCompletedLimit();
}
break;
- case APP_SUBMIT:
- {
- ApplicationSubmissionContext submissionContext =
- ((RMAppManagerSubmitEvent)event).getSubmissionContext();
- long submitTime = ((RMAppManagerSubmitEvent)event).getSubmitTime();
- submitApplication(submissionContext, submitTime, false);
- }
- break;
default:
LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEventType.java
index e805ed8..1b6a44c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEventType.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEventType.java
@@ -19,6 +19,5 @@
package org.apache.hadoop.yarn.server.resourcemanager;
public enum RMAppManagerEventType {
- APP_SUBMIT,
APP_COMPLETED
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java
deleted file mode 100644
index afcd24d..0000000
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager;
-
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-
-public class RMAppManagerSubmitEvent extends RMAppManagerEvent {
-
- private final ApplicationSubmissionContext submissionContext;
- private final long submitTime;
-
- public RMAppManagerSubmitEvent(
- ApplicationSubmissionContext submissionContext, long submitTime) {
- super(submissionContext.getApplicationId(),
- RMAppManagerEventType.APP_SUBMIT);
- this.submissionContext = submissionContext;
- this.submitTime = submitTime;
- }
-
- public ApplicationSubmissionContext getSubmissionContext() {
- return this.submissionContext;
- }
-
- public long getSubmitTime() {
- return this.submitTime;
- }
-}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
index f5cc7d3..73ccc03 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
@@ -19,6 +19,9 @@
package org.apache.hadoop.yarn.server.resourcemanager;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
@@ -31,12 +34,15 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@@ -46,11 +52,11 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.service.Service;
-import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.Lists;
@@ -163,9 +169,10 @@ public void setCompletedAppsMax(int max) {
super.setCompletedAppsMax(max);
}
public void submitApplication(
- ApplicationSubmissionContext submissionContext) {
- super.submitApplication(
- submissionContext, System.currentTimeMillis(), false);
+ ApplicationSubmissionContext submissionContext)
+ throws YarnRemoteException {
+ super.submitApplication(submissionContext, System.currentTimeMillis(),
+ false);
}
}
@@ -179,6 +186,40 @@ protected void addToCompletedApps(TestRMAppManager appMonitor, RMContext rmConte
}
}
+ private RMContext rmContext;
+ private TestRMAppManager appMonitor;
+ private ApplicationSubmissionContext asContext;
+ private ApplicationId appId;
+
+ @Before
+ public void setUp() {
+ long now = System.currentTimeMillis();
+
+ rmContext = mockRMContext(1, now - 10);
+ ResourceScheduler scheduler = mockResourceScheduler();
+ Configuration conf = new Configuration();
+ ApplicationMasterService masterService =
+ new ApplicationMasterService(rmContext, scheduler);
+ appMonitor = new TestRMAppManager(rmContext,
+ new ClientToAMTokenSecretManagerInRM(), scheduler, masterService,
+ new ApplicationACLsManager(conf), conf);
+
+ appId = MockApps.newAppID(1);
+ RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+ asContext =
+ recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
+ asContext.setApplicationId(appId);
+ asContext.setAMContainerSpec(mockContainerLaunchContext(recordFactory));
+ asContext.setResource(mockResource());
+ setupDispatcher(rmContext, conf);
+ }
+
+ @After
+ public void tearDown() {
+ setAppEventType(RMAppEventType.KILL);
+ ((Service)rmContext.getDispatcher()).stop();
+ }
+
@Test
public void testRMAppRetireNone() throws Exception {
long now = System.currentTimeMillis();
@@ -334,38 +375,10 @@ protected void setupDispatcher(RMContext rmContext, Configuration conf) {
@Test
public void testRMAppSubmit() throws Exception {
- long now = System.currentTimeMillis();
-
- RMContext rmContext = mockRMContext(0, now - 10);
- ResourceScheduler scheduler = new CapacityScheduler();
- Configuration conf = new Configuration();
- ApplicationMasterService masterService =
- new ApplicationMasterService(rmContext, scheduler);
- TestRMAppManager appMonitor = new TestRMAppManager(rmContext,
- new ClientToAMTokenSecretManagerInRM(), scheduler, masterService,
- new ApplicationACLsManager(conf), conf);
-
- ApplicationId appID = MockApps.newAppID(1);
- RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
- ApplicationSubmissionContext context =
- recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
- context.setApplicationId(appID);
- ContainerLaunchContext amContainer = recordFactory
- .newRecordInstance(ContainerLaunchContext.class);
- amContainer.setApplicationACLs(new HashMap());
- context.setAMContainerSpec(amContainer);
- setupDispatcher(rmContext, conf);
-
- appMonitor.submitApplication(context);
- RMApp app = rmContext.getRMApps().get(appID);
+ appMonitor.submitApplication(asContext);
+ RMApp app = rmContext.getRMApps().get(appId);
Assert.assertNotNull("app is null", app);
- Assert.assertEquals("app id doesn't match", appID, app.getApplicationId());
- Assert.assertEquals("app name doesn't match",
- YarnConfiguration.DEFAULT_APPLICATION_NAME,
- app.getName());
- Assert.assertEquals("app queue doesn't match",
- YarnConfiguration.DEFAULT_QUEUE_NAME,
- app.getQueue());
+ Assert.assertEquals("app id doesn't match", appId, app.getApplicationId());
Assert.assertEquals("app state doesn't match", RMAppState.NEW, app.getState());
// wait for event to be processed
@@ -374,9 +387,8 @@ public void testRMAppSubmit() throws Exception {
timeoutSecs++ < 20) {
Thread.sleep(1000);
}
- Assert.assertEquals("app event type sent is wrong", RMAppEventType.START, getAppEventType());
- setAppEventType(RMAppEventType.KILL);
- ((Service)rmContext.getDispatcher()).stop();
+ Assert.assertEquals("app event type sent is wrong", RMAppEventType.START,
+ getAppEventType());
}
@Test (timeout = 30000)
@@ -390,10 +402,7 @@ public void testRMAppSubmitMaxAppAttempts() throws Exception {
new int[]{ 1, 1, 1, 1 }};
for (int i = 0; i < globalMaxAppAttempts.length; ++i) {
for (int j = 0; j < individualMaxAppAttempts.length; ++j) {
- long now = System.currentTimeMillis();
-
- RMContext rmContext = mockRMContext(0, now - 10);
- ResourceScheduler scheduler = new CapacityScheduler();
+ ResourceScheduler scheduler = mockResourceScheduler();
Configuration conf = new Configuration();
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, globalMaxAppAttempts[i]);
ApplicationMasterService masterService =
@@ -402,21 +411,12 @@ public void testRMAppSubmitMaxAppAttempts() throws Exception {
new ClientToAMTokenSecretManagerInRM(), scheduler, masterService,
new ApplicationACLsManager(conf), conf);
- RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
- ApplicationSubmissionContext context =
- recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
- ContainerLaunchContext amContainer = recordFactory
- .newRecordInstance(ContainerLaunchContext.class);
- amContainer.setApplicationACLs(new HashMap());
- context.setAMContainerSpec(amContainer);
- setupDispatcher(rmContext, conf);
-
- ApplicationId appID = MockApps.newAppID(1);
- context.setApplicationId(appID);
+ ApplicationId appID = MockApps.newAppID(i * 4 + j + 1);
+ asContext.setApplicationId(appID);
if (individualMaxAppAttempts[i][j] != 0) {
- context.setMaxAppAttempts(individualMaxAppAttempts[i][j]);
+ asContext.setMaxAppAttempts(individualMaxAppAttempts[i][j]);
}
- appMonitor.submitApplication(context);
+ appMonitor.submitApplication(asContext);
RMApp app = rmContext.getRMApps().get(appID);
Assert.assertEquals("max application attempts doesn't match",
expectedNums[i][j], app.getMaxAppAttempts());
@@ -428,96 +428,73 @@ public void testRMAppSubmitMaxAppAttempts() throws Exception {
Thread.sleep(1000);
}
setAppEventType(RMAppEventType.KILL);
- ((Service)rmContext.getDispatcher()).stop();
}
}
}
- @Test (timeout = 3000)
- public void testRMAppSubmitWithQueueAndName() throws Exception {
- long now = System.currentTimeMillis();
-
- RMContext rmContext = mockRMContext(1, now - 10);
- ResourceScheduler scheduler = new CapacityScheduler();
- Configuration conf = new Configuration();
- ApplicationMasterService masterService =
- new ApplicationMasterService(rmContext, scheduler);
- TestRMAppManager appMonitor = new TestRMAppManager(rmContext,
- new ClientToAMTokenSecretManagerInRM(), scheduler, masterService,
- new ApplicationACLsManager(conf), conf);
-
- ApplicationId appID = MockApps.newAppID(10);
- RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
- ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
- context.setApplicationId(appID);
- context.setApplicationName("testApp1");
- context.setQueue("testQueue");
- ContainerLaunchContext amContainer = recordFactory
- .newRecordInstance(ContainerLaunchContext.class);
- amContainer
- .setApplicationACLs(new HashMap());
- context.setAMContainerSpec(amContainer);
+ @Test (timeout = 30000)
+ public void testRMAppSubmitDuplicateApplicationId() throws Exception {
+ ApplicationId appId = MockApps.newAppID(0);
+ asContext.setApplicationId(appId);
+ RMApp appOrig = rmContext.getRMApps().get(appId);
+ Assert.assertTrue("app name matches but shouldn't", "testApp1" != appOrig.getName());
- setupDispatcher(rmContext, conf);
+ // our testApp1 should be rejected and original app with same id should be left in place
+ try {
+ appMonitor.submitApplication(asContext);
+ Assert.fail("Exception is expected when applicationId is duplicate.");
+ } catch (YarnRemoteException e) {
+ Assert.assertTrue("The thrown exception is not the expectd one.",
+ e.getMessage().contains("Cannot add a duplicate!"));
+ }
- appMonitor.submitApplication(context);
- RMApp app = rmContext.getRMApps().get(appID);
+ // make sure original app didn't get removed
+ RMApp app = rmContext.getRMApps().get(appId);
Assert.assertNotNull("app is null", app);
- Assert.assertEquals("app id doesn't match", appID, app.getApplicationId());
- Assert.assertEquals("app name doesn't match", "testApp1", app.getName());
- Assert.assertEquals("app queue doesn't match", "testQueue", app.getQueue());
- Assert.assertEquals("app state doesn't match", RMAppState.NEW, app.getState());
+ Assert.assertEquals("app id doesn't match", appId, app.getApplicationId());
+ Assert.assertEquals("app state doesn't match", RMAppState.FINISHED, app.getState());
+ }
- // wait for event to be processed
- int timeoutSecs = 0;
- while ((getAppEventType() == RMAppEventType.KILL) &&
- timeoutSecs++ < 20) {
- Thread.sleep(1000);
+ @Test (timeout = 30000)
+ public void testRMAppSubmitInvalidResourceRequest() throws Exception {
+ asContext.setResource(Resources.createResource(
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + 1));
+
+ // submit an app
+ try {
+ appMonitor.submitApplication(asContext);
+ Assert.fail("Application submission should fail because resource" +
+ " request is invalid.");
+ } catch (YarnRemoteException e) {
+ // Exception is expected
+ Assert.assertTrue("The thrown exception is not" +
+ " InvalidResourceRequestException",
+ e.getMessage().startsWith("Invalid resource request"));
}
- Assert.assertEquals("app event type sent is wrong", RMAppEventType.START, getAppEventType());
- setAppEventType(RMAppEventType.KILL);
- ((Service)rmContext.getDispatcher()).stop();
}
- @Test
- public void testRMAppSubmitError() throws Exception {
- long now = System.currentTimeMillis();
-
- // specify 1 here and use same appId below so it gets duplicate entry
- RMContext rmContext = mockRMContext(1, now - 10);
- ResourceScheduler scheduler = new CapacityScheduler();
- Configuration conf = new Configuration();
- ApplicationMasterService masterService =
- new ApplicationMasterService(rmContext, scheduler);
- TestRMAppManager appMonitor = new TestRMAppManager(rmContext,
- new ClientToAMTokenSecretManagerInRM(), scheduler, masterService,
- new ApplicationACLsManager(conf), conf);
-
- ApplicationId appID = MockApps.newAppID(0);
- RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
- ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
- context.setApplicationId(appID);
- context.setApplicationName("testApp1");
- context.setQueue("testQueue");
-
- setupDispatcher(rmContext, conf);
-
- RMApp appOrig = rmContext.getRMApps().get(appID);
- Assert.assertTrue("app name matches but shouldn't", "testApp1" != appOrig.getName());
+ private static ResourceScheduler mockResourceScheduler() {
+ ResourceScheduler scheduler = mock(ResourceScheduler.class);
+ when(scheduler.getMinimumResourceCapability()).thenReturn(
+ Resources.createResource(
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
+ when(scheduler.getMaximumResourceCapability()).thenReturn(
+ Resources.createResource(
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
+ return scheduler;
+ }
- ContainerLaunchContext clc =
- BuilderUtils.newContainerLaunchContext(null, null, null, null, null,
- null, null);
- context.setAMContainerSpec(clc);
- // our testApp1 should be rejected and original app with same id should be left in place
- appMonitor.submitApplication(context);
+ private static ContainerLaunchContext mockContainerLaunchContext(
+ RecordFactory recordFactory) {
+ ContainerLaunchContext amContainer = recordFactory.newRecordInstance(
+ ContainerLaunchContext.class);
+ amContainer.setApplicationACLs(new HashMap());;
+ return amContainer;
+ }
- // make sure original app didn't get removed
- RMApp app = rmContext.getRMApps().get(appID);
- Assert.assertNotNull("app is null", app);
- Assert.assertEquals("app id doesn't match", appID, app.getApplicationId());
- Assert.assertEquals("app name doesn't matches", appOrig.getName(), app.getName());
- ((Service)rmContext.getDispatcher()).stop();
+ private static Resource mockResource() {
+ return Resources.createResource(
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
index aa7af9c..ae25e89 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
@@ -250,17 +250,70 @@ private void checkTokenRenewal(UserGroupInformation owner,
rmContext, null, null, null, dtsm);
rmService.renewDelegationToken(request);
}
+
+ @Test (timeout = 30000)
+ @SuppressWarnings ("rawtypes")
+ public void testAppSubmit() throws Exception {
+ YarnScheduler yarnScheduler = mockYarnScheduler();
+ RMContext rmContext = mock(RMContext.class);
+ mockRMContext(yarnScheduler, rmContext);
+ RMStateStore stateStore = mock(RMStateStore.class);
+ when(rmContext.getStateStore()).thenReturn(stateStore);
+ RMAppManager appManager = new RMAppManager(rmContext, yarnScheduler,
+ null, mock(ApplicationACLsManager.class), new Configuration());
+ when(rmContext.getDispatcher().getEventHandler()).thenReturn(
+ new EventHandler() {
+ public void handle(Event event) {}
+ });
+ ClientRMService rmService =
+ new ClientRMService(rmContext, yarnScheduler, appManager, null, null);
+
+ // without name and queue
+ ApplicationId appId1 = getApplicationId(100);
+ SubmitApplicationRequest submitRequest1 = mockSubmitAppRequest(
+ appId1, null, null);
+ try {
+ rmService.submitApplication(submitRequest1);
+ } catch (YarnRemoteException e) {
+ Assert.fail("Exception is not expected.");
+ }
+ RMApp app1 = rmContext.getRMApps().get(appId1);
+ Assert.assertNotNull("app doesn't exist", app1);
+ Assert.assertEquals("app name doesn't match",
+ YarnConfiguration.DEFAULT_APPLICATION_NAME, app1.getName());
+ Assert.assertEquals("app queue doesn't match",
+ YarnConfiguration.DEFAULT_QUEUE_NAME, app1.getQueue());
+
+ // with name and queue
+ String name = MockApps.newAppName();
+ String queue = MockApps.newQueue();
+ ApplicationId appId2 = getApplicationId(101);
+ SubmitApplicationRequest submitRequest2 = mockSubmitAppRequest(
+ appId2, name, queue);
+ try {
+ rmService.submitApplication(submitRequest2);
+ } catch (YarnRemoteException e) {
+ Assert.fail("Exception is not expected.");
+ }
+ RMApp app2 = rmContext.getRMApps().get(appId2);
+ Assert.assertNotNull("app doesn't exist", app2);
+ Assert.assertEquals("app name doesn't match", name, app2.getName());
+ Assert.assertEquals("app queue doesn't match", queue, app2.getQueue());
+
+ // duplicate appId
+ try {
+ rmService.submitApplication(submitRequest2);
+ Assert.fail("Exception is expected.");
+ } catch (YarnRemoteException e) {
+ Assert.assertTrue("The thrown exception is not expected.",
+ e.getMessage().contains("Cannot add a duplicate!"));
+ }
+ }
@Test(timeout=4000)
public void testConcurrentAppSubmit()
throws IOException, InterruptedException, BrokenBarrierException {
- YarnScheduler yarnScheduler = mock(YarnScheduler.class);
- when(yarnScheduler.getMinimumResourceCapability()).thenReturn(
- Resources.createResource(
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
- when(yarnScheduler.getMaximumResourceCapability()).thenReturn(
- Resources.createResource(
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
+ YarnScheduler yarnScheduler = mockYarnScheduler();
RMContext rmContext = mock(RMContext.class);
mockRMContext(yarnScheduler, rmContext);
RMStateStore stateStore = mock(RMStateStore.class);
@@ -270,8 +323,10 @@ public void testConcurrentAppSubmit()
final ApplicationId appId1 = getApplicationId(100);
final ApplicationId appId2 = getApplicationId(101);
- final SubmitApplicationRequest submitRequest1 = mockSubmitAppRequest(appId1);
- final SubmitApplicationRequest submitRequest2 = mockSubmitAppRequest(appId2);
+ final SubmitApplicationRequest submitRequest1 = mockSubmitAppRequest(
+ appId1, null, null);
+ final SubmitApplicationRequest submitRequest2 = mockSubmitAppRequest(
+ appId2, null, null);
final CyclicBarrier startBarrier = new CyclicBarrier(2);
final CyclicBarrier endBarrier = new CyclicBarrier(2);
@@ -319,61 +374,23 @@ public void run() {
t.join();
}
- @Test (timeout = 30000)
- public void testInvalidResourceRequestWhenSubmittingApplication()
- throws IOException, InterruptedException, BrokenBarrierException {
- YarnScheduler yarnScheduler = mock(YarnScheduler.class);
- when(yarnScheduler.getMinimumResourceCapability()).thenReturn(
- Resources.createResource(
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
- when(yarnScheduler.getMaximumResourceCapability()).thenReturn(
- Resources.createResource(
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
- RMContext rmContext = mock(RMContext.class);
- mockRMContext(yarnScheduler, rmContext);
- RMStateStore stateStore = mock(RMStateStore.class);
- when(rmContext.getStateStore()).thenReturn(stateStore);
- RMAppManager appManager = new RMAppManager(rmContext, yarnScheduler,
- null, mock(ApplicationACLsManager.class), new Configuration());
-
- final ApplicationId appId = getApplicationId(100);
- final SubmitApplicationRequest submitRequest = mockSubmitAppRequest(appId);
- Resource resource = Resources.createResource(
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + 1);
- when(submitRequest.getApplicationSubmissionContext()
- .getResource()).thenReturn(resource);
-
- final ClientRMService rmService =
- new ClientRMService(rmContext, yarnScheduler, appManager, null, null);
-
- // submit an app
- try {
- rmService.submitApplication(submitRequest);
- Assert.fail("Application submission should fail because resource" +
- " request is invalid.");
- } catch (YarnRemoteException e) {
- // Exception is expected
- Assert.assertTrue("The thrown exception is not" +
- " InvalidResourceRequestException",
- e.getMessage().startsWith("Invalid resource request"));
- }
- }
-
- private SubmitApplicationRequest mockSubmitAppRequest(ApplicationId appId) {
+ private SubmitApplicationRequest mockSubmitAppRequest(ApplicationId appId,
+ String name, String queue) {
String user = MockApps.newUserName();
- String queue = MockApps.newQueue();
ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class);
Resource resource = Resources.createResource(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
- ApplicationSubmissionContext submissionContext = mock(ApplicationSubmissionContext.class);
- when(submissionContext.getAMContainerSpec()).thenReturn(amContainerSpec);
- when(submissionContext.getAMContainerSpec().getUser()).thenReturn(user);
- when(submissionContext.getQueue()).thenReturn(queue);
- when(submissionContext.getApplicationId()).thenReturn(appId);
- when(submissionContext.getResource()).thenReturn(resource);
+ ApplicationSubmissionContext submissionContext =
+ recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
+ submissionContext.setAMContainerSpec(amContainerSpec);
+ submissionContext.getAMContainerSpec().setUser(user);
+ submissionContext.setApplicationName(name);
+ submissionContext.setQueue(queue);
+ submissionContext.setApplicationId(appId);
+ submissionContext.setResource(resource);
SubmitApplicationRequest submitRequest =
recordFactory.newRecordInstance(SubmitApplicationRequest.class);
@@ -429,4 +446,15 @@ private RMAppImpl getRMApp(RMContext rmContext, YarnScheduler yarnScheduler,
queueName, asContext, yarnScheduler, null , System
.currentTimeMillis());
}
+
+ private static YarnScheduler mockYarnScheduler() {
+ YarnScheduler yarnScheduler = mock(YarnScheduler.class);
+ when(yarnScheduler.getMinimumResourceCapability()).thenReturn(
+ Resources.createResource(
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
+ when(yarnScheduler.getMaximumResourceCapability()).thenReturn(
+ Resources.createResource(
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
+ return yarnScheduler;
+ }
}