diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 8d22a36d99d..5fcf5ae6f99 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -148,6 +148,9 @@ private Map userWeights = new HashMap(); private int maxParallelApps; + // is it a expired dynamic queue? + private boolean expiredQueue = false; + public AbstractCSQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { this(cs, cs.getConfiguration(), queueName, parent, old); @@ -1379,6 +1382,26 @@ public void activeQueue() throws YarnException { } } + public boolean isExpiredQueue() { + writeLock.lock(); + + try { + return expiredQueue; + } finally { + writeLock.unlock(); + } + } + + public void setExpiredQueue(boolean expiredQueue) { + writeLock.lock(); + + try { + this.expiredQueue = expiredQueue; + } finally { + writeLock.unlock(); + } + } + protected void appFinished() { this.writeLock.lock(); try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 51df2242be3..84ac9c7146f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1996,6 +1996,20 @@ public void handle(SchedulerEvent event) { queueManagementChangeEvent.getQueueManagementChanges(); ((ManagedParentQueue) parentQueue) .validateAndApplyQueueManagementChanges(queueManagementChanges); + // Delete queue when it expired. + for (CSQueue child: parentQueue.getChildQueues()) { + if ((child instanceof AutoCreatedLeafQueue && conf + .isAutoCreateChildQueueEnabled( + parentQueue.getQueuePath()) && + ((AutoCreatedLeafQueue) child).isExpiredQueue() + && conf.isAutoExpiredDeletionEnabled( + child.getQueuePath()))) { + AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue)child; + if (leafQueue.isExpiredQueue()) { + removeQueue(leafQueue.getQueuePath()); + } + } + } } catch (SchedulerDynamicEditException sde) { LOG.error("Queue Management Change event cannot be applied for " + "parent queue : " + parentQueue.getQueuePath(), sde); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index d0ee25df300..202afcd6011 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -1952,6 +1952,23 @@ public void setDefaultLifetimePerQueue(String queue, long defaultLifetime) { public static final String AUTO_CREATE_CHILD_QUEUE_ENABLED = AUTO_CREATE_CHILD_QUEUE_PREFIX + "enabled"; + @Private + public static final boolean + DEFAULT_AUTO_CREATE_CHILD_QUEUE_EXPIRED_DELETION_ENABLED = false; + + @Private + public static final String AUTO_CREATE_CHILD_QUEUE_EXPIRED_DELETION_ENABLED = + AUTO_CREATE_CHILD_QUEUE_PREFIX + "expired-deletion-enabled"; + + // 300s for expired defualt + @Private + public static final long + DEFAULT_AUTO_CREATE_CHILD_QUEUE_EXPIRED_DELETION_TIME = 300; + + @Private + public static final String AUTO_CREATE_CHILD_QUEUE_EXPIRED_DELETION_TIME = + AUTO_CREATE_CHILD_QUEUE_PREFIX + "expired-deletion-time"; + @Private public static final String AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX = "leaf-queue-template"; @@ -1979,6 +1996,22 @@ public boolean isAutoCreateChildQueueEnabled(String queuePath) { return isAutoCreateEnabled; } + /** + * If true, this queue will be deleted when queue is expired. + * + * @param queuePath The queues path + * @return true if auto created queue's deletion when expired is enabled + * for child queues else false. Default + * is false + */ + @Private + public boolean isAutoExpiredDeletionEnabled(String queuePath) { + boolean isAutoExpiredDeletionEnabled = getBoolean( + getQueuePrefix(queuePath) + AUTO_CREATE_CHILD_QUEUE_EXPIRED_DELETION_ENABLED, + DEFAULT_AUTO_CREATE_CHILD_QUEUE_EXPIRED_DELETION_ENABLED); + return isAutoExpiredDeletionEnabled; + } + @Private @VisibleForTesting public void setAutoCreateChildQueueEnabled(String queuePath, @@ -1988,6 +2021,34 @@ public void setAutoCreateChildQueueEnabled(String queuePath, autoCreationEnabled); } + @Private + @VisibleForTesting + public void setAutoExpiredDeletionEnabled(String queuePath, + boolean autoExpiredDeletionEnabled) { + setBoolean(getQueuePrefix(queuePath) + + AUTO_CREATE_CHILD_QUEUE_EXPIRED_DELETION_ENABLED, + autoExpiredDeletionEnabled); + } + + @Private + @VisibleForTesting + public void setAutoExpiredDeletionTime(String queuePath, + long time) { + setLong(getQueuePrefix(queuePath) + + AUTO_CREATE_CHILD_QUEUE_EXPIRED_DELETION_TIME, + time); + } + + @Private + @VisibleForTesting + public long getAutoExpiredDeletionTime(String queuePath) { + return getLong(getQueuePrefix(queuePath) + + AUTO_CREATE_CHILD_QUEUE_EXPIRED_DELETION_TIME, + DEFAULT_AUTO_CREATE_CHILD_QUEUE_EXPIRED_DELETION_TIME); + } + + + /** * Get the auto created leaf queue's template configuration prefix * Leaf queue's template capacities are configured at the parent queue diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java index a44929beed6..5591b934531 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java @@ -181,6 +181,7 @@ public void reinitializeQueues(CapacitySchedulerConfiguration newConf) csContext.getRMContext().getHAServiceState() != HAServiceProtocol.HAServiceState.STANDBY) { // Ensure queue hierarchy in the new XML file is proper. + // And check if auto created queue is expired for deletion. CapacitySchedulerConfigValidator .validateQueueHierarchy(queues, newQueues, newConf); } @@ -320,7 +321,13 @@ private void updateQueues(CSQueueStore existingQueues, if (newQueues.get(queue.getQueuePath()) == null && !( queue instanceof AutoCreatedLeafQueue && conf .isAutoCreateChildQueueEnabled( - queue.getParent().getQueuePath()))) { + queue.getParent().getQueuePath())) || + (queue instanceof AutoCreatedLeafQueue && conf + .isAutoCreateChildQueueEnabled( + queue.getParent().getQueuePath()) && + ((AutoCreatedLeafQueue) queue).isExpiredQueue() + && conf.isAutoExpiredDeletionEnabled( + queue.getQueuePath()))) { existingQueues.remove(queue); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java index 90cbf4be27e..5c38a464171 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java @@ -666,6 +666,7 @@ public void commitQueueManagementChanges( AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) queue; + boolean isExpired = true; for (String nodeLabel : updatedQueueTemplate.getQueueCapacities() .getExistingNodeLabels()) { if (updatedQueueTemplate.getQueueCapacities(). @@ -696,8 +697,23 @@ public void commitQueueManagementChanges( .getQueueCapacities().getCapacity(nodeLabel))); deactivate(leafQueue, nodeLabel); } + long lastActive = + getLeafQueueState(leafQueue, nodeLabel).getMostRecentActivationTime(); + long lastDeactive = + getLeafQueueState(leafQueue, nodeLabel).getMostRecentDeactivationTime(); + // Check if need delete when expired. + if (lastActive >= lastDeactive + || (lastDeactive - lastActive)/1000 <= + scheduler.getConfiguration(). + getAutoExpiredDeletionTime(leafQueue.getQueuePath())) { + isExpired = false; + } } } + if (scheduler.getConfiguration(). + isAutoExpiredDeletionEnabled(leafQueue.getQueuePath())) { + leafQueue.setExpiredQueue(isExpired); + } } } finally { writeLock.unlock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java index 4757cd79a07..b1892104e85 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java @@ -400,6 +400,7 @@ public static CapacitySchedulerConfiguration setupQueueConfiguration( conf.setUserLimitFactor(C, 1.0f); conf.setAutoCreateChildQueueEnabled(C, true); + conf.setAutoExpiredDeletionEnabled(C, true); //Setup leaf queue template configs conf.setAutoCreatedLeafQueueConfigCapacity(C, 50.0f); @@ -424,6 +425,7 @@ public static CapacitySchedulerConfiguration setupQueueConfiguration( conf.setUserLimitFactor(D, 1.0f); conf.setAutoCreateChildQueueEnabled(D, true); + conf.setAutoExpiredDeletionEnabled(D, true); conf.setUserLimit(D, 100); conf.setUserLimitFactor(D, 3.0f); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java index 084a177048f..fe351023c0c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import com.sun.source.tree.AssertTree; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.thirdparty.com.google.common.collect.Sets; @@ -660,6 +661,108 @@ public void testAutoCreatedQueueActivationDeactivation() throws Exception { } } + @Test + public void testAutoCreatedQueueExpiredDeletion() throws Exception { + + try { + CSQueue parentQueue = cs.getQueue(PARENT_QUEUE); + + //submit app1 as USER1 + ApplicationId user1AppId = submitApp(mockRM, parentQueue, USER1, USER1, + 1, 1); + Map expectedAbsChildQueueCapacity = + populateExpectedAbsCapacityByLabelForParentQueue(1); + validateInitialQueueEntitlement(parentQueue, USER1, + expectedAbsChildQueueCapacity, accessibleNodeLabelsOnC); + + //submit another app2 as USER2 + ApplicationId user2AppId = submitApp(mockRM, parentQueue, USER2, USER2, 2, + 1); + + expectedAbsChildQueueCapacity = + populateExpectedAbsCapacityByLabelForParentQueue(2); + validateInitialQueueEntitlement(parentQueue, USER2, + expectedAbsChildQueueCapacity, accessibleNodeLabelsOnC); + + //submit another app3 as USER1 + submitApp(mockRM, parentQueue, USER1, USER1, 3, 2); + + //validate total activated abs capacity remains the same + GuaranteedOrZeroCapacityOverTimePolicy autoCreatedQueueManagementPolicy = + (GuaranteedOrZeroCapacityOverTimePolicy) ((ManagedParentQueue) parentQueue) + .getAutoCreatedQueueManagementPolicy(); + + for (String nodeLabel : accessibleNodeLabelsOnC) { + assertEquals(expectedAbsChildQueueCapacity.get(nodeLabel), + autoCreatedQueueManagementPolicy.getAbsoluteActivatedChildQueueCapacity(nodeLabel), EPSILON); + } + + //submit user_3 app. This cant be allocated since there is no capacity + // in NO_LABEL, SSD but can be in GPU label + submitApp(mockRM, parentQueue, USER3, USER3, 4, 1); + final CSQueue user3LeafQueue = cs.getQueue(USER3); + validateCapacities((AutoCreatedLeafQueue) user3LeafQueue, 0.0f, 0.0f, + 1.0f, 1.0f); + validateCapacitiesByLabel((ManagedParentQueue) parentQueue, + (AutoCreatedLeafQueue) + user3LeafQueue, NODEL_LABEL_GPU); + + assertEquals(0.2f, autoCreatedQueueManagementPolicy + .getAbsoluteActivatedChildQueueCapacity(NO_LABEL), EPSILON); + assertEquals(0.9f, autoCreatedQueueManagementPolicy.getAbsoluteActivatedChildQueueCapacity(NODEL_LABEL_GPU), + EPSILON); + + //Verify that AMs can be allocated + //Node 1 has SSD and default node label expression on C is SSD. + //This validates that the default node label expression with SSD is set + // on the AM attempt + // and app attempt reaches ALLOCATED state for a dynamic queue 'USER1' + mockRM.launchAM(mockRM.getRMContext().getRMApps().get(user1AppId), + mockRM, nm1); + + //deactivate USER2 queue + cs.killAllAppsInQueue(USER2); + mockRM.waitForState(user2AppId, RMAppState.KILLED); + + // Set queue expired + AutoCreatedLeafQueue user2Leaf = (AutoCreatedLeafQueue)cs.getQueue(USER2); + user2Leaf.setExpiredQueue(true); + + //Verify if USER_2 can be deactivated since it has no pending apps + List queueManagementChanges = + autoCreatedQueueManagementPolicy.computeQueueManagementChanges(); + + ManagedParentQueue managedParentQueue = (ManagedParentQueue) parentQueue; + managedParentQueue. + validateAndApplyQueueManagementChanges(queueManagementChanges); + + validateDeactivatedQueueEntitlement(parentQueue, USER2, + expectedAbsChildQueueCapacity, queueManagementChanges); + + // Enabled expired deletion for user2 + cs.getConfiguration().setAutoExpiredDeletionEnabled(C + "."+USER2, true); + cs.reinitialize(cs.getConfiguration(), cs.getRMContext(), true); + + // Assert queue has been deleted + assertEquals(cs.getCapacitySchedulerQueueManager().getQueue(USER2), null); + + //deactivate USER1 queue + cs.killAllAppsInQueue(USER1); + mockRM.waitForState(user2AppId, RMAppState.KILLED); + + // Disable expired deletion for user1 + cs.getConfiguration().setAutoExpiredDeletionEnabled(C + "."+USER1, false); + cs.reinitialize(cs.getConfiguration(), cs.getRMContext(), true); + + assertTrue(cs.getCapacitySchedulerQueueManager().getQueue(USER1) != null); + + } finally { + cleanupQueue(USER1); + cleanupQueue(USER2); + cleanupQueue(USER3); + } + } + @Test public void testClusterResourceUpdationOnAutoCreatedLeafQueues() throws Exception {