diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java
index eb84b31..4d9d256 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java
@@ -53,9 +53,11 @@
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.util.Records;
@@ -68,6 +70,7 @@
protected ClientRMProtocol rmClient;
protected InetSocketAddress rmAddress;
+ protected long statePollIntervalMillis;
private static final String ROOT = "root";
@@ -90,6 +93,9 @@ public synchronized void init(Configuration conf) {
if (this.rmAddress == null) {
this.rmAddress = getRmAddress(conf);
}
+ statePollIntervalMillis = conf.getLong(
+ YarnConfiguration.RM_CLIENT_STATE_POLL_INTERVAL,
+ YarnConfiguration.DEFAULT_RM_CLIENT_STATE_POLL_INTERVAL);
super.init(conf);
}
@@ -133,6 +139,20 @@ public GetNewApplicationResponse getNewApplication()
rmClient.submitApplication(request);
LOG.info("Submitted application " + applicationId + " to ResourceManager"
+ " at " + rmAddress);
+
+ while (true) {
+ YarnApplicationState state =
+ getApplicationReport(applicationId).getYarnApplicationState();
+ if (!state.equals(YarnApplicationState.NEW) &&
+ !state.equals(YarnApplicationState.NEW_SAVING)) {
+ break;
+ }
+ try {
+ Thread.sleep(statePollIntervalMillis);
+ } catch (InterruptedException ie) {
+ RPCUtil.getRemoteException(ie);
+ }
+ }
return applicationId;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java
index 3d7f120..f379f51 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java
@@ -18,10 +18,23 @@
package org.apache.hadoop.yarn.client;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import junit.framework.Assert;
+
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.client.YarnClient;
-import org.apache.hadoop.yarn.client.YarnClientImpl;
+import org.apache.hadoop.yarn.api.ClientRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.util.Records;
import org.junit.Test;
public class TestYarnClient {
@@ -43,4 +56,92 @@ public void testClientStop() {
client.start();
client.stop();
}
+
+ @Test (timeout = 30000)
+ public void testSubmitApplication() {
+ Configuration conf = new Configuration();
+ final YarnClient client = new MockYarnClient();
+ client.init(conf);
+ client.start();
+ final ApplicationSubmissionContext context =
+ mock(ApplicationSubmissionContext.class);
+ ApplicationId applicationId = Records.newRecord(ApplicationId.class);
+ applicationId.setClusterTimestamp(System.currentTimeMillis());
+ applicationId.setId(1);
+ when(context.getApplicationId()).thenReturn(applicationId);
+
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ try {
+ client.submitApplication(context);
+ } catch (YarnRemoteException e) {
+ Assert.fail("Exception is not expected.");
+ }
+ }
+ };
+
+ ((MockYarnClient) client).setYarnApplicationState(
+ YarnApplicationState.NEW);
+ t.start();
+ int maxTries = 20;
+ // app submission doesn't return when YarnApplicationState.NEW
+ assertThreadAlive(t, maxTries, false);
+ ((MockYarnClient) client).setYarnApplicationState(
+ YarnApplicationState.NEW_SAVING);
+ // app submission doesn't return when YarnApplicationState.NEW_SAVING
+ assertThreadAlive(t, maxTries, false);
+ ((MockYarnClient) client).setYarnApplicationState(
+ YarnApplicationState.SUBMITTED);
+ // app submission returns when YarnApplicationState.SUBMITTED
+ assertThreadAlive(t, maxTries, true);
+ Assert.assertFalse(t.isAlive());
+
+ client.stop();
+ }
+
+ private static void assertThreadAlive(Thread t, int maxTries, boolean mode) {
+ for (int i = 0; i < maxTries && (mode ? t.isAlive() : true); ++i) {
+ if (!mode) {
+ Assert.assertTrue(t.isAlive());
+ }
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ie) {
+ ie.printStackTrace();
+ }
+ }
+ }
+
+ private static class MockYarnClient extends YarnClientImpl {
+ private ApplicationReport mockReport;
+
+ public MockYarnClient() {
+ super();
+ }
+
+ @Override
+ public void start() {
+ rmClient = mock(ClientRMProtocol.class);
+ GetApplicationReportResponse mockResponse =
+ mock(GetApplicationReportResponse.class);
+ mockReport = mock(ApplicationReport.class);
+ try{
+ when(rmClient.getApplicationReport(any(
+ GetApplicationReportRequest.class))).thenReturn(mockResponse);
+ } catch (YarnRemoteException e) {
+ Assert.fail("Exception is not expected.");
+ }
+ when(mockResponse.getApplicationReport()).thenReturn(mockReport);
+ }
+
+ @Override
+ public void stop() {
+ }
+
+ public void setYarnApplicationState(YarnApplicationState state) {
+ when(mockReport.getYarnApplicationState()).thenReturn(state);
+ }
+ }
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 786598b..cccc8f2 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -94,6 +94,14 @@
public static final String DEFAULT_RM_ADDRESS =
"0.0.0.0:" + DEFAULT_RM_PORT;
+ /**
+ * The interval of the yarn client's querying application state after
+ * application submission. The unit is millisecond.
+ */
+ public static final String RM_CLIENT_STATE_POLL_INTERVAL =
+ RM_PREFIX + "client.state-poll-interval";
+ public static final long DEFAULT_RM_CLIENT_STATE_POLL_INTERVAL = 1000;
+
/** The number of threads used to handle applications manager requests.*/
public static final String RM_CLIENT_THREAD_COUNT =
RM_PREFIX + "client.thread-count";
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index f873ff9..bcf9e16 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -71,6 +71,13 @@
+ The interval of the yarn client's querying application state
+ after application submission. The unit is millisecond.
+ yarn.resourcemanager.client.state-poll-interval
+ 1000
+
+
+
The number of threads used to handle applications manager requests.
yarn.resourcemanager.client.thread-count
50
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
index bcc1f64..714b8ec 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
@@ -38,7 +38,6 @@
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
@@ -72,7 +71,6 @@
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
@@ -83,15 +81,11 @@
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.InvalidResourceRequestException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@@ -266,48 +260,26 @@ public SubmitApplicationResponse submitApplication(
ApplicationSubmissionContext submissionContext = request
.getApplicationSubmissionContext();
ApplicationId applicationId = submissionContext.getApplicationId();
- String user = submissionContext.getAMContainerSpec().getUser();
- try {
- user = UserGroupInformation.getCurrentUser().getShortUserName();
- if (rmContext.getRMApps().get(applicationId) != null) {
- throw new IOException("Application with id " + applicationId
- + " is already present! Cannot add a duplicate!");
- }
-
- // Safety
- submissionContext.getAMContainerSpec().setUser(user);
-
- // Check whether AM resource requirements are within required limits
- if (!submissionContext.getUnmanagedAM()) {
- ResourceRequest amReq = BuilderUtils.newResourceRequest(
- RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
- submissionContext.getResource(), 1);
- try {
- SchedulerUtils.validateResourceRequest(amReq,
- scheduler.getMaximumResourceCapability());
- } catch (InvalidResourceRequestException e) {
- LOG.warn("RM app submission failed in validating AM resource request"
- + " for application " + applicationId, e);
- throw RPCUtil.getRemoteException(e);
- }
- }
- // This needs to be synchronous as the client can query
- // immediately following the submission to get the application status.
- // So call handle directly and do not send an event.
- rmAppManager.handle(new RMAppManagerSubmitEvent(submissionContext, System
- .currentTimeMillis()));
+ try {
+ // call RMAppManager to submit application directly
+ rmAppManager.submitApplication(submissionContext,
+ System.currentTimeMillis(), false);
LOG.info("Application with id " + applicationId.getId() +
- " submitted by user " + user);
- RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST,
- "ClientRMService", applicationId);
- } catch (IOException ie) {
- LOG.info("Exception in submitting application", ie);
- RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
- ie.getMessage(), "ClientRMService",
- "Exception in submitting application", applicationId);
- throw RPCUtil.getRemoteException(ie);
+ " submitted by user " +
+ submissionContext.getAMContainerSpec().getUser());
+ RMAuditLogger.logSuccess(
+ submissionContext.getAMContainerSpec().getUser(),
+ AuditConstants.SUBMIT_APP_REQUEST, "ClientRMService", applicationId);
+ } catch (YarnRemoteException e) {
+ LOG.info("Exception in submitting application", e);
+ RMAuditLogger.logFailure(
+ submissionContext.getAMContainerSpec().getUser(),
+ AuditConstants.SUBMIT_APP_REQUEST, e.getMessage(),
+ "ClientRMService", "Exception in submitting application",
+ applicationId);
+ throw e;
}
SubmitApplicationResponse response = recordFactory
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index 8a92ab1..d782767 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -31,8 +31,10 @@
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
@@ -45,8 +47,12 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.InvalidResourceRequestException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.util.BuilderUtils;
/**
* This class manages the list of applications for the resource manager.
@@ -233,10 +239,29 @@ protected synchronized void checkAppNumCompletedLimit() {
@SuppressWarnings("unchecked")
protected void submitApplication(
ApplicationSubmissionContext submissionContext, long submitTime,
- boolean isRecovered) {
+ boolean isRecovered) throws YarnRemoteException {
ApplicationId applicationId = submissionContext.getApplicationId();
RMApp application = null;
try {
+ // Safety
+ submissionContext.getAMContainerSpec().setUser(
+ UserGroupInformation.getCurrentUser().getShortUserName());
+
+ // Sanity check
+ // Check whether AM resource requirements are within required limits
+ if (!submissionContext.getUnmanagedAM()) {
+ ResourceRequest amReq = BuilderUtils.newResourceRequest(
+ RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
+ submissionContext.getResource(), 1);
+ try {
+ SchedulerUtils.validateResourceRequest(amReq,
+ scheduler.getMaximumResourceCapability());
+ } catch (InvalidResourceRequestException e) {
+ LOG.warn("RM app submission failed in validating AM resource request"
+ + " for application " + applicationId, e);
+ throw RPCUtil.getRemoteException(e);
+ }
+ }
// Sanity checks
if (submissionContext.getQueue() == null) {
@@ -257,6 +282,9 @@ protected void submitApplication(
submitTime);
// Sanity check - duplicate?
+ // Concurrent app submissions with same applicationId will fail here
+ // Concurrent app submissions with different applicationIds will not
+ // influence each other
if (rmContext.getRMApps().putIfAbsent(applicationId, application) !=
null) {
String message = "Application with id " + applicationId
@@ -290,6 +318,10 @@ protected void submitApplication(
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppRejectedEvent(applicationId, ie.getMessage()));
}
+ if (!(ie instanceof YarnRemoteException)) {
+ ie = RPCUtil.getRemoteException(ie);
+ }
+ throw (YarnRemoteException) ie;
}
}
@@ -375,14 +407,6 @@ public void handle(RMAppManagerEvent event) {
checkAppNumCompletedLimit();
}
break;
- case APP_SUBMIT:
- {
- ApplicationSubmissionContext submissionContext =
- ((RMAppManagerSubmitEvent)event).getSubmissionContext();
- long submitTime = ((RMAppManagerSubmitEvent)event).getSubmitTime();
- submitApplication(submissionContext, submitTime, false);
- }
- break;
default:
LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEventType.java
index e805ed8..1b6a44c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEventType.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEventType.java
@@ -19,6 +19,5 @@
package org.apache.hadoop.yarn.server.resourcemanager;
public enum RMAppManagerEventType {
- APP_SUBMIT,
APP_COMPLETED
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java
deleted file mode 100644
index afcd24d..0000000
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager;
-
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-
-public class RMAppManagerSubmitEvent extends RMAppManagerEvent {
-
- private final ApplicationSubmissionContext submissionContext;
- private final long submitTime;
-
- public RMAppManagerSubmitEvent(
- ApplicationSubmissionContext submissionContext, long submitTime) {
- super(submissionContext.getApplicationId(),
- RMAppManagerEventType.APP_SUBMIT);
- this.submissionContext = submissionContext;
- this.submitTime = submitTime;
- }
-
- public ApplicationSubmissionContext getSubmissionContext() {
- return this.submissionContext;
- }
-
- public long getSubmitTime() {
- return this.submitTime;
- }
-}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
index f5cc7d3..38e130c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
@@ -19,6 +19,9 @@
package org.apache.hadoop.yarn.server.resourcemanager;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
@@ -31,12 +34,15 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@@ -46,11 +52,9 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.service.Service;
-import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
import com.google.common.collect.Lists;
@@ -163,9 +167,10 @@ public void setCompletedAppsMax(int max) {
super.setCompletedAppsMax(max);
}
public void submitApplication(
- ApplicationSubmissionContext submissionContext) {
- super.submitApplication(
- submissionContext, System.currentTimeMillis(), false);
+ ApplicationSubmissionContext submissionContext)
+ throws YarnRemoteException {
+ super.submitApplication(submissionContext, System.currentTimeMillis(),
+ false);
}
}
@@ -337,7 +342,7 @@ public void testRMAppSubmit() throws Exception {
long now = System.currentTimeMillis();
RMContext rmContext = mockRMContext(0, now - 10);
- ResourceScheduler scheduler = new CapacityScheduler();
+ ResourceScheduler scheduler = mockResourceScheduler();
Configuration conf = new Configuration();
ApplicationMasterService masterService =
new ApplicationMasterService(rmContext, scheduler);
@@ -350,10 +355,8 @@ public void testRMAppSubmit() throws Exception {
ApplicationSubmissionContext context =
recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
context.setApplicationId(appID);
- ContainerLaunchContext amContainer = recordFactory
- .newRecordInstance(ContainerLaunchContext.class);
- amContainer.setApplicationACLs(new HashMap());
- context.setAMContainerSpec(amContainer);
+ context.setAMContainerSpec(mockContainerLaunchContext(recordFactory));
+ context.setResource(mockResource());
setupDispatcher(rmContext, conf);
appMonitor.submitApplication(context);
@@ -393,7 +396,7 @@ public void testRMAppSubmitMaxAppAttempts() throws Exception {
long now = System.currentTimeMillis();
RMContext rmContext = mockRMContext(0, now - 10);
- ResourceScheduler scheduler = new CapacityScheduler();
+ ResourceScheduler scheduler = mockResourceScheduler();
Configuration conf = new Configuration();
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, globalMaxAppAttempts[i]);
ApplicationMasterService masterService =
@@ -405,10 +408,8 @@ public void testRMAppSubmitMaxAppAttempts() throws Exception {
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
ApplicationSubmissionContext context =
recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
- ContainerLaunchContext amContainer = recordFactory
- .newRecordInstance(ContainerLaunchContext.class);
- amContainer.setApplicationACLs(new HashMap());
- context.setAMContainerSpec(amContainer);
+ context.setAMContainerSpec(mockContainerLaunchContext(recordFactory));
+ context.setResource(mockResource());
setupDispatcher(rmContext, conf);
ApplicationId appID = MockApps.newAppID(1);
@@ -438,7 +439,7 @@ public void testRMAppSubmitWithQueueAndName() throws Exception {
long now = System.currentTimeMillis();
RMContext rmContext = mockRMContext(1, now - 10);
- ResourceScheduler scheduler = new CapacityScheduler();
+ ResourceScheduler scheduler = mockResourceScheduler();
Configuration conf = new Configuration();
ApplicationMasterService masterService =
new ApplicationMasterService(rmContext, scheduler);
@@ -452,12 +453,8 @@ public void testRMAppSubmitWithQueueAndName() throws Exception {
context.setApplicationId(appID);
context.setApplicationName("testApp1");
context.setQueue("testQueue");
- ContainerLaunchContext amContainer = recordFactory
- .newRecordInstance(ContainerLaunchContext.class);
- amContainer
- .setApplicationACLs(new HashMap());
- context.setAMContainerSpec(amContainer);
-
+ context.setAMContainerSpec(mockContainerLaunchContext(recordFactory));
+ context.setResource(mockResource());
setupDispatcher(rmContext, conf);
appMonitor.submitApplication(context);
@@ -479,13 +476,13 @@ public void testRMAppSubmitWithQueueAndName() throws Exception {
((Service)rmContext.getDispatcher()).stop();
}
- @Test
- public void testRMAppSubmitError() throws Exception {
+ @Test (timeout = 30000)
+ public void testRMAppSubmitDuplicateApplicationId() throws Exception {
long now = System.currentTimeMillis();
// specify 1 here and use same appId below so it gets duplicate entry
RMContext rmContext = mockRMContext(1, now - 10);
- ResourceScheduler scheduler = new CapacityScheduler();
+ ResourceScheduler scheduler = mockResourceScheduler();
Configuration conf = new Configuration();
ApplicationMasterService masterService =
new ApplicationMasterService(rmContext, scheduler);
@@ -496,6 +493,8 @@ public void testRMAppSubmitError() throws Exception {
ApplicationId appID = MockApps.newAppID(0);
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
+ context.setAMContainerSpec(mockContainerLaunchContext(recordFactory));
+ context.setResource(mockResource());
context.setApplicationId(appID);
context.setApplicationName("testApp1");
context.setQueue("testQueue");
@@ -505,19 +504,91 @@ public void testRMAppSubmitError() throws Exception {
RMApp appOrig = rmContext.getRMApps().get(appID);
Assert.assertTrue("app name matches but shouldn't", "testApp1" != appOrig.getName());
- ContainerLaunchContext clc =
- BuilderUtils.newContainerLaunchContext(null, null, null, null, null,
- null, null);
- context.setAMContainerSpec(clc);
// our testApp1 should be rejected and original app with same id should be left in place
- appMonitor.submitApplication(context);
+ try {
+ appMonitor.submitApplication(context);
+ Assert.fail("Exception is expected when applicationId is duplicate.");
+ } catch (YarnRemoteException e) {
+ Assert.assertTrue("The thrown exception is not the expectd one.",
+ e.getMessage().contains("Cannot add a duplicate!"));
+ }
// make sure original app didn't get removed
RMApp app = rmContext.getRMApps().get(appID);
Assert.assertNotNull("app is null", app);
Assert.assertEquals("app id doesn't match", appID, app.getApplicationId());
Assert.assertEquals("app name doesn't matches", appOrig.getName(), app.getName());
+
+ int timeoutSecs = 0;
+ while ((getAppEventType() == RMAppEventType.KILL) &&
+ timeoutSecs++ < 20) {
+ Thread.sleep(1000);
+ }
+ setAppEventType(RMAppEventType.KILL);
((Service)rmContext.getDispatcher()).stop();
}
+ @Test (timeout = 30000)
+ public void testRMAppSubmitInvalidResourceRequest() throws Exception {
+ long now = System.currentTimeMillis();
+
+ RMContext rmContext = mockRMContext(0, now - 10);
+ ResourceScheduler scheduler = mockResourceScheduler();
+ Configuration conf = new Configuration();
+ ApplicationMasterService masterService =
+ new ApplicationMasterService(rmContext, scheduler);
+ TestRMAppManager appMonitor = new TestRMAppManager(rmContext,
+ new ClientToAMTokenSecretManagerInRM(), scheduler, masterService,
+ new ApplicationACLsManager(conf), conf);
+
+ ApplicationId appID = MockApps.newAppID(1);
+ RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+ ApplicationSubmissionContext context =
+ recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
+ context.setApplicationId(appID);
+ context.setAMContainerSpec(mockContainerLaunchContext(recordFactory));
+ context.setResource(Resources.createResource(
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + 1));
+ setupDispatcher(rmContext, conf);
+
+ // submit an app
+ try {
+ appMonitor.submitApplication(context);
+ Assert.fail("Application submission should fail because resource" +
+ " request is invalid.");
+ } catch (YarnRemoteException e) {
+ // Exception is expected
+ Assert.assertTrue("The thrown exception is not" +
+ " InvalidResourceRequestException",
+ e.getMessage().startsWith("Invalid resource request"));
+ }
+
+ setAppEventType(RMAppEventType.KILL);
+ ((Service)rmContext.getDispatcher()).stop();
+ }
+
+ private static ResourceScheduler mockResourceScheduler() {
+ ResourceScheduler scheduler = mock(ResourceScheduler.class);
+ when(scheduler.getMinimumResourceCapability()).thenReturn(
+ Resources.createResource(
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
+ when(scheduler.getMaximumResourceCapability()).thenReturn(
+ Resources.createResource(
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
+ return scheduler;
+ }
+
+ private static ContainerLaunchContext mockContainerLaunchContext(
+ RecordFactory recordFactory) {
+ ContainerLaunchContext amContainer = recordFactory.newRecordInstance(
+ ContainerLaunchContext.class);
+ amContainer.setApplicationACLs(new HashMap());;
+ return amContainer;
+ }
+
+ private static Resource mockResource() {
+ return Resources.createResource(
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+ }
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
index aa7af9c..e10774b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
@@ -319,46 +319,6 @@ public void run() {
t.join();
}
- @Test (timeout = 30000)
- public void testInvalidResourceRequestWhenSubmittingApplication()
- throws IOException, InterruptedException, BrokenBarrierException {
- YarnScheduler yarnScheduler = mock(YarnScheduler.class);
- when(yarnScheduler.getMinimumResourceCapability()).thenReturn(
- Resources.createResource(
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
- when(yarnScheduler.getMaximumResourceCapability()).thenReturn(
- Resources.createResource(
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
- RMContext rmContext = mock(RMContext.class);
- mockRMContext(yarnScheduler, rmContext);
- RMStateStore stateStore = mock(RMStateStore.class);
- when(rmContext.getStateStore()).thenReturn(stateStore);
- RMAppManager appManager = new RMAppManager(rmContext, yarnScheduler,
- null, mock(ApplicationACLsManager.class), new Configuration());
-
- final ApplicationId appId = getApplicationId(100);
- final SubmitApplicationRequest submitRequest = mockSubmitAppRequest(appId);
- Resource resource = Resources.createResource(
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + 1);
- when(submitRequest.getApplicationSubmissionContext()
- .getResource()).thenReturn(resource);
-
- final ClientRMService rmService =
- new ClientRMService(rmContext, yarnScheduler, appManager, null, null);
-
- // submit an app
- try {
- rmService.submitApplication(submitRequest);
- Assert.fail("Application submission should fail because resource" +
- " request is invalid.");
- } catch (YarnRemoteException e) {
- // Exception is expected
- Assert.assertTrue("The thrown exception is not" +
- " InvalidResourceRequestException",
- e.getMessage().startsWith("Invalid resource request"));
- }
- }
-
private SubmitApplicationRequest mockSubmitAppRequest(ApplicationId appId) {
String user = MockApps.newUserName();
String queue = MockApps.newQueue();