diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java index 4c830523cb4..820c1e35806 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java @@ -73,16 +73,19 @@ public void run() { while (!Thread.interrupted()) { try { - FSAppAttempt starvedApp = context.getStarvedApps().take(); - // Hold the scheduler readlock so this is not concurrent with the - // update thread. - schedulerReadLock.lock(); - try { - preemptContainers(identifyContainersToPreempt(starvedApp)); - } finally { - schedulerReadLock.unlock(); + FSAppAttempt starvedApp = context.getStarvedApps().poll(); + if (starvedApp != null){ + // Hold the scheduler readlock so this is not concurrent with the + // update thread. + schedulerReadLock.lock(); + try { + preemptContainers(identifyContainersToPreempt(starvedApp)); + } finally { + schedulerReadLock.unlock(); + } + starvedApp.preemptionTriggered(delayBeforeNextStarvationCheck); } - starvedApp.preemptionTriggered(delayBeforeNextStarvationCheck); + Thread.sleep(1000); } catch (InterruptedException e) { LOG.info("Preemption thread interrupted! Exiting."); Thread.currentThread().interrupt(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSStarvedApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSStarvedApps.java index 4f28e41028b..c7990062bc9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSStarvedApps.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSStarvedApps.java @@ -45,27 +45,23 @@ * Add a starved application if it is not already added. * @param app application to add */ - void addStarvedApp(FSAppAttempt app) { + synchronized void addStarvedApp(FSAppAttempt app) { if (!app.equals(appBeingProcessed) && !appsToProcess.contains(app)) { appsToProcess.add(app); } } /** - * Blocking call to fetch the next app to process. The returned app is - * tracked until the next call to this method. This tracking assumes a - * single reader. + * Non-blocking call to fetch the next app to process. * * @return starved application to process * @throws InterruptedException if interrupted while waiting */ - FSAppAttempt take() throws InterruptedException { - // Reset appBeingProcessed before the blocking call - appBeingProcessed = null; - - // Blocking call to fetch the next starved application - FSAppAttempt app = appsToProcess.take(); - appBeingProcessed = app; + synchronized FSAppAttempt poll() throws InterruptedException { + FSAppAttempt app = appsToProcess.poll(); + if (app != null){ + appBeingProcessed = app; + } return app; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java index 706cdc9034c..42a5e46f989 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java @@ -40,10 +40,13 @@ private MockPreemptionThread(FairScheduler scheduler) { public void run() { while (!Thread.interrupted()) { try { - FSAppAttempt app = context.getStarvedApps().take(); - appsAdded.add(app); - totalAppsAdded++; - app.preemptionTriggered(DELAY_FOR_NEXT_STARVATION_CHECK_MS); + FSAppAttempt app = context.getStarvedApps().poll(); + if (app != null){ + appsAdded.add(app); + totalAppsAdded++; + app.preemptionTriggered(DELAY_FOR_NEXT_STARVATION_CHECK_MS); + } + Thread.sleep(1000); } catch (InterruptedException e) { return; }