diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 2b597164baf7034370be20097bf0665c6ecc56ed..7c34d991d9e939ce2a4fc22da605308ebca7d80e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -142,6 +142,8 @@ @VisibleForTesting Thread updateThread; + private final Object updateThreadMonitor = new Object(); + @VisibleForTesting Thread schedulingThread; // timeout to join when we stop this service @@ -246,6 +248,13 @@ public QueueManager getQueueManager() { return queueMgr; } + // Allows update to issue the update to happen without full wait + void triggerUpdate() { + synchronized (updateThreadMonitor) { + updateThreadMonitor.notify(); + } + } + /** * Thread which calls {@link FairScheduler#update()} every * updateInterval milliseconds. @@ -256,7 +265,13 @@ public QueueManager getQueueManager() { public void run() { while (!Thread.currentThread().isInterrupted()) { try { - Thread.sleep(updateInterval); + synchronized (updateThreadMonitor) { + try { + updateThreadMonitor.wait(updateInterval); + } catch (InterruptedException e) { + // Do Nothing + } + } long start = getClock().getTime(); update(); preemptTasksIfNecessary(); @@ -835,6 +850,8 @@ private synchronized void addNode(RMNode node) { updateRootQueueMetrics(); updateMaximumAllocation(schedulerNode, true); + triggerUpdate(); + queueMgr.getRootQueue().setSteadyFairShare(clusterResource); queueMgr.getRootQueue().recomputeSteadyShares(); LOG.info("Added node " + node.getNodeAddress() + @@ -850,6 +867,8 @@ private synchronized void removeNode(RMNode rmNode) { Resources.subtractFrom(clusterResource, rmNode.getTotalCapability()); updateRootQueueMetrics(); + triggerUpdate(); + // Remove running containers List runningContainers = node.getRunningContainers(); for (RMContainer container : runningContainers) {