diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 41442363711..a8153da71c2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -484,6 +484,11 @@ public void removeQueue(String queueName) throws YarnException { + " does not support removing queues"); } + public void removeQueue(CSQueue queueName) throws YarnException { + throw new YarnException(getClass().getSimpleName() + + " does not support removing queues"); + } + @Override public void addQueue(Queue newQueue) throws YarnException, IOException { throw new YarnException(getClass().getSimpleName() diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 12ce05f2791..f4b14e8fd9a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -29,6 +29,7 @@ import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -154,6 +155,11 @@ // is it a dynamic queue? private boolean dynamicQueue = false; + // When this queue has application submit to? + // This property only applies to dynamic queue, + // and will be used to check when the queue need to be removed. + private long lastSubmittedTimestamp; + public AbstractCSQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { this(cs, cs.getConfiguration(), queueName, parent, old); @@ -1633,4 +1639,35 @@ public void setDynamicQueue(boolean dynamicQueue) { writeLock.unlock(); } } + + // "Tab" the queue, so this queue won't be removed because of idle timeout. + public void signalToSubmitToQueue() { + writeLock.lock(); + try { + this.lastSubmittedTimestamp = Time.monotonicNow(); + } finally { + writeLock.unlock(); + } + } + + public long getLastSubmittedTimestamp() { + readLock.lock(); + + try { + return lastSubmittedTimestamp; + } finally { + readLock.unlock(); + } + } + + // just for test + public void setLastSubmittedTimestamp(long lastSubmittedTimestamp) { + writeLock.lock(); + try { + this.lastSubmittedTimestamp = lastSubmittedTimestamp; + } finally { + writeLock.unlock(); + } + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 204fa73b45f..d5c5af99bf8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.placement.CSMappingPlacementRule; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementFactory; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.Marker; @@ -130,24 +131,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintsUtil; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAttributesUpdateSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; - -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event - .QueueManagementChangeEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet; @@ -2044,11 +2028,67 @@ public void handle(SchedulerEvent event) { } } break; + case AUTO_QUEUE_DELETION: + try { + AutoCreatedQueueDeletionCheckEvent autoCreatedQueueDeletionCheckEvent = + (AutoCreatedQueueDeletionCheckEvent) event; + if (autoCreatedQueueDeletionCheckEvent != null) { + AutoCreatedQueueDeletion + (autoCreatedQueueDeletionCheckEvent.getCheckQueue()); + } + }catch (SchedulerDynamicEditException sde) { + LOG.error("Dynamic queue deletion cannot be applied for " + + "queue : " , sde); + } + break; default: LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!"); } } + private void AutoCreatedQueueDeletion(CSQueue checkQueue) + throws SchedulerDynamicEditException{ + // Expired queue, when there are no applications in queue, + // and the last submit time has been expired. + // Delete queue when expired deletion enabled. + + writeLock.lock(); + try { + if (checkQueue instanceof ParentQueue && + ((ParentQueue) checkQueue).isDynamicQueue()) { + long idleDuration = (Time.monotonicNow() - + ((ParentQueue) checkQueue).getLastSubmittedTimestamp()); + if ((idleDuration / 1000) > conf. + getAutoExpiredDeletionTime() + && (checkQueue.getNumApplications() == 0)) { + // Double check no leaf queue in parent + if (checkQueue.getChildQueues().size() != 0) { + throw new SchedulerDynamicEditException( + "The queue " + checkQueue.getQueuePath() + " still have " + + checkQueue.getChildQueues().size() + + " child queues "); + } + removeQueue(checkQueue); + } + } + + if (checkQueue instanceof LeafQueue && + ((LeafQueue) checkQueue).isDynamicQueue()) { + long idleDuration = (Time.monotonicNow() - + ((LeafQueue) checkQueue).getLastSubmittedTimestamp()); + if ((idleDuration / 1000) > conf. + getAutoExpiredDeletionTime() + && (checkQueue.getNumApplications() == 0)) { + removeQueue(checkQueue); + } + } + + } finally { + writeLock.lock(); + } + + } + private void updateNodeAttributes( NodeAttributesUpdateSchedulerEvent attributeUpdateEvent) { writeLock.lock(); @@ -2502,6 +2542,48 @@ public void removeQueue(String queueName) } } + @Override + public void removeQueue(CSQueue queue) + throws SchedulerDynamicEditException { + writeLock.lock(); + try { + LOG.info("Removing queue: " + queue.getQueuePath()); + if (!((AbstractCSQueue)queue).isDynamicQueue()) { + throw new SchedulerDynamicEditException( + "The queue that we are asked " + "to remove (" + queue.getQueuePath() + + ") is not a DynamicQueue"); + } + + // at this point we should have no more apps + if (queue.getNumApplications() != 0) { + throw new SchedulerDynamicEditException( + "The queue " + queue.getQueuePath() + " is not empty " + + queue.getNumApplications() + " apps "); + } + + ParentQueue parentQueue = (ParentQueue)queue.getParent(); + if (parentQueue != null) { + ((ParentQueue) queue.getParent()).removeChildQueue(queue); + } else { + throw new SchedulerDynamicEditException( + "The queue " + queue.getQueuePath() + + " can't removed normally."); + } + + if (parentQueue.childQueues.contains(queue) || + queueManager.getQueue(queue.getQueuePath()) != null) { + throw new SchedulerDynamicEditException( + "The queue " + queue.getQueuePath() + + " has not been removed normally."); + } + + LOG.info("Removed queue: " + queue.getQueuePath()); + + } finally { + writeLock.unlock(); + } + } + @Override public void addQueue(Queue queue) throws SchedulerDynamicEditException, IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index abbc2d7875f..315f1298e00 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -2010,11 +2010,11 @@ public void setDefaultLifetimePerQueue(String queue, long defaultLifetime) { @Private private static final String AUTO_QUEUE_CREATION_V2_PREFIX = - "auto-queue-creation-v2"; + "auto-queue-creation-v2."; @Private public static final String AUTO_QUEUE_CREATION_V2_ENABLED = - AUTO_QUEUE_CREATION_V2_PREFIX + ".enabled"; + AUTO_QUEUE_CREATION_V2_PREFIX + "enabled"; @Private public static final boolean DEFAULT_AUTO_QUEUE_CREATION_ENABLED = false; @@ -2155,6 +2155,59 @@ public int getAutoCreatedQueuesMaxChildQueuesLimit(String queuePath) { public static final long DEFAULT_QUEUE_MANAGEMENT_MONITORING_INTERVAL = 1500L; + @Private + public static final boolean + DEFAULT_AUTO_CREATE_CHILD_QUEUE_EXPIRED_DELETION_ENABLED = true; + + @Private + public static final String AUTO_CREATE_CHILD_QUEUE_EXPIRED_DELETION_ENABLED = + AUTO_QUEUE_CREATION_V2_PREFIX + "expired-deletion-enabled"; + + // 300s for expired defualt + @Private + public static final long + DEFAULT_AUTO_CREATE_CHILD_QUEUE_EXPIRED_DELETION_TIME = 300; + + @Private + public static final String AUTO_CREATE_CHILD_QUEUE_EXPIRED_DELETION_TIME = + AUTO_QUEUE_CREATION_V2_PREFIX + "expired-deletion-time"; + + /** + * If true, auto created queue with weight mode + * will be deleted when queue is expired. + * @return true if auto created queue's deletion when expired is enabled + * for child queues else false. Default + * is true + */ + @Private + public boolean isAutoExpiredDeletionEnabled() { + boolean isAutoExpiredDeletionEnabled = getBoolean( + AUTO_CREATE_CHILD_QUEUE_EXPIRED_DELETION_ENABLED, + DEFAULT_AUTO_CREATE_CHILD_QUEUE_EXPIRED_DELETION_ENABLED); + return isAutoExpiredDeletionEnabled; + } + + @Private + @VisibleForTesting + public void setAutoExpiredDeletionEnabled(boolean autoExpiredDeletionEnabled) { + setBoolean(AUTO_CREATE_CHILD_QUEUE_EXPIRED_DELETION_ENABLED, + autoExpiredDeletionEnabled); + } + + @Private + @VisibleForTesting + public void setAutoExpiredDeletionTime(long time) { + setLong(AUTO_CREATE_CHILD_QUEUE_EXPIRED_DELETION_TIME, time); + } + + @Private + @VisibleForTesting + public long getAutoExpiredDeletionTime() { + return getLong(AUTO_CREATE_CHILD_QUEUE_EXPIRED_DELETION_TIME, + DEFAULT_AUTO_CREATE_CHILD_QUEUE_EXPIRED_DELETION_TIME); + } + + /** * Queue Management computation policy for Auto Created queues * @param queue The queue's path diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 6bf8d0a471a..513dbf45b85 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -603,6 +603,11 @@ public void submitApplication(ApplicationId applicationId, String userName, // Careful! Locking order is important! validateSubmitApplication(applicationId, userName, queue); + // Signal to queue submit time in dynamic queue + if (this.isDynamicQueue()) { + signalToSubmitToQueue(); + } + // Inform the parent queue try { getParent().submitApplication(applicationId, userName, queue); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index b412e8a1dde..7aaf3a676f2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -573,6 +573,27 @@ private CSQueue addDynamicChildQueue(String childQueuePath, boolean isLeaf) } } + + // New method to remove child queue + public void removeChildQueue(CSQueue queue) + throws SchedulerDynamicEditException { + writeLock.lock(); + try { + // Now we can do remove and update + this.childQueues.remove(queue); + this.scheduler.getCapacitySchedulerQueueManager() + .removeQueue(queue.getQueuePath()); + + // Call updateClusterResource, which will deal with all effectiveMin/MaxResource + // Calculation + this.updateClusterResource(csContext.getClusterResource(), + new ResourceLimits(this.csContext.getClusterResource())); + + } finally { + writeLock.unlock(); + } + } + /** * Check whether this queue supports adding additional child queues * dynamically. @@ -697,6 +718,11 @@ public void submitApplication(ApplicationId applicationId, String user, // Sanity check validateSubmitApplication(applicationId, user, queue); + // Signal to queue submit time in dynamic queue + if (this.isDynamicQueue()) { + signalToSubmitToQueue(); + } + addApplication(applicationId, user); } finally { writeLock.unlock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/AutoDeletionForExpiredQueuePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/AutoDeletionForExpiredQueuePolicy.java new file mode 100644 index 00000000000..fd2d7ecd435 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/AutoDeletionForExpiredQueuePolicy.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.queuemanagement; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.*; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AutoCreatedQueueDeletionCheckEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.QueueManagementChangeEvent; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +/** + * Auto deletion policy for auto created queue V2. + * Just for weight based auto created queues. + + * + */ +public class AutoDeletionForExpiredQueuePolicy implements SchedulingEditPolicy { + private static final Logger LOG = + LoggerFactory.getLogger(AutoDeletionForExpiredQueuePolicy.class); + + private Clock clock; + + // Pointer to other RM components + private RMContext rmContext; + private ResourceCalculator rc; + private CapacityScheduler scheduler; + + private long monitoringInterval; + + // Just should get leaf queue in new Auto created queue + // Because auto created parent will update from leaf. + private Set newDynamicQueuesForAutoDeletion = new HashSet<>(); + + @Override + public void init(final Configuration config, final RMContext context, + final ResourceScheduler sched) { + LOG.info("Auto Deletion Policy monitor: {}" + this. + getClass().getCanonicalName()); + assert null == scheduler : "Unexpected duplicate call to init"; + if (!(sched instanceof CapacityScheduler)) { + throw new YarnRuntimeException("Class " + + sched.getClass().getCanonicalName() + " not instance of " + + CapacityScheduler.class.getCanonicalName()); + } + rmContext = context; + scheduler = (CapacityScheduler) sched; + clock = scheduler.getClock(); + + rc = scheduler.getResourceCalculator(); + + CapacitySchedulerConfiguration csConfig = scheduler.getConfiguration(); + + monitoringInterval = csConfig.getLong( + CapacitySchedulerConfiguration.QUEUE_MANAGEMENT_MONITORING_INTERVAL, + CapacitySchedulerConfiguration. + DEFAULT_QUEUE_MANAGEMENT_MONITORING_INTERVAL); + + initQueues(); + } + + /** + * Reinitializes queues(Called on scheduler.reinitialize) + * @param config Configuration + * @param context The resourceManager's context + * @param sched The scheduler + */ + public void reinitialize(final Configuration config, final RMContext context, + final ResourceScheduler sched) { + //TODO - Wire with scheduler reinitialize and remove initQueues below? + initQueues(); + } + + private void initQueues() { + newDynamicQueuesForAutoDeletion.clear(); + for (Map.Entry queues : scheduler + .getCapacitySchedulerQueueManager() + .getQueues().entrySet()) { + + String queueName = queues.getKey(); + CSQueue queue = queues.getValue(); + + // check for leaf queue + // and the parent queue without child + if ((queue instanceof LeafQueue && + ((LeafQueue) queue).isDynamicQueue()) + || (queue instanceof ParentQueue && + queue.getChildQueues().size() == 0 && + ((ParentQueue) queue).isDynamicQueue())) { + newDynamicQueuesForAutoDeletion.add(queueName); + } + } + } + + @Override + public void editSchedule() { + long startTs = clock.getTime(); + + initQueues(); + AutoDeletionForExpiredQueues(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Total time used=" + (clock.getTime() - startTs) + " ms."); + } + } + + @VisibleForTesting + private void AutoDeletionForExpiredQueues() { + // Proceed new auto created queues + if (newDynamicQueuesForAutoDeletion.size() > 0) { + for (String queueName : newDynamicQueuesForAutoDeletion) { + CSQueue checkQueue = + scheduler.getCapacitySchedulerQueueManager(). + getQueue(queueName); + //Void now nothing to return + queueAutoDeletionCheck(checkQueue); + } + } + + } + + @VisibleForTesting + private void queueAutoDeletionCheck(CSQueue checkQueue) { + //Scheduler update is asynchronous + if (checkQueue != null) { + AutoCreatedQueueDeletionCheckEvent autoCreatedQueueDeletionCheckEvent = + new AutoCreatedQueueDeletionCheckEvent(checkQueue); + scheduler.getRMContext().getDispatcher().getEventHandler().handle( + autoCreatedQueueDeletionCheckEvent); + } + } + + @Override + public long getMonitoringInterval() { + return monitoringInterval; + } + + @Override + public String getPolicyName() { + return "AutoDeletionForExpiredQueuePolicy"; + } + + public ResourceCalculator getResourceCalculator() { + return rc; + } + + public RMContext getRmContext() { + return rmContext; + } + + public ResourceCalculator getRC() { + return rc; + } + + public CapacityScheduler getScheduler() { + return scheduler; + } + + public Set getNewDynamicQueuesForAutoDeletion() { + return newDynamicQueuesForAutoDeletion; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AutoCreatedQueueDeletionCheckEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AutoCreatedQueueDeletionCheckEvent.java new file mode 100644 index 00000000000..5da9887d44a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AutoCreatedQueueDeletionCheckEvent.java @@ -0,0 +1,15 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; + +public class AutoCreatedQueueDeletionCheckEvent extends SchedulerEvent{ + private CSQueue checkQueue; + public AutoCreatedQueueDeletionCheckEvent(CSQueue checkQueue) { + super(SchedulerEventType.AUTO_QUEUE_DELETION); + this.checkQueue = checkQueue; + } + + public CSQueue getCheckQueue() { + return checkQueue; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java index 869bf0ed9e4..3b8a1de64e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java @@ -55,5 +55,8 @@ MARK_CONTAINER_FOR_NONKILLABLE, //Queue Management Change - MANAGE_QUEUE + MANAGE_QUEUE, + + // Auto created queue, auto deletion check + AUTO_QUEUE_DELETION } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java index 0c5375e83ef..20b49dafdd1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java @@ -18,14 +18,25 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.queuemanagement.AutoDeletionForExpiredQueuePolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -43,6 +54,8 @@ private CapacityScheduler cs; private CapacitySchedulerConfiguration csConf; private CapacitySchedulerAutoQueueHandler autoQueueHandler; + private AutoDeletionForExpiredQueuePolicy policy = new + AutoDeletionForExpiredQueuePolicy(); /* Create the following structure: @@ -68,6 +81,9 @@ public void setUp() throws Exception { csConf.setAutoQueueCreationV2Enabled("root", true); csConf.setAutoQueueCreationV2Enabled("root.a", true); csConf.setAutoQueueCreationV2Enabled("root.e", true); + // Test for auto deletion when expired + csConf.setAutoExpiredDeletionEnabled(true); + csConf.setAutoExpiredDeletionTime(1); } private void startScheduler() throws Exception { @@ -80,6 +96,8 @@ protected RMNodeLabelsManager createNodeLabelManager() { }; cs = (CapacityScheduler) mockRM.getResourceScheduler(); cs.updatePlacementRules(); + // Policy for new auto created queue's auto deletion when expired + policy.init(cs.getConfiguration(), cs.getRMContext(), cs); mockRM.start(); cs.start(); autoQueueHandler = new CapacitySchedulerAutoQueueHandler( @@ -461,7 +479,8 @@ public void testChildlessParentQueueWhenAutoQueueCreationEnabled() "for auto queue creation", ((ParentQueue)empty).isEligibleForAutoQueueCreation()); } - + + @Test public void testAutoCreateQueueUserLimitDisabled() throws Exception { startScheduler(); createBasicQueueStructureAndValidate(); @@ -478,7 +497,7 @@ public void testAutoCreateQueueUserLimitDisabled() throws Exception { Assert.assertTrue(user0.isDynamicQueue()); Assert.assertTrue(user0 instanceof LeafQueue); - LeafQueue user0LeafQueue = (LeafQueue)user0; + LeafQueue user0LeafQueue = (LeafQueue) user0; // Assert user limit factor is -1 Assert.assertTrue(user0LeafQueue.getUserLimitFactor() == -1); @@ -489,14 +508,100 @@ public void testAutoCreateQueueUserLimitDisabled() throws Exception { // Assert AM Resource Assert.assertEquals(user0LeafQueue.getAMResourceLimit().getMemorySize(), - user0LeafQueue.getMaxAMResourcePerQueuePercent()*MAX_MEMORY*GB, 1e-6); + user0LeafQueue.getMaxAMResourcePerQueuePercent() * MAX_MEMORY * GB, 1e-6); // Assert user limit (no limit) when limit factor is -1 - Assert.assertEquals(MAX_MEMORY*GB, + Assert.assertEquals(MAX_MEMORY * GB, user0LeafQueue.getEffectiveMaxCapacityDown("", user0LeafQueue.getMinimumAllocation()).getMemorySize(), 1e-6); } + @Test + public void testEditScheduleForNewAutoDeletion() throws Exception { + startScheduler(); + policy.editSchedule(); + Assert.assertEquals(0, policy.getNewDynamicQueuesForAutoDeletion().size()); + + // Make e as a dynamic parent + createQueue("root.e.e1"); + + AbstractCSQueue e = (AbstractCSQueue) cs.getQueue("root.e"); + Assert.assertNotNull(e); + Assert.assertTrue(e.isDynamicQueue()); + + // Make sure e1 not null + AbstractCSQueue e1 = (AbstractCSQueue)cs.getQueue("root.e.e1"); + Assert.assertNotNull(e1); + Assert.assertTrue(e1.isDynamicQueue()); + + // signal it because of without submit created + e1.setLastSubmittedTimestamp(Time.monotonicNow() - 10*1000); + + ApplicationAttemptId user0AppAttemptId = + submitApp(cs , USER0, USER0, "root.e"); + + AbstractCSQueue user0 = (AbstractCSQueue) cs.getQueue( + "root.e." + USER0); + Assert.assertNotNull(user0); + Assert.assertTrue(user0.isDynamicQueue()); + Assert.assertTrue(user0.getNumApplications() == 1); + // Test expired for deletion + user0.setLastSubmittedTimestamp(Time.monotonicNow() - 10*1000); + // Mock signal to parent submit time + e.setLastSubmittedTimestamp(Time.monotonicNow() - 10*1000); + + // Make app finished + AppAttemptRemovedSchedulerEvent event = + new AppAttemptRemovedSchedulerEvent(user0AppAttemptId, + RMAppAttemptState.FINISHED, false); + cs.handle(event); + AppRemovedSchedulerEvent rEvent = new AppRemovedSchedulerEvent( + user0AppAttemptId.getApplicationId(), RMAppState.FINISHED); + cs.handle(rEvent); + + Assert.assertTrue(user0.getNumApplications() == 0); + + // New auto created deletion auto when expired + policy.editSchedule(); + Assert.assertEquals(2, policy.getNewDynamicQueuesForAutoDeletion().size()); + // Wait the policy finished + Thread.sleep(500); + + // Make sure e1 has been deleted for expired + // expire time is 1s + e1 = (AbstractCSQueue) cs.getQueue( + "root.e.e1"); + Assert.assertNull(e1); + + // Make sure user0 has been deleted for expired + user0 = (AbstractCSQueue) cs.getQueue( + "root.e." + USER0); + + // Expired for deleted + Assert.assertNull(user0); + + // Parent queue will not be deleted + // because last policy scheduled expired with child queues. + Assert.assertNotNull(e); + + // Now scheduled for parent without child queues + // Expired for deletion. + policy.editSchedule(); + // Wait the policy finished + Thread.sleep(500); + + // There are one parent queue + Assert.assertEquals(1, policy.getNewDynamicQueuesForAutoDeletion().size()); + + // Parent queue will be deleted + Assert.assertNotNull(e); + + // Now there are no dynamic queues for auto deletion. + policy.editSchedule(); + Assert.assertEquals(0, policy.getNewDynamicQueuesForAutoDeletion().size()); + + } + private LeafQueue createQueue(String queuePath) throws YarnException { return autoQueueHandler.autoCreateQueue( CSQueueUtils.extractQueuePath(queuePath));