From 192f2b6c6bf28b7c37dd5af3b7f5f94bc7ba4543 Mon Sep 17 00:00:00 2001 From: Sunil Date: Thu, 10 Nov 2016 18:33:15 +0530 Subject: [PATCH] YARN-5865 --- .../scheduler/capacity/CapacityScheduler.java | 37 ++++++++++++++-------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index af51f3c..8827e2d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -145,6 +145,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.SettableFuture; @LimitedPrivate("yarn") @Evolving @@ -2403,6 +2405,7 @@ public void updateApplicationPriority(Priority newPriority, } RMApp rmApp = rmContext.getRMApps().get(applicationId); + appPriority = checkAndGetApplicationPriority(newPriority, rmApp.getUser(), rmApp.getQueue(), applicationId); @@ -2410,23 +2413,29 @@ public void updateApplicationPriority(Priority newPriority, return; } - // Update new priority in Submission Context to keep track in HA - rmApp.getApplicationSubmissionContext().setPriority(appPriority); + synchronized (applicationId) { + // Create a future object to capture exceptions from StateStore. + SettableFuture future = SettableFuture.create(); + + // Update new priority in Submission Context to keep track in HA + rmApp.getApplicationSubmissionContext().setPriority(appPriority); - // Update to state store - ApplicationStateData appState = - ApplicationStateData.newInstance(rmApp.getSubmitTime(), - rmApp.getStartTime(), rmApp.getApplicationSubmissionContext(), - rmApp.getUser(), rmApp.getCallerContext()); - appState.setApplicationTimeouts(rmApp.getApplicationTimeouts()); - rmContext.getStateStore().updateApplicationStateSynchronously(appState, - false, null); + // Update to state store + ApplicationStateData appState = ApplicationStateData.newInstance( + rmApp.getSubmitTime(), rmApp.getStartTime(), + rmApp.getApplicationSubmissionContext(), rmApp.getUser(), + rmApp.getCallerContext()); + appState.setApplicationTimeouts(rmApp.getApplicationTimeouts()); + rmContext.getStateStore().updateApplicationStateSynchronously(appState, + false, future); - // As we use iterator over a TreeSet for OrderingPolicy, once we change - // priority then reinsert back to make order correct. - LeafQueue queue = (LeafQueue) getQueue(rmApp.getQueue()); + Futures.get(future, YarnException.class); - queue.updateApplicationPriority(application, appPriority); + // As we use iterator over a TreeSet for OrderingPolicy, once we change + // priority then reinsert back to make order correct. + LeafQueue queue = (LeafQueue) getQueue(rmApp.getQueue()); + queue.updateApplicationPriority(application, appPriority); + } // Update the changed application state to timeline server rmContext.getSystemMetricsPublisher().appUpdated(rmApp, -- 2.7.4 (Apple Git-66)