diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index fc0fbb4..d57f924 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -22,6 +22,9 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -64,13 +67,13 @@ Set accessibleLabels; RMNodeLabelsManager labelManager; String defaultLabelExpression; - Resource usedResources = Resources.createResource(0, 0); QueueInfo queueInfo; Map absoluteCapacityByNodeLabels; Map capacitiyByNodeLabels; Map usedResourcesByNodeLabels = new HashMap(); Map absoluteMaxCapacityByNodeLabels; Map maxCapacityByNodeLabels; + QueueResourceInfo queueResourceInfo = new QueueResourceInfo(); Map acls = new HashMap(); @@ -79,6 +82,53 @@ private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + /** + * This class will manage used resource (by label), maximum capacity (by label), + * etc. in each queue. + */ + public class QueueResourceInfo { + Resource usedResource = Resource.newInstance(0, 0); + ReadLock readLock; + WriteLock writeLock; + + public QueueResourceInfo() { + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); + } + + public Resource getUsedResource() { + try { + readLock.lock(); + return usedResource; + } finally { + readLock.unlock(); + } + } + + public void addToUsedResource(Resource res) { + try { + writeLock.lock(); + // Create a new resource object when we need to change resource because + // it is possible thread#1 get the resource for read, and at the same + // time thread#2 is writing to resource. Thread#1 may read incorrect or + // corrupt resource. + usedResource = Resources.add(usedResource, res); + } finally { + writeLock.unlock(); + } + } + + public void subtractFromUsedResource(Resource res) { + try { + writeLock.lock(); + usedResource = Resources.subtract(usedResource, res); + } finally { + writeLock.unlock(); + } + } + } + public AbstractCSQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { this.minimumAllocation = cs.getMinimumResourceCapability(); @@ -157,8 +207,8 @@ public synchronized float getUsedCapacity() { } @Override - public synchronized Resource getUsedResources() { - return usedResources; + public Resource getUsedResources() { + return queueResourceInfo.getUsedResource(); } public synchronized int getNumContainers() { @@ -342,7 +392,7 @@ public Resource getMinimumAllocation() { synchronized void allocateResource(Resource clusterResource, Resource resource, Set nodeLabels) { - Resources.addTo(usedResources, resource); + queueResourceInfo.addToUsedResource(resource); // Update usedResources by labels if (nodeLabels == null || nodeLabels.isEmpty()) { @@ -369,7 +419,7 @@ synchronized void allocateResource(Resource clusterResource, protected synchronized void releaseResource(Resource clusterResource, Resource resource, Set nodeLabels) { // Update queue metrics - Resources.subtractFrom(usedResources, resource); + queueResourceInfo.subtractFromUsedResource(resource); // Update usedResources by labels if (null == nodeLabels || nodeLabels.isEmpty()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index ffeec63..8cd8892 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -463,7 +463,7 @@ public String toString() { return queueName + ": " + "capacity=" + capacity + ", " + "absoluteCapacity=" + absoluteCapacity + ", " + - "usedResources=" + usedResources + ", " + + "usedResources=" + getUsedResources() + ", " + "usedCapacity=" + getUsedCapacity() + ", " + "absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + ", " + "numApps=" + getNumApplications() + ", " + @@ -911,7 +911,7 @@ private Resource getHeadroom(User user, Resource queueMaxCap, Resource headroom = Resources.min(resourceCalculator, clusterResource, Resources.subtract(userLimit, user.getTotalConsumedResources()), - Resources.subtract(queueMaxCap, usedResources) + Resources.subtract(queueMaxCap, getUsedResources()) ); return headroom; } @@ -959,14 +959,14 @@ synchronized boolean canAssignToThisQueue(Resource clusterResource, LOG.debug("try to use reserved: " + getQueueName() + " usedResources: " - + usedResources + + getUsedResources() + " clusterResources: " + clusterResource + " reservedResources: " + application.getCurrentReservation() + " currentCapacity " + Resources.divide(resourceCalculator, clusterResource, - usedResources, clusterResource) + " required " + required + getUsedResources(), clusterResource) + " required " + required + " potentialNewWithoutReservedCapacity: " + potentialNewWithoutReservedCapacity + " ( " + " max-capacity: " + absoluteMaxCapacity + ")"); @@ -1092,8 +1092,8 @@ private Resource computeUserLimit(FiCaSchedulerApp application, Resource currentCapacity = Resources.lessThan(resourceCalculator, clusterResource, - usedResources, queueCapacity) ? - queueCapacity : Resources.add(usedResources, required); + getUsedResources(), queueCapacity) ? + queueCapacity : Resources.add(getUsedResources(), required); // Never allow a single user to take more than the // queue's configured capacity * user-limit-factor. @@ -1131,7 +1131,7 @@ private Resource computeUserLimit(FiCaSchedulerApp application, " consumed: " + user.getTotalConsumedResources() + " limit: " + limit + " queueCapacity: " + queueCapacity + - " qconsumed: " + usedResources + + " qconsumed: " + getUsedResources() + " currentCapacity: " + currentCapacity + " activeUsers: " + activeUsers + " clusterCapacity: " + clusterResource @@ -1601,7 +1601,7 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod " queue=" + this.toString() + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + - " used=" + usedResources + + " used=" + getUsedResources() + " cluster=" + clusterResource); return request.getCapability(); @@ -1703,7 +1703,7 @@ synchronized void allocateResource(Resource clusterResource, if (LOG.isDebugEnabled()) { LOG.info(getQueueName() + " user=" + userName + - " used=" + usedResources + " numContainers=" + numContainers + + " used=" + getUsedResources() + " numContainers=" + numContainers + " headroom = " + application.getHeadroom() + " user-resources=" + user.getTotalConsumedResources() ); @@ -1721,7 +1721,7 @@ synchronized void releaseResource(Resource clusterResource, metrics.setAvailableResourcesToUser(userName, application.getHeadroom()); LOG.info(getQueueName() + - " used=" + usedResources + " numContainers=" + numContainers + + " used=" + getUsedResources() + " numContainers=" + numContainers + " user=" + userName + " user-resources=" + user.getTotalConsumedResources()); } @@ -1908,7 +1908,7 @@ public void attachContainer(Resource clusterResource, + " resource=" + rmContainer.getContainer().getResource() + " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used=" - + usedResources + " cluster=" + clusterResource); + + getUsedResources() + " cluster=" + clusterResource); // Inform the parent queue getParent().attachContainer(clusterResource, application, rmContainer); } @@ -1925,7 +1925,7 @@ public void detachContainer(Resource clusterResource, + " resource=" + rmContainer.getContainer().getResource() + " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used=" - + usedResources + " cluster=" + clusterResource); + + getUsedResources() + " cluster=" + clusterResource); // Inform the parent queue getParent().detachContainer(clusterResource, application, rmContainer); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 6ffaf4c..a8ed240 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -257,7 +257,7 @@ public String toString() { "numChildQueue= " + childQueues.size() + ", " + "capacity=" + capacity + ", " + "absoluteCapacity=" + absoluteCapacity + ", " + - "usedResources=" + usedResources + + "usedResources=" + getUsedResources() + "usedCapacity=" + getUsedCapacity() + ", " + "numApps=" + getNumApplications() + ", " + "numContainers=" + getNumContainers(); @@ -465,7 +465,7 @@ public synchronized CSAssignment assignContainers( " queue=" + getQueueName() + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + - " used=" + usedResources + + " used=" + getUsedResources() + " cluster=" + clusterResource); } else { @@ -519,7 +519,7 @@ private synchronized boolean canAssignToThisQueue(Resource clusterResource, if (currentAbsoluteLabelUsedCapacity >= getAbsoluteMaximumCapacityByNodeLabel(label)) { if (LOG.isDebugEnabled()) { - LOG.debug(getQueueName() + " used=" + usedResources + LOG.debug(getQueueName() + " used=" + getUsedResources() + " current-capacity (" + usedResourcesByNodeLabels.get(label) + ") " + " >= max-capacity (" + labelManager.getResourceByLabel(label, clusterResource) + ")"); @@ -542,16 +542,16 @@ private synchronized boolean assignToQueueIfUnreserve(Resource clusterResource) .getReservedMB(), getMetrics().getReservedVirtualCores()); float capacityWithoutReservedCapacity = Resources.divide( resourceCalculator, clusterResource, - Resources.subtract(usedResources, reservedResources), + Resources.subtract(getUsedResources(), reservedResources), clusterResource); if (capacityWithoutReservedCapacity <= absoluteMaxCapacity) { if (LOG.isDebugEnabled()) { LOG.debug("parent: try to use reserved: " + getQueueName() - + " usedResources: " + usedResources.getMemory() + + " usedResources: " + getUsedResources().getMemory() + " clusterResources: " + clusterResource.getMemory() + " reservedResources: " + reservedResources.getMemory() - + " currentCapacity " + ((float) usedResources.getMemory()) + + " currentCapacity " + ((float) getUsedResources().getMemory()) / clusterResource.getMemory() + " potentialNewWithoutReservedCapacity: " + capacityWithoutReservedCapacity + " ( " + " max-capacity: " @@ -647,7 +647,7 @@ public void completedContainer(Resource clusterResource, " queue=" + getQueueName() + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + - " used=" + usedResources + + " used=" + getUsedResources() + " cluster=" + clusterResource); } @@ -735,7 +735,7 @@ public void attachContainer(Resource clusterResource, .getContainer().getNodeId())); LOG.info("movedContainer" + " queueMoveIn=" + getQueueName() + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" - + getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster=" + + getAbsoluteUsedCapacity() + " used=" + getUsedResources() + " cluster=" + clusterResource); // Inform the parent if (parent != null) { @@ -753,7 +753,7 @@ public void detachContainer(Resource clusterResource, labelManager.getLabelsOnNode(rmContainer.getContainer().getNodeId())); LOG.info("movedContainer" + " queueMoveOut=" + getQueueName() + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" - + getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster=" + + getAbsoluteUsedCapacity() + " used=" + getUsedResources() + " cluster=" + clusterResource); // Inform the parent if (parent != null) {