diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 41442363711..a8153da71c2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -484,6 +484,11 @@ public void removeQueue(String queueName) throws YarnException { + " does not support removing queues"); } + public void removeQueue(CSQueue queueName) throws YarnException { + throw new YarnException(getClass().getSimpleName() + + " does not support removing queues"); + } + @Override public void addQueue(Queue newQueue) throws YarnException, IOException { throw new YarnException(getClass().getSimpleName() diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 12ce05f2791..872f304e3c6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -29,6 +29,7 @@ import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -154,6 +155,11 @@ // is it a dynamic queue? private boolean dynamicQueue = false; + // When this queue has application submit to? + // This property only applies to dynamic queue, + // and will be used to check when the queue need to be removed. + private long lastSubmittedTimestamp; + public AbstractCSQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { this(cs, cs.getConfiguration(), queueName, parent, old); @@ -1633,4 +1639,47 @@ public void setDynamicQueue(boolean dynamicQueue) { writeLock.unlock(); } } + + public boolean isEligibleForAutoDeletion() { + return false; + } + + public boolean isInactiveDynamicQueue() { + long idleDurationSeconds = + (Time.monotonicNow() - lastSubmittedTimestamp)/1000; + return isDynamicQueue() && isEligibleForAutoDeletion() && + (idleDurationSeconds > this.csContext.getConfiguration(). + getAutoExpiredDeletionTime()); + } + + // "Tab" the queue, so this queue won't be removed because of idle timeout. + public void signalToSubmitToQueue() { + writeLock.lock(); + try { + this.lastSubmittedTimestamp = Time.monotonicNow(); + } finally { + writeLock.unlock(); + } + } + + public long getLastSubmittedTimestamp() { + readLock.lock(); + + try { + return lastSubmittedTimestamp; + } finally { + readLock.unlock(); + } + } + + // just for test + public void setLastSubmittedTimestamp(long lastSubmittedTimestamp) { + writeLock.lock(); + try { + this.lastSubmittedTimestamp = lastSubmittedTimestamp; + } finally { + writeLock.unlock(); + } + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 1bb74a092e6..d5c84e95412 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -142,9 +142,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; - import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event .QueueManagementChangeEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AutoCreatedQueueDeletionEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; @@ -2058,11 +2058,39 @@ public void handle(SchedulerEvent event) { } } break; + case AUTO_QUEUE_DELETION: + try { + AutoCreatedQueueDeletionEvent autoCreatedQueueDeletionEvent = + (AutoCreatedQueueDeletionEvent) event; + removeAutoCreatedQueue(autoCreatedQueueDeletionEvent. + getCheckQueue()); + + }catch (SchedulerDynamicEditException sde) { + LOG.error("Dynamic queue deletion cannot be applied for " + + "queue : ", sde); + } + break; default: LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!"); } } + private void removeAutoCreatedQueue(CSQueue checkQueue) + throws SchedulerDynamicEditException{ + // Expired queue, when there are no applications in queue, + // and the last submit time has been expired. + // Delete queue when expired deletion enabled. + writeLock.lock(); + try { + if (checkQueue instanceof AbstractCSQueue + && ((AbstractCSQueue) checkQueue).isInactiveDynamicQueue()) { + removeQueue(checkQueue); + } + } finally { + writeLock.lock(); + } + } + private void updateNodeAttributes( NodeAttributesUpdateSchedulerEvent attributeUpdateEvent) { writeLock.lock(); @@ -2492,7 +2520,8 @@ public void removeQueue(String queueName) if (!(AbstractAutoCreatedLeafQueue.class.isAssignableFrom( q.getClass()))) { throw new SchedulerDynamicEditException( - "The queue that we are asked " + "to remove (" + queueName + "The queue that we are asked " + + "to remove (" + queueName + ") is not a AutoCreatedLeafQueue or ReservationQueue"); } AbstractAutoCreatedLeafQueue disposableLeafQueue = @@ -2516,6 +2545,48 @@ public void removeQueue(String queueName) } } + @Override + public void removeQueue(CSQueue queue) + throws SchedulerDynamicEditException { + writeLock.lock(); + try { + LOG.info("Removing queue: " + queue.getQueuePath()); + if (!((AbstractCSQueue)queue).isDynamicQueue()) { + throw new SchedulerDynamicEditException( + "The queue that we are asked " + "to remove (" + queue.getQueuePath() + + ") is not a DynamicQueue"); + } + + // at this point we should have no more apps + if (queue.getNumApplications() != 0) { + throw new SchedulerDynamicEditException( + "The queue " + queue.getQueuePath() + " is not empty " + + queue.getNumApplications() + " apps "); + } + + ParentQueue parentQueue = (ParentQueue)queue.getParent(); + if (parentQueue != null) { + ((ParentQueue) queue.getParent()).removeChildQueue(queue); + } else { + throw new SchedulerDynamicEditException( + "The queue " + queue.getQueuePath() + + " can't removed normally."); + } + + if (parentQueue.childQueues.contains(queue) || + queueManager.getQueue(queue.getQueuePath()) != null) { + throw new SchedulerDynamicEditException( + "The queue " + queue.getQueuePath() + + " has not been removed normally."); + } + + LOG.info("Removed queue: " + queue.getQueuePath()); + + } finally { + writeLock.unlock(); + } + } + @Override public void addQueue(Queue queue) throws SchedulerDynamicEditException, IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index abbc2d7875f..1cb78c57a66 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -2010,11 +2010,11 @@ public void setDefaultLifetimePerQueue(String queue, long defaultLifetime) { @Private private static final String AUTO_QUEUE_CREATION_V2_PREFIX = - "auto-queue-creation-v2"; + "auto-queue-creation-v2."; @Private public static final String AUTO_QUEUE_CREATION_V2_ENABLED = - AUTO_QUEUE_CREATION_V2_PREFIX + ".enabled"; + AUTO_QUEUE_CREATION_V2_PREFIX + "enabled"; @Private public static final boolean DEFAULT_AUTO_QUEUE_CREATION_ENABLED = false; @@ -2145,7 +2145,7 @@ public int getAutoCreatedQueuesMaxChildQueuesLimit(String queuePath) { "yarn.resourcemanager.monitor.capacity.queue-management."; /** - * Time in milliseconds between invocations of this policy + * Time in milliseconds between invocations of queuemanagement policy */ @Private public static final String QUEUE_MANAGEMENT_MONITORING_INTERVAL = @@ -2155,6 +2155,66 @@ public int getAutoCreatedQueuesMaxChildQueuesLimit(String queuePath) { public static final long DEFAULT_QUEUE_MANAGEMENT_MONITORING_INTERVAL = 1500L; + /** + * Time in milliseconds between invocations of auto removal policy + */ + @Private + public static final String QUEUE_AUTO_REMOVAL_MONITORING_INTERVAL = + AUTO_QUEUE_CREATION_V2_PREFIX + "queue-auto-removal-monitoring-interval"; + + @Private + public static final boolean + DEFAULT_AUTO_CREATE_CHILD_QUEUE_AUTO_REMOVAL_ENABLE = true; + + @Private + public static final String AUTO_CREATE_CHILD_QUEUE_AUTO_REMOVAL_ENABLE = + AUTO_QUEUE_CREATION_V2_PREFIX + "queue-auto-removal.enable"; + + // 300s for expired defualt + @Private + public static final long + DEFAULT_AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME = 300; + + @Private + public static final String AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME = + AUTO_QUEUE_CREATION_V2_PREFIX + "queue-expiration-time"; + + /** + * If true, auto created queue with weight mode + * will be deleted when queue is expired. + * @return true if auto created queue's deletion when expired is enabled + * for child queues else false. Default + * is true + */ + @Private + public boolean isAutoExpiredDeletionEnabled() { + boolean isAutoExpiredDeletionEnabled = getBoolean( + AUTO_CREATE_CHILD_QUEUE_AUTO_REMOVAL_ENABLE, + DEFAULT_AUTO_CREATE_CHILD_QUEUE_AUTO_REMOVAL_ENABLE); + return isAutoExpiredDeletionEnabled; + } + + @Private + @VisibleForTesting + public void setAutoExpiredDeletionEnabled(boolean autoRemovalEnable) { + setBoolean(AUTO_CREATE_CHILD_QUEUE_AUTO_REMOVAL_ENABLE, + autoRemovalEnable); + } + + @Private + @VisibleForTesting + public void setAutoExpiredDeletionTime(long time) { + setLong(AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME, time); + } + + @Private + @VisibleForTesting + public long getAutoExpiredDeletionTime() { + return getLong(AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME, + DEFAULT_AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME); + } + + /** * Queue Management computation policy for Auto Created queues * @param queue The queue's path @@ -2169,8 +2229,9 @@ public String getAutoCreatedQueueManagementPolicy(String queue) { } /** - * Get The policy class configured to manage capacities for auto created leaf - * queues under the specified parent + * Get The policy class configured to manage capacities. + * For auto created leaf queues + * under the specified parent. * * @param queueName The parent queue's name * @return The policy class configured to manage capacities for auto created diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 6bf8d0a471a..e749c886dc9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -603,6 +603,9 @@ public void submitApplication(ApplicationId applicationId, String userName, // Careful! Locking order is important! validateSubmitApplication(applicationId, userName, queue); + // Signal for expired auto deletion. + signalToSubmitToQueue(); + // Inform the parent queue try { getParent().submitApplication(applicationId, userName, queue); @@ -2391,4 +2394,9 @@ boolean removeNonRunnableApp(FiCaSchedulerApp app) { } return appsToReturn; } + + @Override + public boolean isEligibleForAutoDeletion() { + return isDynamicQueue() && getNumApplications() == 0; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index b412e8a1dde..03c9d5ed2b1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -560,6 +560,7 @@ private CSQueue addDynamicChildQueue(String childQueuePath, boolean isLeaf) CSQueue newQueue = createNewQueue(childQueuePath, isLeaf); this.childQueues.add(newQueue); + signalToSubmitToQueue(); // Call updateClusterResource // , which will deal with all effectiveMin/MaxResource @@ -573,6 +574,28 @@ private CSQueue addDynamicChildQueue(String childQueuePath, boolean isLeaf) } } + + // New method to remove child queue + public void removeChildQueue(CSQueue queue) + throws SchedulerDynamicEditException { + writeLock.lock(); + try { + // Now we can do remove and update + this.childQueues.remove(queue); + this.scheduler.getCapacitySchedulerQueueManager() + .removeQueue(queue.getQueuePath()); + + // Call updateClusterResource, + // which will deal with all effectiveMin/MaxResource + // Calculation + this.updateClusterResource(csContext.getClusterResource(), + new ResourceLimits(this.csContext.getClusterResource())); + + } finally { + writeLock.unlock(); + } + } + /** * Check whether this queue supports adding additional child queues * dynamically. @@ -1597,4 +1620,9 @@ void decrementRunnableApps() { Map getEffectiveMinRatioPerResource() { return effectiveMinRatioPerResource; } + + @Override + public boolean isEligibleForAutoDeletion() { + return isDynamicQueue() && getChildQueues().size() == 0; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/AutoCreatedQueueDeletionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/AutoCreatedQueueDeletionPolicy.java new file mode 100644 index 00000000000..b0f10b8f83b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/AutoCreatedQueueDeletionPolicy.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.queuemanagement; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AutoCreatedQueueDeletionEvent; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * Auto deletion policy for auto created queue V2. + * Just for weight based auto created queues. + + * + */ +public class AutoCreatedQueueDeletionPolicy implements SchedulingEditPolicy { + private static final Logger LOG = + LoggerFactory.getLogger(AutoCreatedQueueDeletionPolicy.class); + + private Clock clock; + + // Pointer to other RM components + private RMContext rmContext; + private ResourceCalculator rc; + private CapacityScheduler scheduler; + + private long monitoringInterval; + + // markedForDeletion: in each interval, + // this set is extended by queues that are eligible for auto deletion. + private Set markedForDeletion = new HashSet<>(); + // sentForDeletion: if in the next interval, + // there is queue, that is eligible for auto deletion, + // and is already marked for deletion, move it to this queue. + private Set sentForDeletion = new HashSet<>(); + + @Override + public void init(final Configuration config, final RMContext context, + final ResourceScheduler sched) { + LOG.info("Auto Deletion Policy monitor: {}" + this. + getClass().getCanonicalName()); + assert null == scheduler : "Unexpected duplicate call to init"; + if (!(sched instanceof CapacityScheduler)) { + throw new YarnRuntimeException("Class " + + sched.getClass().getCanonicalName() + " not instance of " + + CapacityScheduler.class.getCanonicalName()); + } + rmContext = context; + scheduler = (CapacityScheduler) sched; + clock = scheduler.getClock(); + + rc = scheduler.getResourceCalculator(); + + CapacitySchedulerConfiguration csConfig = scheduler.getConfiguration(); + + // The monitor time will equal the + // auto deletion expired time default. + monitoringInterval = csConfig.getLong( + CapacitySchedulerConfiguration.QUEUE_AUTO_REMOVAL_MONITORING_INTERVAL, + csConfig.getLong(CapacitySchedulerConfiguration. + AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME, + CapacitySchedulerConfiguration. + DEFAULT_AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME)); + + initQueues(); + } + + /** + * Reinitializes queues(Called on scheduler.reinitialize). + * @param config Configuration + * @param context The resourceManager's context + * @param sched The scheduler + */ + public void reinitialize(final Configuration config, final RMContext context, + final ResourceScheduler sched) { + //TODO - Wire with scheduler reinitialize and remove initQueues below? + initQueues(); + } + + private void initQueues() { + Set newMarks = new HashSet<>(); + for (Map.Entry queueEntry : + scheduler.getCapacitySchedulerQueueManager().getQueues().entrySet()) { + String queuePath = queueEntry.getKey(); + CSQueue queue = queueEntry.getValue(); + if (queue instanceof AbstractCSQueue && + ((AbstractCSQueue) queue).isEligibleForAutoDeletion()) { + if (markedForDeletion.contains(queuePath)) { + sentForDeletion.add(queuePath); + markedForDeletion.remove(queuePath); + } else { + newMarks.add(queuePath); + } + } + } + markedForDeletion.clear(); + markedForDeletion.addAll(newMarks); + } + + @Override + public void editSchedule() { + long startTs = clock.getTime(); + + initQueues(); + autoDeletionForExpiredQueues(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Total time used=" + (clock.getTime() - startTs) + " ms."); + } + } + + private void autoDeletionForExpiredQueues() { + // Proceed new auto created queues + for (String queueName : sentForDeletion) { + CSQueue checkQueue = + scheduler.getCapacitySchedulerQueueManager(). + getQueue(queueName); + queueAutoDeletionCheck(checkQueue); + } + sentForDeletion.clear(); + } + + private void queueAutoDeletionCheck(CSQueue checkQueue) { + //Scheduler update is asynchronous + if (checkQueue != null) { + AutoCreatedQueueDeletionEvent autoCreatedQueueDeletionEvent = + new AutoCreatedQueueDeletionEvent(checkQueue); + scheduler.getRMContext().getDispatcher().getEventHandler().handle( + autoCreatedQueueDeletionEvent); + } + } + + @Override + public long getMonitoringInterval() { + return monitoringInterval; + } + + @Override + public String getPolicyName() { + return "AutoDeletionForExpiredQueuePolicy"; + } + + public ResourceCalculator getResourceCalculator() { + return rc; + } + + public RMContext getRmContext() { + return rmContext; + } + + public ResourceCalculator getRC() { + return rc; + } + + public CapacityScheduler getScheduler() { + return scheduler; + } + + public Set getMarkedForDeletion() { + return markedForDeletion; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AutoCreatedQueueDeletionEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AutoCreatedQueueDeletionEvent.java new file mode 100644 index 00000000000..e51c860a6c4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AutoCreatedQueueDeletionEvent.java @@ -0,0 +1,15 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; + +public class AutoCreatedQueueDeletionEvent extends SchedulerEvent{ + private CSQueue checkQueue; + public AutoCreatedQueueDeletionEvent(CSQueue checkQueue) { + super(SchedulerEventType.AUTO_QUEUE_DELETION); + this.checkQueue = checkQueue; + } + + public CSQueue getCheckQueue() { + return checkQueue; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java index 869bf0ed9e4..3b8a1de64e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java @@ -55,5 +55,8 @@ MARK_CONTAINER_FOR_NONKILLABLE, //Queue Management Change - MANAGE_QUEUE + MANAGE_QUEUE, + + // Auto created queue, auto deletion check + AUTO_QUEUE_DELETION } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java index 48dba821434..e2b163ec638 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java @@ -18,17 +18,22 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; -import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.queuemanagement.AutoCreatedQueueDeletionPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; @@ -50,6 +55,8 @@ private CapacityScheduler cs; private CapacitySchedulerConfiguration csConf; private CapacitySchedulerAutoQueueHandler autoQueueHandler; + private AutoCreatedQueueDeletionPolicy policy = new + AutoCreatedQueueDeletionPolicy(); /* Create the following structure: @@ -75,6 +82,9 @@ public void setUp() throws Exception { csConf.setAutoQueueCreationV2Enabled("root", true); csConf.setAutoQueueCreationV2Enabled("root.a", true); csConf.setAutoQueueCreationV2Enabled("root.e", true); + // Test for auto deletion when expired + csConf.setAutoExpiredDeletionEnabled(true); + csConf.setAutoExpiredDeletionTime(1); } private void startScheduler() throws Exception { @@ -87,6 +97,8 @@ protected RMNodeLabelsManager createNodeLabelManager() { }; cs = (CapacityScheduler) mockRM.getResourceScheduler(); cs.updatePlacementRules(); + // Policy for new auto created queue's auto deletion when expired + policy.init(cs.getConfiguration(), cs.getRMContext(), cs); mockRM.start(); cs.start(); autoQueueHandler = new CapacitySchedulerAutoQueueHandler( @@ -506,7 +518,7 @@ public void testAutoCreateQueueUserLimitDisabled() throws Exception { Assert.assertTrue(user0.isDynamicQueue()); Assert.assertTrue(user0 instanceof LeafQueue); - LeafQueue user0LeafQueue = (LeafQueue)user0; + LeafQueue user0LeafQueue = (LeafQueue) user0; // Assert user limit factor is -1 Assert.assertTrue(user0LeafQueue.getUserLimitFactor() == -1); @@ -517,14 +529,113 @@ public void testAutoCreateQueueUserLimitDisabled() throws Exception { // Assert AM Resource Assert.assertEquals(user0LeafQueue.getAMResourceLimit().getMemorySize(), - user0LeafQueue.getMaxAMResourcePerQueuePercent()*MAX_MEMORY*GB, 1e-6); + user0LeafQueue.getMaxAMResourcePerQueuePercent() * MAX_MEMORY * GB, + 1e-6); // Assert user limit (no limit) when limit factor is -1 - Assert.assertEquals(MAX_MEMORY*GB, + Assert.assertEquals(MAX_MEMORY * GB, user0LeafQueue.getEffectiveMaxCapacityDown("", user0LeafQueue.getMinimumAllocation()).getMemorySize(), 1e-6); } + @Test + public void testEditScheduleForNewAutoDeletion() throws Exception { + startScheduler(); + policy.editSchedule(); + Assert.assertEquals(0, policy.getMarkedForDeletion().size()); + + // Make e as a dynamic parent + createQueue("root.e.e1"); + + AbstractCSQueue e = (AbstractCSQueue) cs.getQueue("root.e"); + Assert.assertNotNull(e); + Assert.assertTrue(e.isDynamicQueue()); + + // Make sure e1 not null + AbstractCSQueue e1 = (AbstractCSQueue)cs.getQueue("root.e.e1"); + Assert.assertNotNull(e1); + Assert.assertTrue(e1.isDynamicQueue()); + + // signal it because of without submit created + e1.setLastSubmittedTimestamp(Time.monotonicNow() - 10*1000); + + ApplicationAttemptId user0AppAttemptId = + submitApp(cs, USER0, USER0, "root.e"); + + AbstractCSQueue user0 = (AbstractCSQueue) cs.getQueue( + "root.e." + USER0); + Assert.assertNotNull(user0); + Assert.assertTrue(user0.isDynamicQueue()); + Assert.assertTrue(user0.getNumApplications() == 1); + // Test expired for deletion + user0.setLastSubmittedTimestamp(Time.monotonicNow() - 10*1000); + // Mock signal to parent submit time + e.setLastSubmittedTimestamp(Time.monotonicNow() - 10*1000); + + // Make app finished + AppAttemptRemovedSchedulerEvent event = + new AppAttemptRemovedSchedulerEvent(user0AppAttemptId, + RMAppAttemptState.FINISHED, false); + cs.handle(event); + AppRemovedSchedulerEvent rEvent = new AppRemovedSchedulerEvent( + user0AppAttemptId.getApplicationId(), RMAppState.FINISHED); + cs.handle(rEvent); + + Assert.assertTrue(user0.getNumApplications() == 0); + + // New auto created deletion auto when expired, + // will add to mark deletion at first time. + policy.editSchedule(); + Assert.assertEquals(2, policy.getMarkedForDeletion().size()); + + // Then will send to deletion at second time. + policy.editSchedule(); + Assert.assertEquals(0, policy.getMarkedForDeletion().size()); + + // Wait for deletion + Thread.sleep(100); + + // Make sure e1 has been deleted for expired + // expire time is 1s + e1 = (AbstractCSQueue) cs.getQueue( + "root.e.e1"); + Assert.assertNull(e1); + + // Make sure user0 has been deleted for expired + user0 = (AbstractCSQueue) cs.getQueue( + "root.e." + USER0); + + // Expired for deleted + Assert.assertNull(user0); + + // Parent queue will not be deleted + // because last policy scheduled expired with child queues. + e = (AbstractCSQueue) cs.getQueue("root.e"); + Assert.assertNotNull(e); + + // Now scheduled for parent without child queues + // Mark expired for deletion. + policy.editSchedule(); + Assert.assertEquals(1, policy.getMarkedForDeletion().size()); + + // Now scheduled for parent without child queues + // send to for deletion. + policy.editSchedule(); + Assert.assertEquals(0, policy.getMarkedForDeletion().size()); + + // Wait the policy finished + Thread.sleep(100); + + // Parent queue will be deleted + e = (AbstractCSQueue) cs.getQueue("root.e"); + Assert.assertNull(e); + + // Now there are no dynamic queues marked for auto deletion. + policy.editSchedule(); + Assert.assertEquals(0, policy.getMarkedForDeletion().size()); + + } + private LeafQueue createQueue(String queuePath) throws YarnException { return autoQueueHandler.autoCreateQueue( CSQueueUtils.extractQueuePath(queuePath));