diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java index eb84b31..4d9d256 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java @@ -53,9 +53,11 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.util.Records; @@ -68,6 +70,7 @@ protected ClientRMProtocol rmClient; protected InetSocketAddress rmAddress; + protected long statePollIntervalMillis; private static final String ROOT = "root"; @@ -90,6 +93,9 @@ public synchronized void init(Configuration conf) { if (this.rmAddress == null) { this.rmAddress = getRmAddress(conf); } + statePollIntervalMillis = conf.getLong( + YarnConfiguration.RM_CLIENT_STATE_POLL_INTERVAL, + YarnConfiguration.DEFAULT_RM_CLIENT_STATE_POLL_INTERVAL); super.init(conf); } @@ -133,6 +139,20 @@ public GetNewApplicationResponse getNewApplication() rmClient.submitApplication(request); LOG.info("Submitted application " + applicationId + " to ResourceManager" + " at " + rmAddress); + + while (true) { + YarnApplicationState state = + getApplicationReport(applicationId).getYarnApplicationState(); + if (!state.equals(YarnApplicationState.NEW) && + !state.equals(YarnApplicationState.NEW_SAVING)) { + break; + } + try { + Thread.sleep(statePollIntervalMillis); + } catch (InterruptedException ie) { + RPCUtil.getRemoteException(ie); + } + } return applicationId; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java index 3d7f120..f379f51 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java @@ -18,10 +18,23 @@ package org.apache.hadoop.yarn.client; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import junit.framework.Assert; + import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.client.YarnClient; -import org.apache.hadoop.yarn.client.YarnClientImpl; +import org.apache.hadoop.yarn.api.ClientRMProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.util.Records; import org.junit.Test; public class TestYarnClient { @@ -43,4 +56,92 @@ public void testClientStop() { client.start(); client.stop(); } + + @Test (timeout = 30000) + public void testSubmitApplication() { + Configuration conf = new Configuration(); + final YarnClient client = new MockYarnClient(); + client.init(conf); + client.start(); + final ApplicationSubmissionContext context = + mock(ApplicationSubmissionContext.class); + ApplicationId applicationId = Records.newRecord(ApplicationId.class); + applicationId.setClusterTimestamp(System.currentTimeMillis()); + applicationId.setId(1); + when(context.getApplicationId()).thenReturn(applicationId); + + Thread t = new Thread() { + @Override + public void run() { + try { + client.submitApplication(context); + } catch (YarnRemoteException e) { + Assert.fail("Exception is not expected."); + } + } + }; + + ((MockYarnClient) client).setYarnApplicationState( + YarnApplicationState.NEW); + t.start(); + int maxTries = 20; + // app submission doesn't return when YarnApplicationState.NEW + assertThreadAlive(t, maxTries, false); + ((MockYarnClient) client).setYarnApplicationState( + YarnApplicationState.NEW_SAVING); + // app submission doesn't return when YarnApplicationState.NEW_SAVING + assertThreadAlive(t, maxTries, false); + ((MockYarnClient) client).setYarnApplicationState( + YarnApplicationState.SUBMITTED); + // app submission returns when YarnApplicationState.SUBMITTED + assertThreadAlive(t, maxTries, true); + Assert.assertFalse(t.isAlive()); + + client.stop(); + } + + private static void assertThreadAlive(Thread t, int maxTries, boolean mode) { + for (int i = 0; i < maxTries && (mode ? t.isAlive() : true); ++i) { + if (!mode) { + Assert.assertTrue(t.isAlive()); + } + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + ie.printStackTrace(); + } + } + } + + private static class MockYarnClient extends YarnClientImpl { + private ApplicationReport mockReport; + + public MockYarnClient() { + super(); + } + + @Override + public void start() { + rmClient = mock(ClientRMProtocol.class); + GetApplicationReportResponse mockResponse = + mock(GetApplicationReportResponse.class); + mockReport = mock(ApplicationReport.class); + try{ + when(rmClient.getApplicationReport(any( + GetApplicationReportRequest.class))).thenReturn(mockResponse); + } catch (YarnRemoteException e) { + Assert.fail("Exception is not expected."); + } + when(mockResponse.getApplicationReport()).thenReturn(mockReport); + } + + @Override + public void stop() { + } + + public void setYarnApplicationState(YarnApplicationState state) { + when(mockReport.getYarnApplicationState()).thenReturn(state); + } + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 786598b..cccc8f2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -94,6 +94,14 @@ public static final String DEFAULT_RM_ADDRESS = "0.0.0.0:" + DEFAULT_RM_PORT; + /** + * The interval of the yarn client's querying application state after + * application submission. The unit is millisecond. + */ + public static final String RM_CLIENT_STATE_POLL_INTERVAL = + RM_PREFIX + "client.state-poll-interval"; + public static final long DEFAULT_RM_CLIENT_STATE_POLL_INTERVAL = 1000; + /** The number of threads used to handle applications manager requests.*/ public static final String RM_CLIENT_THREAD_COUNT = RM_PREFIX + "client.thread-count"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index f873ff9..bcf9e16 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -71,6 +71,13 @@ + The interval of the yarn client's querying application state + after application submission. The unit is millisecond. + yarn.resourcemanager.client.state-poll-interval + 1000 + + + The number of threads used to handle applications manager requests. yarn.resourcemanager.client.thread-count 50 diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index bcc1f64..714b8ec 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -38,7 +38,6 @@ import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.ClientRMProtocol; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; @@ -72,7 +71,6 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; @@ -83,15 +81,11 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.InvalidResourceRequestException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; @@ -266,48 +260,26 @@ public SubmitApplicationResponse submitApplication( ApplicationSubmissionContext submissionContext = request .getApplicationSubmissionContext(); ApplicationId applicationId = submissionContext.getApplicationId(); - String user = submissionContext.getAMContainerSpec().getUser(); - try { - user = UserGroupInformation.getCurrentUser().getShortUserName(); - if (rmContext.getRMApps().get(applicationId) != null) { - throw new IOException("Application with id " + applicationId - + " is already present! Cannot add a duplicate!"); - } - - // Safety - submissionContext.getAMContainerSpec().setUser(user); - - // Check whether AM resource requirements are within required limits - if (!submissionContext.getUnmanagedAM()) { - ResourceRequest amReq = BuilderUtils.newResourceRequest( - RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY, - submissionContext.getResource(), 1); - try { - SchedulerUtils.validateResourceRequest(amReq, - scheduler.getMaximumResourceCapability()); - } catch (InvalidResourceRequestException e) { - LOG.warn("RM app submission failed in validating AM resource request" - + " for application " + applicationId, e); - throw RPCUtil.getRemoteException(e); - } - } - // This needs to be synchronous as the client can query - // immediately following the submission to get the application status. - // So call handle directly and do not send an event. - rmAppManager.handle(new RMAppManagerSubmitEvent(submissionContext, System - .currentTimeMillis())); + try { + // call RMAppManager to submit application directly + rmAppManager.submitApplication(submissionContext, + System.currentTimeMillis(), false); LOG.info("Application with id " + applicationId.getId() + - " submitted by user " + user); - RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST, - "ClientRMService", applicationId); - } catch (IOException ie) { - LOG.info("Exception in submitting application", ie); - RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST, - ie.getMessage(), "ClientRMService", - "Exception in submitting application", applicationId); - throw RPCUtil.getRemoteException(ie); + " submitted by user " + + submissionContext.getAMContainerSpec().getUser()); + RMAuditLogger.logSuccess( + submissionContext.getAMContainerSpec().getUser(), + AuditConstants.SUBMIT_APP_REQUEST, "ClientRMService", applicationId); + } catch (YarnRemoteException e) { + LOG.info("Exception in submitting application", e); + RMAuditLogger.logFailure( + submissionContext.getAMContainerSpec().getUser(), + AuditConstants.SUBMIT_APP_REQUEST, e.getMessage(), + "ClientRMService", "Exception in submitting application", + applicationId); + throw e; } SubmitApplicationResponse response = recordFactory diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 8a92ab1..d782767 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -31,8 +31,10 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; @@ -45,8 +47,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.InvalidResourceRequestException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.apache.hadoop.yarn.util.BuilderUtils; /** * This class manages the list of applications for the resource manager. @@ -233,10 +239,29 @@ protected synchronized void checkAppNumCompletedLimit() { @SuppressWarnings("unchecked") protected void submitApplication( ApplicationSubmissionContext submissionContext, long submitTime, - boolean isRecovered) { + boolean isRecovered) throws YarnRemoteException { ApplicationId applicationId = submissionContext.getApplicationId(); RMApp application = null; try { + // Safety + submissionContext.getAMContainerSpec().setUser( + UserGroupInformation.getCurrentUser().getShortUserName()); + + // Sanity check + // Check whether AM resource requirements are within required limits + if (!submissionContext.getUnmanagedAM()) { + ResourceRequest amReq = BuilderUtils.newResourceRequest( + RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY, + submissionContext.getResource(), 1); + try { + SchedulerUtils.validateResourceRequest(amReq, + scheduler.getMaximumResourceCapability()); + } catch (InvalidResourceRequestException e) { + LOG.warn("RM app submission failed in validating AM resource request" + + " for application " + applicationId, e); + throw RPCUtil.getRemoteException(e); + } + } // Sanity checks if (submissionContext.getQueue() == null) { @@ -257,6 +282,9 @@ protected void submitApplication( submitTime); // Sanity check - duplicate? + // Concurrent app submissions with same applicationId will fail here + // Concurrent app submissions with different applicationIds will not + // influence each other if (rmContext.getRMApps().putIfAbsent(applicationId, application) != null) { String message = "Application with id " + applicationId @@ -290,6 +318,10 @@ protected void submitApplication( this.rmContext.getDispatcher().getEventHandler().handle( new RMAppRejectedEvent(applicationId, ie.getMessage())); } + if (!(ie instanceof YarnRemoteException)) { + ie = RPCUtil.getRemoteException(ie); + } + throw (YarnRemoteException) ie; } } @@ -375,14 +407,6 @@ public void handle(RMAppManagerEvent event) { checkAppNumCompletedLimit(); } break; - case APP_SUBMIT: - { - ApplicationSubmissionContext submissionContext = - ((RMAppManagerSubmitEvent)event).getSubmissionContext(); - long submitTime = ((RMAppManagerSubmitEvent)event).getSubmitTime(); - submitApplication(submissionContext, submitTime, false); - } - break; default: LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!"); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEventType.java index e805ed8..1b6a44c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEventType.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEventType.java @@ -19,6 +19,5 @@ package org.apache.hadoop.yarn.server.resourcemanager; public enum RMAppManagerEventType { - APP_SUBMIT, APP_COMPLETED } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java deleted file mode 100644 index afcd24d..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager; - -import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; - -public class RMAppManagerSubmitEvent extends RMAppManagerEvent { - - private final ApplicationSubmissionContext submissionContext; - private final long submitTime; - - public RMAppManagerSubmitEvent( - ApplicationSubmissionContext submissionContext, long submitTime) { - super(submissionContext.getApplicationId(), - RMAppManagerEventType.APP_SUBMIT); - this.submissionContext = submissionContext; - this.submitTime = submitTime; - } - - public ApplicationSubmissionContext getSubmissionContext() { - return this.submissionContext; - } - - public long getSubmitTime() { - return this.submitTime; - } -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java index f5cc7d3..38e130c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java @@ -19,6 +19,9 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.util.HashMap; import java.util.List; import java.util.concurrent.ConcurrentMap; @@ -31,12 +34,15 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; @@ -46,11 +52,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.service.Service; -import org.apache.hadoop.yarn.util.BuilderUtils; import org.junit.Test; import com.google.common.collect.Lists; @@ -163,9 +167,10 @@ public void setCompletedAppsMax(int max) { super.setCompletedAppsMax(max); } public void submitApplication( - ApplicationSubmissionContext submissionContext) { - super.submitApplication( - submissionContext, System.currentTimeMillis(), false); + ApplicationSubmissionContext submissionContext) + throws YarnRemoteException { + super.submitApplication(submissionContext, System.currentTimeMillis(), + false); } } @@ -337,7 +342,7 @@ public void testRMAppSubmit() throws Exception { long now = System.currentTimeMillis(); RMContext rmContext = mockRMContext(0, now - 10); - ResourceScheduler scheduler = new CapacityScheduler(); + ResourceScheduler scheduler = mockResourceScheduler(); Configuration conf = new Configuration(); ApplicationMasterService masterService = new ApplicationMasterService(rmContext, scheduler); @@ -350,10 +355,8 @@ public void testRMAppSubmit() throws Exception { ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class); context.setApplicationId(appID); - ContainerLaunchContext amContainer = recordFactory - .newRecordInstance(ContainerLaunchContext.class); - amContainer.setApplicationACLs(new HashMap()); - context.setAMContainerSpec(amContainer); + context.setAMContainerSpec(mockContainerLaunchContext(recordFactory)); + context.setResource(mockResource()); setupDispatcher(rmContext, conf); appMonitor.submitApplication(context); @@ -393,7 +396,7 @@ public void testRMAppSubmitMaxAppAttempts() throws Exception { long now = System.currentTimeMillis(); RMContext rmContext = mockRMContext(0, now - 10); - ResourceScheduler scheduler = new CapacityScheduler(); + ResourceScheduler scheduler = mockResourceScheduler(); Configuration conf = new Configuration(); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, globalMaxAppAttempts[i]); ApplicationMasterService masterService = @@ -405,10 +408,8 @@ public void testRMAppSubmitMaxAppAttempts() throws Exception { RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class); - ContainerLaunchContext amContainer = recordFactory - .newRecordInstance(ContainerLaunchContext.class); - amContainer.setApplicationACLs(new HashMap()); - context.setAMContainerSpec(amContainer); + context.setAMContainerSpec(mockContainerLaunchContext(recordFactory)); + context.setResource(mockResource()); setupDispatcher(rmContext, conf); ApplicationId appID = MockApps.newAppID(1); @@ -438,7 +439,7 @@ public void testRMAppSubmitWithQueueAndName() throws Exception { long now = System.currentTimeMillis(); RMContext rmContext = mockRMContext(1, now - 10); - ResourceScheduler scheduler = new CapacityScheduler(); + ResourceScheduler scheduler = mockResourceScheduler(); Configuration conf = new Configuration(); ApplicationMasterService masterService = new ApplicationMasterService(rmContext, scheduler); @@ -452,12 +453,8 @@ public void testRMAppSubmitWithQueueAndName() throws Exception { context.setApplicationId(appID); context.setApplicationName("testApp1"); context.setQueue("testQueue"); - ContainerLaunchContext amContainer = recordFactory - .newRecordInstance(ContainerLaunchContext.class); - amContainer - .setApplicationACLs(new HashMap()); - context.setAMContainerSpec(amContainer); - + context.setAMContainerSpec(mockContainerLaunchContext(recordFactory)); + context.setResource(mockResource()); setupDispatcher(rmContext, conf); appMonitor.submitApplication(context); @@ -479,13 +476,13 @@ public void testRMAppSubmitWithQueueAndName() throws Exception { ((Service)rmContext.getDispatcher()).stop(); } - @Test - public void testRMAppSubmitError() throws Exception { + @Test (timeout = 30000) + public void testRMAppSubmitDuplicateApplicationId() throws Exception { long now = System.currentTimeMillis(); // specify 1 here and use same appId below so it gets duplicate entry RMContext rmContext = mockRMContext(1, now - 10); - ResourceScheduler scheduler = new CapacityScheduler(); + ResourceScheduler scheduler = mockResourceScheduler(); Configuration conf = new Configuration(); ApplicationMasterService masterService = new ApplicationMasterService(rmContext, scheduler); @@ -496,6 +493,8 @@ public void testRMAppSubmitError() throws Exception { ApplicationId appID = MockApps.newAppID(0); RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class); + context.setAMContainerSpec(mockContainerLaunchContext(recordFactory)); + context.setResource(mockResource()); context.setApplicationId(appID); context.setApplicationName("testApp1"); context.setQueue("testQueue"); @@ -505,19 +504,91 @@ public void testRMAppSubmitError() throws Exception { RMApp appOrig = rmContext.getRMApps().get(appID); Assert.assertTrue("app name matches but shouldn't", "testApp1" != appOrig.getName()); - ContainerLaunchContext clc = - BuilderUtils.newContainerLaunchContext(null, null, null, null, null, - null, null); - context.setAMContainerSpec(clc); // our testApp1 should be rejected and original app with same id should be left in place - appMonitor.submitApplication(context); + try { + appMonitor.submitApplication(context); + Assert.fail("Exception is expected when applicationId is duplicate."); + } catch (YarnRemoteException e) { + Assert.assertTrue("The thrown exception is not the expectd one.", + e.getMessage().contains("Cannot add a duplicate!")); + } // make sure original app didn't get removed RMApp app = rmContext.getRMApps().get(appID); Assert.assertNotNull("app is null", app); Assert.assertEquals("app id doesn't match", appID, app.getApplicationId()); Assert.assertEquals("app name doesn't matches", appOrig.getName(), app.getName()); + + int timeoutSecs = 0; + while ((getAppEventType() == RMAppEventType.KILL) && + timeoutSecs++ < 20) { + Thread.sleep(1000); + } + setAppEventType(RMAppEventType.KILL); ((Service)rmContext.getDispatcher()).stop(); } + @Test (timeout = 30000) + public void testRMAppSubmitInvalidResourceRequest() throws Exception { + long now = System.currentTimeMillis(); + + RMContext rmContext = mockRMContext(0, now - 10); + ResourceScheduler scheduler = mockResourceScheduler(); + Configuration conf = new Configuration(); + ApplicationMasterService masterService = + new ApplicationMasterService(rmContext, scheduler); + TestRMAppManager appMonitor = new TestRMAppManager(rmContext, + new ClientToAMTokenSecretManagerInRM(), scheduler, masterService, + new ApplicationACLsManager(conf), conf); + + ApplicationId appID = MockApps.newAppID(1); + RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + ApplicationSubmissionContext context = + recordFactory.newRecordInstance(ApplicationSubmissionContext.class); + context.setApplicationId(appID); + context.setAMContainerSpec(mockContainerLaunchContext(recordFactory)); + context.setResource(Resources.createResource( + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + 1)); + setupDispatcher(rmContext, conf); + + // submit an app + try { + appMonitor.submitApplication(context); + Assert.fail("Application submission should fail because resource" + + " request is invalid."); + } catch (YarnRemoteException e) { + // Exception is expected + Assert.assertTrue("The thrown exception is not" + + " InvalidResourceRequestException", + e.getMessage().startsWith("Invalid resource request")); + } + + setAppEventType(RMAppEventType.KILL); + ((Service)rmContext.getDispatcher()).stop(); + } + + private static ResourceScheduler mockResourceScheduler() { + ResourceScheduler scheduler = mock(ResourceScheduler.class); + when(scheduler.getMinimumResourceCapability()).thenReturn( + Resources.createResource( + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB)); + when(scheduler.getMaximumResourceCapability()).thenReturn( + Resources.createResource( + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB)); + return scheduler; + } + + private static ContainerLaunchContext mockContainerLaunchContext( + RecordFactory recordFactory) { + ContainerLaunchContext amContainer = recordFactory.newRecordInstance( + ContainerLaunchContext.class); + amContainer.setApplicationACLs(new HashMap());; + return amContainer; + } + + private static Resource mockResource() { + return Resources.createResource( + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index aa7af9c..e10774b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -319,46 +319,6 @@ public void run() { t.join(); } - @Test (timeout = 30000) - public void testInvalidResourceRequestWhenSubmittingApplication() - throws IOException, InterruptedException, BrokenBarrierException { - YarnScheduler yarnScheduler = mock(YarnScheduler.class); - when(yarnScheduler.getMinimumResourceCapability()).thenReturn( - Resources.createResource( - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB)); - when(yarnScheduler.getMaximumResourceCapability()).thenReturn( - Resources.createResource( - YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB)); - RMContext rmContext = mock(RMContext.class); - mockRMContext(yarnScheduler, rmContext); - RMStateStore stateStore = mock(RMStateStore.class); - when(rmContext.getStateStore()).thenReturn(stateStore); - RMAppManager appManager = new RMAppManager(rmContext, yarnScheduler, - null, mock(ApplicationACLsManager.class), new Configuration()); - - final ApplicationId appId = getApplicationId(100); - final SubmitApplicationRequest submitRequest = mockSubmitAppRequest(appId); - Resource resource = Resources.createResource( - YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + 1); - when(submitRequest.getApplicationSubmissionContext() - .getResource()).thenReturn(resource); - - final ClientRMService rmService = - new ClientRMService(rmContext, yarnScheduler, appManager, null, null); - - // submit an app - try { - rmService.submitApplication(submitRequest); - Assert.fail("Application submission should fail because resource" + - " request is invalid."); - } catch (YarnRemoteException e) { - // Exception is expected - Assert.assertTrue("The thrown exception is not" + - " InvalidResourceRequestException", - e.getMessage().startsWith("Invalid resource request")); - } - } - private SubmitApplicationRequest mockSubmitAppRequest(ApplicationId appId) { String user = MockApps.newUserName(); String queue = MockApps.newQueue();