From b90e2d8dda554a26fcd9c94ad2b29518bf80a697 Mon Sep 17 00:00:00 2001 From: Sunil Date: Thu, 10 Nov 2016 19:40:20 +0530 Subject: [PATCH] YARN-5825 --- .../monitor/capacity/FifoCandidatesSelector.java | 5 ++++- .../monitor/capacity/IntraQueueCandidatesSelector.java | 5 ++++- .../capacity/ProportionalCapacityPreemptionPolicy.java | 14 +++++++++++--- .../scheduler/capacity/AbstractCSQueue.java | 10 +++++++++- .../server/resourcemanager/scheduler/capacity/CSQueue.java | 7 +++++++ 5 files changed, 35 insertions(+), 6 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java index 39336a4..f4d7e92 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java @@ -89,7 +89,8 @@ .getResToObtainByPartitionForLeafQueue(preemptionContext, queueName, clusterResource); - synchronized (leafQueue) { + try { + leafQueue.getReadLock().lock(); // go through all ignore-partition-exclusivity containers first to make // sure such containers will be preemptionCandidates first Map> ignorePartitionExclusivityContainers = @@ -147,6 +148,8 @@ preemptAMContainers(clusterResource, selectedCandidates, skippedAMContainerlist, resToObtainByPartition, skippedAMSize, maxAMCapacityForThisQueue, totalPreemptionAllowed); + } finally { + leafQueue.getReadLock().unlock(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java index 039b53e..4f2b272 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java @@ -118,7 +118,8 @@ public int compare(TempAppPerPartition tq1, TempAppPerPartition tq2) { // 6. Based on the selected resource demand per partition, select // containers with known policy from inter-queue preemption. - synchronized (leafQueue) { + try { + leafQueue.getReadLock().lock(); Iterator desc = leafQueue.getOrderingPolicy() .getPreemptionIterator(); while (desc.hasNext()) { @@ -127,6 +128,8 @@ public int compare(TempAppPerPartition tq1, TempAppPerPartition tq2) { totalPreemptedResourceAllowed, resToObtainByPartition, leafQueue, app); } + } finally { + leafQueue.getReadLock().unlock(); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 5347074..2a5662d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptableQueue; @@ -56,6 +57,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * This class implement a {@link SchedulingEditPolicy} that is designed to be @@ -430,15 +432,18 @@ public String getPolicyName() { private TempQueuePerPartition cloneQueues(CSQueue curQueue, Resource partitionResource, String partitionToLookAt) { TempQueuePerPartition ret; - synchronized (curQueue) { + try { + // Acquire a read lock from Parent/LeafQueue. + curQueue.getReadLock().lock(); + String queueName = curQueue.getQueueName(); QueueCapacities qc = curQueue.getQueueCapacities(); float absCap = qc.getAbsoluteCapacity(partitionToLookAt); float absMaxCap = qc.getAbsoluteMaximumCapacity(partitionToLookAt); boolean preemptionDisabled = curQueue.getPreemptionDisabled(); - Resource current = Resources.clone( - curQueue.getQueueResourceUsage().getUsed(partitionToLookAt)); + Resource current = Resources + .clone(curQueue.getQueueResourceUsage().getUsed(partitionToLookAt)); Resource killable = Resources.none(); Resource reserved = Resources.clone( @@ -472,7 +477,10 @@ private TempQueuePerPartition cloneQueues(CSQueue curQueue, ret.addChild(subq); } } + } finally { + curQueue.getReadLock().unlock(); } + addTempQueuePartition(ret); return ret; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 7e18b29..d03103e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -432,7 +432,6 @@ public boolean getReservationContinueLooking() { } finally { readLock.unlock(); } - } @Private @@ -450,6 +449,15 @@ public ResourceUsage getQueueResourceUsage() { return queueUsage; } + public ReentrantReadWriteLock.WriteLock getWriteLock() { + return writeLock; + } + + @Override + public ReentrantReadWriteLock.ReadLock getReadLock() { + return readLock; + } + /** * The specified queue is preemptable if system-wide preemption is turned on * unless any queue in the qPath hierarchy has explicitly turned diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index e5cbd04..baf60e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.List; import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -355,4 +356,10 @@ boolean accept(Resource cluster, void apply(Resource cluster, ResourceCommitRequest request); + + /** + * Get readLock associated with the Queue. + * @return readLock of corresponding queue. + */ + public ReentrantReadWriteLock.ReadLock getReadLock(); } -- 2.7.4 (Apple Git-66)