diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 3eefb8f..59bab83 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -1413,6 +1413,9 @@ public void serviceStop() throws Exception { schedulingThread.join(THREAD_JOIN_TIMEOUT_MS); } } + Thread queueMonitor = queueMgr.getQueueMonitor(); + queueMonitor.interrupt(); + queueMonitor.join(THREAD_JOIN_TIMEOUT_MS); if (allocsLoader != null) { allocsLoader.stop(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java index 6556717..cc2cde5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java @@ -22,8 +22,10 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import javax.xml.parsers.ParserConfigurationException; @@ -56,9 +58,16 @@ new CopyOnWriteArrayList(); private final Map queues = new HashMap(); private FSParentQueue rootQueue; + private final Set confQueueSet = new HashSet<>(); + private final Set confLeafQueueSet = new HashSet<>(); + private volatile boolean confChanged = false; + private final Thread queueMonitor; public QueueManager(FairScheduler scheduler) { this.scheduler = scheduler; + this.queueMonitor = new Thread(new QueueRemoveHandler(), "Queue-monitor"); + this.queueMonitor.setDaemon(true); + this.queueMonitor.start(); } public FSParentQueue getRootQueue() { @@ -369,45 +378,143 @@ private String ensureRootPrefix(String name) { } return name; } - - public void updateAllocationConfiguration(AllocationConfiguration queueConf) { - // Create leaf queues and the parent queues in a leaf's ancestry if they do not exist - for (String name : queueConf.getConfiguredQueues().get(FSQueueType.LEAF)) { - if (removeEmptyIncompatibleQueues(name, FSQueueType.LEAF)) { - getLeafQueue(name, true); + + private void addAllQueue(String queuePath, Set queueSet, boolean leafQueue) { + queuePath = ensureRootPrefix(queuePath); + queueSet.add(queuePath); + int end = queuePath.lastIndexOf("."); + if(!leafQueue && end > 0) { + LOG.debug("Add parent queue " + queuePath.substring(0, end)); + } + while(end > 0) { + if(queueSet.contains(queuePath.substring(0,end))) { + break; } + LOG.debug("Add parent queue " + queuePath.substring(0, end)); + queueSet.add(queuePath.substring(0,end)); + end = queuePath.lastIndexOf(".", end - 1); } + } - // At this point all leaves and 'parents with at least one child' would have been created. - // Now create parents with no configured leaf. - for (String name : queueConf.getConfiguredQueues().get( - FSQueueType.PARENT)) { - if (removeEmptyIncompatibleQueues(name, FSQueueType.PARENT)) { - getParentQueue(name, true); - } + public void updateAllocationConfiguration(AllocationConfiguration queueConf) { + Set queueSet = new HashSet<>(); + queueSet.add(ROOT_QUEUE); + queueSet.add(ensureRootPrefix(YarnConfiguration.DEFAULT_QUEUE_NAME)); + synchronized (confLeafQueueSet) { + confLeafQueueSet.clear(); + confLeafQueueSet.addAll(queueConf.getConfiguredQueues().get(FSQueueType.LEAF)); } - - for (FSQueue queue : queues.values()) { - // Update queue metrics - FSQueueMetrics queueMetrics = queue.getMetrics(); - queueMetrics.setMinShare(queue.getMinShare()); - queueMetrics.setMaxShare(queue.getMaxShare()); - // Set scheduling policies - try { - SchedulingPolicy policy = queueConf.getSchedulingPolicy(queue.getName()); - policy.initialize(scheduler.getClusterResource()); - queue.setPolicy(policy); - } catch (AllocationConfigurationException ex) { - LOG.warn("Cannot apply configured scheduling policy to queue " - + queue.getName(), ex); + + synchronized (queues) { + // Create leaf queues and the parent queues in a leaf's ancestry if they do not exist + for (String name : queueConf.getConfiguredQueues().get(FSQueueType.LEAF)) { + if (removeEmptyIncompatibleQueues(name, FSQueueType.LEAF)) { + getLeafQueue(name, true); + } + addAllQueue(name, queueSet, true); + } + + // At this point all leaves and 'parents with at least one child' would have been created. + // Now create parents with no configured leaf. + for (String name : queueConf.getConfiguredQueues().get( + FSQueueType.PARENT)) { + if (removeEmptyIncompatibleQueues(name, FSQueueType.PARENT)) { + getParentQueue(name, true); + } + addAllQueue(name, queueSet, false); } + for (FSQueue queue : queues.values()) { + // Update queue metrics + FSQueueMetrics queueMetrics = queue.getMetrics(); + queueMetrics.setMinShare(queue.getMinShare()); + queueMetrics.setMaxShare(queue.getMaxShare()); + // Set scheduling policies + try { + SchedulingPolicy policy = queueConf.getSchedulingPolicy(queue.getName()); + policy.initialize(scheduler.getClusterResource()); + queue.setPolicy(policy); + } catch (AllocationConfigurationException ex) { + LOG.warn("Cannot apply configured scheduling policy to queue " + + queue.getName(), ex); + } + } + } + + synchronized (this.confQueueSet) { + this.confQueueSet.clear(); + this.confQueueSet.addAll(queueSet); } + confChanged = true; + synchronized (rootQueue) { + // Update steady fair shares for all queues + rootQueue.recomputeSteadyShares(); + // Update the fair share preemption timeouts and preemption for all queues + // recursively + rootQueue.updatePreemptionVariables(); + } + } + + public Thread getQueueMonitor() { + return this.queueMonitor; + } - // Update steady fair shares for all queues - rootQueue.recomputeSteadyShares(); - // Update the fair share preemption timeouts and preemption for all queues - // recursively - rootQueue.updatePreemptionVariables(); + class QueueRemoveHandler implements Runnable { + private boolean hasRemovableQueue = false; + + @Override + public void run() { + while (!Thread.currentThread().isInterrupted()) { + if (confChanged || hasRemovableQueue) { + LOG.info("fair-schedule.xml changed: " + confChanged + ", or some queue should be removed: " + hasRemovableQueue); + confChanged = false; + hasRemovableQueue = false; + synchronized (queues) { + synchronized (confQueueSet) { + Set removable = new HashSet<>(); + for (String name : queues.keySet()) { + if (!confQueueSet.contains(name)) { + removable.add(name); + } + } + boolean queueChanged = false; + for (String name : removable) { + LOG.debug("queue " + name + " will be deleted"); + // maybe has been deleted in the early loop. + if (exists(name)) { + boolean flag = removeQueueIfEmpty(queues.get(name)); + hasRemovableQueue |= !flag; // delete fail, should check and try remove again + queueChanged |= flag; // queue changed, check if parent should by leaf node + } + } + + if (queueChanged) { + synchronized (confLeafQueueSet) { + for (String name : confLeafQueueSet) { + name = ensureRootPrefix(name); + if (removeEmptyIncompatibleQueues(name, FSQueueType.LEAF)) { + getLeafQueue(name, true); + } + } + } + synchronized (rootQueue) { + // Update steady fair shares for all queues + rootQueue.recomputeSteadyShares(); + // Update the fair share preemption timeouts and preemption for all queues + // recursively + rootQueue.updatePreemptionVariables(); + } + } + } + } + } + try { + Thread.sleep(60 * 1000); + } catch (InterruptedException e) { + LOG.warn("QueueRemoveHandler thread interrupted. Exiting.", e); + return; + } + } + } } /**