diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index fd144f23cd3..8542d0424cf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -157,8 +157,13 @@ // When this queue has application submit to? // This property only applies to dynamic queue, // and will be used to check when the queue need to be removed. + // Used for new auto created. private long lastSubmittedTimestamp; + // is it a expired dynamic queue? + // Used for old auto created + private boolean expiredQueue = false; + public AbstractCSQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { this(cs, cs.getConfiguration(), queueName, parent, old); @@ -1638,6 +1643,11 @@ public long getLastSubmittedTimestamp() { return lastSubmittedTimestamp; } + // just for test + public void setLastSubmittedTimestamp(long lastSubmittedTimestamp) { + this.lastSubmittedTimestamp = lastSubmittedTimestamp; + } + // "Tab" the queue, so this queue won't be removed because of idle timeout. public void signalToSubmitToQueue() { writeLock.lock(); @@ -1647,4 +1657,24 @@ public void signalToSubmitToQueue() { writeLock.unlock(); } } + + public boolean isExpiredQueue() { + writeLock.lock(); + + try { + return expiredQueue; + } finally { + writeLock.unlock(); + } + } + + public void setExpiredQueue(boolean expiredQueue) { + writeLock.lock(); + + try { + this.expiredQueue = expiredQueue; + } finally { + writeLock.unlock(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueStore.java index a551de15d8e..e862f7473a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueStore.java @@ -195,6 +195,11 @@ public void remove(CSQueue queue) { */ public void remove(String name) { CSQueue queue = get(name); + // fixme should get by queueName or queuePath + // for remove consistent + if (queue == null) { + queue = getByFullName(name); + } if (queue != null) { remove(queue); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 89c1cf752ea..7a58e051862 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -821,7 +821,10 @@ public CSQueue getQueue(String queueName) { if (queueName == null) { return null; } - return this.queueManager.getQueue(queueName); + // fixme should get by queueName or queuePath + return this.queueManager.getQueue(queueName) == null ? + this.queueManager.getQueueByFullName(queueName) : + this.queueManager.getQueue(queueName); } /** @@ -2029,18 +2032,46 @@ public void handle(SchedulerEvent event) { { QueueManagementChangeEvent queueManagementChangeEvent = (QueueManagementChangeEvent) event; - ParentQueue parentQueue = queueManagementChangeEvent.getParentQueue(); - try { - final List queueManagementChanges = - queueManagementChangeEvent.getQueueManagementChanges(); - ((ManagedParentQueue) parentQueue) - .validateAndApplyQueueManagementChanges(queueManagementChanges); - } catch (SchedulerDynamicEditException sde) { - LOG.error("Queue Management Change event cannot be applied for " - + "parent queue : " + parentQueue.getQueuePath(), sde); - } catch (IOException ioe) { - LOG.error("Queue Management Change event cannot be applied for " - + "parent queue : " + parentQueue.getQueuePath(), ioe); + if (queueManagementChangeEvent.getParentQueue() != null) { + ParentQueue parentQueue = queueManagementChangeEvent.getParentQueue(); + try { + final List queueManagementChanges = + queueManagementChangeEvent.getQueueManagementChanges(); + ((ManagedParentQueue) parentQueue) + .validateAndApplyQueueManagementChanges(queueManagementChanges); + // Delete queue when it expired. + // For old auto created. + for (CSQueue child : parentQueue.getChildQueues()) { + if ((child instanceof AutoCreatedLeafQueue && conf + .isAutoCreateChildQueueEnabled( + parentQueue.getQueuePath()) + && conf.isAutoExpiredDeletionEnabled( + parentQueue.getQueuePath()))) { + AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) child; + // fixme should get by both queueName or queuePath + // for remove consistent + if (leafQueue.isExpiredQueue()) { + removeQueue(leafQueue.getQueuePath()); + } + } + } + } catch (SchedulerDynamicEditException sde) { + LOG.error("Queue Management Change event cannot be applied for " + + "parent queue : " + parentQueue.getQueuePath(), sde); + } catch (IOException ioe) { + LOG.error("Queue Management Change event cannot be applied for " + + "parent queue : " + parentQueue.getQueuePath(), ioe); + } + }else { + try { + if (queueManagementChangeEvent.getLeafQueue() != null) { + computeDynamicLeafQueueChanges + (queueManagementChangeEvent.getLeafQueue()); + } + }catch (SchedulerDynamicEditException sde) { + LOG.error("Dynamic leaf queue changes cannot be applied for " + + "leaf queue : " + queueManagementChangeEvent.getLeafQueue(), sde); + } } } break; @@ -2049,6 +2080,36 @@ public void handle(SchedulerEvent event) { } } + private synchronized void computeDynamicLeafQueueChanges(LeafQueue leafQueue) + throws SchedulerDynamicEditException { + // Expired queue, when there are no running in leafQueue + // and the last submit time has been expired + // Delete queue when expired deletion enabled. + + ParentQueue parentQueue = (ParentQueue) leafQueue.getParent(); + if (parentQueue == null) { + throw new SchedulerDynamicEditException("Parent " + + "queue should not be null for auto deletion!"); + } + long idleDuration = + (System.currentTimeMillis() + - leafQueue.getLastSubmittedTimestamp())/1000; + + if (leafQueue.getAllApplications().size() ==0 + && idleDuration > this.getConfiguration() + .getAutoExpiredDeletionTime(leafQueue.getParent().getQueuePath()) + && this.getConfiguration(). + isAutoExpiredDeletionEnabled(leafQueue.getParent().getQueuePath())){ + + LeafQueue removed = parentQueue. + removeDynamicLeafQueue(leafQueue.getQueuePath()); + if (removed != null) { + this.getCapacitySchedulerQueueManager(). + removeQueue(leafQueue.getQueuePath()); + } + } + } + private void updateNodeAttributes( NodeAttributesUpdateSchedulerEvent attributeUpdateEvent) { writeLock.lock(); @@ -2474,6 +2535,8 @@ public void removeQueue(String queueName) writeLock.lock(); try { LOG.info("Removing queue: " + queueName); + // fixme should get by queueName or queuePath + // for remove consistent CSQueue q = this.getQueue(queueName); if (!(AbstractAutoCreatedLeafQueue.class.isAssignableFrom( q.getClass()))) { @@ -3380,13 +3443,6 @@ private LeafQueue autoCreateLeafQueue( if (!StringUtils.isEmpty(parentQueueName)) { CSQueue parentQueue = getQueue(parentQueueName); - if (parentQueue == null) { - throw new SchedulerDynamicEditException( - "Could not auto-create leaf queue for " + leafQueueName - + ". Queue mapping specifies an invalid parent queue " - + "which does not exist " + parentQueueName); - } - if (parentQueue != null && conf.isAutoCreateChildQueueEnabled(parentQueue.getQueuePath())) { // Case 1: Handle ManagedParentQueue diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index abbc2d7875f..f2812a61331 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -2010,11 +2010,11 @@ public void setDefaultLifetimePerQueue(String queue, long defaultLifetime) { @Private private static final String AUTO_QUEUE_CREATION_V2_PREFIX = - "auto-queue-creation-v2"; + "auto-queue-creation-v2."; @Private public static final String AUTO_QUEUE_CREATION_V2_ENABLED = - AUTO_QUEUE_CREATION_V2_PREFIX + ".enabled"; + AUTO_QUEUE_CREATION_V2_PREFIX + "enabled"; @Private public static final boolean DEFAULT_AUTO_QUEUE_CREATION_ENABLED = false; @@ -2155,6 +2155,68 @@ public int getAutoCreatedQueuesMaxChildQueuesLimit(String queuePath) { public static final long DEFAULT_QUEUE_MANAGEMENT_MONITORING_INTERVAL = 1500L; + @Private + public static final boolean + DEFAULT_AUTO_CREATE_CHILD_QUEUE_EXPIRED_DELETION_ENABLED = false; + + // TODO we just use v2 for deletion enabled for both v1 and v2 + // Because this is the new feature in v2 update + @Private + public static final String AUTO_CREATE_CHILD_QUEUE_EXPIRED_DELETION_ENABLED = + AUTO_QUEUE_CREATION_V2_PREFIX + "expired-deletion-enabled"; + + // 300s for expired defualt + @Private + public static final long + DEFAULT_AUTO_CREATE_CHILD_QUEUE_EXPIRED_DELETION_TIME = 300; + + @Private + public static final String AUTO_CREATE_CHILD_QUEUE_EXPIRED_DELETION_TIME = + AUTO_QUEUE_CREATION_V2_PREFIX + "expired-deletion-time"; + + /** + * If true, this queue will be deleted when queue is expired. + * + * @param queuePath The queues path + * @return true if auto created queue's deletion when expired is enabled + * for child queues else false. Default + * is false + */ + @Private + public boolean isAutoExpiredDeletionEnabled(String queuePath) { + boolean isAutoExpiredDeletionEnabled = getBoolean( + getQueuePrefix(queuePath) + AUTO_CREATE_CHILD_QUEUE_EXPIRED_DELETION_ENABLED, + DEFAULT_AUTO_CREATE_CHILD_QUEUE_EXPIRED_DELETION_ENABLED); + return isAutoExpiredDeletionEnabled; + } + + @Private + @VisibleForTesting + public void setAutoExpiredDeletionEnabled(String queuePath, + boolean autoExpiredDeletionEnabled) { + setBoolean(getQueuePrefix(queuePath) + + AUTO_CREATE_CHILD_QUEUE_EXPIRED_DELETION_ENABLED, + autoExpiredDeletionEnabled); + } + + @Private + @VisibleForTesting + public void setAutoExpiredDeletionTime(String queuePath, + long time) { + setLong(getQueuePrefix(queuePath) + + AUTO_CREATE_CHILD_QUEUE_EXPIRED_DELETION_TIME, + time); + } + + @Private + @VisibleForTesting + public long getAutoExpiredDeletionTime(String queuePath) { + return getLong(getQueuePrefix(queuePath) + + AUTO_CREATE_CHILD_QUEUE_EXPIRED_DELETION_TIME, + DEFAULT_AUTO_CREATE_CHILD_QUEUE_EXPIRED_DELETION_TIME); + } + + /** * Queue Management computation policy for Auto Created queues * @param queue The queue's path diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java index c5ce700eef5..10f8fa2aa36 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java @@ -191,7 +191,7 @@ public void reinitializeQueues(CapacitySchedulerConfiguration newConf) .validateQueueHierarchy(queues, newQueues, newConf); } - // Add new queues and delete OldQeueus only after validation. + // Add new queues and delete OldQueues only after validation. updateQueues(queues, newQueues); // Re-configure queues @@ -333,7 +333,23 @@ private void updateQueues(CSQueueStore existingQueues, } for (CSQueue queue : existingQueues.getQueues()) { - if (!((AbstractCSQueue) queue).isDynamicQueue() && newQueues.get( + // should also support for auto created for expired deletion + // 1. handle old auto created deletion for reinitializeQueues + // 2. handle new auto created deletion for reinitializeQueues + if ((queue.getParent() != null && queue instanceof AutoCreatedLeafQueue && + conf.isAutoExpiredDeletionEnabled(queue.getParent().getQueuePath()) + && (newQueues.get(queue.getQueuePath())) == null && + ((AutoCreatedLeafQueue) queue).isExpiredQueue()) + || + (queue.getParent() != null && queue instanceof LeafQueue && + ((LeafQueue) queue).isDynamicQueue() && + conf.isAutoExpiredDeletionEnabled(queue.getParent().getQueuePath()) + && (newQueues.get(queue.getQueuePath())) == null && + ((System.currentTimeMillis() - ((LeafQueue)queue).getLastSubmittedTimestamp()) + > conf.getAutoExpiredDeletionTime(queue.getParent().getQueuePath())) && + ((LeafQueue)queue).getAllApplications().size() == 0) + || + !((AbstractCSQueue) queue).isDynamicQueue() && newQueues.get( queue.getQueuePath()) == null && !( queue instanceof AutoCreatedLeafQueue && conf .isAutoCreateChildQueueEnabled( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 15c321fca0b..918a70fc439 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -603,6 +603,11 @@ public void submitApplication(ApplicationId applicationId, String userName, // Careful! Locking order is important! validateSubmitApplication(applicationId, userName, queue); + // Signal to queue submit time in dynamic queue + if (this.isDynamicQueue()) { + signalToSubmitToQueue(); + } + // Inform the parent queue try { getParent().submitApplication(applicationId, userName, queue); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 0a2f0820070..a696afe8586 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -573,6 +573,44 @@ private CSQueue addDynamicChildQueue(String childQueuePath, boolean isLeaf) } } + public LeafQueue removeDynamicLeafQueue(String queuePath) + throws SchedulerDynamicEditException{ + return (LeafQueue) removeDynamicChildQueue(queuePath, true); + } + + // New method to remove child queue + // TODO if we need to support delete parent queue? + private CSQueue removeDynamicChildQueue(String childQueuePath, boolean isLeaf) + throws SchedulerDynamicEditException { + writeLock.lock(); + try { + // Check if queue exists, if queue not exists, write a warning message (this + // should not happen, since it will be handled before calling this method) + // , but we will move on. + CSQueue queue = + csContext.getCapacitySchedulerQueueManager().getQueueByFullName( + childQueuePath); + + //return null + if (queue == null) { + LOG.warn( + "This should not happen, trying to remove queue=" + childQueuePath + + ", however the queue already not exists"); + } + + this.childQueues.remove(queue); + + // Call updateClusterResource, which will deal with all effectiveMin/MaxResource + // Calculation + this.updateClusterResource(csContext.getClusterResource(), + new ResourceLimits(this.csContext.getClusterResource())); + + return queue; + } finally { + writeLock.unlock(); + } + } + /** * Check whether this queue supports adding additional child queues * dynamically. @@ -697,6 +735,11 @@ public void submitApplication(ApplicationId applicationId, String user, // Sanity check validateSubmitApplication(applicationId, user, queue); + // Signal to queue submit time in dynamic queue + if (this.isDynamicQueue()) { + signalToSubmitToQueue(); + } + addApplication(applicationId, user); } finally { writeLock.unlock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueManagementDynamicEditPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueManagementDynamicEditPolicy.java index 1b175d1ff5c..4602228f212 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueManagementDynamicEditPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueManagementDynamicEditPolicy.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; import java.text.MessageFormat; import java.util.ArrayList; @@ -44,10 +45,15 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** - * Queue Management scheduling policy for managed parent queues which enable - * auto child queue creation + * Queue Management scheduling policy for + * 1. Managed parent queues which enable + * auto child queue creation + * + * 2. New dynamic auto created leaf queues + + * */ public class QueueManagementDynamicEditPolicy implements SchedulingEditPolicy { @@ -65,6 +71,10 @@ private long monitoringInterval; private Set managedParentQueues = new HashSet<>(); + // Just should get leaf queue in new Auto created queue + // Because auto created parent will update from leaf. + private Set newDynamicLeafQueues = new HashSet<>(); + /** * Instantiated by CapacitySchedulerConfiguration @@ -130,6 +140,7 @@ public void reinitialize(final Configuration config, final RMContext context, private void initQueues() { managedParentQueues.clear(); + newDynamicLeafQueues.clear(); for (Map.Entry queues : scheduler .getCapacitySchedulerQueueManager() .getQueues().entrySet()) { @@ -140,6 +151,11 @@ private void initQueues() { if ( queue instanceof ManagedParentQueue) { managedParentQueues.add(queueName); } + + if( queue instanceof LeafQueue + && ((LeafQueue) queue).isDynamicQueue()) { + newDynamicLeafQueues.add(queueName); + } } } @@ -150,6 +166,7 @@ public void editSchedule() { initQueues(); manageAutoCreatedLeafQueues(); + if (LOG.isDebugEnabled()) { LOG.debug("Total time used=" + (clock.getTime() - startTs) + " ms."); } @@ -173,8 +190,36 @@ public void editSchedule() { computeQueueManagementChanges (parentQueue)); } + return queueManagementChanges; + } + + // Proceed new auto created queues + if (newDynamicLeafQueues.size() > 0) { + for (String leafQueueName : newDynamicLeafQueues) { + LeafQueue leafQueue = + (LeafQueue)scheduler.getCapacitySchedulerQueueManager(). + getQueue(leafQueueName); + //Void now nothing to return + computeQueueManagementChanges(leafQueue); + } + } + + return new ArrayList<>(); + } + + @VisibleForTesting + void computeQueueManagementChanges + (LeafQueue leafQueue) { + // TODO add refresh dynamic queue capacity all level including parent + // When template config is finished + + //Scheduler update is asynchronous + if (leafQueue != null) { + QueueManagementChangeEvent queueManagementChangeEvent = + new QueueManagementChangeEvent(leafQueue); + scheduler.getRMContext().getDispatcher().getEventHandler().handle( + queueManagementChangeEvent); } - return queueManagementChanges; } @@ -258,4 +303,8 @@ public CapacityScheduler getScheduler() { public Set getManagedParentQueues() { return managedParentQueues; } + + public Set getNewDynamicLeafQueues() { + return newDynamicLeafQueues; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java index ab99317888d..ba3c3b0e861 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java @@ -465,11 +465,13 @@ private void initializeLeafQueueTemplate(ManagedParentQueue parentQueue) AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) scheduler.getCapacitySchedulerQueueManager() .getQueue(leafQueueName); - AutoCreatedLeafQueueConfig newTemplate = buildTemplate( - queueCapacities.getValue()); - queueManagementChanges.add( - new QueueManagementChange.UpdateQueue(leafQueue, newTemplate)); - + // May be already deleted + if (leafQueue != null) { + AutoCreatedLeafQueueConfig newTemplate = buildTemplate( + queueCapacities.getValue()); + queueManagementChanges.add( + new QueueManagementChange.UpdateQueue(leafQueue, newTemplate)); + } } return queueManagementChanges; } finally { @@ -691,6 +693,8 @@ public void commitQueueManagementChanges( AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) queue; + boolean isExpired = true; + for (String nodeLabel : updatedQueueTemplate.getQueueCapacities() .getExistingNodeLabels()) { if (updatedQueueTemplate.getQueueCapacities(). @@ -698,8 +702,10 @@ public void commitQueueManagementChanges( if (isActive(leafQueue, nodeLabel)) { LOG.debug("Queue is already active. Skipping activation : {}", leafQueue.getQueuePath()); + isExpired = false; } else{ activate(leafQueue, nodeLabel); + isExpired = false; } } else{ if (!isActive(leafQueue, nodeLabel)) { @@ -720,9 +726,26 @@ public void commitQueueManagementChanges( this.scheduler.getClusterResource(), updatedQueueTemplate .getQueueCapacities().getCapacity(nodeLabel))); deactivate(leafQueue, nodeLabel); + + } + long lastActive = + getLeafQueueState(leafQueue, nodeLabel).getMostRecentActivationTime(); + long lastDeactive = + getLeafQueueState(leafQueue, nodeLabel).getMostRecentDeactivationTime(); + // Check if need delete when expired. + if (lastActive >= lastDeactive + || (lastDeactive - lastActive)/1000 <= + scheduler.getConfiguration(). + getAutoExpiredDeletionTime(managedParentQueue.getQueuePath()) + || leafQueue.getAllApplications().size() > 0) { + isExpired = false; } } } + if (scheduler.getConfiguration(). + isAutoExpiredDeletionEnabled(managedParentQueue.getQueuePath())) { + leafQueue.setExpiredQueue(isExpired); + } } } finally { writeLock.unlock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/QueueManagementChangeEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/QueueManagementChangeEvent.java index 926e1be6668..034b2c449d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/QueueManagementChangeEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/QueueManagementChangeEvent.java @@ -19,6 +19,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity .ParentQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity .QueueManagementChange; @@ -30,6 +31,7 @@ public class QueueManagementChangeEvent extends SchedulerEvent { private ParentQueue parentQueue; + private LeafQueue leafQueue; private List queueManagementChanges; public QueueManagementChangeEvent(ParentQueue parentQueue, @@ -39,6 +41,13 @@ public QueueManagementChangeEvent(ParentQueue parentQueue, this.queueManagementChanges = queueManagementChanges; } + public QueueManagementChangeEvent(LeafQueue leafQueue) { + super(SchedulerEventType.MANAGE_QUEUE); + this.leafQueue = leafQueue; + } + + public LeafQueue getLeafQueue() { return leafQueue; } + public ParentQueue getParentQueue() { return parentQueue; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java index 1dd639c66bf..e06f7cbcbfb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java @@ -400,6 +400,7 @@ public static CapacitySchedulerConfiguration setupQueueConfiguration( conf.setUserLimitFactor(C, 1.0f); conf.setAutoCreateChildQueueEnabled(C, true); + conf.setAutoExpiredDeletionEnabled(C, true); //Setup leaf queue template configs conf.setAutoCreatedLeafQueueConfigCapacity(C, 50.0f); @@ -426,6 +427,7 @@ public static CapacitySchedulerConfiguration setupQueueConfiguration( conf.setAutoCreateChildQueueEnabled(D, true); conf.setUserLimit(D, 100); conf.setUserLimitFactor(D, 3.0f); + conf.setAutoExpiredDeletionEnabled(D, true); //Setup leaf queue template configs conf.setAutoCreatedLeafQueueConfigCapacity(D, 10.0f); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java index 300993b9475..2b9db927f79 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java @@ -966,4 +966,116 @@ public RMNodeLabelsManager createNodeLabelManager() { } } } + + @Test + public void testAutoCreatedQueueExpiredDeletion() throws Exception { + + try { + CSQueue parentQueue = cs.getQueue(PARENT_QUEUE); + + //submit app1 as USER1 + ApplicationId user1AppId = submitApp(mockRM, parentQueue, USER1, USER1, + 1, 1); + Map expectedAbsChildQueueCapacity = + populateExpectedAbsCapacityByLabelForParentQueue(1); + validateInitialQueueEntitlement(parentQueue, USER1, + expectedAbsChildQueueCapacity, accessibleNodeLabelsOnC); + + //submit another app2 as USER2 + ApplicationId user2AppId = submitApp(mockRM, parentQueue, USER2, USER2, 2, + 1); + + expectedAbsChildQueueCapacity = + populateExpectedAbsCapacityByLabelForParentQueue(2); + validateInitialQueueEntitlement(parentQueue, USER2, + expectedAbsChildQueueCapacity, accessibleNodeLabelsOnC); + + //submit another app3 as USER1 + submitApp(mockRM, parentQueue, USER1, USER1, 3, 2); + + //validate total activated abs capacity remains the same + GuaranteedOrZeroCapacityOverTimePolicy autoCreatedQueueManagementPolicy = + (GuaranteedOrZeroCapacityOverTimePolicy) ((ManagedParentQueue) parentQueue) + .getAutoCreatedQueueManagementPolicy(); + + for (String nodeLabel : accessibleNodeLabelsOnC) { + assertEquals(expectedAbsChildQueueCapacity.get(nodeLabel), + autoCreatedQueueManagementPolicy.getAbsoluteActivatedChildQueueCapacity(nodeLabel), EPSILON); + } + + //submit user_3 app. This cant be allocated since there is no capacity + // in NO_LABEL, SSD but can be in GPU label + submitApp(mockRM, parentQueue, USER3, USER3, 4, 1); + final CSQueue user3LeafQueue = cs.getQueue(USER3); + validateCapacities((AutoCreatedLeafQueue) user3LeafQueue, 0.0f, 0.0f, + 1.0f, 1.0f); + validateCapacitiesByLabel((ManagedParentQueue) parentQueue, + (AutoCreatedLeafQueue) + user3LeafQueue, NODEL_LABEL_GPU); + + assertEquals(0.2f, autoCreatedQueueManagementPolicy + .getAbsoluteActivatedChildQueueCapacity(NO_LABEL), EPSILON); + assertEquals(0.9f, autoCreatedQueueManagementPolicy.getAbsoluteActivatedChildQueueCapacity(NODEL_LABEL_GPU), + EPSILON); + + //Verify that AMs can be allocated + //Node 1 has SSD and default node label expression on C is SSD. + //This validates that the default node label expression with SSD is set + // on the AM attempt + // and app attempt reaches ALLOCATED state for a dynamic queue 'USER1' + mockRM.launchAM(mockRM.getRMContext().getRMApps().get(user1AppId), + mockRM, nm1); + + //deactivate USER2 queue + cs.killAllAppsInQueue(USER2); + mockRM.waitForState(user2AppId, RMAppState.KILLED); + + // Set queue expired + AutoCreatedLeafQueue user2Leaf = (AutoCreatedLeafQueue)cs.getQueue(USER2); + user2Leaf.setExpiredQueue(true); + + //Verify if USER_2 can be deactivated since it has no pending apps + List queueManagementChanges = + autoCreatedQueueManagementPolicy.computeQueueManagementChanges(); + + ManagedParentQueue managedParentQueue = (ManagedParentQueue) parentQueue; + managedParentQueue. + validateAndApplyQueueManagementChanges(queueManagementChanges); + + validateDeactivatedQueueEntitlement(parentQueue, USER2, + expectedAbsChildQueueCapacity, queueManagementChanges); + + // Enabled expired deletion for user2 + cs.getConfiguration().setAutoExpiredDeletionEnabled(C , true); + cs.reinitialize(cs.getConfiguration(), cs.getRMContext(), true); + + // Assert queue has been deleted + // fixme when reinitialize also should handle consistent deletion for queuename and queuepath + //assertTrue(cs.getCapacitySchedulerQueueManager().getQueue( USER1) == null); + assertTrue(cs.getCapacitySchedulerQueueManager().getQueue(C + USER2) == null); + + //deactivate USER1 queue + cs.killAllAppsInQueue(USER1); + mockRM.waitForState(user2AppId, RMAppState.KILLED); + + // Disable expired deletion for user1 + cs.getConfiguration().setAutoExpiredDeletionEnabled(C , false); + cs.reinitialize(cs.getConfiguration(), cs.getRMContext(), true); + + assertTrue(cs.getCapacitySchedulerQueueManager().getQueue(USER1) != null); + + // Enabled expired deletion for user1 + cs.getConfiguration().setAutoExpiredDeletionEnabled(C , true); + cs.reinitialize(cs.getConfiguration(), cs.getRMContext(), true); + // fixme when reinitialize also should handle consistent deletion for queuename and queuepath + //assertTrue(cs.getCapacitySchedulerQueueManager().getQueue( USER1) == null); + assertTrue(cs.getCapacitySchedulerQueueManager().getQueue(C + USER1) == null); + + } finally { + cleanupQueue(USER1); + cleanupQueue(USER2); + cleanupQueue(USER3); + } + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java index 25b2f4d0c4a..f580fd8a7cd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java @@ -18,14 +18,22 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -43,6 +51,8 @@ private CapacityScheduler cs; private CapacitySchedulerConfiguration csConf; private CapacitySchedulerAutoQueueHandler autoQueueHandler; + private QueueManagementDynamicEditPolicy policy = new + QueueManagementDynamicEditPolicy(); /* Create the following structure: @@ -68,6 +78,9 @@ public void setUp() throws Exception { csConf.setAutoQueueCreationV2Enabled("root", true); csConf.setAutoQueueCreationV2Enabled("root.a", true); csConf.setAutoQueueCreationV2Enabled("root.e", true); + // Test for auto deletion when expired + csConf.setAutoExpiredDeletionEnabled("root.e", true); + csConf.setAutoExpiredDeletionTime("root.e", 1); } private void startScheduler() throws Exception { @@ -80,6 +93,8 @@ protected RMNodeLabelsManager createNodeLabelManager() { }; cs = (CapacityScheduler) mockRM.getResourceScheduler(); cs.updatePlacementRules(); + // Policy for new auto created queue's auto deletion when expired + policy.init(cs.getConfiguration(), cs.getRMContext(), cs); mockRM.start(); cs.start(); autoQueueHandler = new CapacitySchedulerAutoQueueHandler( @@ -423,6 +438,147 @@ public void testAutoQueueCreationOnAppSubmission() throws Exception { Assert.assertTrue(user0.isDynamicQueue()); } + @Test + public void testEditScheduleForNewAutoDeletion() throws Exception { + + startScheduler(); + + Assert.assertEquals(0, policy.getNewDynamicLeafQueues().size()); + + // Make e as a dynamic parent + createQueue("root.e.e1"); + + AbstractCSQueue e = (AbstractCSQueue) cs.getQueue("root.e"); + Assert.assertNotNull(e); + Assert.assertTrue(e.isDynamicQueue()); + + // Make sure e1 not null + AbstractCSQueue e1 = (AbstractCSQueue)cs.getCapacitySchedulerQueueManager().getQueue("root.e.e1"); + Assert.assertNotNull(e1); + Assert.assertTrue(e1.isDynamicQueue()); + + // signal it because of without submit created + e1.setLastSubmittedTimestamp(System.currentTimeMillis() - 10*1000); + + ApplicationAttemptId user0AppAttemptId = + submitApp(cs , USER0, USER0, "root.e"); + + AbstractCSQueue user0 = (AbstractCSQueue) cs.getQueue( + "root.e." + USER0); + Assert.assertNotNull(user0); + Assert.assertTrue(user0.isDynamicQueue()); + Assert.assertTrue(user0.getNumApplications() == 1); + + // Test expired for deletion + user0.setLastSubmittedTimestamp(System.currentTimeMillis() - 10*1000); + + // New auto created deletion auto when expired + policy.editSchedule(); + Assert.assertEquals(2, policy.getNewDynamicLeafQueues().size()); + + // Make app finished + AppAttemptRemovedSchedulerEvent event = + new AppAttemptRemovedSchedulerEvent(user0AppAttemptId, + RMAppAttemptState.FINISHED, false); + cs.handle(event); + AppRemovedSchedulerEvent rEvent = new AppRemovedSchedulerEvent( + user0AppAttemptId.getApplicationId(), RMAppState.FINISHED); + cs.handle(rEvent); + + Assert.assertTrue(user0.getNumApplications() == 0); + + // New auto created deletion auto when expired + policy.editSchedule(); + + // Make sure e1 has been deleted for expired + // expire time is 1s + e1 = (AbstractCSQueue) cs.getQueue( + "root.e.e1"); + Assert.assertNull(e1); + + // Make sure user0 has been deleted for expired + user0 = (AbstractCSQueue) cs.getQueue( + "root.e." + USER0); + + // Expired for deleted + Assert.assertNull(user0); + + // There are no new dynamic leaf queues + Assert.assertEquals(0, policy.getNewDynamicLeafQueues().size()); + + // Parent queue not deleted + Assert.assertNotNull(e); + + } + + // If policy not enabled we can trigger by Reinitialize update + @Test + public void testReinitializeForNewAutoDeletion() throws Exception { + + startScheduler(); + + // Without trigger policy + Assert.assertEquals(0, policy.getNewDynamicLeafQueues().size()); + + // Make e as a dynamic parent + createQueue("root.e.e1"); + + AbstractCSQueue e = (AbstractCSQueue) cs.getQueue("root.e"); + Assert.assertNotNull(e); + Assert.assertTrue(e.isDynamicQueue()); + + // Make sure e1 not null + AbstractCSQueue e1 = (AbstractCSQueue)cs.getCapacitySchedulerQueueManager().getQueue("root.e.e1"); + Assert.assertNotNull(e1); + Assert.assertTrue(e1.isDynamicQueue()); + + // signal it because of without submit created + e1.setLastSubmittedTimestamp(System.currentTimeMillis() - 10*1000); + + ApplicationAttemptId user0AppAttemptId = + submitApp(cs , USER0, USER0, "root.e"); + + AbstractCSQueue user0 = (AbstractCSQueue) cs.getQueue( + "root.e." + USER0); + Assert.assertNotNull(user0); + Assert.assertTrue(user0.isDynamicQueue()); + Assert.assertTrue(user0.getNumApplications() == 1); + + // Test expired for deletion + user0.setLastSubmittedTimestamp(System.currentTimeMillis() - 10*1000); + + // Make app finished + AppAttemptRemovedSchedulerEvent event = + new AppAttemptRemovedSchedulerEvent(user0AppAttemptId, + RMAppAttemptState.FINISHED, false); + cs.handle(event); + AppRemovedSchedulerEvent rEvent = new AppRemovedSchedulerEvent( + user0AppAttemptId.getApplicationId(), RMAppState.FINISHED); + cs.handle(rEvent); + + Assert.assertTrue(user0.getNumApplications() == 0); + + // Will be deleted by reinitialize update + cs.reinitialize(cs.getConfiguration(), cs.getRMContext(), true); + + // Make sure e1 has been deleted for expired + // expire time is 1s + e1 = (AbstractCSQueue) cs.getQueue( + "root.e.e1"); + Assert.assertNull(e1); + + // Make sure user0 has been deleted for expired + user0 = (AbstractCSQueue) cs.getQueue( + "root.e." + USER0); + + // Expired for deleted + Assert.assertNull(user0); + + // There are no new dynamic leaf queues + Assert.assertEquals(0, policy.getNewDynamicLeafQueues().size()); + + } + private LeafQueue createQueue(String queuePath) throws YarnException { return autoQueueHandler.autoCreateQueue( CSQueueUtils.extractQueuePath(queuePath)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueManagementDynamicEditPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueManagementDynamicEditPolicy.java index 733f0417157..f85826a81bd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueManagementDynamicEditPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueManagementDynamicEditPolicy.java @@ -31,6 +31,7 @@ import static org.apache.hadoop.yarn.server.resourcemanager.scheduler .capacity.CSQueueUtils.EPSILON; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class TestQueueManagementDynamicEditPolicy extends TestCapacitySchedulerAutoCreatedQueueBase { @@ -113,6 +114,94 @@ public void testEditSchedule() throws Exception { } } + + @Test + public void testEditScheduleForOldAutoDeletion() throws Exception { + + try { + // Old auto created deletion auto when expired + policy.editSchedule(); + assertEquals(2, policy.getManagedParentQueues().size()); + + // Here we can use queue path to consistent with auto created + CSQueue parentQueue = cs.getQueue(PARENT_QUEUE); + + GuaranteedOrZeroCapacityOverTimePolicy autoCreatedQueueManagementPolicy = + (GuaranteedOrZeroCapacityOverTimePolicy) ((ManagedParentQueue) + parentQueue) + .getAutoCreatedQueueManagementPolicy(); + assertEquals(0f, autoCreatedQueueManagementPolicy + .getAbsoluteActivatedChildQueueCapacity(NO_LABEL), EPSILON); + + //submit app1 as USER1 + ApplicationId user1AppId = submitApp(mockRM, parentQueue, USER1, USER1, 1, + 1); + Map expectedAbsChildQueueCapacity = + populateExpectedAbsCapacityByLabelForParentQueue(1); + validateInitialQueueEntitlement(parentQueue, USER1, + expectedAbsChildQueueCapacity, accessibleNodeLabelsOnC); + + //submit another app2 as USER2 + ApplicationId user2AppId = submitApp(mockRM, parentQueue, USER2, USER2, 2, + 1); + expectedAbsChildQueueCapacity = + populateExpectedAbsCapacityByLabelForParentQueue(2); + validateInitialQueueEntitlement(parentQueue, USER2, + expectedAbsChildQueueCapacity, accessibleNodeLabelsOnC); + + //validate total activated abs capacity + assertEquals(0.2f, autoCreatedQueueManagementPolicy + .getAbsoluteActivatedChildQueueCapacity(NO_LABEL), EPSILON); + + //submit user_3 app. This cant be scheduled since there is no capacity + submitApp(mockRM, parentQueue, USER3, USER3, 3, 1); + final CSQueue user3LeafQueue = cs.getQueue(USER3); + validateCapacities((AutoCreatedLeafQueue) user3LeafQueue, 0.0f, 0.0f, + 1.0f, 1.0f); + + assertEquals(autoCreatedQueueManagementPolicy + .getAbsoluteActivatedChildQueueCapacity(NO_LABEL), 0.2f, EPSILON); + + //deactivate USER2 queue + cs.killAllAppsInQueue(USER2); + mockRM.waitForState(user2AppId, RMAppState.KILLED); + + //deactivate USER1 queue + cs.killAllAppsInQueue(USER1); + mockRM.waitForState(user1AppId, RMAppState.KILLED); + + policy.editSchedule(); + waitForPolicyState(0.1f, autoCreatedQueueManagementPolicy, NO_LABEL, + 1000); + + validateCapacities((AutoCreatedLeafQueue) user3LeafQueue, 0.5f, 0.1f, + 1.0f, 1.0f); + + validateCapacitiesByLabel((ManagedParentQueue) parentQueue, (AutoCreatedLeafQueue) user3LeafQueue, + NODEL_LABEL_GPU); + + // The deletion for queuename and queuepath should be consistent + // Set queue expired + AutoCreatedLeafQueue user2Leaf = (AutoCreatedLeafQueue)cs.getQueue(USER2); + assertTrue(cs.getCapacitySchedulerQueueManager().getQueue("root." + "c." +USER2) != null); + assertTrue(cs.getCapacitySchedulerQueueManager().getQueue(USER2) != null); + user2Leaf.setExpiredQueue(true); + + // update expired deletion + policy.editSchedule(); + + // Assert queue has been deleted + assertEquals(cs.getCapacitySchedulerQueueManager().getQueue("root." + "c." +USER), null); + assertEquals(cs.getCapacitySchedulerQueueManager().getQueue(USER), null); + + + } finally { + cleanupQueue(USER1); + cleanupQueue(USER2); + cleanupQueue(USER3); + } + } + private void waitForPolicyState(float expectedVal, GuaranteedOrZeroCapacityOverTimePolicy queueManagementPolicy, String nodeLabel, int timesec) throws InterruptedException {