diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java new file mode 100644 index 0000000..5a4cced --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java @@ -0,0 +1,332 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.util.resource.Resources; + +/** + * Resource Usage by Labels for following fields by label - AM resource (to + * enforce max-am-resource-by-label after YARN-2637) - Used resource (includes + * AM resource usage) - Reserved resource - Pending resource - Headroom + * + * This class can be used to track resource usage in queue/user/app. + * + * And it is thread-safe + */ +public class ResourceUsage { + private ReadLock readLock; + private WriteLock writeLock; + private Map usages; + // short for no-label :) + private static final String NL = CommonNodeLabelsManager.NO_LABEL; + + public ResourceUsage() { + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); + + usages = new HashMap(); + } + + // Usage enum here to make implement cleaner + private enum ResourceType { + USED(0), PENDING(1), AMUSED(2), RESERVED(3), HEADROOM(4); + + private int value; + + private ResourceType(int value) { + this.value = value; + } + + public int getValue() { + return this.value; + } + } + + private static class UsageByLabel { + // usage by label, contains all UsageType + private Resource[] resArr; + + public UsageByLabel(String label) { + resArr = new Resource[ResourceType.values().length]; + for (int i = 0; i < resArr.length; i++) { + resArr[i] = Resource.newInstance(0, 0); + } + } + + public Resource get(ResourceType type) { + return resArr[type.getValue()]; + } + + public void set(ResourceType type, Resource res) { + resArr[type.getValue()] = res; + } + + public void inc(ResourceType type, Resource res) { + Resources.addTo(resArr[type.getValue()], res); + } + + public void dec(ResourceType type, Resource res) { + Resources.subtractFrom(resArr[type.getValue()], res); + } + } + + /* + * Used + */ + public Resource getUsed() { + return getUsed(NL); + } + + public Resource getUsed(String label) { + return internalGet(label, ResourceType.USED); + } + + public void incUsed(String label, Resource res) { + internalInc(label, ResourceType.USED, res); + } + + public void incUsed(Resource res) { + incUsed(NL, res); + } + + public void decUsed(Resource res) { + decUsed(NL, res); + } + + public void decUsed(String label, Resource res) { + internalDec(label, ResourceType.USED, res); + } + + public void setUsed(Resource res) { + setUsed(NL, res); + } + + public void setUsed(String label, Resource res) { + internalSet(label, ResourceType.USED, res); + } + + /* + * Pending + */ + public Resource getPending() { + return getPending(NL); + } + + public Resource getPending(String label) { + return internalGet(label, ResourceType.PENDING); + } + + public void incPending(String label, Resource res) { + internalInc(label, ResourceType.PENDING, res); + } + + public void incPending(Resource res) { + incPending(NL, res); + } + + public void decPending(Resource res) { + decPending(NL, res); + } + + public void decPending(String label, Resource res) { + internalDec(label, ResourceType.PENDING, res); + } + + public void setPending(Resource res) { + setPending(NL, res); + } + + public void setPending(String label, Resource res) { + internalSet(label, ResourceType.PENDING, res); + } + + /* + * Reserved + */ + public Resource getReserved() { + return getReserved(NL); + } + + public Resource getReserved(String label) { + return internalGet(label, ResourceType.RESERVED); + } + + public void incReserved(String label, Resource res) { + internalInc(label, ResourceType.RESERVED, res); + } + + public void incReserved(Resource res) { + incReserved(NL, res); + } + + public void decReserved(Resource res) { + decReserved(NL, res); + } + + public void decReserved(String label, Resource res) { + internalDec(label, ResourceType.RESERVED, res); + } + + public void setReserved(Resource res) { + setReserved(NL, res); + } + + public void setReserved(String label, Resource res) { + internalSet(label, ResourceType.RESERVED, res); + } + + /* + * Headroom + */ + public Resource getHeadroom() { + return getHeadroom(NL); + } + + public Resource getHeadroom(String label) { + return internalGet(label, ResourceType.HEADROOM); + } + + public void incHeadroom(String label, Resource res) { + internalInc(label, ResourceType.HEADROOM, res); + } + + public void incHeadroom(Resource res) { + incHeadroom(NL, res); + } + + public void decHeadroom(Resource res) { + decHeadroom(NL, res); + } + + public void decHeadroom(String label, Resource res) { + internalDec(label, ResourceType.HEADROOM, res); + } + + public void setHeadroom(Resource res) { + setHeadroom(NL, res); + } + + public void setHeadroom(String label, Resource res) { + internalSet(label, ResourceType.HEADROOM, res); + } + + /* + * AM-Used + */ + public Resource getAMUsed() { + return getAMUsed(NL); + } + + public Resource getAMUsed(String label) { + return internalGet(label, ResourceType.AMUSED); + } + + public void incAMUsed(String label, Resource res) { + internalInc(label, ResourceType.AMUSED, res); + } + + public void incAMUsed(Resource res) { + incAMUsed(NL, res); + } + + public void decAMUsed(Resource res) { + decAMUsed(NL, res); + } + + public void decAMUsed(String label, Resource res) { + internalDec(label, ResourceType.AMUSED, res); + } + + public void setAMUsed(Resource res) { + setAMUsed(NL, res); + } + + public void setAMUsed(String label, Resource res) { + internalSet(label, ResourceType.AMUSED, res); + } + + private static Resource normalize(Resource res) { + if (res == null) { + return Resources.none(); + } + return res; + } + + private Resource internalGet(String label, ResourceType type) { + try { + readLock.lock(); + UsageByLabel usage = usages.get(label); + if (null == usage) { + return Resources.none(); + } + return normalize(usage.get(type)); + } finally { + readLock.unlock(); + } + } + + private UsageByLabel getAndAddIfMissing(String label) { + if (!usages.containsKey(label)) { + UsageByLabel u = new UsageByLabel(label); + usages.put(label, u); + return u; + } + + return usages.get(label); + } + + private void internalSet(String label, ResourceType type, Resource res) { + try { + writeLock.lock(); + UsageByLabel usage = getAndAddIfMissing(label); + usage.set(type, res); + } finally { + writeLock.unlock(); + } + } + + private void internalInc(String label, ResourceType type, Resource res) { + try { + writeLock.lock(); + UsageByLabel usage = getAndAddIfMissing(label); + usage.inc(type, res); + } finally { + writeLock.unlock(); + } + } + + private void internalDec(String label, ResourceType type, Resource res) { + try { + writeLock.lock(); + UsageByLabel usage = getAndAddIfMissing(label); + usage.dec(type, res); + } finally { + writeLock.unlock(); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index fec3a56..7eb85bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -35,10 +35,11 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; -import org.apache.hadoop.yarn.util.resource.Resources; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Sets; public abstract class AbstractCSQueue implements CSQueue { @@ -64,10 +65,8 @@ Set accessibleLabels; RMNodeLabelsManager labelManager; String defaultLabelExpression; - Resource usedResources = Resources.createResource(0, 0); Map absoluteCapacityByNodeLabels; Map capacitiyByNodeLabels; - Map usedResourcesByNodeLabels = new HashMap(); Map absoluteMaxCapacityByNodeLabels; Map maxCapacityByNodeLabels; @@ -75,6 +74,9 @@ new HashMap(); boolean reservationsContinueLooking; + // Track resource usage-by-label like used-resource/pending-resource, etc. + ResourceUsage usage; + private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -120,6 +122,8 @@ public AbstractCSQueue(CapacitySchedulerContext cs, maxCapacityByNodeLabels = cs.getConfiguration().getMaximumNodeLabelCapacities(getQueuePath(), accessibleLabels, labelManager); + + usage = new ResourceUsage(); } @Override @@ -153,8 +157,8 @@ public synchronized float getUsedCapacity() { } @Override - public synchronized Resource getUsedResources() { - return usedResources; + public Resource getUsedResources() { + return usage.getUsed(); } public synchronized int getNumContainers() { @@ -344,22 +348,13 @@ public Resource getMinimumAllocation() { synchronized void allocateResource(Resource clusterResource, Resource resource, Set nodeLabels) { - Resources.addTo(usedResources, resource); // Update usedResources by labels if (nodeLabels == null || nodeLabels.isEmpty()) { - if (!usedResourcesByNodeLabels.containsKey(RMNodeLabelsManager.NO_LABEL)) { - usedResourcesByNodeLabels.put(RMNodeLabelsManager.NO_LABEL, - Resources.createResource(0)); - } - Resources.addTo(usedResourcesByNodeLabels.get(RMNodeLabelsManager.NO_LABEL), - resource); + usage.incUsed(resource); } else { for (String label : Sets.intersection(accessibleLabels, nodeLabels)) { - if (!usedResourcesByNodeLabels.containsKey(label)) { - usedResourcesByNodeLabels.put(label, Resources.createResource(0)); - } - Resources.addTo(usedResourcesByNodeLabels.get(label), resource); + usage.incUsed(label, resource); } } @@ -370,23 +365,12 @@ synchronized void allocateResource(Resource clusterResource, protected synchronized void releaseResource(Resource clusterResource, Resource resource, Set nodeLabels) { - // Update queue metrics - Resources.subtractFrom(usedResources, resource); - // Update usedResources by labels if (null == nodeLabels || nodeLabels.isEmpty()) { - if (!usedResourcesByNodeLabels.containsKey(RMNodeLabelsManager.NO_LABEL)) { - usedResourcesByNodeLabels.put(RMNodeLabelsManager.NO_LABEL, - Resources.createResource(0)); - } - Resources.subtractFrom( - usedResourcesByNodeLabels.get(RMNodeLabelsManager.NO_LABEL), resource); + usage.decUsed(resource); } else { for (String label : Sets.intersection(accessibleLabels, nodeLabels)) { - if (!usedResourcesByNodeLabels.containsKey(label)) { - usedResourcesByNodeLabels.put(label, Resources.createResource(0)); - } - Resources.subtractFrom(usedResourcesByNodeLabels.get(label), resource); + usage.decUsed(label, resource); } } @@ -452,6 +436,11 @@ public boolean getReservationContinueLooking() { @Private public Resource getUsedResourceByLabel(String nodeLabel) { - return usedResourcesByNodeLabels.get(nodeLabel); + return usage.getUsed(nodeLabel); + } + + @VisibleForTesting + public ResourceUsage getResourceUsage() { + return usage; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index fd8a7ee..51bbe82 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; @@ -115,10 +116,6 @@ private final QueueHeadroomInfo queueHeadroomInfo = new QueueHeadroomInfo(); - // sum of resources used by application masters for applications - // running in this queue - private final Resource usedAMResources = Resource.newInstance(0, 0); - public LeafQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { super(cs, queueName, parent, old); @@ -434,7 +431,7 @@ public String toString() { return queueName + ": " + "capacity=" + capacity + ", " + "absoluteCapacity=" + absoluteCapacity + ", " + - "usedResources=" + usedResources + ", " + + "usedResources=" + usage.getUsed() + ", " + "usedCapacity=" + getUsedCapacity() + ", " + "absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + ", " + "numApps=" + getNumApplications() + ", " + @@ -463,7 +460,7 @@ public synchronized User getUser(String userName) { ArrayList usersToReturn = new ArrayList(); for (Map.Entry entry: users.entrySet()) { usersToReturn.add(new UserInfo(entry.getKey(), Resources.clone( - entry.getValue().consumed), entry.getValue().getActiveApplications(), + entry.getValue().getUsed()), entry.getValue().getActiveApplications(), entry.getValue().getPendingApplications())); } return usersToReturn; @@ -632,7 +629,7 @@ private synchronized void activateApplications() { // Check am resource limit Resource amIfStarted = - Resources.add(application.getAMResource(), usedAMResources); + Resources.add(application.getAMResource(), usage.getAMUsed()); if (LOG.isDebugEnabled()) { LOG.debug("application AMResource " + application.getAMResource() + @@ -677,9 +674,8 @@ private synchronized void activateApplications() { } user.activateApplication(); activeApplications.add(application); - Resources.addTo(usedAMResources, application.getAMResource()); - Resources.addTo(user.getConsumedAMResources(), - application.getAMResource()); + usage.incAMUsed(application.getAMResource()); + user.getResourceUsage().incAMUsed(application.getAMResource()); i.remove(); LOG.info("Application " + application.getApplicationId() + " from user: " + application.getUser() + @@ -730,9 +726,8 @@ public synchronized void removeApplicationAttempt( if (!wasActive) { pendingApplications.remove(application); } else { - Resources.subtractFrom(usedAMResources, application.getAMResource()); - Resources.subtractFrom(user.getConsumedAMResources(), - application.getAMResource()); + usage.decAMUsed(application.getAMResource()); + user.getResourceUsage().decAMUsed(application.getAMResource()); } applicationAttemptMap.remove(application.getApplicationAttemptId()); @@ -971,8 +966,8 @@ private Resource getHeadroom(User user, Resource queueMaxCap, */ Resource headroom = Resources.min(resourceCalculator, clusterResource, - Resources.subtract(userLimit, user.getTotalConsumedResources()), - Resources.subtract(queueMaxCap, usedResources) + Resources.subtract(userLimit, user.getUsed()), + Resources.subtract(queueMaxCap, usage.getUsed()) ); return headroom; } @@ -992,12 +987,8 @@ synchronized boolean canAssignToThisQueue(Resource clusterResource, boolean canAssign = true; for (String label : labelCanAccess) { - if (!usedResourcesByNodeLabels.containsKey(label)) { - usedResourcesByNodeLabels.put(label, Resources.createResource(0)); - } - Resource potentialTotalCapacity = - Resources.add(usedResourcesByNodeLabels.get(label), required); + Resources.add(usage.getUsed(label), required); float potentialNewCapacity = Resources.divide(resourceCalculator, clusterResource, @@ -1020,14 +1011,14 @@ synchronized boolean canAssignToThisQueue(Resource clusterResource, LOG.debug("try to use reserved: " + getQueueName() + " usedResources: " - + usedResources + + usage.getUsed() + " clusterResources: " + clusterResource + " reservedResources: " + application.getCurrentReservation() + " currentCapacity " + Resources.divide(resourceCalculator, clusterResource, - usedResources, clusterResource) + " required " + required + usage.getUsed(), clusterResource) + " required " + required + " potentialNewWithoutReservedCapacity: " + potentialNewWithoutReservedCapacity + " ( " + " max-capacity: " + absoluteMaxCapacity + ")"); @@ -1047,11 +1038,11 @@ synchronized boolean canAssignToThisQueue(Resource clusterResource, if (LOG.isDebugEnabled()) { LOG.debug(getQueueName() + "Check assign to queue, label=" + label - + " usedResources: " + usedResourcesByNodeLabels.get(label) + + " usedResources: " + usage.getUsed() + " clusterResources: " + clusterResource + " currentCapacity " + Resources.divide(resourceCalculator, clusterResource, - usedResourcesByNodeLabels.get(label), + usage.getUsed(label), labelManager.getResourceByLabel(label, clusterResource)) + " potentialNewCapacity: " + potentialNewCapacity + " ( " + " max-capacity: " + absoluteMaxCapacity + ")"); @@ -1108,7 +1099,7 @@ Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application, LOG.debug("Headroom calculation for user " + user + ": " + " userLimit=" + userLimit + " queueMaxCap=" + queueMaxCap + - " consumed=" + queueUser.getTotalConsumedResources() + + " consumed=" + queueUser.getUsed() + " headroom=" + headroom); } @@ -1163,8 +1154,8 @@ private Resource computeUserLimit(FiCaSchedulerApp application, Resource currentCapacity = Resources.lessThan(resourceCalculator, clusterResource, - usedResources, queueCapacity) ? - queueCapacity : Resources.add(usedResources, required); + usage.getUsed(), queueCapacity) ? + queueCapacity : Resources.add(usage.getUsed(), required); // Never allow a single user to take more than the // queue's configured capacity * user-limit-factor. @@ -1199,10 +1190,10 @@ private Resource computeUserLimit(FiCaSchedulerApp application, " userLimit=" + userLimit + " userLimitFactor=" + userLimitFactor + " required: " + required + - " consumed: " + user.getTotalConsumedResources() + + " consumed: " + user.getUsed() + " limit: " + limit + " queueCapacity: " + queueCapacity + - " qconsumed: " + usedResources + + " qconsumed: " + usage.getUsed() + " currentCapacity: " + currentCapacity + " activeUsers: " + activeUsers + " clusterCapacity: " + clusterResource @@ -1227,7 +1218,7 @@ protected synchronized boolean assignToUser(Resource clusterResource, // overhead of the AM, but it's a > check, not a >= check, so... if (Resources .greaterThan(resourceCalculator, clusterResource, - user.getConsumedResourceByLabel(label), + user.getUsed(label), limit)) { // if enabled, check to see if could we potentially use this node instead // of a reserved node if the application has reserved containers @@ -1235,13 +1226,13 @@ protected synchronized boolean assignToUser(Resource clusterResource, if (Resources.lessThanOrEqual( resourceCalculator, clusterResource, - Resources.subtract(user.getTotalConsumedResources(), + Resources.subtract(user.getUsed(), application.getCurrentReservation()), limit)) { if (LOG.isDebugEnabled()) { LOG.debug("User " + userName + " in queue " + getQueueName() + " will exceed limit based on reservations - " + " consumed: " - + user.getTotalConsumedResources() + " reserved: " + + user.getUsed() + " reserved: " + application.getCurrentReservation() + " limit: " + limit); } return true; @@ -1250,7 +1241,7 @@ protected synchronized boolean assignToUser(Resource clusterResource, if (LOG.isDebugEnabled()) { LOG.debug("User " + userName + " in queue " + getQueueName() + " will exceed limit - " + " consumed: " - + user.getTotalConsumedResources() + " limit: " + limit); + + user.getUsed() + " limit: " + limit); } return false; } @@ -1672,7 +1663,7 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod " queue=" + this.toString() + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + - " used=" + usedResources + + " used=" + usage.getUsed() + " cluster=" + clusterResource); return request.getCapability(); @@ -1773,9 +1764,9 @@ synchronized void allocateResource(Resource clusterResource, if (LOG.isDebugEnabled()) { LOG.info(getQueueName() + " user=" + userName + - " used=" + usedResources + " numContainers=" + numContainers + + " used=" + usage.getUsed() + " numContainers=" + numContainers + " headroom = " + application.getHeadroom() + - " user-resources=" + user.getTotalConsumedResources() + " user-resources=" + user.getUsed() ); } } @@ -1791,8 +1782,8 @@ synchronized void releaseResource(Resource clusterResource, metrics.setAvailableResourcesToUser(userName, application.getHeadroom()); LOG.info(getQueueName() + - " used=" + usedResources + " numContainers=" + numContainers + - " user=" + userName + " user-resources=" + user.getTotalConsumedResources()); + " used=" + usage.getUsed() + " numContainers=" + numContainers + + " user=" + userName + " user-resources=" + user.getUsed()); } private void updateAbsoluteCapacityResource(Resource clusterResource) { @@ -1834,22 +1825,20 @@ public synchronized void updateClusterResource(Resource clusterResource) { @VisibleForTesting public static class User { - Resource consumed = Resources.createResource(0, 0); - Resource consumedAMResources = Resources.createResource(0, 0); - Map consumedByLabel = new HashMap(); + ResourceUsage userResourceUsage = new ResourceUsage(); int pendingApplications = 0; int activeApplications = 0; - public Resource getTotalConsumedResources() { - return consumed; + public ResourceUsage getResourceUsage() { + return userResourceUsage; } - public Resource getConsumedResourceByLabel(String label) { - Resource r = consumedByLabel.get(label); - if (null != r) { - return r; - } - return Resources.none(); + public Resource getUsed() { + return userResourceUsage.getUsed(); + } + + public Resource getUsed(String label) { + return userResourceUsage.getUsed(label); } public int getPendingApplications() { @@ -1861,7 +1850,7 @@ public int getActiveApplications() { } public Resource getConsumedAMResources() { - return consumedAMResources; + return userResourceUsage.getAMUsed(); } public int getTotalApplications() { @@ -1886,47 +1875,26 @@ public synchronized void finishApplication(boolean wasActive) { } } - public synchronized void assignContainer(Resource resource, + public void assignContainer(Resource resource, Set nodeLabels) { - Resources.addTo(consumed, resource); - if (nodeLabels == null || nodeLabels.isEmpty()) { - if (!consumedByLabel.containsKey(RMNodeLabelsManager.NO_LABEL)) { - consumedByLabel.put(RMNodeLabelsManager.NO_LABEL, - Resources.createResource(0)); - } - Resources.addTo(consumedByLabel.get(RMNodeLabelsManager.NO_LABEL), - resource); + userResourceUsage.incUsed(resource); } else { for (String label : nodeLabels) { - if (!consumedByLabel.containsKey(label)) { - consumedByLabel.put(label, Resources.createResource(0)); - } - Resources.addTo(consumedByLabel.get(label), resource); + userResourceUsage.incUsed(label, resource); } } } - public synchronized void releaseContainer(Resource resource, Set nodeLabels) { - Resources.subtractFrom(consumed, resource); - - // Update usedResources by labels + public void releaseContainer(Resource resource, Set nodeLabels) { if (nodeLabels == null || nodeLabels.isEmpty()) { - if (!consumedByLabel.containsKey(RMNodeLabelsManager.NO_LABEL)) { - consumedByLabel.put(RMNodeLabelsManager.NO_LABEL, - Resources.createResource(0)); - } - Resources.subtractFrom( - consumedByLabel.get(RMNodeLabelsManager.NO_LABEL), resource); + userResourceUsage.decUsed(resource); } else { for (String label : nodeLabels) { - if (!consumedByLabel.containsKey(label)) { - consumedByLabel.put(label, Resources.createResource(0)); - } - Resources.subtractFrom(consumedByLabel.get(label), resource); + userResourceUsage.decUsed(label, resource); } } - } + } } @Override @@ -1985,7 +1953,7 @@ public void attachContainer(Resource clusterResource, + " resource=" + rmContainer.getContainer().getResource() + " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used=" - + usedResources + " cluster=" + clusterResource); + + usage.getUsed() + " cluster=" + clusterResource); // Inform the parent queue getParent().attachContainer(clusterResource, application, rmContainer); } @@ -2003,7 +1971,7 @@ public void detachContainer(Resource clusterResource, + " resource=" + rmContainer.getContainer().getResource() + " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used=" - + usedResources + " cluster=" + clusterResource); + + usage.getUsed() + " cluster=" + clusterResource); // Inform the parent queue getParent().detachContainer(clusterResource, application, rmContainer); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index f820cca..578be22 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -256,7 +256,7 @@ public String toString() { "numChildQueue= " + childQueues.size() + ", " + "capacity=" + capacity + ", " + "absoluteCapacity=" + absoluteCapacity + ", " + - "usedResources=" + usedResources + + "usedResources=" + usage.getUsed() + "usedCapacity=" + getUsedCapacity() + ", " + "numApps=" + getNumApplications() + ", " + "numContainers=" + getNumContainers(); @@ -463,7 +463,7 @@ public synchronized CSAssignment assignContainers( " queue=" + getQueueName() + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + - " used=" + usedResources + + " used=" + usage.getUsed() + " cluster=" + clusterResource); } else { @@ -506,19 +506,16 @@ private synchronized boolean canAssignToThisQueue(Resource clusterResource, boolean canAssign = true; for (String label : labelCanAccess) { - if (!usedResourcesByNodeLabels.containsKey(label)) { - usedResourcesByNodeLabels.put(label, Resources.createResource(0)); - } float currentAbsoluteLabelUsedCapacity = Resources.divide(resourceCalculator, clusterResource, - usedResourcesByNodeLabels.get(label), + usage.getUsed(label), labelManager.getResourceByLabel(label, clusterResource)); // if any of the label doesn't beyond limit, we can allocate on this node if (currentAbsoluteLabelUsedCapacity >= getAbsoluteMaximumCapacityByNodeLabel(label)) { if (LOG.isDebugEnabled()) { - LOG.debug(getQueueName() + " used=" + usedResources - + " current-capacity (" + usedResourcesByNodeLabels.get(label) + ") " + LOG.debug(getQueueName() + " used=" + usage.getUsed() + + " current-capacity (" + usage.getUsed(label) + ") " + " >= max-capacity (" + labelManager.getResourceByLabel(label, clusterResource) + ")"); } @@ -540,16 +537,16 @@ private synchronized boolean assignToQueueIfUnreserve(Resource clusterResource) .getReservedMB(), getMetrics().getReservedVirtualCores()); float capacityWithoutReservedCapacity = Resources.divide( resourceCalculator, clusterResource, - Resources.subtract(usedResources, reservedResources), + Resources.subtract(usage.getUsed(), reservedResources), clusterResource); if (capacityWithoutReservedCapacity <= absoluteMaxCapacity) { if (LOG.isDebugEnabled()) { LOG.debug("parent: try to use reserved: " + getQueueName() - + " usedResources: " + usedResources.getMemory() + + " usedResources: " + usage.getUsed().getMemory() + " clusterResources: " + clusterResource.getMemory() + " reservedResources: " + reservedResources.getMemory() - + " currentCapacity " + ((float) usedResources.getMemory()) + + " currentCapacity " + ((float) usage.getUsed().getMemory()) / clusterResource.getMemory() + " potentialNewWithoutReservedCapacity: " + capacityWithoutReservedCapacity + " ( " + " max-capacity: " @@ -645,7 +642,7 @@ public void completedContainer(Resource clusterResource, " queue=" + getQueueName() + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + - " used=" + usedResources + + " used=" + usage.getUsed() + " cluster=" + clusterResource); } @@ -735,7 +732,7 @@ public void attachContainer(Resource clusterResource, .getResource(), node.getLabels()); LOG.info("movedContainer" + " queueMoveIn=" + getQueueName() + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" - + getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster=" + + getAbsoluteUsedCapacity() + " used=" + usage.getUsed() + " cluster=" + clusterResource); // Inform the parent if (parent != null) { @@ -755,7 +752,7 @@ public void detachContainer(Resource clusterResource, node.getLabels()); LOG.info("movedContainer" + " queueMoveOut=" + getQueueName() + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" - + getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster=" + + getAbsoluteUsedCapacity() + " used=" + usage.getUsed() + " cluster=" + clusterResource); // Inform the parent if (parent != null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index e21fcf9..a9caf77 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -321,7 +321,7 @@ private void checkCSLeafQueue(MockRM rm, 1e-8); // assert user consumed resources. assertEquals(usedResource, leafQueue.getUser(app.getUser()) - .getTotalConsumedResources()); + .getUsed()); } private void checkFifoQueue(ResourceManager rm, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestResourceUsage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestResourceUsage.java new file mode 100644 index 0000000..b6dfacb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestResourceUsage.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.Collection; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.Resource; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestResourceUsage { + private static final Log LOG = LogFactory.getLog(TestResourceUsage.class); + private String suffix; + + @Parameterized.Parameters + public static Collection getParameters() { + return Arrays.asList(new String[][] { { "Pending" }, { "Used" }, + { "Headroom" }, { "Reserved" }, { "AMUsed" } }); + } + + public TestResourceUsage(String suffix) { + this.suffix = suffix; + } + + private static void dec(ResourceUsage obj, String suffix, Resource res, + String label) throws Exception { + executeByName(obj, "dec" + suffix, res, label); + } + + private static void inc(ResourceUsage obj, String suffix, Resource res, + String label) throws Exception { + executeByName(obj, "inc" + suffix, res, label); + } + + private static void set(ResourceUsage obj, String suffix, Resource res, + String label) throws Exception { + executeByName(obj, "set" + suffix, res, label); + } + + private static Resource get(ResourceUsage obj, String suffix, String label) + throws Exception { + return executeByName(obj, "get" + suffix, null, label); + } + + // Use reflection to avoid too much avoid code + private static Resource executeByName(ResourceUsage obj, String methodName, + Resource arg, String label) throws Exception { + // We have 4 kinds of method + // 1. getXXX() : Resource + // 2. getXXX(label) : Resource + // 3. set/inc/decXXX(res) : void + // 4. set/inc/decXXX(label, res) : void + if (methodName.startsWith("get")) { + Resource result; + if (label == null) { + // 1. + Method method = ResourceUsage.class.getDeclaredMethod(methodName); + result = (Resource) method.invoke(obj); + } else { + // 2. + Method method = + ResourceUsage.class.getDeclaredMethod(methodName, String.class); + result = (Resource) method.invoke(obj, label); + } + return result; + } else { + if (label == null) { + // 3. + Method method = + ResourceUsage.class.getDeclaredMethod(methodName, Resource.class); + method.invoke(obj, arg); + } else { + // 4. + Method method = + ResourceUsage.class.getDeclaredMethod(methodName, String.class, + Resource.class); + method.invoke(obj, label, arg); + } + return null; + } + } + + private void internalTestModifyAndRead(String label) throws Exception { + ResourceUsage usage = new ResourceUsage(); + Resource res; + + // First get returns 0 always + res = get(usage, suffix, label); + check(0, 0, res); + + // Add 1,1 should returns 1,1 + inc(usage, suffix, Resource.newInstance(1, 1), label); + check(1, 1, get(usage, suffix, label)); + + // Set 2,2 + set(usage, suffix, Resource.newInstance(2, 2), label); + check(2, 2, get(usage, suffix, label)); + + // dec 2,2 + dec(usage, suffix, Resource.newInstance(2, 2), label); + check(0, 0, get(usage, suffix, label)); + } + + void check(int mem, int cpu, Resource res) { + Assert.assertEquals(mem, res.getMemory()); + Assert.assertEquals(cpu, res.getVirtualCores()); + } + + @Test + public void testModifyAndRead() throws Exception { + LOG.info("Test - " + suffix); + internalTestModifyAndRead(null); + internalTestModifyAndRead("label"); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java index a62889b..d643c9d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java @@ -202,33 +202,33 @@ public void testAbsoluteMaxAvailCapacityWithUse() throws Exception { LOG.info("t2 l2q2 " + result); //some usage, but below the base capacity - Resources.addTo(root.getUsedResources(), Resources.multiply(clusterResource, 0.1f)); - Resources.addTo(l1q2.getUsedResources(), Resources.multiply(clusterResource, 0.1f)); + root.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f)); + l1q2.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f)); result = CSQueueUtils.getAbsoluteMaxAvailCapacity( resourceCalculator, clusterResource, l2q2); assertEquals( 0.4f, result, 0.000001f); LOG.info("t2 l2q2 " + result); //usage gt base on parent sibling - Resources.addTo(root.getUsedResources(), Resources.multiply(clusterResource, 0.3f)); - Resources.addTo(l1q2.getUsedResources(), Resources.multiply(clusterResource, 0.3f)); + root.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.3f)); + l1q2.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.3f)); result = CSQueueUtils.getAbsoluteMaxAvailCapacity( resourceCalculator, clusterResource, l2q2); assertEquals( 0.3f, result, 0.000001f); LOG.info("t2 l2q2 " + result); //same as last, but with usage also on direct parent - Resources.addTo(root.getUsedResources(), Resources.multiply(clusterResource, 0.1f)); - Resources.addTo(l1q1.getUsedResources(), Resources.multiply(clusterResource, 0.1f)); + root.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f)); + l1q1.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f)); result = CSQueueUtils.getAbsoluteMaxAvailCapacity( resourceCalculator, clusterResource, l2q2); assertEquals( 0.3f, result, 0.000001f); LOG.info("t2 l2q2 " + result); //add to direct sibling, below the threshold of effect at present - Resources.addTo(root.getUsedResources(), Resources.multiply(clusterResource, 0.2f)); - Resources.addTo(l1q1.getUsedResources(), Resources.multiply(clusterResource, 0.2f)); - Resources.addTo(l2q1.getUsedResources(), Resources.multiply(clusterResource, 0.2f)); + root.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f)); + l1q1.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f)); + l2q1.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f)); result = CSQueueUtils.getAbsoluteMaxAvailCapacity( resourceCalculator, clusterResource, l2q2); assertEquals( 0.3f, result, 0.000001f); @@ -236,9 +236,9 @@ public void testAbsoluteMaxAvailCapacityWithUse() throws Exception { //add to direct sibling, now above the threshold of effect //(it's cumulative with prior tests) - Resources.addTo(root.getUsedResources(), Resources.multiply(clusterResource, 0.2f)); - Resources.addTo(l1q1.getUsedResources(), Resources.multiply(clusterResource, 0.2f)); - Resources.addTo(l2q1.getUsedResources(), Resources.multiply(clusterResource, 0.2f)); + root.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f)); + l1q1.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f)); + l2q1.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f)); result = CSQueueUtils.getAbsoluteMaxAvailCapacity( resourceCalculator, clusterResource, l2q2); assertEquals( 0.1f, result, 0.000001f);