diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java index d8d90518305..4c706f78c92 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java @@ -18,6 +18,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import com.google.common.annotations.VisibleForTesting; + +import static com.google.common.base.Preconditions.checkNotNull; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -88,6 +91,7 @@ private FileSystem fs; private Listener reloadListener; + private QueueManager queueMgr; @VisibleForTesting long reloadIntervalMs = ALLOC_RELOAD_INTERVAL_MS; @@ -114,6 +118,7 @@ public void serviceInit(Configuration conf) throws Exception { reloadThread = new Thread(() -> { while (running) { try { + queueMgr.removeEmptyDynamicQueues(); long time = clock.getTime(); long lastModified = fs.getFileStatus(allocFile).getModificationTime(); @@ -352,4 +357,8 @@ private ReservationQueueConfiguration createReservationQueueConfig( public interface Listener { void onReload(AllocationConfiguration info) throws IOException; } + + public void setQueueManager(QueueManager queueMgr) { + this.queueMgr = checkNotNull(queueMgr); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index 4babfd5659a..2a99096b930 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -83,6 +83,7 @@ private long minSharePreemptionTimeout = Long.MAX_VALUE; private float fairSharePreemptionThreshold = 0.5f; private boolean preemptable = true; + private boolean isDynamic = true; public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) { this.name = name; @@ -585,4 +586,12 @@ public String dumpState() { * @param sb the {code StringBuilder} which holds queue states */ protected abstract void dumpStateInternal(StringBuilder sb); + + public boolean isDynamic() { + return isDynamic; + } + + public void setDynamic(boolean isDynamic) { + this.isDynamic = isDynamic; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 1f85814adac..7477ec5d8ef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -1364,6 +1364,7 @@ private void initScheduler(Configuration conf) throws IOException { } allocsLoader.init(conf); + allocsLoader.setQueueManager(queueMgr); allocsLoader.setReloadListener(new AllocationReloadListener()); // If we fail to load allocations file on initialize, we want to fail // immediately. After a successful load, exceptions on future reloads diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java index 8734877f608..8405c6a01b6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; @@ -75,10 +76,13 @@ public void initialize(Configuration conf) throws IOException, // SchedulingPolicy.DEFAULT_POLICY since the allocation file hasn't been // loaded yet. rootQueue = new FSParentQueue("root", scheduler, null); + rootQueue.setDynamic(false); queues.put(rootQueue.getName(), rootQueue); // Create the default queue - getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, true); + FSLeafQueue defaultQueue = + getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, true); + defaultQueue.setDynamic(false); // Recursively reinitialize to propagate queue properties rootQueue.reinit(true); } @@ -396,6 +400,30 @@ private boolean removeEmptyIncompatibleQueues(String queueToCreate, return true; } + public void removeEmptyDynamicQueues() { + synchronized (queues) { + Set parentQueuesToCheck = new HashSet<>(); + for (FSQueue queue : getQueues()) { + if (queue.isDynamic() && queue.getChildQueues().isEmpty()) { + boolean removed = removeQueueIfEmpty(queue); + if (removed) { + parentQueuesToCheck.add(queue.getParent()); + } + } + } + while (!parentQueuesToCheck.isEmpty()) { + FSParentQueue queue = parentQueuesToCheck.iterator().next(); + if (!ROOT_QUEUE.equals(queue.getName()) + && queue.isDynamic() + && queue.getChildQueues().isEmpty()) { + removeQueue(queue); + parentQueuesToCheck.add(queue.getParent()); + } + parentQueuesToCheck.remove(queue); + } + } + } + /** * Remove the queue if it and its descendents are all empty. * @param queue @@ -504,7 +532,10 @@ public void updateAllocationConfiguration(AllocationConfiguration queueConf) { for (String name : queueConf.getConfiguredQueues().get( FSQueueType.LEAF)) { if (removeEmptyIncompatibleQueues(name, FSQueueType.LEAF)) { - getLeafQueue(name, true, false); + FSLeafQueue queue = getLeafQueue(name, true, false); + if (queue != null) { + queue.setDynamic(false); + } } } // At this point all leaves and 'parents with @@ -513,9 +544,17 @@ public void updateAllocationConfiguration(AllocationConfiguration queueConf) { for (String name : queueConf.getConfiguredQueues().get( FSQueueType.PARENT)) { if (removeEmptyIncompatibleQueues(name, FSQueueType.PARENT)) { - getParentQueue(name, true, false); + FSParentQueue queue = getParentQueue(name, true, false); + if (queue != null) { + queue.setDynamic(false); + } } } + Set dynamicQueueNames = + getDynamicQueueNames(queueConf.getConfiguredQueues()); + for (String name : dynamicQueueNames) { + queues.get(name).setDynamic(true); + } } // Initialize all queues recursively @@ -524,6 +563,18 @@ public void updateAllocationConfiguration(AllocationConfiguration queueConf) { rootQueue.recomputeSteadyShares(); } + private Set getDynamicQueueNames( + Map> configuredQueues) { + Set dynamicQueueNames = new HashSet<>(queues.keySet()); + for (Set staticQueueNames : configuredQueues.values()) { + dynamicQueueNames.removeAll(staticQueueNames); + } + dynamicQueueNames.remove(ROOT_QUEUE); + dynamicQueueNames.remove(ROOT_QUEUE + "." + + YarnConfiguration.DEFAULT_QUEUE_NAME); + return dynamicQueueNames; + } + /** * Check whether queue name is valid, * return true if it is valid, otherwise return false. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java index 4a7461da79b..8ce6414c4bb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java @@ -33,6 +33,8 @@ import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Test; +import org.mockito.Mockito; + import java.io.File; import java.io.FileOutputStream; import java.io.FileWriter; @@ -137,6 +139,7 @@ public void testReload() throws Exception { allocLoader.reloadIntervalMs = 5; allocLoader.init(conf); ReloadListener confHolder = new ReloadListener(); + allocLoader.setQueueManager(Mockito.mock(QueueManager.class)); allocLoader.setReloadListener(confHolder); allocLoader.reloadAllocations(); AllocationConfiguration allocConf = confHolder.allocConf; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java index eb2d402cd25..c5f77243147 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java @@ -23,11 +23,13 @@ import java.util.HashSet; import java.util.Set; +import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import com.google.common.collect.Sets; @@ -305,4 +307,130 @@ public void testCreateParentQueueAndParent() { assertEquals("createQueue() returned wrong queue", "root.queue1.queue2", q2.getName()); } + + @Test + public void testRemovalOfDynamicLeafQueue() { + AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); + + queueManager.updateAllocationConfiguration(allocConf); + + FSQueue q1 = queueManager.getLeafQueue("root.dynamic1", true); + + assertNotNull("Queue root.dynamic1 was not created", + queueManager.getLeafQueue("root.dynamic1", false)); + assertEquals("createQueue() returned wrong queue", + "root.dynamic1", q1.getName()); + assertTrue("root.dynamic1 is not a dynamic queue", q1.isDynamic()); + + queueManager.removeEmptyDynamicQueues(); + q1 = queueManager.getLeafQueue("root.dynamic1", false); + + assertNull("Queue root.dynamic1 was not deleted", q1); + } + + @Test + public void testRemovalOfDynamicParentQueue() { + AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); + + queueManager.updateAllocationConfiguration(allocConf); + + FSQueue q1 = queueManager.getLeafQueue("root.parent1.dynamic1", true); + + assertNotNull("Queue root.parent1.dynamic1 was not created", + queueManager.getLeafQueue("root.parent1.dynamic1", false)); + assertEquals("createQueue() returned wrong queue", + "root.parent1.dynamic1", q1.getName()); + assertTrue("root.parent1.dynamic1 is not a dynamic queue", q1.isDynamic()); + + FSQueue p1 = queueManager.getParentQueue("root.parent1", false); + assertNotNull("Queue root.parent1 was not created", p1); + assertTrue("root.parent1 is not a dynamic queue", p1.isDynamic()); + + queueManager.removeEmptyDynamicQueues(); + q1 = queueManager.getLeafQueue("root.parent1.dynamic1", false); + p1 = queueManager.getParentQueue("root.parent1", false); + + assertNull("Queue root.parent1.dynamic1 was not deleted", q1); + assertNull("Queue root.parent1 was not deleted", p1); + } + + @Test + public void testNonEmptyDynamicQueueBecomingStaticQueue() { + AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); + + queueManager.updateAllocationConfiguration(allocConf); + + FSLeafQueue q1 = queueManager.getLeafQueue("root.leaf1", true); + + assertNotNull("Queue root.leaf1 was not created", + queueManager.getLeafQueue("root.leaf1", false)); + assertEquals("createQueue() returned wrong queue", + "root.leaf1", q1.getName()); + assertTrue("root.leaf1 is not a dynamic queue", q1.isDynamic()); + + // pretend that we submitted an app to the queue + notEmptyQueues.add(q1); + + // non-empty queues should not be deleted + queueManager.removeEmptyDynamicQueues(); + q1 = queueManager.getLeafQueue("root.leaf1", false); + assertNotNull("Queue root.leaf1 was deleted", q1); + + // next we add leaf1 under root in the allocation config + allocConf.configuredQueues.get(FSQueueType.LEAF).add("root.leaf1"); + queueManager.updateAllocationConfiguration(allocConf); + + // application finished now and the queue is empty, but since leaf1 is a + // static queue at this point, hence not affected by + // removeEmptyDynamicQueues() + notEmptyQueues.clear(); + queueManager.removeEmptyDynamicQueues(); + q1 = queueManager.getLeafQueue("root.leaf1", false); + assertNotNull("Queue root.leaf1 was deleted", q1); + assertFalse("root.leaf1 is not a static queue", q1.isDynamic()); + } + + @Test + public void testNonEmptyStaticQueueBecomingDynamicQueue() { + AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); + + queueManager.updateAllocationConfiguration(allocConf); + + FSLeafQueue q1 = queueManager.getLeafQueue("root.test.childA", false); + + assertNotNull("Queue root.test.childA does not exist", q1); + assertEquals("createQueue() returned wrong queue", + "root.test.childA", q1.getName()); + assertFalse("root.queue1 is not a static queue", q1.isDynamic()); + + // we submitted an app to the queue + notEmptyQueues.add(q1); + + // next we remove all queues from the allocation config, + // this should cause all queues to change to dynamic + allocConf.configuredQueues.get(FSQueueType.LEAF).clear(); + allocConf.configuredQueues.get(FSQueueType.PARENT).clear(); + + queueManager.removeEmptyDynamicQueues(); + q1 = queueManager.getLeafQueue("root.test.childA", false); + assertNotNull("Queue root.test.childA was deleted", q1); + + queueManager.updateAllocationConfiguration(allocConf); + + q1 = queueManager.getLeafQueue("root.test.childA", false); + assertNotNull("Queue root.test.childA was deleted", q1); + assertTrue("root.test.childA is not a dynamic queue", q1.isDynamic()); + + // application finished - the queue does not have runnable app + // the next removeEmptyDynamicQueues() call should remove the queues + notEmptyQueues.remove(q1); + + queueManager.removeEmptyDynamicQueues(); + + q1 = queueManager.getLeafQueue("root.test.childA", false); + assertNull("Queue root.test.childA was not deleted", q1); + + FSParentQueue p1 = queueManager.getParentQueue("root.test", false); + assertNull("Queue root.test was not deleted", p1); +} }