diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java index d8d90518305..da8c22cc0b7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java @@ -114,6 +114,7 @@ public void serviceInit(Configuration conf) throws Exception { reloadThread = new Thread(() -> { while (running) { try { + reloadListener.onCheck(); long time = clock.getTime(); long lastModified = fs.getFileStatus(allocFile).getModificationTime(); @@ -351,5 +352,7 @@ private ReservationQueueConfiguration createReservationQueueConfig( public interface Listener { void onReload(AllocationConfiguration info) throws IOException; + + void onCheck(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 49d216694db..e7da16f59ac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -21,7 +21,9 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -34,6 +36,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; @@ -56,6 +59,8 @@ // apps that are runnable private final List runnableApps = new ArrayList<>(); private final List nonRunnableApps = new ArrayList<>(); + // assignedApps keeps track of applications that have no appAttempts + private final Set assignedApps = new HashSet<>(); // get a lock with fair distribution for app list updates private final ReadWriteLock rwl = new ReentrantReadWriteLock(true); private final Lock readLock = rwl.readLock(); @@ -89,6 +94,9 @@ void addApp(FSAppAttempt app, boolean runnable) { } else { nonRunnableApps.add(app); } + // when an appAttempt is created for an application, we'd like to move + // it over from assignedApps to either runnableApps or nonRunnableApps + assignedApps.remove(app.getApplicationId()); incUsedResource(app.getResourceUsage()); } finally { writeLock.unlock(); @@ -440,6 +448,15 @@ public int getNumPendingApps() { return numPendingApps; } + public int getNumAssignedApps() { + readLock.lock(); + try { + return assignedApps.size(); + } finally { + readLock.unlock(); + } + } + /** * TODO: Based on how frequently this is called, we might want to club * counting pending and active apps in the same method. @@ -609,4 +626,18 @@ protected void dumpStateInternal(StringBuilder sb) { ", LastTimeAtMinShare: " + lastTimeAtMinShare + "}"); } + + /** + * This method is called when an application is assigned to this queue + * for book-keeping purposes (to be able to determine if the queue is empty). + * @param applicationId the application's id + */ + public void addAssignedApp(ApplicationId applicationId) { + writeLock.lock(); + try { + assignedApps.add(applicationId); + } finally { + writeLock.unlock(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index 4babfd5659a..6b88a329fa3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -83,6 +83,7 @@ private long minSharePreemptionTimeout = Long.MAX_VALUE; private float fairSharePreemptionThreshold = 0.5f; private boolean preemptable = true; + private boolean isDynamic = true; public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) { this.name = name; @@ -585,4 +586,12 @@ public String dumpState() { * @param sb the {code StringBuilder} which holds queue states */ protected abstract void dumpStateInternal(StringBuilder sb); + + public boolean isDynamic() { + return isDynamic; + } + + public void setDynamic(boolean dynamic) { + this.isDynamic = dynamic; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 1c4bd51473d..123f7110cfe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -99,6 +99,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Comparator; import java.util.EnumSet; import java.util.HashSet; @@ -516,6 +517,7 @@ protected void addApplication(ApplicationId applicationId, new SchedulerApplication(queue, user); applications.put(applicationId, application); queue.getMetrics().submitApp(user); + queue.addAssignedApp(applicationId); LOG.info("Accepted application " + applicationId + " from user: " + user + ", in queue: " + queue.getName() @@ -1589,6 +1591,7 @@ public void onReload(AllocationConfiguration queueInfo) // Commit the reload; also create any queue defined in the alloc file // if it does not already exist, so it can be displayed on the web UI. + Set removedStaticQueues = getRemovedStaticQueues(queueInfo); writeLock.lock(); try { if (queueInfo == null) { @@ -1599,6 +1602,7 @@ public void onReload(AllocationConfiguration queueInfo) setQueueAcls(allocConf.getQueueAcls()); allocConf.getDefaultSchedulingPolicy().initialize(getContext()); queueMgr.updateAllocationConfiguration(allocConf); + queueMgr.setQueuesToDynamic(removedStaticQueues); applyChildDefaults(); maxRunningEnforcer.updateRunnabilityOnReload(); } @@ -1606,6 +1610,27 @@ public void onReload(AllocationConfiguration queueInfo) writeLock.unlock(); } } + + private Set getRemovedStaticQueues( + AllocationConfiguration queueInfo) { + if (queueInfo == null || allocConf == null) { + return Collections.emptySet(); + } + Set removedStaticQueues = new HashSet<>(); + for (Set queues : allocConf.getConfiguredQueues().values()) { + removedStaticQueues.addAll(queues); + } + for (Set queues : queueInfo.getConfiguredQueues().values()) { + removedStaticQueues.removeAll(queues); + } + return removedStaticQueues; + } + + @Override + public void onCheck() { + queueMgr.removeEmptyDynamicQueues(); + queueMgr.removePendingIncompatibleQueues(); + } } private void setQueueAcls( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java index 8734877f608..96deae5a3b9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; @@ -29,6 +30,8 @@ import javax.xml.parsers.ParserConfigurationException; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -52,6 +55,34 @@ public static final Log LOG = LogFactory.getLog( QueueManager.class.getName()); + private class IncompatibleQueueRemovalTask { + + private final String queueToCreate; + private final FSQueueType queueType; + + private IncompatibleQueueRemovalTask(String queueToCreate, + FSQueueType queueType) { + this.queueToCreate = queueToCreate; + this.queueType = queueType; + } + + private void execute() { + Boolean removed = + removeEmptyIncompatibleQueues(queueToCreate, queueType); + if (Boolean.TRUE.equals(removed)) { + FSQueue queue = getQueue(queueToCreate, true, queueType, false); + if (queue != null && + // if queueToCreate is present in the allocation config, set it + // to static + scheduler.allocConf.configuredQueues.values().stream() + .anyMatch(s -> s.contains(queueToCreate))) { + queue.setDynamic(false); + } + incompatibleQueuesPendingRemoval.remove(this); + } + } + } + public static final String ROOT_QUEUE = "root"; private final FairScheduler scheduler; @@ -59,6 +90,8 @@ private final Collection leafQueues = new CopyOnWriteArrayList(); private final Map queues = new HashMap(); + private Set incompatibleQueuesPendingRemoval = + new HashSet<>(); private FSParentQueue rootQueue; public QueueManager(FairScheduler scheduler) { @@ -75,10 +108,13 @@ public void initialize(Configuration conf) throws IOException, // SchedulingPolicy.DEFAULT_POLICY since the allocation file hasn't been // loaded yet. rootQueue = new FSParentQueue("root", scheduler, null); + rootQueue.setDynamic(false); queues.put(rootQueue.getName(), rootQueue); // Create the default queue - getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, true); + FSLeafQueue defaultQueue = + getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, true); + defaultQueue.setDynamic(false); // Recursively reinitialize to propagate queue properties rootQueue.reinit(true); } @@ -121,7 +157,8 @@ public FSLeafQueue getLeafQueue( */ public boolean removeLeafQueue(String name) { name = ensureRootPrefix(name); - return removeEmptyIncompatibleQueues(name, FSQueueType.PARENT); + return Boolean.TRUE.equals( + removeEmptyIncompatibleQueues(name, FSQueueType.PARENT)); } @@ -346,9 +383,11 @@ void setChildResourceLimits(FSParentQueue parent, FSQueue child, * * We will never remove the root queue or the default queue in this way. * - * @return true if we can create queueToCreate or it already exists. + * @return true if there was an incompatible queue that has been removed, + * false it there was an incompatible queue that have not be removed, + * null if there is no incompatible queue. */ - private boolean removeEmptyIncompatibleQueues(String queueToCreate, + private Boolean removeEmptyIncompatibleQueues(String queueToCreate, FSQueueType queueType) { queueToCreate = ensureRootPrefix(queueToCreate); @@ -357,7 +396,7 @@ private boolean removeEmptyIncompatibleQueues(String queueToCreate, if (queueToCreate.equals(ROOT_QUEUE) || queueToCreate.startsWith( ROOT_QUEUE + "." + YarnConfiguration.DEFAULT_QUEUE_NAME + ".")) { - return false; + return null; } FSQueue queue = queues.get(queueToCreate); @@ -365,15 +404,14 @@ private boolean removeEmptyIncompatibleQueues(String queueToCreate, if (queue != null) { if (queue instanceof FSLeafQueue) { if (queueType == FSQueueType.LEAF) { - // if queue is already a leaf then return true - return true; + return null; } // remove incompatibility since queue is a leaf currently // needs to change to a parent. return removeQueueIfEmpty(queue); } else { if (queueType == FSQueueType.PARENT) { - return true; + return null; } // If it's an existing parent queue and needs to change to leaf, // remove it if it's empty. @@ -393,7 +431,40 @@ private boolean removeEmptyIncompatibleQueues(String queueToCreate, } sepIndex = queueToCreate.lastIndexOf('.', sepIndex-1); } - return true; + return null; + } + + public void removeEmptyDynamicQueues() { + synchronized (queues) { + Set parentQueuesToCheck = new HashSet<>(); + for (FSQueue queue : getQueues()) { + if (queue.isDynamic() && queue.getChildQueues().isEmpty()) { + boolean removed = removeQueueIfEmpty(queue); + if (removed && queue.getParent().isDynamic()) { + parentQueuesToCheck.add(queue.getParent()); + } + } + } + while (!parentQueuesToCheck.isEmpty()) { + FSParentQueue queue = parentQueuesToCheck.iterator().next(); + if (queue.getChildQueues().isEmpty()) { + removeQueue(queue); + if (queue.getParent().isDynamic()) { + parentQueuesToCheck.add(queue.getParent()); + } + } + parentQueuesToCheck.remove(queue); + } + } + } + + public void removePendingIncompatibleQueues() { + synchronized (queues) { + for (IncompatibleQueueRemovalTask removalTask : + ImmutableSet.copyOf(incompatibleQueuesPendingRemoval)) { + removalTask.execute(); + } + } } /** @@ -435,7 +506,8 @@ protected boolean isEmpty(FSQueue queue) { if (queue instanceof FSLeafQueue) { FSLeafQueue leafQueue = (FSLeafQueue)queue; return queue.getNumRunnableApps() == 0 && - leafQueue.getNumNonRunnableApps() == 0; + leafQueue.getNumNonRunnableApps() == 0 && + leafQueue.getNumAssignedApps() == 0; } else { for (FSQueue child : queue.getChildQueues()) { if (!isEmpty(child)) { @@ -501,21 +573,13 @@ public void updateAllocationConfiguration(AllocationConfiguration queueConf) { LOG.error("Setting scheduling policies for existing queues failed!"); } - for (String name : queueConf.getConfiguredQueues().get( - FSQueueType.LEAF)) { - if (removeEmptyIncompatibleQueues(name, FSQueueType.LEAF)) { - getLeafQueue(name, true, false); - } - } + ensureQueueExistsAndIsCompatibleAndIsStatic(queueConf, FSQueueType.LEAF); + // At this point all leaves and 'parents with // at least one child' would have been created. // Now create parents with no configured leaf. - for (String name : queueConf.getConfiguredQueues().get( - FSQueueType.PARENT)) { - if (removeEmptyIncompatibleQueues(name, FSQueueType.PARENT)) { - getParentQueue(name, true, false); - } - } + ensureQueueExistsAndIsCompatibleAndIsStatic(queueConf, + FSQueueType.PARENT); } // Initialize all queues recursively @@ -524,6 +588,34 @@ public void updateAllocationConfiguration(AllocationConfiguration queueConf) { rootQueue.recomputeSteadyShares(); } + private void ensureQueueExistsAndIsCompatibleAndIsStatic( + AllocationConfiguration queueConf, FSQueueType queueType) { + for (String name : queueConf.getConfiguredQueues().get(queueType)) { + Boolean removed = removeEmptyIncompatibleQueues(name, queueType); + if (Boolean.FALSE.equals(removed)) { + incompatibleQueuesPendingRemoval.add( + new IncompatibleQueueRemovalTask(name, queueType)); + } else { + FSQueue queue = getQueue(name, true, queueType, false); + if (queue != null) { + queue.setDynamic(false); + } + } + } + } + + /** + * Setting a set of queues to dynamic. + * @param queueNames The names of the queues to be set to dynamic + */ + protected void setQueuesToDynamic(Set queueNames) { + synchronized (queues) { + for (String queueName : queueNames) { + queues.get(queueName).setDynamic(true); + } + } + } + /** * Check whether queue name is valid, * return true if it is valid, otherwise return false. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java index 8591d67ab7c..c5b1793ee78 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java @@ -32,6 +32,8 @@ import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Test; +import org.mockito.Mockito; + import java.io.File; import java.io.FileOutputStream; import java.io.FileWriter; @@ -867,5 +869,9 @@ public void testReservableCannotBeCombinedWithDynamicUserQueue() public void onReload(AllocationConfiguration info) { allocConf = info; } + + @Override + public void onCheck() { + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java index eb2d402cd25..3674ffb40b6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java @@ -20,15 +20,22 @@ import static org.junit.Assert.*; import static org.mockito.Mockito.*; +import java.util.Collections; import java.util.HashSet; import java.util.Set; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; public class TestQueueManager { @@ -305,4 +312,334 @@ public void testCreateParentQueueAndParent() { assertEquals("createQueue() returned wrong queue", "root.queue1.queue2", q2.getName()); } + + @Test + public void testRemovalOfDynamicLeafQueue() { + AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); + + queueManager.updateAllocationConfiguration(allocConf); + + FSQueue q1 = queueManager.getLeafQueue("root.test.childB.dynamic1", true); + + assertNotNull("Queue root.test.childB.dynamic1 was not created", q1); + assertEquals("createQueue() returned wrong queue", + "root.test.childB.dynamic1", q1.getName()); + assertTrue("root.test.childB.dynamic1 is not a dynamic queue", + q1.isDynamic()); + + // an application is submitted to root.test.childB.dynamic1 + notEmptyQueues.add(q1); + + // root.test.childB.dynamic1 is not empty and should not be removed + queueManager.removePendingIncompatibleQueues(); + queueManager.removeEmptyDynamicQueues(); + q1 = queueManager.getLeafQueue("root.test.childB.dynamic1", false); + assertNotNull("Queue root.test.childB.dynamic1 was deleted", q1); + + // the application finishes, the next removeEmptyDynamicQueues() should + // clean root.test.childB.dynamic1 up, but keep its static parent + notEmptyQueues.remove(q1); + + queueManager.removePendingIncompatibleQueues(); + queueManager.removeEmptyDynamicQueues(); + q1 = queueManager.getLeafQueue("root.test.childB.dynamic1", false); + assertNull("Queue root.test.childB.dynamic1 was not deleted", q1); + assertNotNull("The static parent of root.test.childB.dynamic1 was deleted", + queueManager.getParentQueue("root.test.childB", false)); + } + + @Test + public void testRemovalOfDynamicParentQueue() { + AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); + + queueManager.updateAllocationConfiguration(allocConf); + + FSQueue q1 = queueManager.getLeafQueue("root.parent1.dynamic1", true); + + assertNotNull("Queue root.parent1.dynamic1 was not created", q1); + assertEquals("createQueue() returned wrong queue", + "root.parent1.dynamic1", q1.getName()); + assertTrue("root.parent1.dynamic1 is not a dynamic queue", q1.isDynamic()); + + FSQueue p1 = queueManager.getParentQueue("root.parent1", false); + assertNotNull("Queue root.parent1 was not created", p1); + assertTrue("root.parent1 is not a dynamic queue", p1.isDynamic()); + + queueManager.removePendingIncompatibleQueues(); + queueManager.removeEmptyDynamicQueues(); + q1 = queueManager.getLeafQueue("root.parent1.dynamic1", false); + p1 = queueManager.getParentQueue("root.parent1", false); + + assertNull("Queue root.parent1.dynamic1 was not deleted", q1); + assertNull("Queue root.parent1 was not deleted", p1); + } + + @Test + public void testNonEmptyDynamicQueueBecomingStaticQueue() { + AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); + + queueManager.updateAllocationConfiguration(allocConf); + + FSLeafQueue q1 = queueManager.getLeafQueue("root.leaf1", true); + + assertNotNull("Queue root.leaf1 was not created", q1); + assertEquals("createQueue() returned wrong queue", + "root.leaf1", q1.getName()); + assertTrue("root.leaf1 is not a dynamic queue", q1.isDynamic()); + + // pretend that we submitted an app to the queue + notEmptyQueues.add(q1); + + // non-empty queues should not be deleted + queueManager.removePendingIncompatibleQueues(); + queueManager.removeEmptyDynamicQueues(); + q1 = queueManager.getLeafQueue("root.leaf1", false); + assertNotNull("Queue root.leaf1 was deleted", q1); + + // next we add leaf1 under root in the allocation config + allocConf.configuredQueues.get(FSQueueType.LEAF).add("root.leaf1"); + queueManager.updateAllocationConfiguration(allocConf); + + // updateAllocationConfiguration() should make root.leaf1 a dynamic queue + assertFalse("root.leaf1 is not a static queue", q1.isDynamic()); + + // application finished now and the queue is empty, but since leaf1 is a + // static queue at this point, hence not affected by + // removeEmptyDynamicQueues() + notEmptyQueues.clear(); + queueManager.removePendingIncompatibleQueues(); + queueManager.removeEmptyDynamicQueues(); + q1 = queueManager.getLeafQueue("root.leaf1", false); + assertNotNull("Queue root.leaf1 was deleted", q1); + assertFalse("root.leaf1 is not a static queue", q1.isDynamic()); + } + + @Test + public void testNonEmptyStaticQueueBecomingDynamicQueue() { + AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); + queueManager.updateAllocationConfiguration(allocConf); + + FSLeafQueue q1 = queueManager.getLeafQueue("root.test.childA", false); + + assertNotNull("Queue root.test.childA does not exist", q1); + assertEquals("createQueue() returned wrong queue", + "root.test.childA", q1.getName()); + assertFalse("root.test.childA is not a static queue", q1.isDynamic()); + + // we submitted an app to the queue + notEmptyQueues.add(q1); + + // the next removeEmptyDynamicQueues() call should not modify + // root.test.childA + queueManager.removePendingIncompatibleQueues(); + queueManager.removeEmptyDynamicQueues(); + q1 = queueManager.getLeafQueue("root.test.childA", false); + assertNotNull("Queue root.test.childA was deleted", q1); + assertFalse("root.test.childA is not a dynamic queue", q1.isDynamic()); + + // next we remove all queues from the allocation config, + // this causes all queues to change to dynamic + for (Set queueNames : allocConf.configuredQueues.values()) { + queueManager.setQueuesToDynamic(queueNames); + queueNames.clear(); + } + queueManager.updateAllocationConfiguration(allocConf); + + q1 = queueManager.getLeafQueue("root.test.childA", false); + assertNotNull("Queue root.test.childA was deleted", q1); + assertTrue("root.test.childA is not a dynamic queue", q1.isDynamic()); + + // application finished - the queue does not have runnable app + // the next removeEmptyDynamicQueues() call should remove the queues + notEmptyQueues.remove(q1); + + queueManager.removePendingIncompatibleQueues(); + queueManager.removeEmptyDynamicQueues(); + + q1 = queueManager.getLeafQueue("root.test.childA", false); + assertNull("Queue root.test.childA was not deleted", q1); + + FSParentQueue p1 = queueManager.getParentQueue("root.test", false); + assertNull("Queue root.test was not deleted", p1); + } + + @Test + public void testRemovalOfChildlessParentQueue() { + AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); + queueManager.updateAllocationConfiguration(allocConf); + + FSParentQueue q1 = queueManager.getParentQueue("root.test.childB", false); + + assertNotNull("Queue root.test.childB was not created", q1); + assertEquals("createQueue() returned wrong queue", + "root.test.childB", q1.getName()); + assertFalse("root.test.childB is a dynamic queue", q1.isDynamic()); + + // static queues should not be deleted + queueManager.removePendingIncompatibleQueues(); + queueManager.removeEmptyDynamicQueues(); + q1 = queueManager.getParentQueue("root.test.childB", false); + assertNotNull("Queue root.test.childB was deleted", q1); + + // next we remove root.test.childB from the allocation config + allocConf.configuredQueues.get(FSQueueType.PARENT) + .remove("root.test.childB"); + queueManager.updateAllocationConfiguration(allocConf); + queueManager.setQueuesToDynamic(Collections.singleton("root.test.childB")); + + // the next removeEmptyDynamicQueues() call should clean + // root.test.childB up + queueManager.removePendingIncompatibleQueues(); + queueManager.removeEmptyDynamicQueues(); + q1 = queueManager.getParentQueue("root.leaf1", false); + assertNull("Queue root.leaf1 was not deleted", q1); + } + + @Test + public void testQueueTypeChange() { + AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); + queueManager.updateAllocationConfiguration(allocConf); + + FSQueue q1 = queueManager.getLeafQueue("root.parent1.leaf1", true); + assertNotNull("Queue root.parent1.leaf1 was not created", q1); + assertEquals("createQueue() returned wrong queue", + "root.parent1.leaf1", q1.getName()); + assertTrue("root.parent1.leaf1 is not a dynamic queue", q1.isDynamic()); + + FSQueue p1 = queueManager.getParentQueue("root.parent1", false); + assertNotNull("Queue root.parent1 was not created", p1); + assertTrue("root.parent1 is not a dynamic queue", p1.isDynamic()); + + // adding root.parent1.leaf1 and root.parent1 to the allocation config + allocConf.configuredQueues.get(FSQueueType.PARENT).add("root.parent1"); + allocConf.configuredQueues.get(FSQueueType.LEAF) + .add("root.parent1.leaf1"); + + // updateAllocationConfiguration() should change both queues over to static + queueManager.updateAllocationConfiguration(allocConf); + q1 = queueManager.getLeafQueue("root.parent1.leaf1", false); + assertFalse("root.parent1.leaf1 is not a static queue", q1.isDynamic()); + p1 = queueManager.getParentQueue("root.parent1", false); + assertFalse("root.parent1 is not a static queue", p1.isDynamic()); + + // removing root.parent1.leaf1 and root.parent1 from the allocation + // config + allocConf.configuredQueues.get(FSQueueType.PARENT).remove("root.parent1"); + allocConf.configuredQueues.get(FSQueueType.LEAF) + .remove("root.parent1.leaf1"); + + // updateAllocationConfiguration() should change both queues + // to dynamic + queueManager.updateAllocationConfiguration(allocConf); + queueManager.setQueuesToDynamic( + ImmutableSet.of("root.parent1", "root.parent1.leaf1")); + q1 = queueManager.getLeafQueue("root.parent1.leaf1", false); + assertTrue("root.parent1.leaf1 is not a dynamic queue", q1.isDynamic()); + p1 = queueManager.getParentQueue("root.parent1", false); + assertTrue("root.parent1 is not a dynamic queue", p1.isDynamic()); + } + + @Test + public void testApplicationAssignmentPreventsRemovalOfDynamicQueue() + throws Exception { + AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); + queueManager = new QueueManager(scheduler); + queueManager.initialize(conf); + queueManager.updateAllocationConfiguration(allocConf); + + FSLeafQueue q = queueManager.getLeafQueue("root.leaf1", true); + assertNotNull("root.leaf1 does not exist", q); + assertTrue("root.leaf1 is not empty", queueManager.isEmpty(q)); + + // assigning an application (without an appAttempt so far) to the queue + // removeEmptyDynamicQueues() should not remove the queue + ApplicationId applicationId = ApplicationId.newInstance(1L, 0); + q.addAssignedApp(applicationId); + q = queueManager.getLeafQueue("root.leaf1", false); + assertFalse("root.leaf1 is empty", queueManager.isEmpty(q)); + + queueManager.removePendingIncompatibleQueues(); + queueManager.removeEmptyDynamicQueues(); + q = queueManager.getLeafQueue("root.leaf1", false); + assertNotNull("root.leaf1 has been removed", q); + assertFalse("root.leaf1 is empty", queueManager.isEmpty(q)); + + ApplicationAttemptId applicationAttemptId = + ApplicationAttemptId.newInstance(applicationId, 0); + ActiveUsersManager activeUsersManager = + Mockito.mock(ActiveUsersManager.class); + RMContext rmContext = Mockito.mock(RMContext.class); + + // the appAttempt is created + // removeEmptyDynamicQueues() should not remove the queue + FSAppAttempt appAttempt = new FSAppAttempt(scheduler, applicationAttemptId, + "a_user", q, activeUsersManager, rmContext); + q.addApp(appAttempt, true); + queueManager.removeEmptyDynamicQueues(); + q = queueManager.getLeafQueue("root.leaf1", false); + assertNotNull("root.leaf1 has been removed", q); + assertFalse("root.leaf1 is empty", queueManager.isEmpty(q)); + + // the appAttempt finished, the queue should be empty + q.removeApp(appAttempt); + q = queueManager.getLeafQueue("root.leaf1", false); + assertTrue("root.leaf1 is not empty", queueManager.isEmpty(q)); + + // removeEmptyDynamicQueues() should remove the queue + queueManager.removePendingIncompatibleQueues(); + queueManager.removeEmptyDynamicQueues(); + q = queueManager.getLeafQueue("root.leaf1", false); + assertNull("root.leaf1 has not been removed", q); + } + + @Test + public void testRemovalOfIncompatibleNonEmptyQueue() + throws Exception { + AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); + allocConf.configuredQueues.get(FSQueueType.LEAF).add("root.a"); + scheduler.allocConf = allocConf; + queueManager.updateAllocationConfiguration(allocConf); + + FSLeafQueue q = queueManager.getLeafQueue("root.a", true); + assertNotNull("root.a does not exist", q); + assertTrue("root.a is not empty", queueManager.isEmpty(q)); + + // we start to run an application on root.a + notEmptyQueues.add(q); + q = queueManager.getLeafQueue("root.a", false); + assertNotNull("root.a does not exist", q); + assertFalse("root.a is empty", queueManager.isEmpty(q)); + + // root.a should not be removed by removeEmptyDynamicQueues or by + // removePendingIncompatibleQueues + queueManager.removePendingIncompatibleQueues(); + queueManager.removeEmptyDynamicQueues(); + q = queueManager.getLeafQueue("root.a", false); + assertNotNull("root.a does not exist", q); + + // let's introduce queue incompatibility + allocConf.configuredQueues.get(FSQueueType.LEAF).remove("root.a"); + allocConf.configuredQueues.get(FSQueueType.PARENT).add("root.a"); + allocConf.configuredQueues.get(FSQueueType.LEAF).add("root.a.b"); + queueManager.updateAllocationConfiguration(allocConf); + + // since root.a has running applications, it should be still a leaf queue + q = queueManager.getLeafQueue("root.a", false); + assertNotNull("root.a has been removed", q); + assertFalse("root.a is empty", queueManager.isEmpty(q)); + + // removePendingIncompatibleQueues should still keep root.a as a leaf queue + queueManager.removePendingIncompatibleQueues(); + q = queueManager.getLeafQueue("root.a", false); + assertNotNull("root.a has been removed", q); + assertFalse("root.a is empty", queueManager.isEmpty(q)); + + // when the application finishes, root.a should be a parent queue + notEmptyQueues.clear(); + queueManager.removePendingIncompatibleQueues(); + queueManager.removeEmptyDynamicQueues(); + FSParentQueue p = queueManager.getParentQueue("root.a", false); + assertNotNull("root.a does not exist", p); + } + }