diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index b21fb737f52..5ea11525ae0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -366,22 +366,20 @@ private RMAppImpl createAndPopulateNewRMApp( String user, boolean isRecovery, long startTime) throws YarnException { ApplicationPlacementContext placementContext = null; + try { + placementContext = placeApplication(rmContext, submissionContext, user); + } catch (YarnException e) { + String msg = + "Failed to place application " + submissionContext.getApplicationId() + + " to queue and specified " + "queue is invalid : " + + submissionContext.getQueue(); + LOG.error(msg, e); + throw e; + } - // We only do queue mapping when it's a new application + // We only replace the queue when it's a new application if (!isRecovery) { - try { - // Do queue mapping - placementContext = placeApplication(rmContext, - submissionContext, user); - replaceQueueFromPlacementContext(placementContext, - submissionContext); - } catch (YarnException e) { - String msg = "Failed to place application " + - submissionContext.getApplicationId() + " to queue and specified " - + "queue is invalid : " + submissionContext.getQueue(); - LOG.error(msg, e); - throw e; - } + replaceQueueFromPlacementContext(placementContext, submissionContext); // fail the submission if configured application timeout value is invalid RMServerUtils.validateApplicationTimeouts( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 4df4cf23d13..9afbdd5957c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -147,7 +147,7 @@ public AbstractCSQueue(CapacitySchedulerContext cs, this.metrics = old != null ? (CSQueueMetrics) old.getMetrics() : CSQueueMetrics.forQueue(getQueuePath(), parent, - configuration.getEnableUserMetrics(), cs.getConf()); + cs.getConfiguration().getEnableUserMetrics(), cs.getConf()); this.csContext = cs; this.minimumAllocation = csContext.getMinimumResourceCapability(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 8de363140fb..d4f5bd70eef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -650,30 +650,65 @@ public CSQueue getQueue(String queueName) { return this.queueManager.getQueue(queueName); } - private void addApplicationOnRecovery( - ApplicationId applicationId, String queueName, String user, - Priority priority) { + private void addApplicationOnRecovery(ApplicationId applicationId, + String queueName, String user, + Priority priority, ApplicationPlacementContext placementContext) { try { writeLock.lock(); CSQueue queue = getQueue(queueName); if (queue == null) { - //During a restart, this indicates a queue was removed, which is - //not presently supported - if (!YarnConfiguration.shouldRMFailFast(getConfig())) { - this.rmContext.getDispatcher().getEventHandler().handle( - new RMAppEvent(applicationId, RMAppEventType.KILL, - "Application killed on recovery as it was submitted to queue " - + queueName + " which no longer exists after restart.")); - return; - } else{ - String queueErrorMsg = "Queue named " + queueName - + " missing during application recovery." - + " Queue removal during recovery is not presently " - + "supported by the capacity scheduler, please " - + "restart with all queues configured" - + " which were present before shutdown/restart."; - LOG.fatal(queueErrorMsg); - throw new QueueInvalidException(queueErrorMsg); + //check if the queue needs to be auto-created during recovery + if (placementContext != null && placementContext.hasParentQueue()) { + try { + queue = autoCreateLeafQueue(placementContext); + } catch (YarnException | IOException e) { + LOG.error( + "Could not auto-create leaf queue during recovery due to : ", + e); + if (!YarnConfiguration.shouldRMFailFast(getConfig())) { + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppEvent(applicationId, RMAppEventType.KILL, + "Application killed on recovery" + + " as it was submitted to queue " + + queueName + " which could not be auto-created")); + } else{ + String queueErrorMsg = "Queue named " + queueName + " missing " + + "during application recovery." + + " Queue removal during recovery is not presently " + + "supported by the capacity scheduler, please " + + "restart with all queues configured" + " which were " + + "present before shutdown/restart or check that " + + "configuration for auto-creating leaf queue " + queueName + + " " + + CapacitySchedulerConfiguration + .AUTO_CREATE_CHILD_QUEUE_ENABLED + + " is set to true on its " + "parent queue"; + LOG.fatal(queueErrorMsg); + throw new QueueInvalidException(queueErrorMsg); + } + } + } + + if (queue == null) { + //During a restart, this indicates a queue was removed, which is + //not presently supported + if (!YarnConfiguration.shouldRMFailFast(getConfig())) { + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppEvent(applicationId, RMAppEventType.KILL, + "Application killed on recovery as it" + + " was submitted to queue " + queueName + + " which no longer exists after restart.")); + return; + } else{ + String queueErrorMsg = "Queue named " + queueName + " missing " + + "during application recovery." + + " Queue removal during recovery is not presently " + + "supported by the capacity scheduler, please " + + "restart with all queues configured" + + " which were present before shutdown/restart."; + LOG.fatal(queueErrorMsg); + throw new QueueInvalidException(queueErrorMsg); + } } } if (!(queue instanceof LeafQueue)) { @@ -682,8 +717,8 @@ private void addApplicationOnRecovery( if (!YarnConfiguration.shouldRMFailFast(getConfig())) { this.rmContext.getDispatcher().getEventHandler().handle( new RMAppEvent(applicationId, RMAppEventType.KILL, - "Application killed on recovery as it was submitted to queue " - + queueName + "Application killed on recovery as it was " + + "submitted to queue " + queueName + " which is no longer a leaf queue after restart.")); return; } else{ @@ -708,8 +743,8 @@ private void addApplicationOnRecovery( SchedulerApplication application = new SchedulerApplication(queue, user, priority); applications.put(applicationId, application); - LOG.info("Accepted application " + applicationId + " from user: " + user - + ", in queue: " + queueName); + LOG.info("Accepted application " + applicationId + " from user: " + + user + ", in queue: " + queueName); if (LOG.isDebugEnabled()) { LOG.debug( applicationId + " is recovering. Skip notifying APP_ACCEPTED"); @@ -1534,7 +1569,8 @@ public void handle(SchedulerEvent event) { appAddedEvent.getPlacementContext()); } else { addApplicationOnRecovery(appAddedEvent.getApplicationId(), queueName, - appAddedEvent.getUser(), appAddedEvent.getApplicatonPriority()); + appAddedEvent.getUser(), appAddedEvent.getApplicatonPriority(), + appAddedEvent.getPlacementContext()); } } } @@ -2058,10 +2094,10 @@ public void addQueue(Queue queue) + " (should be set and be a PlanQueue or ManagedParentQueue)"); } - AbstractManagedParentQueue parentPlan = + AbstractManagedParentQueue parent = (AbstractManagedParentQueue) newQueue.getParent(); String queuename = newQueue.getQueueName(); - parentPlan.addChildQueue(newQueue); + parent.addChildQueue(newQueue); this.queueManager.addQueue(queuename, newQueue); LOG.info("Creation of AutoCreatedLeafQueue " + newQueue + " succeeded"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index 064e2174e2f..888d18fa22c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -64,6 +64,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .TestCapacitySchedulerAutoCreatedQueueBase; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueueMetrics; @@ -97,6 +100,8 @@ import java.util.concurrent.TimeUnit; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .capacity.TestCapacitySchedulerAutoCreatedQueueBase.USER1; import static org.junit.Assert.*; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -281,6 +286,18 @@ private Configuration getSchedulerDynamicConfiguration() throws IOException { } } + private CapacitySchedulerConfiguration + getSchedulerAutoCreatedQueueConfiguration() + throws IOException { + CapacitySchedulerConfiguration schedulerConf = + new CapacitySchedulerConfiguration(conf); + TestCapacitySchedulerAutoCreatedQueueBase + .setupQueueConfigurationForSingleAutoCreatedLeafQueue(schedulerConf); + TestCapacitySchedulerAutoCreatedQueueBase.setupQueueMappings(schedulerConf, + "c", new int[] { 0, 1 }); + return schedulerConf; + } + // Test work preserving recovery of apps running under reservation. // This involves: // 1. Setting up a dynamic reservable queue, @@ -1532,4 +1549,123 @@ public void testUAMRecoveryOnRMWorkPreservingRestart() throws Exception { Assert.assertEquals(RMAppState.FINISHED, recoveredApp.getState()); } + + // Test work preserving recovery of apps running on auto-created queues. + // This involves: + // 1. Setting up a dynamic auto-created queue, + // 2. Submitting an app to it, + // 3. Failing over RM, + // 4. Validating that the app is recovered post failover, + // 5. Check if all running containers are recovered, + // 6. Verify the scheduler state like attempt info, + // 7. Verify the queue/user metrics for the dynamic auto-created queue. + @Test(timeout = 30000) + public void testDynamicAutoCreatedQueueRecovery() throws Exception { + conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true); + conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, + DominantResourceCalculator.class.getName()); + conf.setBoolean(YarnConfiguration.RM_FAIL_FAST, true); + + // 1. Set up dynamic auto-created queue. + CapacitySchedulerConfiguration schedulerConf = + getSchedulerAutoCreatedQueueConfiguration(); + int containerMemory = 1024; + Resource containerResource = Resource.newInstance(containerMemory, 1); + + rm1 = new MockRM(schedulerConf); + rm1.start(); + MockNM nm1 = new MockNM("127.0.0.1:1234", 8192, + rm1.getResourceTrackerService()); + nm1.registerNode(); + // 2. submit app to queue which is auto-created. + RMApp app1 = rm1.submitApp(200, "autoCreatedQApp", USER1, null, USER1); + Resource amResources = app1.getAMResourceRequests().get(0).getCapability(); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // clear queue metrics + rm1.clearQueueMetrics(app1); + + // 3. Fail over (restart) RM. + rm2 = new MockRM(schedulerConf, rm1.getRMStateStore()); + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + // 4. Validate app is recovered post failover. + RMApp recoveredApp1 = rm2.getRMContext().getRMApps().get( + app1.getApplicationId()); + RMAppAttempt loadedAttempt1 = recoveredApp1.getCurrentAppAttempt(); + NMContainerStatus amContainer = TestRMRestart.createNMContainerStatus( + am1.getApplicationAttemptId(), 1, ContainerState.RUNNING); + NMContainerStatus runningContainer = TestRMRestart.createNMContainerStatus( + am1.getApplicationAttemptId(), 2, ContainerState.RUNNING); + NMContainerStatus completedContainer = + TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 3, + ContainerState.COMPLETE); + + nm1.registerNode( + Arrays.asList(amContainer, runningContainer, completedContainer), null); + + // Wait for RM to settle down on recovering containers. + waitForNumContainersToRecover(2, rm2, am1.getApplicationAttemptId()); + Set launchedContainers = + ((RMNodeImpl) rm2.getRMContext().getRMNodes().get(nm1.getNodeId())) + .getLaunchedContainers(); + assertTrue(launchedContainers.contains(amContainer.getContainerId())); + assertTrue(launchedContainers.contains(runningContainer.getContainerId())); + + // 5. Check RMContainers are re-recreated and the container state is + // correct. + rm2.waitForState(nm1, amContainer.getContainerId(), + RMContainerState.RUNNING); + rm2.waitForState(nm1, runningContainer.getContainerId(), + RMContainerState.RUNNING); + rm2.waitForContainerToComplete(loadedAttempt1, completedContainer); + + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm2.getResourceScheduler(); + SchedulerNode schedulerNode1 = scheduler.getSchedulerNode(nm1.getNodeId()); + + // ********* check scheduler node state.******* + // 2 running containers. + Resource usedResources = Resources.multiply(containerResource, 2); + Resource nmResource = Resource.newInstance(nm1.getMemory(), + nm1.getvCores()); + + assertTrue(schedulerNode1.isValidContainer(amContainer.getContainerId())); + assertTrue( + schedulerNode1.isValidContainer(runningContainer.getContainerId())); + assertFalse( + schedulerNode1.isValidContainer(completedContainer.getContainerId())); + // 2 launched containers, 1 completed container + assertEquals(2, schedulerNode1.getNumContainers()); + + assertEquals(Resources.subtract(nmResource, usedResources), + schedulerNode1.getUnallocatedResource()); + assertEquals(usedResources, schedulerNode1.getAllocatedResource()); + // Resource availableResources = Resources.subtract(nmResource, + // usedResources); + + // 6. Verify the scheduler state like attempt info. + Map> sa = + ((AbstractYarnScheduler) rm2.getResourceScheduler()) + .getSchedulerApplications(); + SchedulerApplication schedulerApp = sa.get( + recoveredApp1.getApplicationId()); + + // 7. Verify the queue/user metrics for the dynamic reservable queue. + if (getSchedulerType() == SchedulerType.CAPACITY) { + checkCSQueue(rm2, schedulerApp, nmResource, nmResource, usedResources, 2); + } + + // *********** check scheduler attempt state.******** + SchedulerApplicationAttempt schedulerAttempt = + schedulerApp.getCurrentAppAttempt(); + assertTrue(schedulerAttempt.getLiveContainers() + .contains(scheduler.getRMContainer(amContainer.getContainerId()))); + assertTrue(schedulerAttempt.getLiveContainers() + .contains(scheduler.getRMContainer(runningContainer.getContainerId()))); + assertEquals(schedulerAttempt.getCurrentConsumption(), usedResources); + + // *********** check appSchedulingInfo state *********** + assertEquals((1L << 40) + 1L, schedulerAttempt.getNewContainerId()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java index d6282a17d79..420d4200a49 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java @@ -100,6 +100,7 @@ public static final String B = CapacitySchedulerConfiguration.ROOT + ".b"; public static final String C = CapacitySchedulerConfiguration.ROOT + ".c"; public static final String D = CapacitySchedulerConfiguration.ROOT + ".d"; + public static final String E = CapacitySchedulerConfiguration.ROOT + ".e"; public static final String A1 = A + ".a1"; public static final String A2 = A + ".a2"; public static final String B1 = B + ".b1"; @@ -129,8 +130,8 @@ public static final String USER = "user_"; public static final String USER0 = USER + 0; public static final String USER1 = USER + 1; - public static final String USER3 = USER + 3; public static final String USER2 = USER + 2; + public static final String USER3 = USER + 3; public static final String PARENT_QUEUE = "c"; public static final Set accessibleNodeLabelsOnC = new HashSet<>(); @@ -183,7 +184,7 @@ public void setUp() throws Exception { conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class); - setupQueueMappings(conf); + setupQueueMappings(conf, PARENT_QUEUE, new int[] {0,1,2,3}); dispatcher = new SpyDispatcher(); rmAppEventEventHandler = new SpyDispatcher.SpyRMAppEventHandler(); @@ -225,25 +226,30 @@ protected void setupNodes(MockRM newMockRM) throws Exception { } public static CapacitySchedulerConfiguration setupQueueMappings( - CapacitySchedulerConfiguration conf) { + CapacitySchedulerConfiguration conf, String parentQueue, int[] userIds) { List queuePlacementRules = new ArrayList<>(); queuePlacementRules.add(YarnConfiguration.USER_GROUP_PLACEMENT_RULE); conf.setQueuePlacementRules(queuePlacementRules); + List existingMappings = conf + .getQueueMappings(); + //set queue mapping List queueMappings = new ArrayList<>(); - for (int i = 0; i <= 3; i++) { + for (int i = 0; i < userIds.length; i++) { //Set C as parent queue name for auto queue creation UserGroupMappingPlacementRule.QueueMapping userQueueMapping = new UserGroupMappingPlacementRule.QueueMapping( UserGroupMappingPlacementRule.QueueMapping.MappingType.USER, - USER + i, getQueueMapping(PARENT_QUEUE, USER + i)); + USER + userIds[i], getQueueMapping(parentQueue, USER + + userIds[i])); queueMappings.add(userQueueMapping); } - conf.setQueueMappings(queueMappings); + existingMappings.addAll(queueMappings); + conf.setQueueMappings(existingMappings); //override with queue mappings conf.setOverrideWithQueueMappings(true); return conf; @@ -327,6 +333,33 @@ public static CapacitySchedulerConfiguration setupQueueConfiguration( return conf; } + public static CapacitySchedulerConfiguration + setupQueueConfigurationForSingleAutoCreatedLeafQueue( + CapacitySchedulerConfiguration conf) { + + + //set up auto created queue configs + TestCapacitySchedulerAutoCreatedQueueBase.setupQueueMappings(conf, "c", + new int[] {1,2}); + //setup new queues with one of them auto enabled + // Define top-level queues + // Set childQueue for root + conf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] { "c" }); + conf.setCapacity(C, 100f); + + conf.setUserLimitFactor(C, 1.0f); + conf.setAutoCreateChildQueueEnabled(C, true); + + //Setup leaf queue template configs + conf.setAutoCreatedLeafQueueConfigCapacity(C, 100f); + conf.setAutoCreatedLeafQueueConfigMaxCapacity(C, 100.0f); + conf.setAutoCreatedLeafQueueConfigUserLimit(C, 100); + conf.setAutoCreatedLeafQueueConfigUserLimitFactor(C, 3.0f); + + return conf; + } + @After public void tearDown() throws Exception { if (mockRM != null) { @@ -395,7 +428,7 @@ protected MockRM setupSchedulerInstance() throws Exception { conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class); - setupQueueMappings(conf); + setupQueueMappings(conf, PARENT_QUEUE, new int[] {0,1,2,3}); RMNodeLabelsManager mgr = setupNodeLabelManager(conf); MockRM newMockRM = new MockRM(conf) {