diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 0190742..8ffa2f7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -51,8 +51,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; @@ -156,12 +158,45 @@ synchronized public void containerCompleted(RMContainer rmContainer, this.attemptResourceUsage.decUsed(containerResource); // remove from preemption map if it is completed - preemptionMap.remove(rmContainer); + if (preemptionMap.remove(rmContainer) == null) { + // do not decResource when the container exited in the preemptionMap + // before because we have written down the resource when adding the + // container to preemptionMap in this#addPreemption. + ((FSQueue) queue).decResourceUsage(containerResource); + } // Clear resource utilization metrics cache. lastMemoryAggregateAllocationUpdateTime = -1; } + @Override + public synchronized void move(Queue newQueue) { + Resource appResource = Resource.newInstance(0, 0); + QueueMetrics oldMetrics = queue.getMetrics(); + QueueMetrics newMetrics = newQueue.getMetrics(); + String user = getUser(); + for (RMContainer liveContainer : liveContainers.values()) { + Resource resource = liveContainer.getContainer().getResource(); + Resources.addTo(appResource, liveContainer.getContainer().getResource()); + oldMetrics.releaseResources(user, 1, resource); + newMetrics.allocateResources(user, 1, resource, false); + } + ((FSQueue) queue).decResourceUsage(appResource); + + for (Map map : reservedContainers.values()) { + for (RMContainer reservedContainer : map.values()) { + Resource resource = reservedContainer.getReservedResource(); + oldMetrics.unreserveResource(user, resource); + newMetrics.reserveResource(user, resource); + } + } + + appSchedulingInfo.move(newQueue); + this.queue = newQueue; + + ((FSQueue) queue).incResourceUsage(appResource); + } + private synchronized void unreserveInternal( Priority priority, FSSchedulerNode node) { Map reservedContainers = @@ -387,6 +422,7 @@ else if (allowed.equals(NodeType.RACK_LOCAL) && List resourceRequestList = appSchedulingInfo.allocate( type, node, priority, request, container); this.attemptResourceUsage.incUsed(container.getResource()); + ((FSQueue) queue).incResourceUsage(container.getResource()); // Update resource requests related to "request" and store in RMContainer ((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList); @@ -408,6 +444,16 @@ else if (allowed.equals(NodeType.RACK_LOCAL) && return rmContainer; } + @Override + public synchronized void recoverContainer(SchedulerNode node, + RMContainer rmContainer) { + super.recoverContainer(node, rmContainer); + if (!rmContainer.getState().equals(RMContainerState.COMPLETED)) { + ((FSQueue) queue).incResourceUsage(rmContainer.getContainer() + .getResource()); + } + } + /** * Should be called when the scheduler assigns a container at a higher * degree of locality than the current threshold. Reset the allowed locality @@ -426,6 +472,8 @@ public void addPreemption(RMContainer container, long time) { assert preemptionMap.get(container) == null; preemptionMap.put(container, time); Resources.addTo(preemptedResources, container.getAllocatedResource()); + // Preempt resources should bring down resource usage. + ((FSQueue) queue).decResourceUsage(container.getAllocatedResource()); } public Long getContainerPreemptionTime(RMContainer container) { @@ -446,10 +494,12 @@ public Resource getPreemptedResources() { } public void resetPreemptedResources() { + ((FSQueue) queue).incResourceUsage(preemptedResources); preemptedResources = Resources.createResource(0); for (RMContainer container : getPreemptionContainers()) { Resources.addTo(preemptedResources, container.getAllocatedResource()); } + ((FSQueue) queue).decResourceUsage(preemptedResources); } public void clearPreemptedResources() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index ca5a146..5b7827a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -239,23 +239,6 @@ public Resource getDemand() { return demand; } - @Override - public Resource getResourceUsage() { - Resource usage = Resources.createResource(0); - readLock.lock(); - try { - for (FSAppAttempt app : runnableApps) { - Resources.addTo(usage, app.getResourceUsage()); - } - for (FSAppAttempt app : nonRunnableApps) { - Resources.addTo(usage, app.getResourceUsage()); - } - } finally { - readLock.unlock(); - } - return usage; - } - public Resource getAmResourceUsage() { return amResourceUsage; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index d9fac90..6459ae0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -135,20 +135,6 @@ public Resource getDemand() { } @Override - public Resource getResourceUsage() { - Resource usage = Resources.createResource(0); - readLock.lock(); - try { - for (FSQueue child : childQueues) { - Resources.addTo(usage, child.getResourceUsage()); - } - } finally { - readLock.unlock(); - } - return usage; - } - - @Override public void updateDemand() { // Compute demand by iterating through apps in the queue // Limit demand to maxResources diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index a33084f..92349c0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -62,6 +62,7 @@ private long fairSharePreemptionTimeout = Long.MAX_VALUE; private long minSharePreemptionTimeout = Long.MAX_VALUE; private float fairSharePreemptionThreshold = 0.5f; + private final Resource queueUsage = Resource.newInstance(0, 0); private boolean preemptable = true; public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) { @@ -115,6 +116,25 @@ public Resource getMaxShare() { } @Override + public Resource getResourceUsage() { + return queueUsage; + } + + public synchronized void incResourceUsage(Resource res) { + Resources.addTo(queueUsage, res); + if (parent != null) { + parent.incResourceUsage(res); + } + } + + public synchronized void decResourceUsage(Resource res) { + Resources.subtractFrom(queueUsage, res); + if (parent != null) { + parent.decResourceUsage(res); + } + } + + @Override public long getStartTime() { return 0; }