diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 6c61b45..99fc328 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -137,8 +137,8 @@ public QueueMetrics getMetrics() { void containerCompleted(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { + writeLock.lock(); try { - writeLock.lock(); Container container = rmContainer.getContainer(); ContainerId containerId = container.getId(); @@ -179,8 +179,8 @@ void containerCompleted(RMContainer rmContainer, private void unreserveInternal( SchedulerRequestKey schedulerKey, FSSchedulerNode node) { + writeLock.lock(); try { - writeLock.lock(); Map reservedContainers = this.reservedContainers.get( schedulerKey); RMContainer reservedContainer = reservedContainers.remove( @@ -282,8 +282,8 @@ NodeType getAllowedLocalityLevel( return NodeType.OFF_SWITCH; } + writeLock.lock(); try { - writeLock.lock(); // Default level is NODE_LOCAL if (!allowedLocalityLevel.containsKey(schedulerKey)) { @@ -352,8 +352,8 @@ NodeType getAllowedLocalityLevelByTime( return NodeType.OFF_SWITCH; } + writeLock.lock(); try { - writeLock.lock(); // default level is NODE_LOCAL if (!allowedLocalityLevel.containsKey(schedulerKey)) { @@ -423,8 +423,8 @@ public RMContainer allocate(NodeType type, FSSchedulerNode node, RMContainer rmContainer; Container container; + writeLock.lock(); try { - writeLock.lock(); // Update allowed locality level NodeType allowed = allowedLocalityLevel.get(schedulerKey); if (allowed != null) { @@ -496,8 +496,8 @@ public RMContainer allocate(NodeType type, FSSchedulerNode node, void resetAllowedLocalityLevel( SchedulerRequestKey schedulerKey, NodeType level) { NodeType old; + writeLock.lock(); try { - writeLock.lock(); old = allowedLocalityLevel.put(schedulerKey, level); } finally { writeLock.unlock(); @@ -716,8 +716,8 @@ private void setReservation(SchedulerNode node) { String rackName = node.getRackName() == null ? "NULL" : node.getRackName(); + writeLock.lock(); try { - writeLock.lock(); Set rackReservations = reservations.get(rackName); if (rackReservations == null) { rackReservations = new HashSet<>(); @@ -733,8 +733,8 @@ private void clearReservation(SchedulerNode node) { String rackName = node.getRackName() == null ? "NULL" : node.getRackName(); + writeLock.lock(); try { - writeLock.lock(); Set rackReservations = reservations.get(rackName); if (rackReservations != null) { rackReservations.remove(node.getNodeName()); @@ -891,8 +891,8 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) { // For each priority, see if we can schedule a node local, rack local // or off-switch request. Rack of off-switch requests may be delayed // (not scheduled) in order to promote better locality. + writeLock.lock(); try { - writeLock.lock(); // TODO (wandga): All logics in this method should be added to // SchedulerPlacement#canDelayTo which is independent from scheduler. @@ -1286,8 +1286,8 @@ public void updateDemand() { Resources.addTo(demand, getCurrentConsumption()); // Add up outstanding resource requests + writeLock.lock(); try { - writeLock.lock(); for (SchedulerRequestKey k : getSchedulerKeys()) { PendingAsk pendingAsk = getPendingAsk(k, ResourceRequest.ANY); if (pendingAsk.getCount() > 0) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 3246778..ff65c1e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -350,9 +350,8 @@ public void run() { * required resources per job. */ protected void update() { + writeLock.lock(); try { - writeLock.lock(); - FSQueue rootQueue = queueMgr.getRootQueue(); // Recursively update demands for all queues @@ -386,8 +385,8 @@ protected void update() { } public ResourceWeights getAppWeight(FSAppAttempt app) { + readLock.lock(); try { - readLock.lock(); double weight = 1.0; if (sizeBasedWeight) { // Set weight based on current memory demand @@ -469,8 +468,8 @@ protected void addApplication(ApplicationId applicationId, return; } + writeLock.lock(); try { - writeLock.lock(); RMApp rmApp = rmContext.getRMApps().get(applicationId); FSLeafQueue queue = assignToQueue(rmApp, queueName, user); if (queue == null) { @@ -521,8 +520,8 @@ protected void addApplicationAttempt( ApplicationAttemptId applicationAttemptId, boolean transferStateFromPreviousAttempt, boolean isAttemptRecovering) { + writeLock.lock(); try { - writeLock.lock(); SchedulerApplication application = applications.get( applicationAttemptId.getApplicationId()); String user = application.getUser(); @@ -627,8 +626,8 @@ private void removeApplication(ApplicationId applicationId, private void removeApplicationAttempt( ApplicationAttemptId applicationAttemptId, RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) { + writeLock.lock(); try { - writeLock.lock(); LOG.info("Application " + applicationAttemptId + " is done. finalState=" + rmAppAttemptFinalState); FSAppAttempt attempt = getApplicationAttempt(applicationAttemptId); @@ -694,8 +693,8 @@ private void removeApplicationAttempt( protected void completedContainerInternal( RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { + writeLock.lock(); try { - writeLock.lock(); Container container = rmContainer.getContainer(); // Get the application for the finished container @@ -733,8 +732,8 @@ protected void completedContainerInternal( private void addNode(List containerReports, RMNode node) { + writeLock.lock(); try { - writeLock.lock(); FSSchedulerNode schedulerNode = new FSSchedulerNode(node, usePortForNodeName); nodeTracker.addNode(schedulerNode); @@ -755,8 +754,8 @@ private void addNode(List containerReports, } private void removeNode(RMNode rmNode) { + writeLock.lock(); try { - writeLock.lock(); NodeId nodeId = rmNode.getNodeID(); FSSchedulerNode node = nodeTracker.getNode(nodeId); if (node == null) { @@ -884,8 +883,8 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, @Override protected void nodeUpdate(RMNode nm) { + writeLock.lock(); try { - writeLock.lock(); long start = getClock().getTime(); eventLog.log("HEARTBEAT", nm.getHostName()); super.nodeUpdate(nm); @@ -959,8 +958,8 @@ private boolean shouldContinueAssigning(int containers, @VisibleForTesting void attemptScheduling(FSSchedulerNode node) { + writeLock.lock(); try { - writeLock.lock(); if (rmContext.isWorkPreservingRecoveryEnabled() && !rmContext .isSchedulerReadyForAllocatingContainers()) { return; @@ -1156,8 +1155,8 @@ public void handle(SchedulerEvent event) { private String resolveReservationQueueName(String queueName, ApplicationId applicationId, ReservationId reservationID, boolean isRecovering) { + readLock.lock(); try { - readLock.lock(); FSQueue queue = queueMgr.getQueue(queueName); if ((queue == null) || !allocConf.isReservable(queue.getQueueName())) { return queueName; @@ -1220,8 +1219,8 @@ public void setRMContext(RMContext rmContext) { } private void initScheduler(Configuration conf) throws IOException { + writeLock.lock(); try { - writeLock.lock(); this.conf = new FairSchedulerConfiguration(conf); validateConf(this.conf); authorizer = YarnAuthorizationProvider.getInstance(conf); @@ -1316,8 +1315,8 @@ private void updateReservationThreshold() { } private void startSchedulerThreads() { + writeLock.lock(); try { - writeLock.lock(); Preconditions.checkNotNull(updateThread, "updateThread is null"); Preconditions.checkNotNull(allocsLoader, "allocsLoader is null"); updateThread.start(); @@ -1349,8 +1348,8 @@ public void serviceStart() throws Exception { @Override public void serviceStop() throws Exception { + writeLock.lock(); try { - writeLock.lock(); if (updateThread != null) { updateThread.interrupt(); updateThread.join(THREAD_JOIN_TIMEOUT_MS); @@ -1415,8 +1414,8 @@ public int getNumClusterNodes() { @Override public boolean checkAccess(UserGroupInformation callerUGI, QueueACL acl, String queueName) { + readLock.lock(); try { - readLock.lock(); FSQueue queue = getQueueManager().getQueue(queueName); if (queue == null) { if (LOG.isDebugEnabled()) { @@ -1520,8 +1519,8 @@ private void applyChildDefaults() { @Override public String moveApplication(ApplicationId appId, String queueName) throws YarnException { + writeLock.lock(); try { - writeLock.lock(); SchedulerApplication app = applications.get(appId); if (app == null) { throw new YarnException("App to be moved " + appId + " not found."); @@ -1566,8 +1565,8 @@ public String moveApplication(ApplicationId appId, @Override public void preValidateMoveApplication(ApplicationId appId, String newQueue) throws YarnException { + writeLock.lock(); try { - writeLock.lock(); SchedulerApplication app = applications.get(appId); if (app == null) { throw new YarnException("App to be moved " + appId + " not found."); @@ -1698,8 +1697,8 @@ FSQueue findLowestCommonAncestorQueue(FSQueue queue1, FSQueue queue2) { @Override public void updateNodeResource(RMNode nm, ResourceOption resourceOption) { + writeLock.lock(); try { - writeLock.lock(); super.updateNodeResource(nm, resourceOption); updateRootQueueMetrics(); queueMgr.getRootQueue().setSteadyFairShare(getClusterResource());