diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index f95b30b1e5a..0ba6d3a566e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -490,6 +490,11 @@ public void removeQueue(String queueName) throws YarnException { + " does not support removing queues"); } + public void removeQueue(CSQueue queueName) throws YarnException { + throw new YarnException(getClass().getSimpleName() + + " does not support removing queues"); + } + @Override public void addQueue(Queue newQueue) throws YarnException, IOException { throw new YarnException(getClass().getSimpleName() diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 12ce05f2791..7e55b4bb759 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -29,6 +29,7 @@ import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -154,6 +155,11 @@ // is it a dynamic queue? private boolean dynamicQueue = false; + // When this queue has application submit to? + // This property only applies to dynamic queue, + // and will be used to check when the queue need to be removed. + private long lastSubmittedTimestamp; + public AbstractCSQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { this(cs, cs.getConfiguration(), queueName, parent, old); @@ -1633,4 +1639,45 @@ public void setDynamicQueue(boolean dynamicQueue) { writeLock.unlock(); } } + + public boolean isEligibleForAutoDeletion() { return false; } + + public boolean isInactiveDynamicQueue() { + long idleDurationSeconds = + (Time.monotonicNow() - getLastSubmittedTimestamp())/1000; + return isDynamicQueue() && isEligibleForAutoDeletion() && + (idleDurationSeconds > this.csContext.getConfiguration(). + getAutoExpiredDeletionTime()); + } + + // "Tab" the queue, so this queue won't be removed because of idle timeout. + public void signalToSubmitToQueue() { + writeLock.lock(); + try { + this.lastSubmittedTimestamp = Time.monotonicNow(); + } finally { + writeLock.unlock(); + } + } + + public long getLastSubmittedTimestamp() { + readLock.lock(); + + try { + return lastSubmittedTimestamp; + } finally { + readLock.unlock(); + } + } + + // just for test + public void setLastSubmittedTimestamp(long lastSubmittedTimestamp) { + writeLock.lock(); + try { + this.lastSubmittedTimestamp = lastSubmittedTimestamp; + } finally { + writeLock.unlock(); + } + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 158c9cd7daa..f53d0565993 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -143,9 +143,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; - import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event .QueueManagementChangeEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AutoCreatedQueueDeletionEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; @@ -2092,11 +2092,40 @@ public void handle(SchedulerEvent event) { } } break; + case AUTO_QUEUE_DELETION: + try { + AutoCreatedQueueDeletionEvent autoCreatedQueueDeletionEvent = + (AutoCreatedQueueDeletionEvent) event; + if (autoCreatedQueueDeletionEvent != null) { + removeAutoCreatedQueue + (autoCreatedQueueDeletionEvent.getCheckQueue()); + } + }catch (SchedulerDynamicEditException sde) { + LOG.error("Dynamic queue deletion cannot be applied for " + + "queue : " , sde); + } + break; default: LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!"); } } + private void removeAutoCreatedQueue(CSQueue checkQueue) + throws SchedulerDynamicEditException{ + // Expired queue, when there are no applications in queue, + // and the last submit time has been expired. + // Delete queue when expired deletion enabled. + writeLock.lock(); + try { + if (checkQueue instanceof AbstractCSQueue + && ((AbstractCSQueue) checkQueue).isInactiveDynamicQueue()) { + removeQueue(checkQueue); + } + } finally { + writeLock.lock(); + } + } + private void updateNodeAttributes( NodeAttributesUpdateSchedulerEvent attributeUpdateEvent) { writeLock.lock(); @@ -2550,6 +2579,48 @@ public void removeQueue(String queueName) } } + @Override + public void removeQueue(CSQueue queue) + throws SchedulerDynamicEditException { + writeLock.lock(); + try { + LOG.info("Removing queue: " + queue.getQueuePath()); + if (!((AbstractCSQueue)queue).isDynamicQueue()) { + throw new SchedulerDynamicEditException( + "The queue that we are asked " + + "to remove (" + queue.getQueuePath() + + ") is not a DynamicQueue"); + } + + if (!((AbstractCSQueue) queue).isEligibleForAutoDeletion()) { + LOG.warn("Queue " + queue.getQueuePath() + + " is marked for deletion, but not eligible for deletion"); + return; + } + + ParentQueue parentQueue = (ParentQueue)queue.getParent(); + if (parentQueue != null) { + ((ParentQueue) queue.getParent()).removeChildQueue(queue); + } else { + throw new SchedulerDynamicEditException( + "The queue " + queue.getQueuePath() + + " can't removed normally."); + } + + if (parentQueue.childQueues.contains(queue) || + queueManager.getQueue(queue.getQueuePath()) != null) { + throw new SchedulerDynamicEditException( + "The queue " + queue.getQueuePath() + + " has not been removed normally."); + } + + LOG.info("Removed queue: " + queue.getQueuePath()); + + } finally { + writeLock.unlock(); + } + } + @Override public void addQueue(Queue queue) throws SchedulerDynamicEditException, IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index abbc2d7875f..c0f1bf47000 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -2010,11 +2010,11 @@ public void setDefaultLifetimePerQueue(String queue, long defaultLifetime) { @Private private static final String AUTO_QUEUE_CREATION_V2_PREFIX = - "auto-queue-creation-v2"; + "auto-queue-creation-v2."; @Private public static final String AUTO_QUEUE_CREATION_V2_ENABLED = - AUTO_QUEUE_CREATION_V2_PREFIX + ".enabled"; + AUTO_QUEUE_CREATION_V2_PREFIX + "enabled"; @Private public static final boolean DEFAULT_AUTO_QUEUE_CREATION_ENABLED = false; @@ -2155,6 +2155,62 @@ public int getAutoCreatedQueuesMaxChildQueuesLimit(String queuePath) { public static final long DEFAULT_QUEUE_MANAGEMENT_MONITORING_INTERVAL = 1500L; + @Private + public static final boolean + DEFAULT_AUTO_CREATE_CHILD_QUEUE_AUTO_REMOVAL_ENABLE = true; + + @Private + public static final String AUTO_CREATE_CHILD_QUEUE_AUTO_REMOVAL_ENABLE = + AUTO_QUEUE_CREATION_V2_PREFIX + "queue-auto-removal.enable"; + + // 300s for expired defualt + @Private + public static final long + DEFAULT_AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME = 300; + + @Private + public static final String AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME = + AUTO_QUEUE_CREATION_V2_PREFIX + "queue-expiration-time"; + + /** + * If true, auto created queue with weight mode + * will be deleted when queue is expired. + * @return true if auto created queue's deletion when expired is enabled + * else false. Default + * is true. + */ + @Private + public boolean isAutoExpiredDeletionEnabled(String queuePath) { + boolean isAutoExpiredDeletionEnabled = getBoolean( + getQueuePrefix(queuePath) + + AUTO_CREATE_CHILD_QUEUE_AUTO_REMOVAL_ENABLE, + DEFAULT_AUTO_CREATE_CHILD_QUEUE_AUTO_REMOVAL_ENABLE); + return isAutoExpiredDeletionEnabled; + } + + @Private + @VisibleForTesting + public void setAutoExpiredDeletionEnabled(String queuePath, + boolean autoRemovalEnable) { + setBoolean(getQueuePrefix(queuePath) + + AUTO_CREATE_CHILD_QUEUE_AUTO_REMOVAL_ENABLE, + autoRemovalEnable); + } + + @Private + @VisibleForTesting + public void setAutoExpiredDeletionTime(long time) { + setLong(AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME, time); + } + + @Private + @VisibleForTesting + public long getAutoExpiredDeletionTime() { + return getLong(AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME, + DEFAULT_AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME); + } + + /** * Queue Management computation policy for Auto Created queues * @param queue The queue's path diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 6bf8d0a471a..3533db4842c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -603,6 +603,9 @@ public void submitApplication(ApplicationId applicationId, String userName, // Careful! Locking order is important! validateSubmitApplication(applicationId, userName, queue); + // Signal for expired auto deletion. + signalToSubmitToQueue(); + // Inform the parent queue try { getParent().submitApplication(applicationId, userName, queue); @@ -2391,4 +2394,11 @@ boolean removeNonRunnableApp(FiCaSchedulerApp app) { } return appsToReturn; } + + @Override + public boolean isEligibleForAutoDeletion() { + return isDynamicQueue() && getNumApplications() == 0 + && csContext.getConfiguration(). + isAutoExpiredDeletionEnabled(this.getQueuePath()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index b412e8a1dde..b944131ed39 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -560,6 +560,7 @@ private CSQueue addDynamicChildQueue(String childQueuePath, boolean isLeaf) CSQueue newQueue = createNewQueue(childQueuePath, isLeaf); this.childQueues.add(newQueue); + signalToSubmitToQueue(); // Call updateClusterResource // , which will deal with all effectiveMin/MaxResource @@ -573,6 +574,28 @@ private CSQueue addDynamicChildQueue(String childQueuePath, boolean isLeaf) } } + + // New method to remove child queue + public void removeChildQueue(CSQueue queue) + throws SchedulerDynamicEditException { + writeLock.lock(); + try { + // Now we can do remove and update + this.childQueues.remove(queue); + this.scheduler.getCapacitySchedulerQueueManager() + .removeQueue(queue.getQueuePath()); + + // Call updateClusterResource, + // which will deal with all effectiveMin/MaxResource + // Calculation + this.updateClusterResource(csContext.getClusterResource(), + new ResourceLimits(this.csContext.getClusterResource())); + + } finally { + writeLock.unlock(); + } + } + /** * Check whether this queue supports adding additional child queues * dynamically. @@ -1597,4 +1620,11 @@ void decrementRunnableApps() { Map getEffectiveMinRatioPerResource() { return effectiveMinRatioPerResource; } + + @Override + public boolean isEligibleForAutoDeletion() { + return isDynamicQueue() && getChildQueues().size() == 0 && + csContext.getConfiguration(). + isAutoExpiredDeletionEnabled(this.getQueuePath()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/AutoCreatedQueueDeletionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/AutoCreatedQueueDeletionPolicy.java new file mode 100644 index 00000000000..ad2b3bf022b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/AutoCreatedQueueDeletionPolicy.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .queuemanagement; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AutoCreatedQueueDeletionEvent; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * Auto deletion policy for auto created queue V2. + * Just for weight based auto created queues. + + * + */ +public class AutoCreatedQueueDeletionPolicy implements SchedulingEditPolicy { + private static final Logger LOG = + LoggerFactory.getLogger(AutoCreatedQueueDeletionPolicy.class); + + private Clock clock; + + // Pointer to other RM components + private RMContext rmContext; + private ResourceCalculator rc; + private CapacityScheduler scheduler; + + private long monitoringInterval; + + // markedForDeletion: in each interval, + // this set is extended by queues that are eligible for auto deletion. + private Set markedForDeletion = new HashSet<>(); + // sentForDeletion: if in the next interval, + // there is queue, that is eligible for auto deletion, + // and is already marked for deletion, move it to this queue. + private Set sentForDeletion = new HashSet<>(); + + @Override + public void init(final Configuration config, final RMContext context, + final ResourceScheduler sched) { + LOG.info("Auto Deletion Policy monitor: {}" + this. + getClass().getCanonicalName()); + if (!(sched instanceof CapacityScheduler)) { + throw new YarnRuntimeException("Class " + + sched.getClass().getCanonicalName() + " not instance of " + + CapacityScheduler.class.getCanonicalName()); + } + rmContext = context; + scheduler = (CapacityScheduler) sched; + clock = scheduler.getClock(); + + rc = scheduler.getResourceCalculator(); + + CapacitySchedulerConfiguration csConfig = scheduler.getConfiguration(); + + // The monitor time will equal the + // auto deletion expired time default. + monitoringInterval = + csConfig.getLong(CapacitySchedulerConfiguration. + AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME, + CapacitySchedulerConfiguration. + DEFAULT_AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME); + + prepareForAutoDeletion(); + } + + private void prepareForAutoDeletion() { + Set newMarks = new HashSet<>(); + for (Map.Entry queueEntry : + scheduler.getCapacitySchedulerQueueManager().getQueues().entrySet()) { + String queuePath = queueEntry.getKey(); + CSQueue queue = queueEntry.getValue(); + if (queue instanceof AbstractCSQueue && + ((AbstractCSQueue) queue).isEligibleForAutoDeletion()) { + if (markedForDeletion.contains(queuePath)) { + sentForDeletion.add(queuePath); + markedForDeletion.remove(queuePath); + } else { + newMarks.add(queuePath); + } + } + } + markedForDeletion.clear(); + markedForDeletion.addAll(newMarks); + } + + @Override + public void editSchedule() { + long startTs = clock.getTime(); + + prepareForAutoDeletion(); + triggerAutoDeletionForExpiredQueues(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Total time used=" + (clock.getTime() - startTs) + " ms."); + } + } + + private void triggerAutoDeletionForExpiredQueues() { + // Proceed new auto created queues + for (String queueName : sentForDeletion) { + CSQueue checkQueue = + scheduler.getCapacitySchedulerQueueManager(). + getQueue(queueName); + queueAutoDeletion(checkQueue); + } + sentForDeletion.clear(); + } + + private void queueAutoDeletion(CSQueue checkQueue) { + //Scheduler update is asynchronous + if (checkQueue != null) { + AutoCreatedQueueDeletionEvent autoCreatedQueueDeletionEvent = + new AutoCreatedQueueDeletionEvent(checkQueue); + scheduler.getRMContext().getDispatcher().getEventHandler().handle( + autoCreatedQueueDeletionEvent); + } + } + + @Override + public long getMonitoringInterval() { + return monitoringInterval; + } + + @Override + public String getPolicyName() { + return AutoCreatedQueueDeletionPolicy.class.getCanonicalName(); + } + + @VisibleForTesting + public Set getMarkedForDeletion() { + return markedForDeletion; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AutoCreatedQueueDeletionEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AutoCreatedQueueDeletionEvent.java new file mode 100644 index 00000000000..68b86dda408 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AutoCreatedQueueDeletionEvent.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; + +public class AutoCreatedQueueDeletionEvent extends SchedulerEvent{ + private CSQueue checkQueue; + public AutoCreatedQueueDeletionEvent(CSQueue checkQueue) { + super(SchedulerEventType.AUTO_QUEUE_DELETION); + this.checkQueue = checkQueue; + } + + public CSQueue getCheckQueue() { + return checkQueue; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java index 869bf0ed9e4..3b8a1de64e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java @@ -55,5 +55,8 @@ MARK_CONTAINER_FOR_NONKILLABLE, //Queue Management Change - MANAGE_QUEUE + MANAGE_QUEUE, + + // Auto created queue, auto deletion check + AUTO_QUEUE_DELETION } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/TestSchedulingMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/TestSchedulingMonitor.java index 84126c72877..66ac332c535 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/TestSchedulingMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/TestSchedulingMonitor.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.queuemanagement.AutoCreatedQueueDeletionPolicy; import org.junit.Test; import java.util.HashSet; @@ -92,4 +93,44 @@ public void testRMUpdateSchedulingEditPolicy() throws Exception { cs.reinitialize(conf, rm.getRMContext()); assertTrue(smm.isRSMEmpty()); } + + @Test(timeout = 10000) + public void testRMUpdateAutoCreatedQueueDeletionPolicy() throws Exception { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true); + conf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, + AutoCreatedQueueDeletionPolicy.class.getCanonicalName()); + MockRM rm = new MockRM(conf); + rm.start(); + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + SchedulingMonitorManager smm = cs.getSchedulingMonitorManager(); + + // runningSchedulingMonitors should not be empty when initialize RM + // scheduler monitor + cs.reinitialize(conf, rm.getRMContext()); + assertFalse(smm.isRSMEmpty()); + + // make sure runningSchedulingPolicies contains all the configured policy + // in YARNConfiguration + String[] configuredPolicies = conf.getStrings( + YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES); + Set configurePoliciesSet = new HashSet<>(); + for (String s : configuredPolicies) { + configurePoliciesSet.add(s); + } + assertTrue(smm.isSameConfiguredPolicies(configurePoliciesSet)); + + // make sure the running monitor contains AutoCreatedQueueDeletionPolicy + assertTrue(configurePoliciesSet. + contains(AutoCreatedQueueDeletionPolicy.class.getCanonicalName())); + + // disable RM scheduler monitor + conf.setBoolean( + YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS); + cs.reinitialize(conf, rm.getRMContext()); + assertTrue(smm.isRSMEmpty()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java index 48dba821434..06084d55ceb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -26,12 +27,16 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; -import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.queuemanagement.AutoCreatedQueueDeletionPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.Assert; import org.junit.Before; @@ -50,6 +55,8 @@ private CapacityScheduler cs; private CapacitySchedulerConfiguration csConf; private CapacitySchedulerAutoQueueHandler autoQueueHandler; + private AutoCreatedQueueDeletionPolicy policy = new + AutoCreatedQueueDeletionPolicy(); /* Create the following structure: @@ -75,6 +82,8 @@ public void setUp() throws Exception { csConf.setAutoQueueCreationV2Enabled("root", true); csConf.setAutoQueueCreationV2Enabled("root.a", true); csConf.setAutoQueueCreationV2Enabled("root.e", true); + // Test for auto deletion when expired + csConf.setAutoExpiredDeletionTime(1); } private void startScheduler() throws Exception { @@ -87,6 +96,8 @@ protected RMNodeLabelsManager createNodeLabelManager() { }; cs = (CapacityScheduler) mockRM.getResourceScheduler(); cs.updatePlacementRules(); + // Policy for new auto created queue's auto deletion when expired + policy.init(cs.getConfiguration(), cs.getRMContext(), cs); mockRM.start(); cs.start(); autoQueueHandler = new CapacitySchedulerAutoQueueHandler( @@ -506,7 +517,7 @@ public void testAutoCreateQueueUserLimitDisabled() throws Exception { Assert.assertTrue(user0.isDynamicQueue()); Assert.assertTrue(user0 instanceof LeafQueue); - LeafQueue user0LeafQueue = (LeafQueue)user0; + LeafQueue user0LeafQueue = (LeafQueue) user0; // Assert user limit factor is -1 Assert.assertTrue(user0LeafQueue.getUserLimitFactor() == -1); @@ -517,14 +528,163 @@ public void testAutoCreateQueueUserLimitDisabled() throws Exception { // Assert AM Resource Assert.assertEquals(user0LeafQueue.getAMResourceLimit().getMemorySize(), - user0LeafQueue.getMaxAMResourcePerQueuePercent()*MAX_MEMORY*GB, 1e-6); + user0LeafQueue.getMaxAMResourcePerQueuePercent() * MAX_MEMORY * GB, 1e-6); // Assert user limit (no limit) when limit factor is -1 - Assert.assertEquals(MAX_MEMORY*GB, + Assert.assertEquals(MAX_MEMORY * GB, user0LeafQueue.getEffectiveMaxCapacityDown("", user0LeafQueue.getMinimumAllocation()).getMemorySize(), 1e-6); } + @Test + public void testEditScheduleForNewAutoDeletion() throws Exception { + startScheduler(); + policy.editSchedule(); + Assert.assertEquals(0, policy.getMarkedForDeletion().size()); + + // Make e as a dynamic parent + createQueue("root.e.e1"); + + AbstractCSQueue e = (AbstractCSQueue) cs.getQueue("root.e"); + Assert.assertNotNull(e); + Assert.assertTrue(e.isDynamicQueue()); + + // Make sure e1 not null + AbstractCSQueue e1 = (AbstractCSQueue)cs.getQueue("root.e.e1"); + Assert.assertNotNull(e1); + Assert.assertTrue(e1.isDynamicQueue()); + + // signal it because of without submit created + e1.setLastSubmittedTimestamp(Time.monotonicNow() - 10*1000); + + ApplicationAttemptId user0AppAttemptId = + submitApp(cs , USER0, USER0, "root.e"); + + AbstractCSQueue user0 = (AbstractCSQueue) cs.getQueue( + "root.e." + USER0); + Assert.assertNotNull(user0); + Assert.assertTrue(user0.isDynamicQueue()); + Assert.assertTrue(user0.getNumApplications() == 1); + // Test expired for deletion + user0.setLastSubmittedTimestamp(Time.monotonicNow() - 10*1000); + // Mock signal to parent submit time + e.setLastSubmittedTimestamp(Time.monotonicNow() - 10*1000); + + // Make app finished + AppAttemptRemovedSchedulerEvent event = + new AppAttemptRemovedSchedulerEvent(user0AppAttemptId, + RMAppAttemptState.FINISHED, false); + cs.handle(event); + AppRemovedSchedulerEvent rEvent = new AppRemovedSchedulerEvent( + user0AppAttemptId.getApplicationId(), RMAppState.FINISHED); + cs.handle(rEvent); + + Assert.assertTrue(user0.getNumApplications() == 0); + + // New auto created deletion auto when expired, + // will add to mark deletion at first time. + policy.editSchedule(); + Assert.assertEquals(2, policy.getMarkedForDeletion().size()); + + // Then will send to deletion at second time. + policy.editSchedule(); + Assert.assertEquals(0, policy.getMarkedForDeletion().size()); + + // Wait for deletion + Thread.sleep(100); + + // Make sure e1 has been deleted for expired + // expire time is 1s + e1 = (AbstractCSQueue) cs.getQueue( + "root.e.e1"); + Assert.assertNull(e1); + + // Make sure user0 has been deleted for expired + user0 = (AbstractCSQueue) cs.getQueue( + "root.e." + USER0); + + // Expired for deleted + Assert.assertNull(user0); + + // Parent queue will not be deleted + // because last policy scheduled expired with child queues. + e = (AbstractCSQueue) cs.getQueue("root.e"); + Assert.assertNotNull(e); + + // Now scheduled for parent without child queues + // Mark expired for deletion. + policy.editSchedule(); + Assert.assertEquals(1, policy.getMarkedForDeletion().size()); + + // Now scheduled for parent without child queues + // send to for deletion. + policy.editSchedule(); + Assert.assertEquals(0, policy.getMarkedForDeletion().size()); + + // Wait the policy finished + Thread.sleep(100); + + // Parent queue will be deleted + e = (AbstractCSQueue) cs.getQueue("root.e"); + Assert.assertNull(e); + + // Now there are no dynamic queues marked for auto deletion. + policy.editSchedule(); + Assert.assertEquals(0, policy.getMarkedForDeletion().size()); + } + + @Test + public void testEditScheduleWhenNewAutoDeletionDisabled() throws Exception { + // Test for disabled auto deletion + csConf.setAutoExpiredDeletionEnabled("root.e.e1", + false); + startScheduler(); + policy.editSchedule(); + Assert.assertEquals(0, policy.getMarkedForDeletion().size()); + + // Make e as a dynamic parent + createQueue("root.e.e1"); + + AbstractCSQueue e = (AbstractCSQueue) cs.getQueue("root.e"); + Assert.assertNotNull(e); + Assert.assertTrue(e.isDynamicQueue()); + + // Make sure e1 not null + AbstractCSQueue e1 = (AbstractCSQueue)cs.getQueue("root.e.e1"); + Assert.assertNotNull(e1); + Assert.assertTrue(e1.isDynamicQueue()); + + // signal it because of without submit created + e1.setLastSubmittedTimestamp(Time.monotonicNow() - 10*1000); + Assert.assertFalse(csConf. + isAutoExpiredDeletionEnabled("root.e.e1")); + + // Now there are no dynamic queues marked for auto deletion. + policy.editSchedule(); + Assert.assertEquals(0, policy.getMarkedForDeletion().size()); + + Assert.assertNotNull(e1); + + // Enabled now + csConf.setAutoExpiredDeletionEnabled("root.e.e1", + true); + cs.reinitialize(csConf, cs.getRMContext()); + Assert.assertTrue(csConf. + isAutoExpiredDeletionEnabled("root.e.e1")); + + policy.editSchedule(); + Assert.assertEquals(1, policy.getMarkedForDeletion().size()); + + policy.editSchedule(); + Assert.assertEquals(0, policy.getMarkedForDeletion().size()); + + // Wait the policy finished + Thread.sleep(100); + + e1 = (AbstractCSQueue)cs.getQueue("root.e.e1"); + Assert.assertNull(e1); + } + private LeafQueue createQueue(String queuePath) throws YarnException { return autoQueueHandler.autoCreateQueue( CSQueueUtils.extractQueuePath(queuePath));