From c9925c7ac5829483eec48b59f96a3272b85d641e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=B7=A8=E4=B8=B0?= <920347627@qq.com> Date: Fri, 6 Nov 2020 13:37:33 +0800 Subject: [PATCH 7/8] =?UTF-8?q?=E4=BF=AE=E6=94=B9RM=E7=9B=B8=E5=85=B3?= =?UTF-8?q?=E7=9A=84=E6=89=80=E6=9C=89ReadWriteLock=E4=B8=BAsynchronized?= =?UTF-8?q?=E9=94=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../capacity/FifoCandidatesSelector.java | 5 +- .../IntraQueueCandidatesSelector.java | 5 +- .../ProportionalCapacityPreemptionPolicy.java | 8 +- .../AbstractAutoCreatedLeafQueue.java | 7 +- .../scheduler/capacity/AbstractCSQueue.java | 90 ++----------- .../capacity/AbstractManagedParentQueue.java | 43 +----- .../capacity/AutoCreatedLeafQueue.java | 16 +-- .../scheduler/capacity/CSQueue.java | 5 - .../capacity/ManagedParentQueue.java | 46 ++----- .../scheduler/capacity/ParentQueue.java | 126 +++--------------- .../scheduler/capacity/PlanQueue.java | 7 +- .../scheduler/capacity/ReservationQueue.java | 7 +- 12 files changed, 57 insertions(+), 308 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java index c2735f1..c97d0ff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java @@ -96,8 +96,7 @@ .getResToObtainByPartitionForLeafQueue(preemptionContext, queueName, clusterResource); - try { - leafQueue.getReadLock().lock(); + synchronized (this){ // go through all ignore-partition-exclusivity containers first to make // sure such containers will be preemptionCandidates first Map> ignorePartitionExclusivityContainers = @@ -155,8 +154,6 @@ preemptAMContainers(clusterResource, selectedCandidates, curCandidates, skippedAMContainerlist, resToObtainByPartition, skippedAMSize, maxAMCapacityForThisQueue, totalPreemptionAllowed); - } finally { - leafQueue.getReadLock().unlock(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java index c52fd95..d287c70 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java @@ -178,15 +178,12 @@ public int compare(TempAppPerPartition ta1, TempAppPerPartition ta2) { // 7. Based on the selected resource demand per partition, select // containers with known policy from inter-queue preemption. - try { - leafQueue.getReadLock().lock(); + synchronized (this){ for (FiCaSchedulerApp app : apps) { preemptFromLeastStarvedApp(leafQueue, app, selectedCandidates, curCandidates, clusterResource, totalPreemptedResourceAllowed, resToObtainByPartition, rollingResourceUsagePerUser); } - } finally { - leafQueue.getReadLock().unlock(); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 1d49408..d88ad7b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -562,11 +562,7 @@ public String getPolicyName() { private TempQueuePerPartition cloneQueues(CSQueue curQueue, Resource partitionResource, String partitionToLookAt) { TempQueuePerPartition ret; - ReadLock readLock = curQueue.getReadLock(); - try { - // Acquire a read lock from Parent/LeafQueue. - readLock.lock(); - + synchronized (this){ String queueName = curQueue.getQueueName(); QueueCapacities qc = curQueue.getQueueCapacities(); float absCap = qc.getAbsoluteCapacity(partitionToLookAt); @@ -626,8 +622,6 @@ private TempQueuePerPartition cloneQueues(CSQueue curQueue, subq.parent = ret; } } - } finally { - readLock.unlock(); } addTempQueuePartition(ret); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractAutoCreatedLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractAutoCreatedLeafQueue.java index ac97d72..d5de538 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractAutoCreatedLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractAutoCreatedLeafQueue.java @@ -75,10 +75,8 @@ public void setEntitlement(QueueEntitlement entitlement) * maxCapacity, etc..) * @throws SchedulerDynamicEditException */ - public void setEntitlement(String nodeLabel, QueueEntitlement entitlement) + public synchronized void setEntitlement(String nodeLabel, QueueEntitlement entitlement) throws SchedulerDynamicEditException { - try { - writeLock.lock(); float capacity = entitlement.getCapacity(); if (capacity < 0 || capacity > 1.0f) { throw new SchedulerDynamicEditException( @@ -101,9 +99,6 @@ public void setEntitlement(String nodeLabel, QueueEntitlement entitlement) CSQueueUtils.updateQueueStatistics(resourceCalculator, csContext.getClusterResource(), this, labelManager, nodeLabel); - } finally { - writeLock.unlock(); - } } protected void setupConfigurableCapacities(QueueCapacities queueCapacities) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 67b676b..e5a3581 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -123,9 +123,6 @@ protected ActivitiesManager activitiesManager; - protected ReentrantReadWriteLock.ReadLock readLock; - protected ReentrantReadWriteLock.WriteLock writeLock; - volatile Priority priority = Priority.newInstance(0); private Map userWeights = new HashMap(); @@ -165,9 +162,6 @@ public AbstractCSQueue(CapacitySchedulerContext cs, // initialize queueResourceQuotas queueResourceQuotas = new QueueResourceQuotas(); - ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - readLock = lock.readLock(); - writeLock = lock.writeLock(); } protected void setupConfigurableCapacities() { @@ -272,9 +266,7 @@ public boolean hasAccess(QueueACL acl, UserGroupInformation user) { * Set maximum capacity - used only for testing. * @param maximumCapacity new max capacity */ - void setMaxCapacity(float maximumCapacity) { - try { - writeLock.lock(); + synchronized void setMaxCapacity(float maximumCapacity) { // Sanity check CSQueueUtils.checkMaxCapacity(getQueueName(), queueCapacities.getCapacity(), maximumCapacity); @@ -285,18 +277,13 @@ void setMaxCapacity(float maximumCapacity) { queueCapacities.setMaximumCapacity(maximumCapacity); queueCapacities.setAbsoluteMaximumCapacity(absMaxCapacity); - } finally { - writeLock.unlock(); - } } /** * Set maximum capacity * @param maximumCapacity new max capacity */ - void setMaxCapacity(String nodeLabel, float maximumCapacity) { - try { - writeLock.lock(); + synchronized void setMaxCapacity(String nodeLabel, float maximumCapacity) { // Sanity check CSQueueUtils.checkMaxCapacity(getQueueName(), queueCapacities.getCapacity(nodeLabel), maximumCapacity); @@ -307,9 +294,6 @@ void setMaxCapacity(String nodeLabel, float maximumCapacity) { queueCapacities.setMaximumCapacity(maximumCapacity); queueCapacities.setAbsoluteMaximumCapacity(absMaxCapacity); - } finally { - writeLock.unlock(); - } } @@ -323,12 +307,10 @@ void setupQueueConfigs(Resource clusterResource) setupQueueConfigs(clusterResource, csContext.getConfiguration()); } - protected void setupQueueConfigs(Resource clusterResource, + protected synchronized void setupQueueConfigs(Resource clusterResource, CapacitySchedulerConfiguration configuration) throws IOException { - try { - writeLock.lock(); // get labels this.accessibleLabels = configuration.getAccessibleNodeLabels(getQueuePath()); @@ -415,9 +397,6 @@ protected void setupQueueConfigs(Resource clusterResource, getQueuePath()); this.userWeights = getUserWeightsFromHierarchy(configuration); - } finally { - writeLock.unlock(); - } } private Map getUserWeightsFromHierarchy @@ -692,35 +671,25 @@ public Resource getMaximumAllocation() { public Resource getMinimumAllocation() { return minimumAllocation; } - - void allocateResource(Resource clusterResource, + + synchronized void allocateResource(Resource clusterResource, Resource resource, String nodePartition) { - try { - writeLock.lock(); queueUsage.incUsed(nodePartition, resource); ++numContainers; CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, this, labelManager, nodePartition); - } finally { - writeLock.unlock(); - } } - protected void releaseResource(Resource clusterResource, + protected synchronized void releaseResource(Resource clusterResource, Resource resource, String nodePartition) { - try { - writeLock.lock(); queueUsage.decUsed(nodePartition, resource); CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, this, labelManager, nodePartition); --numContainers; - } finally { - writeLock.unlock(); - } } @Private @@ -729,13 +698,8 @@ public boolean getReservationContinueLooking() { } @Private - public Map getACLs() { - try { - readLock.lock(); + public synchronized Map getACLs() { return acls; - } finally { - readLock.unlock(); - } } @Private @@ -768,11 +732,6 @@ public QueueResourceQuotas getQueueResourceQuotas() { return queueResourceQuotas; } - @Override - public ReentrantReadWriteLock.ReadLock getReadLock() { - return readLock; - } - /** * The specified queue is cross-queue preemptable if system-wide cross-queue * preemption is turned on unless any queue in the qPath hierarchy @@ -880,11 +839,9 @@ public boolean hasChildQueues() { return childQueues != null && !childQueues.isEmpty(); } - boolean canAssignToThisQueue(Resource clusterResource, + synchronized boolean canAssignToThisQueue(Resource clusterResource, String nodePartition, ResourceLimits currentResourceLimits, Resource resourceCouldBeUnreserved, SchedulingMode schedulingMode) { - try { - readLock.lock(); // Get current limited resource: // - When doing RESPECT_PARTITION_EXCLUSIVITY allocation, we will respect // queues' max capacity. @@ -969,10 +926,6 @@ boolean canAssignToThisQueue(Resource clusterResource, .getAbsoluteMaximumCapacity(nodePartition)); } return true; - } finally { - readLock.unlock(); - } - } @Override @@ -1148,9 +1101,7 @@ public boolean accept(Resource cluster, Resource netAllocated = Resources.subtract(required, request.getTotalReleasedResource()); - try { - readLock.lock(); - + synchronized (this) { String partition = schedulerContainer.getNodePartition(); Resource maxResourceLimit; if (allocation.getSchedulingMode() @@ -1170,8 +1121,6 @@ public boolean accept(Resource cluster, } return false; } - } finally { - readLock.unlock(); } // Only check parent queue when something new allocated or reserved. @@ -1198,9 +1147,7 @@ public void updateQueueState(QueueState queueState) { } @Override - public void activeQueue() throws YarnException { - try { - this.writeLock.lock(); + public synchronized void activeQueue() throws YarnException { if (getState() == QueueState.RUNNING) { LOG.info("The specified queue:" + queueName + " is already in the RUNNING state."); @@ -1217,22 +1164,14 @@ public void activeQueue() throws YarnException { + " is not running. Please activate the parent queue first"); } } - } finally { - this.writeLock.unlock(); - } } - protected void appFinished() { - try { - this.writeLock.lock(); + protected synchronized void appFinished() { if (getState() == QueueState.DRAINING) { if (getNumApplications() == 0) { updateQueueState(QueueState.STOPPED); } } - } finally { - this.writeLock.unlock(); - } } @Override @@ -1245,9 +1184,7 @@ public Priority getPriority() { return userWeights; } - public void recoverDrainingState() { - try { - this.writeLock.lock(); + public synchronized void recoverDrainingState() { if (getState() == QueueState.STOPPED) { updateQueueState(QueueState.DRAINING); } @@ -1255,8 +1192,5 @@ public void recoverDrainingState() { if (getParent() != null && getParent().getState() == QueueState.STOPPED) { ((AbstractCSQueue) getParent()).recoverDrainingState(); } - } finally { - this.writeLock.unlock(); - } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java index 9d38f79..0d22bf6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java @@ -52,17 +52,11 @@ public AbstractManagedParentQueue(CapacitySchedulerContext cs, } @Override - public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) + public synchronized void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) throws IOException { - try { - writeLock.lock(); - // Set new configs setupQueueConfigs(clusterResource); - } finally { - writeLock.unlock(); - } } /** @@ -70,10 +64,8 @@ public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) * @param childQueue reference to the child queue to be added * @throws SchedulerDynamicEditException */ - public void addChildQueue(CSQueue childQueue) + public synchronized void addChildQueue(CSQueue childQueue) throws SchedulerDynamicEditException, IOException { - try { - writeLock.lock(); if (childQueue.getCapacity() > 0) { throw new SchedulerDynamicEditException( "Queue " + childQueue + " being added has non zero capacity."); @@ -83,9 +75,6 @@ public void addChildQueue(CSQueue childQueue) LOG.debug("updateChildQueues (action: add queue): " + added + " " + getChildQueuesToPrint()); } - } finally { - writeLock.unlock(); - } } /** @@ -93,10 +82,8 @@ public void addChildQueue(CSQueue childQueue) * @param childQueue reference to the child queue to be removed * @throws SchedulerDynamicEditException */ - public void removeChildQueue(CSQueue childQueue) + public synchronized void removeChildQueue(CSQueue childQueue) throws SchedulerDynamicEditException { - try { - writeLock.lock(); if (childQueue.getCapacity() > 0) { throw new SchedulerDynamicEditException( "Queue " + childQueue + " being removed has non zero capacity."); @@ -111,9 +98,6 @@ public void removeChildQueue(CSQueue childQueue) } } } - } finally { - writeLock.unlock(); - } } /** @@ -121,11 +105,9 @@ public void removeChildQueue(CSQueue childQueue) * @param childQueueName name of the child queue to be removed * @throws SchedulerDynamicEditException */ - public CSQueue removeChildQueue(String childQueueName) + public synchronized CSQueue removeChildQueue(String childQueueName) throws SchedulerDynamicEditException { CSQueue childQueue; - try { - writeLock.lock(); childQueue = this.csContext.getCapacitySchedulerQueueManager().getQueue( childQueueName); if (childQueue != null) { @@ -134,36 +116,23 @@ public CSQueue removeChildQueue(String childQueueName) throw new SchedulerDynamicEditException("Cannot find queue to delete " + ": " + childQueueName); } - } finally { - writeLock.unlock(); - } return childQueue; } - protected float sumOfChildCapacities() { - try { - writeLock.lock(); + protected synchronized float sumOfChildCapacities() { float ret = 0; for (CSQueue l : childQueues) { ret += l.getCapacity(); } return ret; - } finally { - writeLock.unlock(); - } } - protected float sumOfChildAbsCapacities() { - try { - writeLock.lock(); + protected synchronized float sumOfChildAbsCapacities() { float ret = 0; for (CSQueue l : childQueues) { ret += l.getAbsoluteCapacity(); } return ret; - } finally { - writeLock.unlock(); - } } public AutoCreatedLeafQueueConfig getLeafQueueTemplate() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java index e12b55e..a39a0a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java @@ -47,10 +47,8 @@ public AutoCreatedLeafQueue(CapacitySchedulerContext cs, String queueName, } @Override - public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) + public synchronized void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) throws IOException { - try { - writeLock.lock(); validate(newlyParsedQueue); @@ -63,18 +61,11 @@ public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) // queueCapacities to initialize to configured capacity which might // overcommit resources from parent queue updateCapacitiesToZero(); - - } finally { - writeLock.unlock(); - } } - public void reinitializeFromTemplate(AutoCreatedLeafQueueConfig + public synchronized void reinitializeFromTemplate(AutoCreatedLeafQueueConfig leafQueueTemplate) throws SchedulerDynamicEditException, IOException { - try { - writeLock.lock(); - // TODO: // reinitialize only capacities for now since 0 capacity updates // can cause @@ -96,9 +87,6 @@ public void reinitializeFromTemplate(AutoCreatedLeafQueueConfig //activate applications if any are pending activateApplications(); - } finally { - writeLock.unlock(); - } } private void mergeCapacities(QueueCapacities capacities) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index 3963dc0..1d664b8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -345,11 +345,6 @@ boolean accept(Resource cluster, void apply(Resource cluster, ResourceCommitRequest request); - /** - * Get readLock associated with the Queue. - * @return readLock of corresponding queue. - */ - public ReentrantReadWriteLock.ReadLock getReadLock(); /** * Validate submitApplication api so that moveApplication do a pre-check. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java index 2494000..ddc9835 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java @@ -73,11 +73,10 @@ public ManagedParentQueue(final CapacitySchedulerContext cs, } @Override - public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) + public synchronized void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) throws IOException { try { - writeLock.lock(); validate(newlyParsedQueue); shouldFailAutoCreationWhenGuaranteedCapacityExceeded = @@ -127,8 +126,6 @@ public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) LOG.error("Exception while computing policy changes for leaf queue : " + getQueueName(), ye); throw new IOException(ye); - } finally { - writeLock.unlock(); } } @@ -186,11 +183,8 @@ protected void validate(final CSQueue newlyParsedQueue) throws IOException { } @Override - public void addChildQueue(CSQueue childQueue) + public synchronized void addChildQueue(CSQueue childQueue) throws SchedulerDynamicEditException, IOException { - try { - writeLock.lock(); - if (childQueue == null || !(childQueue instanceof AutoCreatedLeafQueue)) { throw new SchedulerDynamicEditException( "Expected child queue to be an instance of AutoCreatedLeafQueue"); @@ -229,48 +223,35 @@ public void addChildQueue(CSQueue childQueue) queueManagementPolicy.getInitialLeafQueueConfiguration(leafQueue); leafQueue.reinitializeFromTemplate(initialLeafQueueTemplate); - } finally { - writeLock.unlock(); - } } - public List getScheduleableApplications() { - try { - readLock.lock(); + public synchronized List getScheduleableApplications() { + List apps = new ArrayList<>(); for (CSQueue childQueue : getChildQueues()) { apps.addAll(((LeafQueue) childQueue).getApplications()); } return Collections.unmodifiableList(apps); - } finally { - readLock.unlock(); - } } - public List getPendingApplications() { - try { - readLock.lock(); + public synchronized List getPendingApplications() { + List apps = new ArrayList<>(); for (CSQueue childQueue : getChildQueues()) { apps.addAll(((LeafQueue) childQueue).getPendingApplications()); } return Collections.unmodifiableList(apps); - } finally { - readLock.unlock(); - } + } - public List getAllApplications() { - try { - readLock.lock(); + public synchronized List getAllApplications() { + List apps = new ArrayList<>(); for (CSQueue childQueue : getChildQueues()) { apps.addAll(((LeafQueue) childQueue).getAllApplications()); } return Collections.unmodifiableList(apps); - } finally { - readLock.unlock(); - } + } public String getLeafQueueConfigPrefix(CapacitySchedulerConfiguration conf) { @@ -287,11 +268,9 @@ public boolean shouldFailAutoCreationWhenGuaranteedCapacityExceeded() { * * @param queueManagementChanges */ - public void validateAndApplyQueueManagementChanges( + public synchronized void validateAndApplyQueueManagementChanges( List queueManagementChanges) throws IOException, SchedulerDynamicEditException { - try { - writeLock.lock(); validateQueueManagementChanges(queueManagementChanges); @@ -303,9 +282,6 @@ public void validateAndApplyQueueManagementChanges( //acquires write lock on policy policy.commitQueueManagementChanges(queueManagementChanges); - } finally { - writeLock.unlock(); - } } public void validateQueueManagementChanges( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 7eb1c29..4a2af1f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -123,10 +123,8 @@ private String getQueueOrderingPolicyConfigName() { queueOrderingPolicy.getConfigName(); } - protected void setupQueueConfigs(Resource clusterResource) + protected synchronized void setupQueueConfigs(Resource clusterResource) throws IOException { - try { - writeLock.lock(); super.setupQueueConfigs(clusterResource); StringBuilder aclsString = new StringBuilder(); for (Map.Entry e : acls.entrySet()) { @@ -157,16 +155,11 @@ protected void setupQueueConfigs(Resource clusterResource) + ", reservationsContinueLooking=" + reservationsContinueLooking + ", orderingPolicy=" + getQueueOrderingPolicyConfigName() + ", priority=" + priority); - } finally { - writeLock.unlock(); - } } private static float PRECISION = 0.0005f; // 0.05% precision - void setChildQueues(Collection childQueues) { - try { - writeLock.lock(); + synchronized void setChildQueues(Collection childQueues) { // Validate float childCapacities = 0; Resource minResDefaultLabel = Resources.createResource(0, 0); @@ -248,16 +241,11 @@ void setChildQueues(Collection childQueues) { if (LOG.isDebugEnabled()) { LOG.debug("setChildQueues: " + getChildQueuesToPrint()); } - } finally { - writeLock.unlock(); - } } @Override - public QueueInfo getQueueInfo( + public synchronized QueueInfo getQueueInfo( boolean includeChildQueues, boolean recursive) { - try { - readLock.lock(); QueueInfo queueInfo = getQueueInfo(); List childQueuesInfo = new ArrayList<>(); @@ -270,16 +258,10 @@ public QueueInfo getQueueInfo( queueInfo.setChildQueues(childQueuesInfo); return queueInfo; - } finally { - readLock.unlock(); - } - } - private QueueUserACLInfo getUserAclInfo( + private synchronized QueueUserACLInfo getUserAclInfo( UserGroupInformation user) { - try { - readLock.lock(); QueueUserACLInfo userAclInfo = recordFactory.newRecordInstance( QueueUserACLInfo.class); List operations = new ArrayList(); @@ -292,17 +274,12 @@ private QueueUserACLInfo getUserAclInfo( userAclInfo.setQueueName(getQueueName()); userAclInfo.setUserAcls(operations); return userAclInfo; - } finally { - readLock.unlock(); - } } @Override - public List getQueueUserAclInfo( + public synchronized List getQueueUserAclInfo( UserGroupInformation user) { - try { - readLock.lock(); List userAcls = new ArrayList<>(); // Add parent queue acls @@ -314,10 +291,6 @@ private QueueUserACLInfo getUserAclInfo( } return userAcls; - } finally { - readLock.unlock(); - } - } public String toString() { @@ -332,10 +305,8 @@ public String toString() { } @Override - public void reinitialize(CSQueue newlyParsedQueue, + public synchronized void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) throws IOException { - try { - writeLock.lock(); // Sanity check if (!(newlyParsedQueue instanceof ParentQueue) || !newlyParsedQueue .getQueuePath().equals(getQueuePath())) { @@ -413,9 +384,6 @@ public void reinitialize(CSQueue newlyParsedQueue, // Make sure we notifies QueueOrderingPolicy queueOrderingPolicy.setQueues(childQueues); - } finally { - writeLock.unlock(); - } } private Map getQueuesMap(List queues) { @@ -430,14 +398,11 @@ public void reinitialize(CSQueue newlyParsedQueue, public void submitApplication(ApplicationId applicationId, String user, String queue) throws AccessControlException { - try { - writeLock.lock(); + synchronized (this){ // Sanity check validateSubmitApplication(applicationId, user, queue); addApplication(applicationId, user); - } finally { - writeLock.unlock(); } // Inform the parent queue @@ -453,10 +418,9 @@ public void submitApplication(ApplicationId applicationId, String user, } } - public void validateSubmitApplication(ApplicationId applicationId, + public synchronized void validateSubmitApplication(ApplicationId applicationId, String userName, String queue) throws AccessControlException { - try { - writeLock.lock(); + if (queue.equals(queueName)) { throw new AccessControlException( "Cannot submit application " + "to non-leaf queue: " + queueName); @@ -467,9 +431,6 @@ public void validateSubmitApplication(ApplicationId applicationId, + " is STOPPED. Cannot accept submission of application: " + applicationId); } - } finally { - writeLock.unlock(); - } } @Override @@ -484,20 +445,15 @@ public void finishApplicationAttempt(FiCaSchedulerApp application, // finish attempt logic. } - private void addApplication(ApplicationId applicationId, + private synchronized void addApplication(ApplicationId applicationId, String user) { - try { - writeLock.lock(); ++numApplications; LOG.info( "Application added -" + " appId: " + applicationId + " user: " + user + " leaf-queue of parent: " + getQueueName() + " #applications: " + getNumApplications()); - } finally { - writeLock.unlock(); - } } @Override @@ -513,18 +469,14 @@ public void finishApplication(ApplicationId application, String user) { } } - private void removeApplication(ApplicationId applicationId, + private synchronized void removeApplication(ApplicationId applicationId, String user) { - try { - writeLock.lock(); + --numApplications; LOG.info("Application removed -" + " appId: " + applicationId + " user: " + user + " leaf-queue of parent: " + getQueueName() + " #applications: " + getNumApplications()); - } finally { - writeLock.unlock(); - } } private String getParentName() { @@ -851,10 +803,8 @@ private void printChildQueues() { } } - private void internalReleaseResource(Resource clusterResource, + private synchronized void internalReleaseResource(Resource clusterResource, FiCaSchedulerNode node, Resource releasedResource) { - try { - writeLock.lock(); super.releaseResource(clusterResource, releasedResource, node.getPartition()); @@ -862,10 +812,6 @@ private void internalReleaseResource(Resource clusterResource, LOG.debug( "completedContainer " + this + ", cluster=" + clusterResource); } - - } finally { - writeLock.unlock(); - } } @Override @@ -888,10 +834,8 @@ public void completedContainer(Resource clusterResource, } @Override - public void updateClusterResource(Resource clusterResource, + public synchronized void updateClusterResource(Resource clusterResource, ResourceLimits resourceLimits) { - try { - writeLock.lock(); // Update effective capacity in all parent queue. Set configuredNodelabels = csContext.getConfiguration() @@ -911,9 +855,6 @@ public void updateClusterResource(Resource clusterResource, CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, this, labelManager, null); - } finally { - writeLock.unlock(); - } } @Override @@ -1127,14 +1068,8 @@ private void deriveCapacityFromAbsoluteConfigurations(String label, } @Override - public List getChildQueues() { - try { - readLock.lock(); + public synchronized List getChildQueues() { return new ArrayList(childQueues); - } finally { - readLock.unlock(); - } - } @Override @@ -1148,14 +1083,11 @@ public void recoverContainer(Resource clusterResource, } // Careful! Locking order is important! - try { - writeLock.lock(); + synchronized (this){ FiCaSchedulerNode node = scheduler.getNode( rmContainer.getContainer().getNodeId()); allocateResource(clusterResource, rmContainer.getContainer().getResource(), node.getPartition()); - } finally { - writeLock.unlock(); } if (parent != null) { @@ -1170,16 +1102,11 @@ public ActiveUsersManager getAbstractUsersManager() { } @Override - public void collectSchedulerApplications( + public synchronized void collectSchedulerApplications( Collection apps) { - try { - readLock.lock(); for (CSQueue queue : childQueues) { queue.collectSchedulerApplications(apps); } - } finally { - readLock.unlock(); - } } @@ -1226,10 +1153,8 @@ public int getNumApplications() { return numApplications; } - void allocateResource(Resource clusterResource, + synchronized void allocateResource(Resource clusterResource, Resource resource, String nodePartition) { - try { - writeLock.lock(); super.allocateResource(clusterResource, resource, nodePartition); /** @@ -1274,9 +1199,6 @@ void allocateResource(Resource clusterResource, clusterResource); } } - } finally { - writeLock.unlock(); - } } private void killContainersToEnforceMaxQueueCapacity(String partition, @@ -1326,8 +1248,7 @@ public void apply(Resource cluster, // Do not modify queue when allocation from reserved container if (allocation.getAllocateFromReservedContainer() == null) { - try { - writeLock.lock(); + synchronized (this){ // Book-keeping // Note: Update headroom to account for current allocation too... allocateResource(cluster, allocation.getAllocatedOrReservedResource(), @@ -1337,8 +1258,6 @@ public void apply(Resource cluster, + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed() + " cluster=" + cluster); - } finally { - writeLock.unlock(); } } } @@ -1349,9 +1268,7 @@ public void apply(Resource cluster, } @Override - public void stopQueue() { - try { - this.writeLock.lock(); + public synchronized void stopQueue() { if (getNumApplications() > 0) { updateQueueState(QueueState.DRAINING); } else { @@ -1362,9 +1279,6 @@ public void stopQueue() { child.stopQueue(); } } - } finally { - this.writeLock.unlock(); - } } public QueueOrderingPolicy getQueueOrderingPolicy() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java index 757002f..b621699 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java @@ -79,10 +79,8 @@ public PlanQueue(CapacitySchedulerContext cs, String queueName, } @Override - public void reinitialize(CSQueue newlyParsedQueue, + public synchronized void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) throws IOException { - try { - writeLock.lock(); // Sanity check if (!(newlyParsedQueue instanceof PlanQueue) || !newlyParsedQueue .getQueuePath().equals(getQueuePath())) { @@ -114,9 +112,6 @@ public void reinitialize(CSQueue newlyParsedQueue, } showReservationsAsQueues = newlyParsedParentQueue.showReservationsAsQueues; - } finally { - writeLock.unlock(); - } } private void updateQuotas(int userLimit, float userLimitFactor, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java index 34f4aa1..7417d36 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java @@ -51,10 +51,8 @@ public ReservationQueue(CapacitySchedulerContext cs, String queueName, } @Override - public void reinitialize(CSQueue newlyParsedQueue, + public synchronized void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) throws IOException { - try { - writeLock.lock(); // Sanity check if (!(newlyParsedQueue instanceof ReservationQueue) || !newlyParsedQueue .getQueuePath().equals(getQueuePath())) { @@ -70,9 +68,6 @@ public void reinitialize(CSQueue newlyParsedQueue, parent.getUserLimitFactor(), parent.getMaxApplicationsForReservations(), parent.getMaxApplicationsPerUserForReservation()); - } finally { - writeLock.unlock(); - } } private void updateQuotas(int userLimit, float userLimitFactor, -- 2.23.0.windows.1