diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index bcc1f64..e049cb7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -38,7 +38,6 @@ import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.ClientRMProtocol; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; @@ -72,7 +71,6 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; @@ -83,15 +81,11 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.InvalidResourceRequestException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; @@ -266,48 +260,54 @@ public SubmitApplicationResponse submitApplication( ApplicationSubmissionContext submissionContext = request .getApplicationSubmissionContext(); ApplicationId applicationId = submissionContext.getApplicationId(); - String user = submissionContext.getAMContainerSpec().getUser(); + + // Validations of the application submission context need to be here, if + // they is independent of the RM's configuration. They only need to be + // done once during submission. + + String user = null; try { + // Safety user = UserGroupInformation.getCurrentUser().getShortUserName(); - if (rmContext.getRMApps().get(applicationId) != null) { - throw new IOException("Application with id " + applicationId - + " is already present! Cannot add a duplicate!"); - } - - // Safety submissionContext.getAMContainerSpec().setUser(user); + } catch (IOException ie) { + LOG.warn("Unable to get the current user.", ie); + throw RPCUtil.getRemoteException(ie); + } - // Check whether AM resource requirements are within required limits - if (!submissionContext.getUnmanagedAM()) { - ResourceRequest amReq = BuilderUtils.newResourceRequest( - RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY, - submissionContext.getResource(), 1); - try { - SchedulerUtils.validateResourceRequest(amReq, - scheduler.getMaximumResourceCapability()); - } catch (InvalidResourceRequestException e) { - LOG.warn("RM app submission failed in validating AM resource request" - + " for application " + applicationId, e); - throw RPCUtil.getRemoteException(e); - } - } + // Though duplication will checked again when app is put into rmContext, + // but it is good to fail the invalid submission as early as possible. + if (rmContext.getRMApps().get(applicationId) != null) { + String message = "Application with id " + applicationId + + " is already present! Cannot add a duplicate!"; + LOG.warn(message); + throw RPCUtil.getRemoteException(message); + } + + if (submissionContext.getQueue() == null) { + submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME); + } + if (submissionContext.getApplicationName() == null) { + submissionContext.setApplicationName( + YarnConfiguration.DEFAULT_APPLICATION_NAME); + } - // This needs to be synchronous as the client can query - // immediately following the submission to get the application status. - // So call handle directly and do not send an event. - rmAppManager.handle(new RMAppManagerSubmitEvent(submissionContext, System - .currentTimeMillis())); + try { + // call RMAppManager to submit application directly + rmAppManager.submitApplication(submissionContext, + System.currentTimeMillis(), false); LOG.info("Application with id " + applicationId.getId() + " submitted by user " + user); RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST, "ClientRMService", applicationId); - } catch (IOException ie) { - LOG.info("Exception in submitting application", ie); - RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST, - ie.getMessage(), "ClientRMService", + } catch (YarnRemoteException e) { + LOG.info("Exception in submitting application with id " + + applicationId.getId(), e); + RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST, + e.getMessage(), "ClientRMService", "Exception in submitting application", applicationId); - throw RPCUtil.getRemoteException(ie); + throw e; } SubmitApplicationResponse response = recordFactory diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 4dcb6f2..a8be5a5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -31,8 +31,10 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; @@ -45,8 +47,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.InvalidResourceRequestException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.apache.hadoop.yarn.util.BuilderUtils; /** * This class manages the list of applications for the resource manager. @@ -233,64 +239,76 @@ protected synchronized void checkAppNumCompletedLimit() { @SuppressWarnings("unchecked") protected void submitApplication( ApplicationSubmissionContext submissionContext, long submitTime, - boolean isRecovered) { + boolean isRecovered) throws YarnRemoteException { ApplicationId applicationId = submissionContext.getApplicationId(); - RMApp application = null; - try { - // Sanity checks - if (submissionContext.getQueue() == null) { - submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME); - } - if (submissionContext.getApplicationName() == null) { - submissionContext.setApplicationName( - YarnConfiguration.DEFAULT_APPLICATION_NAME); + // Validations of the application submission context need to be here, if + // they depend on the RM's configuration. They have to be done whether + // they are new submission or recovered. + + // Check whether AM resource requirements are within required limits + if (!submissionContext.getUnmanagedAM()) { + ResourceRequest amReq = BuilderUtils.newResourceRequest( + RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY, + submissionContext.getResource(), 1); + try { + SchedulerUtils.validateResourceRequest(amReq, + scheduler.getMaximumResourceCapability()); + } catch (InvalidResourceRequestException e) { + LOG.warn("RM app submission failed in validating AM resource request" + + " for application " + applicationId, e); + throw RPCUtil.getRemoteException(e); } + } - // Create RMApp - application = - new RMAppImpl(applicationId, rmContext, this.conf, - submissionContext.getApplicationName(), - submissionContext.getAMContainerSpec().getUser(), - submissionContext.getQueue(), - submissionContext, this.scheduler, this.masterService, - submitTime); - - // Sanity check - duplicate? - if (rmContext.getRMApps().putIfAbsent(applicationId, application) != - null) { - String message = "Application with id " + applicationId - + " is already present! Cannot add a duplicate!"; - LOG.info(message); - throw RPCUtil.getRemoteException(message); - } + // Create RMApp + RMApp application = + new RMAppImpl(applicationId, rmContext, this.conf, + submissionContext.getApplicationName(), + submissionContext.getAMContainerSpec().getUser(), + submissionContext.getQueue(), + submissionContext, this.scheduler, this.masterService, + submitTime); - // Inform the ACLs Manager - this.applicationACLsManager.addApplication(applicationId, - submissionContext.getAMContainerSpec().getApplicationACLs()); + // Concurrent app submissions with same applicationId will fail here + // Concurrent app submissions with different applicationIds will not + // influence each other + if (rmContext.getRMApps().putIfAbsent(applicationId, application) != + null) { + String message = "Application with id " + applicationId + + " is already present! Cannot add a duplicate!"; + LOG.warn(message); + throw RPCUtil.getRemoteException(message); + } + + // Inform the ACLs Manager + this.applicationACLsManager.addApplication(applicationId, + submissionContext.getAMContainerSpec().getApplicationACLs()); + try { // Setup tokens for renewal if (UserGroupInformation.isSecurityEnabled()) { this.rmContext.getDelegationTokenRenewer().addApplication( applicationId,parseCredentials(submissionContext), submissionContext.getCancelTokensWhenComplete() ); - } - - // All done, start the RMApp - this.rmContext.getDispatcher().getEventHandler().handle( - new RMAppEvent(applicationId, isRecovered ? RMAppEventType.RECOVER: - RMAppEventType.START)); + } } catch (IOException ie) { - LOG.info("RMAppManager submit application exception", ie); - if (application != null) { - // Sending APP_REJECTED is fine, since we assume that the - // RMApp is in NEW state and thus we havne't yet informed the - // Scheduler about the existence of the application - this.rmContext.getDispatcher().getEventHandler().handle( - new RMAppRejectedEvent(applicationId, ie.getMessage())); - } + LOG.warn( + "Unable to add the application to the delegation token renewer.", + ie); + // Sending APP_REJECTED is fine, since we assume that the + // RMApp is in NEW state and thus we havne't yet informed the + // Scheduler about the existence of the application + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppRejectedEvent(applicationId, ie.getMessage())); + throw RPCUtil.getRemoteException(ie); } + + // All done, start the RMApp + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppEvent(applicationId, isRecovered ? RMAppEventType.RECOVER: + RMAppEventType.START)); } private Credentials parseCredentials(ApplicationSubmissionContext application) @@ -377,14 +395,6 @@ public void handle(RMAppManagerEvent event) { checkAppNumCompletedLimit(); } break; - case APP_SUBMIT: - { - ApplicationSubmissionContext submissionContext = - ((RMAppManagerSubmitEvent)event).getSubmissionContext(); - long submitTime = ((RMAppManagerSubmitEvent)event).getSubmitTime(); - submitApplication(submissionContext, submitTime, false); - } - break; default: LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!"); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEventType.java index e805ed8..1b6a44c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEventType.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEventType.java @@ -19,6 +19,5 @@ package org.apache.hadoop.yarn.server.resourcemanager; public enum RMAppManagerEventType { - APP_SUBMIT, APP_COMPLETED } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java deleted file mode 100644 index afcd24d..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager; - -import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; - -public class RMAppManagerSubmitEvent extends RMAppManagerEvent { - - private final ApplicationSubmissionContext submissionContext; - private final long submitTime; - - public RMAppManagerSubmitEvent( - ApplicationSubmissionContext submissionContext, long submitTime) { - super(submissionContext.getApplicationId(), - RMAppManagerEventType.APP_SUBMIT); - this.submissionContext = submissionContext; - this.submitTime = submitTime; - } - - public ApplicationSubmissionContext getSubmissionContext() { - return this.submissionContext; - } - - public long getSubmitTime() { - return this.submitTime; - } -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java index f5cc7d3..73ccc03 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java @@ -19,6 +19,9 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.util.HashMap; import java.util.List; import java.util.concurrent.ConcurrentMap; @@ -31,12 +34,15 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; @@ -46,11 +52,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.service.Service; -import org.apache.hadoop.yarn.util.BuilderUtils; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import com.google.common.collect.Lists; @@ -163,9 +169,10 @@ public void setCompletedAppsMax(int max) { super.setCompletedAppsMax(max); } public void submitApplication( - ApplicationSubmissionContext submissionContext) { - super.submitApplication( - submissionContext, System.currentTimeMillis(), false); + ApplicationSubmissionContext submissionContext) + throws YarnRemoteException { + super.submitApplication(submissionContext, System.currentTimeMillis(), + false); } } @@ -179,6 +186,40 @@ protected void addToCompletedApps(TestRMAppManager appMonitor, RMContext rmConte } } + private RMContext rmContext; + private TestRMAppManager appMonitor; + private ApplicationSubmissionContext asContext; + private ApplicationId appId; + + @Before + public void setUp() { + long now = System.currentTimeMillis(); + + rmContext = mockRMContext(1, now - 10); + ResourceScheduler scheduler = mockResourceScheduler(); + Configuration conf = new Configuration(); + ApplicationMasterService masterService = + new ApplicationMasterService(rmContext, scheduler); + appMonitor = new TestRMAppManager(rmContext, + new ClientToAMTokenSecretManagerInRM(), scheduler, masterService, + new ApplicationACLsManager(conf), conf); + + appId = MockApps.newAppID(1); + RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + asContext = + recordFactory.newRecordInstance(ApplicationSubmissionContext.class); + asContext.setApplicationId(appId); + asContext.setAMContainerSpec(mockContainerLaunchContext(recordFactory)); + asContext.setResource(mockResource()); + setupDispatcher(rmContext, conf); + } + + @After + public void tearDown() { + setAppEventType(RMAppEventType.KILL); + ((Service)rmContext.getDispatcher()).stop(); + } + @Test public void testRMAppRetireNone() throws Exception { long now = System.currentTimeMillis(); @@ -334,38 +375,10 @@ protected void setupDispatcher(RMContext rmContext, Configuration conf) { @Test public void testRMAppSubmit() throws Exception { - long now = System.currentTimeMillis(); - - RMContext rmContext = mockRMContext(0, now - 10); - ResourceScheduler scheduler = new CapacityScheduler(); - Configuration conf = new Configuration(); - ApplicationMasterService masterService = - new ApplicationMasterService(rmContext, scheduler); - TestRMAppManager appMonitor = new TestRMAppManager(rmContext, - new ClientToAMTokenSecretManagerInRM(), scheduler, masterService, - new ApplicationACLsManager(conf), conf); - - ApplicationId appID = MockApps.newAppID(1); - RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - ApplicationSubmissionContext context = - recordFactory.newRecordInstance(ApplicationSubmissionContext.class); - context.setApplicationId(appID); - ContainerLaunchContext amContainer = recordFactory - .newRecordInstance(ContainerLaunchContext.class); - amContainer.setApplicationACLs(new HashMap()); - context.setAMContainerSpec(amContainer); - setupDispatcher(rmContext, conf); - - appMonitor.submitApplication(context); - RMApp app = rmContext.getRMApps().get(appID); + appMonitor.submitApplication(asContext); + RMApp app = rmContext.getRMApps().get(appId); Assert.assertNotNull("app is null", app); - Assert.assertEquals("app id doesn't match", appID, app.getApplicationId()); - Assert.assertEquals("app name doesn't match", - YarnConfiguration.DEFAULT_APPLICATION_NAME, - app.getName()); - Assert.assertEquals("app queue doesn't match", - YarnConfiguration.DEFAULT_QUEUE_NAME, - app.getQueue()); + Assert.assertEquals("app id doesn't match", appId, app.getApplicationId()); Assert.assertEquals("app state doesn't match", RMAppState.NEW, app.getState()); // wait for event to be processed @@ -374,9 +387,8 @@ public void testRMAppSubmit() throws Exception { timeoutSecs++ < 20) { Thread.sleep(1000); } - Assert.assertEquals("app event type sent is wrong", RMAppEventType.START, getAppEventType()); - setAppEventType(RMAppEventType.KILL); - ((Service)rmContext.getDispatcher()).stop(); + Assert.assertEquals("app event type sent is wrong", RMAppEventType.START, + getAppEventType()); } @Test (timeout = 30000) @@ -390,10 +402,7 @@ public void testRMAppSubmitMaxAppAttempts() throws Exception { new int[]{ 1, 1, 1, 1 }}; for (int i = 0; i < globalMaxAppAttempts.length; ++i) { for (int j = 0; j < individualMaxAppAttempts.length; ++j) { - long now = System.currentTimeMillis(); - - RMContext rmContext = mockRMContext(0, now - 10); - ResourceScheduler scheduler = new CapacityScheduler(); + ResourceScheduler scheduler = mockResourceScheduler(); Configuration conf = new Configuration(); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, globalMaxAppAttempts[i]); ApplicationMasterService masterService = @@ -402,21 +411,12 @@ public void testRMAppSubmitMaxAppAttempts() throws Exception { new ClientToAMTokenSecretManagerInRM(), scheduler, masterService, new ApplicationACLsManager(conf), conf); - RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - ApplicationSubmissionContext context = - recordFactory.newRecordInstance(ApplicationSubmissionContext.class); - ContainerLaunchContext amContainer = recordFactory - .newRecordInstance(ContainerLaunchContext.class); - amContainer.setApplicationACLs(new HashMap()); - context.setAMContainerSpec(amContainer); - setupDispatcher(rmContext, conf); - - ApplicationId appID = MockApps.newAppID(1); - context.setApplicationId(appID); + ApplicationId appID = MockApps.newAppID(i * 4 + j + 1); + asContext.setApplicationId(appID); if (individualMaxAppAttempts[i][j] != 0) { - context.setMaxAppAttempts(individualMaxAppAttempts[i][j]); + asContext.setMaxAppAttempts(individualMaxAppAttempts[i][j]); } - appMonitor.submitApplication(context); + appMonitor.submitApplication(asContext); RMApp app = rmContext.getRMApps().get(appID); Assert.assertEquals("max application attempts doesn't match", expectedNums[i][j], app.getMaxAppAttempts()); @@ -428,96 +428,73 @@ public void testRMAppSubmitMaxAppAttempts() throws Exception { Thread.sleep(1000); } setAppEventType(RMAppEventType.KILL); - ((Service)rmContext.getDispatcher()).stop(); } } } - @Test (timeout = 3000) - public void testRMAppSubmitWithQueueAndName() throws Exception { - long now = System.currentTimeMillis(); - - RMContext rmContext = mockRMContext(1, now - 10); - ResourceScheduler scheduler = new CapacityScheduler(); - Configuration conf = new Configuration(); - ApplicationMasterService masterService = - new ApplicationMasterService(rmContext, scheduler); - TestRMAppManager appMonitor = new TestRMAppManager(rmContext, - new ClientToAMTokenSecretManagerInRM(), scheduler, masterService, - new ApplicationACLsManager(conf), conf); - - ApplicationId appID = MockApps.newAppID(10); - RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class); - context.setApplicationId(appID); - context.setApplicationName("testApp1"); - context.setQueue("testQueue"); - ContainerLaunchContext amContainer = recordFactory - .newRecordInstance(ContainerLaunchContext.class); - amContainer - .setApplicationACLs(new HashMap()); - context.setAMContainerSpec(amContainer); + @Test (timeout = 30000) + public void testRMAppSubmitDuplicateApplicationId() throws Exception { + ApplicationId appId = MockApps.newAppID(0); + asContext.setApplicationId(appId); + RMApp appOrig = rmContext.getRMApps().get(appId); + Assert.assertTrue("app name matches but shouldn't", "testApp1" != appOrig.getName()); - setupDispatcher(rmContext, conf); + // our testApp1 should be rejected and original app with same id should be left in place + try { + appMonitor.submitApplication(asContext); + Assert.fail("Exception is expected when applicationId is duplicate."); + } catch (YarnRemoteException e) { + Assert.assertTrue("The thrown exception is not the expectd one.", + e.getMessage().contains("Cannot add a duplicate!")); + } - appMonitor.submitApplication(context); - RMApp app = rmContext.getRMApps().get(appID); + // make sure original app didn't get removed + RMApp app = rmContext.getRMApps().get(appId); Assert.assertNotNull("app is null", app); - Assert.assertEquals("app id doesn't match", appID, app.getApplicationId()); - Assert.assertEquals("app name doesn't match", "testApp1", app.getName()); - Assert.assertEquals("app queue doesn't match", "testQueue", app.getQueue()); - Assert.assertEquals("app state doesn't match", RMAppState.NEW, app.getState()); + Assert.assertEquals("app id doesn't match", appId, app.getApplicationId()); + Assert.assertEquals("app state doesn't match", RMAppState.FINISHED, app.getState()); + } - // wait for event to be processed - int timeoutSecs = 0; - while ((getAppEventType() == RMAppEventType.KILL) && - timeoutSecs++ < 20) { - Thread.sleep(1000); + @Test (timeout = 30000) + public void testRMAppSubmitInvalidResourceRequest() throws Exception { + asContext.setResource(Resources.createResource( + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + 1)); + + // submit an app + try { + appMonitor.submitApplication(asContext); + Assert.fail("Application submission should fail because resource" + + " request is invalid."); + } catch (YarnRemoteException e) { + // Exception is expected + Assert.assertTrue("The thrown exception is not" + + " InvalidResourceRequestException", + e.getMessage().startsWith("Invalid resource request")); } - Assert.assertEquals("app event type sent is wrong", RMAppEventType.START, getAppEventType()); - setAppEventType(RMAppEventType.KILL); - ((Service)rmContext.getDispatcher()).stop(); } - @Test - public void testRMAppSubmitError() throws Exception { - long now = System.currentTimeMillis(); - - // specify 1 here and use same appId below so it gets duplicate entry - RMContext rmContext = mockRMContext(1, now - 10); - ResourceScheduler scheduler = new CapacityScheduler(); - Configuration conf = new Configuration(); - ApplicationMasterService masterService = - new ApplicationMasterService(rmContext, scheduler); - TestRMAppManager appMonitor = new TestRMAppManager(rmContext, - new ClientToAMTokenSecretManagerInRM(), scheduler, masterService, - new ApplicationACLsManager(conf), conf); - - ApplicationId appID = MockApps.newAppID(0); - RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class); - context.setApplicationId(appID); - context.setApplicationName("testApp1"); - context.setQueue("testQueue"); - - setupDispatcher(rmContext, conf); - - RMApp appOrig = rmContext.getRMApps().get(appID); - Assert.assertTrue("app name matches but shouldn't", "testApp1" != appOrig.getName()); + private static ResourceScheduler mockResourceScheduler() { + ResourceScheduler scheduler = mock(ResourceScheduler.class); + when(scheduler.getMinimumResourceCapability()).thenReturn( + Resources.createResource( + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB)); + when(scheduler.getMaximumResourceCapability()).thenReturn( + Resources.createResource( + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB)); + return scheduler; + } - ContainerLaunchContext clc = - BuilderUtils.newContainerLaunchContext(null, null, null, null, null, - null, null); - context.setAMContainerSpec(clc); - // our testApp1 should be rejected and original app with same id should be left in place - appMonitor.submitApplication(context); + private static ContainerLaunchContext mockContainerLaunchContext( + RecordFactory recordFactory) { + ContainerLaunchContext amContainer = recordFactory.newRecordInstance( + ContainerLaunchContext.class); + amContainer.setApplicationACLs(new HashMap());; + return amContainer; + } - // make sure original app didn't get removed - RMApp app = rmContext.getRMApps().get(appID); - Assert.assertNotNull("app is null", app); - Assert.assertEquals("app id doesn't match", appID, app.getApplicationId()); - Assert.assertEquals("app name doesn't matches", appOrig.getName(), app.getName()); - ((Service)rmContext.getDispatcher()).stop(); + private static Resource mockResource() { + return Resources.createResource( + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index aa7af9c..ae25e89 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -250,17 +250,70 @@ private void checkTokenRenewal(UserGroupInformation owner, rmContext, null, null, null, dtsm); rmService.renewDelegationToken(request); } + + @Test (timeout = 30000) + @SuppressWarnings ("rawtypes") + public void testAppSubmit() throws Exception { + YarnScheduler yarnScheduler = mockYarnScheduler(); + RMContext rmContext = mock(RMContext.class); + mockRMContext(yarnScheduler, rmContext); + RMStateStore stateStore = mock(RMStateStore.class); + when(rmContext.getStateStore()).thenReturn(stateStore); + RMAppManager appManager = new RMAppManager(rmContext, yarnScheduler, + null, mock(ApplicationACLsManager.class), new Configuration()); + when(rmContext.getDispatcher().getEventHandler()).thenReturn( + new EventHandler() { + public void handle(Event event) {} + }); + ClientRMService rmService = + new ClientRMService(rmContext, yarnScheduler, appManager, null, null); + + // without name and queue + ApplicationId appId1 = getApplicationId(100); + SubmitApplicationRequest submitRequest1 = mockSubmitAppRequest( + appId1, null, null); + try { + rmService.submitApplication(submitRequest1); + } catch (YarnRemoteException e) { + Assert.fail("Exception is not expected."); + } + RMApp app1 = rmContext.getRMApps().get(appId1); + Assert.assertNotNull("app doesn't exist", app1); + Assert.assertEquals("app name doesn't match", + YarnConfiguration.DEFAULT_APPLICATION_NAME, app1.getName()); + Assert.assertEquals("app queue doesn't match", + YarnConfiguration.DEFAULT_QUEUE_NAME, app1.getQueue()); + + // with name and queue + String name = MockApps.newAppName(); + String queue = MockApps.newQueue(); + ApplicationId appId2 = getApplicationId(101); + SubmitApplicationRequest submitRequest2 = mockSubmitAppRequest( + appId2, name, queue); + try { + rmService.submitApplication(submitRequest2); + } catch (YarnRemoteException e) { + Assert.fail("Exception is not expected."); + } + RMApp app2 = rmContext.getRMApps().get(appId2); + Assert.assertNotNull("app doesn't exist", app2); + Assert.assertEquals("app name doesn't match", name, app2.getName()); + Assert.assertEquals("app queue doesn't match", queue, app2.getQueue()); + + // duplicate appId + try { + rmService.submitApplication(submitRequest2); + Assert.fail("Exception is expected."); + } catch (YarnRemoteException e) { + Assert.assertTrue("The thrown exception is not expected.", + e.getMessage().contains("Cannot add a duplicate!")); + } + } @Test(timeout=4000) public void testConcurrentAppSubmit() throws IOException, InterruptedException, BrokenBarrierException { - YarnScheduler yarnScheduler = mock(YarnScheduler.class); - when(yarnScheduler.getMinimumResourceCapability()).thenReturn( - Resources.createResource( - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB)); - when(yarnScheduler.getMaximumResourceCapability()).thenReturn( - Resources.createResource( - YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB)); + YarnScheduler yarnScheduler = mockYarnScheduler(); RMContext rmContext = mock(RMContext.class); mockRMContext(yarnScheduler, rmContext); RMStateStore stateStore = mock(RMStateStore.class); @@ -270,8 +323,10 @@ public void testConcurrentAppSubmit() final ApplicationId appId1 = getApplicationId(100); final ApplicationId appId2 = getApplicationId(101); - final SubmitApplicationRequest submitRequest1 = mockSubmitAppRequest(appId1); - final SubmitApplicationRequest submitRequest2 = mockSubmitAppRequest(appId2); + final SubmitApplicationRequest submitRequest1 = mockSubmitAppRequest( + appId1, null, null); + final SubmitApplicationRequest submitRequest2 = mockSubmitAppRequest( + appId2, null, null); final CyclicBarrier startBarrier = new CyclicBarrier(2); final CyclicBarrier endBarrier = new CyclicBarrier(2); @@ -319,61 +374,23 @@ public void run() { t.join(); } - @Test (timeout = 30000) - public void testInvalidResourceRequestWhenSubmittingApplication() - throws IOException, InterruptedException, BrokenBarrierException { - YarnScheduler yarnScheduler = mock(YarnScheduler.class); - when(yarnScheduler.getMinimumResourceCapability()).thenReturn( - Resources.createResource( - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB)); - when(yarnScheduler.getMaximumResourceCapability()).thenReturn( - Resources.createResource( - YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB)); - RMContext rmContext = mock(RMContext.class); - mockRMContext(yarnScheduler, rmContext); - RMStateStore stateStore = mock(RMStateStore.class); - when(rmContext.getStateStore()).thenReturn(stateStore); - RMAppManager appManager = new RMAppManager(rmContext, yarnScheduler, - null, mock(ApplicationACLsManager.class), new Configuration()); - - final ApplicationId appId = getApplicationId(100); - final SubmitApplicationRequest submitRequest = mockSubmitAppRequest(appId); - Resource resource = Resources.createResource( - YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + 1); - when(submitRequest.getApplicationSubmissionContext() - .getResource()).thenReturn(resource); - - final ClientRMService rmService = - new ClientRMService(rmContext, yarnScheduler, appManager, null, null); - - // submit an app - try { - rmService.submitApplication(submitRequest); - Assert.fail("Application submission should fail because resource" + - " request is invalid."); - } catch (YarnRemoteException e) { - // Exception is expected - Assert.assertTrue("The thrown exception is not" + - " InvalidResourceRequestException", - e.getMessage().startsWith("Invalid resource request")); - } - } - - private SubmitApplicationRequest mockSubmitAppRequest(ApplicationId appId) { + private SubmitApplicationRequest mockSubmitAppRequest(ApplicationId appId, + String name, String queue) { String user = MockApps.newUserName(); - String queue = MockApps.newQueue(); ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class); Resource resource = Resources.createResource( YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); - ApplicationSubmissionContext submissionContext = mock(ApplicationSubmissionContext.class); - when(submissionContext.getAMContainerSpec()).thenReturn(amContainerSpec); - when(submissionContext.getAMContainerSpec().getUser()).thenReturn(user); - when(submissionContext.getQueue()).thenReturn(queue); - when(submissionContext.getApplicationId()).thenReturn(appId); - when(submissionContext.getResource()).thenReturn(resource); + ApplicationSubmissionContext submissionContext = + recordFactory.newRecordInstance(ApplicationSubmissionContext.class); + submissionContext.setAMContainerSpec(amContainerSpec); + submissionContext.getAMContainerSpec().setUser(user); + submissionContext.setApplicationName(name); + submissionContext.setQueue(queue); + submissionContext.setApplicationId(appId); + submissionContext.setResource(resource); SubmitApplicationRequest submitRequest = recordFactory.newRecordInstance(SubmitApplicationRequest.class); @@ -429,4 +446,15 @@ private RMAppImpl getRMApp(RMContext rmContext, YarnScheduler yarnScheduler, queueName, asContext, yarnScheduler, null , System .currentTimeMillis()); } + + private static YarnScheduler mockYarnScheduler() { + YarnScheduler yarnScheduler = mock(YarnScheduler.class); + when(yarnScheduler.getMinimumResourceCapability()).thenReturn( + Resources.createResource( + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB)); + when(yarnScheduler.getMaximumResourceCapability()).thenReturn( + Resources.createResource( + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB)); + return yarnScheduler; + } }