diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java index c32565f63c3..29d061d4d4e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java @@ -74,15 +74,17 @@ public void run() { while (!Thread.interrupted()) { try { FSAppAttempt starvedApp = context.getStarvedApps().take(); - // Hold the scheduler readlock so this is not concurrent with the - // update thread. - schedulerReadLock.lock(); - try { - preemptContainers(identifyContainersToPreempt(starvedApp)); - } finally { - schedulerReadLock.unlock(); + if (starvedApp != null){ + // Hold the scheduler readlock so this is not concurrent with the + // update thread. + schedulerReadLock.lock(); + try { + preemptContainers(identifyContainersToPreempt(starvedApp)); + } finally { + schedulerReadLock.unlock(); + } + starvedApp.preemptionTriggered(delayBeforeNextStarvationCheck); } - starvedApp.preemptionTriggered(delayBeforeNextStarvationCheck); } catch (InterruptedException e) { LOG.info("Preemption thread interrupted! Exiting."); Thread.currentThread().interrupt(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSStarvedApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSStarvedApps.java index 4f28e41028b..8102e43d7bd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSStarvedApps.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSStarvedApps.java @@ -45,7 +45,7 @@ * Add a starved application if it is not already added. * @param app application to add */ - void addStarvedApp(FSAppAttempt app) { + synchronized void addStarvedApp(FSAppAttempt app) { if (!app.equals(appBeingProcessed) && !appsToProcess.contains(app)) { appsToProcess.add(app); } @@ -59,13 +59,11 @@ void addStarvedApp(FSAppAttempt app) { * @return starved application to process * @throws InterruptedException if interrupted while waiting */ - FSAppAttempt take() throws InterruptedException { - // Reset appBeingProcessed before the blocking call - appBeingProcessed = null; - - // Blocking call to fetch the next starved application - FSAppAttempt app = appsToProcess.take(); - appBeingProcessed = app; + synchronized FSAppAttempt take() throws InterruptedException { + FSAppAttempt app = appsToProcess.poll(); + if (app != null){ + appBeingProcessed = app; + } return app; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java index 706cdc9034c..48af5bc130b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java @@ -41,9 +41,11 @@ public void run() { while (!Thread.interrupted()) { try { FSAppAttempt app = context.getStarvedApps().take(); - appsAdded.add(app); - totalAppsAdded++; - app.preemptionTriggered(DELAY_FOR_NEXT_STARVATION_CHECK_MS); + if (app != null){ + appsAdded.add(app); + totalAppsAdded++; + app.preemptionTriggered(DELAY_FOR_NEXT_STARVATION_CHECK_MS); + } } catch (InterruptedException e) { return; }