diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 7990421..01697fb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -324,6 +324,10 @@ protected void recoverApplication(ApplicationStateData appState, private RMAppImpl createAndPopulateNewRMApp( ApplicationSubmissionContext submissionContext, long submitTime, String user, boolean isRecovery) throws YarnException { + // apply queue mapping before creating RMApp + String newQueue = scheduler.getMappedQueue(user, submissionContext); + submissionContext.setQueue(newQueue); + ApplicationId applicationId = submissionContext.getApplicationId(); ResourceRequest amReq = validateAndCreateResourceRequest(submissionContext, isRecovery); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index ae927f1..d2788f7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; @@ -685,4 +686,11 @@ protected void refreshMaximumAllocation(Resource newMaxAlloc) { } return null; } + + /** {@inheritDoc} */ + @Override + public String getMappedQueue(String user, + ApplicationSubmissionContext asc) throws YarnException { + return asc.getQueue(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index b99b217..f593fcb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.QueueACL; @@ -40,11 +41,11 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; -import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; /** @@ -286,4 +287,14 @@ void setEntitlement(String queue, QueueEntitlement entitlement) * @return an EnumSet containing the resource types */ public EnumSet getSchedulingResourceTypes(); + + /** + * Get queueName of a given {@link ApplicationSubmissionContext} after applied + * queue mapping, etc. + * + * @return queueName after applied queue mapping + * @throws YarnException + */ + public String getMappedQueue(String user, + ApplicationSubmissionContext asc) throws YarnException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 48c7f2f..19e2f6b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -49,6 +49,7 @@ import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -71,7 +72,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent; @@ -684,31 +684,6 @@ else if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) { private synchronized void addApplication(ApplicationId applicationId, String queueName, String user, boolean isAppRecovering) { - - if (mappings != null && mappings.size() > 0) { - try { - String mappedQueue = getMappedQueue(user); - if (mappedQueue != null) { - // We have a mapping, should we use it? - if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME) - || overrideWithQueueMappings) { - LOG.info("Application " + applicationId + " user " + user - + " mapping [" + queueName + "] to [" + mappedQueue - + "] override " + overrideWithQueueMappings); - queueName = mappedQueue; - RMApp rmApp = rmContext.getRMApps().get(applicationId); - rmApp.setQueue(queueName); - } - } - } catch (IOException ioex) { - String message = "Failed to submit application " + applicationId + - " submitted by user " + user + " reason: " + ioex.getMessage(); - this.rmContext.getDispatcher().getEventHandler() - .handle(new RMAppRejectedEvent(applicationId, message)); - return; - } - } - // sanity checks. CSQueue queue = getQueue(queueName); if (queue == null) { @@ -1781,4 +1756,37 @@ public SchedulerHealth getSchedulerHealth() { private synchronized void setLastNodeUpdateTime(long time) { this.lastNodeUpdateTime = time; } + + /** {@inheritDoc} */ + @Override + public String getMappedQueue(String user, + ApplicationSubmissionContext asc) throws YarnException{ + // original queue name is what specified in ApplicationSubmissionContext + // from client + String originalQueueName = asc.getQueue(); + ApplicationId applicationId = asc.getApplicationId(); + + if (mappings != null && mappings.size() > 0) { + try { + String mappedQueue = getMappedQueue(user); + if (mappedQueue != null) { + // We have a mapping, should we use it? + if (originalQueueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME) + || overrideWithQueueMappings) { + LOG.info("Application " + applicationId + " user " + user + + " mapping [" + originalQueueName + "] to [" + mappedQueue + + "] override " + overrideWithQueueMappings); + return mappedQueue; + } + } + } catch (IOException ioex) { + String message = "Failed to submit application " + applicationId + + " submitted by user " + user + " reason: " + ioex.getMessage(); + throw new YarnException(message); + } + } + + // In other case, we will return originalQueueName + return originalQueueName; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java index 3db8b7c..3c45f70 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java @@ -211,6 +211,7 @@ protected void addToCompletedApps(TestRMAppManager appMonitor, RMContext rmConte private TestRMAppManager appMonitor; private ApplicationSubmissionContext asContext; private ApplicationId appId; + private ResourceScheduler scheduler; @SuppressWarnings("deprecation") @Before @@ -218,7 +219,7 @@ public void setUp() { long now = System.currentTimeMillis(); rmContext = mockRMContext(1, now - 10); - ResourceScheduler scheduler = mockResourceScheduler(); + scheduler = mockResourceScheduler(); Configuration conf = new Configuration(); ApplicationMasterService masterService = new ApplicationMasterService(rmContext, scheduler); @@ -481,6 +482,27 @@ public void testRMAppSubmit() throws Exception { Assert.assertEquals("app event type sent is wrong", RMAppEventType.START, getAppEventType()); } + + @Test + public void testRMAppSubmitWithQueueChanged() throws Exception { + when( + scheduler.getMappedQueue(any(String.class), + any(ApplicationSubmissionContext.class))).thenReturn("newQueue"); + asContext.setQueue("oldQueue"); + appMonitor.submitApplication(asContext, "test"); + RMApp app = rmContext.getRMApps().get(appId); + Assert.assertNotNull("app is null", app); + Assert.assertEquals("newQueue", asContext.getQueue()); + + // wait for event to be processed + int timeoutSecs = 0; + while ((getAppEventType() == RMAppEventType.KILL) && + timeoutSecs++ < 20) { + Thread.sleep(1000); + } + Assert.assertEquals("app event type sent is wrong", RMAppEventType.START, + getAppEventType()); + } @Test public void testRMAppSubmitWithInvalidTokens() throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index a39f94f..495c5f7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -24,6 +24,7 @@ import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -153,9 +154,12 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; +import com.sun.tools.internal.xjc.reader.xmlschema.bindinfo.BIConversion.User; public class TestClientRMService { @@ -829,6 +833,15 @@ private void checkTokenCancellation(ClientRMService rmService, @SuppressWarnings ("rawtypes") public void testAppSubmit() throws Exception { YarnScheduler yarnScheduler = mockYarnScheduler(); + doAnswer(new Answer() { + @Override + public String answer(InvocationOnMock invocation) throws Throwable { + ApplicationSubmissionContext asc = + (ApplicationSubmissionContext) invocation.getArguments()[1]; + return asc.getQueue(); + } + }).when(yarnScheduler).getMappedQueue(any(String.class), + any(ApplicationSubmissionContext.class)); RMContext rmContext = mock(RMContext.class); mockRMContext(yarnScheduler, rmContext); RMStateStore stateStore = mock(RMStateStore.class);