diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index a9591a5..942ca9c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -461,6 +461,7 @@ public RMContainer allocate(NodeType type, FSSchedulerNode node, List resourceRequestList = appSchedulingInfo.allocate( type, node, schedulerKey, request, container); this.attemptResourceUsage.incUsed(container.getResource()); + ((FSQueue) queue).incResourceUsage(container.getResource()); // Update resource requests related to "request" and store in RMContainer ((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList); @@ -484,6 +485,16 @@ public RMContainer allocate(NodeType type, FSSchedulerNode node, return rmContainer; } + @Override + public synchronized void recoverContainer(SchedulerNode node, + RMContainer rmContainer) { + super.recoverContainer(node, rmContainer); + if (!rmContainer.getState().equals(RMContainerState.COMPLETED)) { + ((FSQueue) queue).incResourceUsage(rmContainer.getContainer() + .getResource()); + } + } + /** * Should be called when the scheduler assigns a container at a higher * degree of locality than the current threshold. Reset the allowed locality @@ -1112,6 +1123,32 @@ public String getName() { } @Override + public synchronized void move(Queue newQueue) { + Resource appResource = Resource.newInstance(0, 0); + QueueMetrics oldMetrics = queue.getMetrics(); + QueueMetrics newMetrics = newQueue.getMetrics(); + String user = getUser(); + for (RMContainer liveContainer : liveContainers.values()) { + Resource resource = liveContainer.getContainer().getResource(); + Resources.addTo(appResource, liveContainer.getContainer().getResource()); + oldMetrics.releaseResources(user, 1, resource); + newMetrics.allocateResources(user, 1, resource, false); + } + ((FSQueue) queue).decResourceUsage(appResource); + + for (Map map : reservedContainers.values()) { + for (RMContainer reservedContainer : map.values()) { + Resource resource = reservedContainer.getReservedResource(); + oldMetrics.unreserveResource(user, resource); + newMetrics.reserveResource(user, resource); + } + } + + appSchedulingInfo.move(newQueue); + this.queue = newQueue; + + ((FSQueue) queue).incResourceUsage(appResource); + } public Resource getDemand() { return demand; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index 16570aa..9a1fab8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -134,20 +134,6 @@ public Resource getDemand() { } @Override - public Resource getResourceUsage() { - Resource usage = Resources.createResource(0); - readLock.lock(); - try { - for (FSQueue child : childQueues) { - Resources.addTo(usage, child.getResourceUsage()); - } - } finally { - readLock.unlock(); - } - return usage; - } - - @Override public void updateDemand() { // Compute demand by iterating through apps in the queue // Limit demand to maxResources diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index d87668d..9964ac2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -81,6 +81,7 @@ private long fairSharePreemptionTimeout = Long.MAX_VALUE; private long minSharePreemptionTimeout = Long.MAX_VALUE; private float fairSharePreemptionThreshold = 0.5f; + private final Resource queueUsage = Resource.newInstance(0, 0); private boolean preemptable = true; public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) { @@ -183,6 +184,30 @@ public void setMaxAMShare(float maxAMShare){ } @Override + public Resource getResourceUsage() { + return queueUsage; + } + + public void incResourceUsage(Resource res) { + synchronized (queueUsage) { + Resources.addTo(queueUsage, res); + if (parent != null) { + parent.incResourceUsage(res); + } + } + + } + + public void decResourceUsage(Resource res) { + synchronized (queueUsage) { + Resources.subtractFrom(queueUsage, res); + if (parent != null) { + parent.decResourceUsage(res); + } + } + } + + @Override public long getStartTime() { return 0; }