diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 33ab2f1..8a7ecb2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -54,31 +54,27 @@ private static final Log LOG = LogFactory.getLog(SchedulerNode.class); - private Resource unallocatedResource = Resource.newInstance(0, 0); + private final RMNode rmNode; + private final String nodeName; + + /** Following accounting variables are protected via synchronized methods */ + private Resource capacity; private Resource allocatedResource = Resource.newInstance(0, 0); - private Resource totalResource; + private Resource unallocatedResource; private RMContainer reservedContainer; - private volatile int numContainers; - private volatile ResourceUtilization containersUtilization = - ResourceUtilization.newInstance(0, 0, 0f); - private volatile ResourceUtilization nodeUtilization = - ResourceUtilization.newInstance(0, 0, 0f); - + private int numContainers; /** Set of containers that are allocated containers. */ private final Map launchedContainers = new HashMap<>(); - private final RMNode rmNode; - private final String nodeName; - private volatile Set labels = null; public SchedulerNode(RMNode node, boolean usePortForNodeName, Set labels) { this.rmNode = node; this.unallocatedResource = Resources.clone(node.getTotalCapability()); - this.totalResource = Resources.clone(node.getTotalCapability()); + this.capacity = Resources.clone(node.getTotalCapability()); if (usePortForNodeName) { nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort(); } else { @@ -100,8 +96,8 @@ public RMNode getRMNode() { * @param resource Total resources on the node. */ public synchronized void setTotalResource(Resource resource){ - this.totalResource = resource; - this.unallocatedResource = Resources.subtract(totalResource, + this.capacity = resource; + this.unallocatedResource = Resources.subtract(capacity, this.allocatedResource); } @@ -150,7 +146,7 @@ public String getRackName() { */ public synchronized void allocateContainer(RMContainer rmContainer) { Container container = rmContainer.getContainer(); - deductUnallocatedResource(container.getResource()); + updateResourceAllocation(container.getResource(), true); ++numContainers; launchedContainers.put(container.getId(), rmContainer); @@ -170,11 +166,7 @@ public synchronized void allocateContainer(RMContainer rmContainer) { */ private synchronized void changeContainerResource(ContainerId containerId, Resource deltaResource, boolean increase) { - if (increase) { - deductUnallocatedResource(deltaResource); - } else { - addUnallocatedResource(deltaResource); - } + updateResourceAllocation(deltaResource, increase); LOG.info((increase ? "Increased" : "Decreased") + " container " + containerId + " of capacity " + deltaResource + " on host " @@ -224,7 +216,7 @@ public synchronized Resource getAllocatedResource() { * @return Total resources on the node. */ public synchronized Resource getTotalResource() { - return this.totalResource; + return this.capacity; } /** @@ -232,19 +224,28 @@ public synchronized Resource getTotalResource() { * @return If the container is launched by the node. */ public synchronized boolean isValidContainer(ContainerId containerId) { - if (launchedContainers.containsKey(containerId)) { - return true; - } - return false; + return launchedContainers.containsKey(containerId); } /** - * Update the resources of the node when allocating a new container. - * @param container Container to allocate. + * Update allocation based stats. + * @param resource - Resource allocated/released + * @param increase - whether resources are allocated or released */ - private synchronized void updateResource(Container container) { - addUnallocatedResource(container.getResource()); - --numContainers; + private synchronized void updateResourceAllocation( + Resource resource, boolean increase) { + if (resource == null) { + LOG.error("Invalid update on resource allocation " + + rmNode.getNodeAddress()); + return; + } + if (increase) { + Resources.addTo(allocatedResource, resource); + Resources.subtractFrom(unallocatedResource, resource); + } else { + Resources.subtractFrom(allocatedResource, resource); + Resources.addTo(unallocatedResource, resource); + } } /** @@ -259,7 +260,8 @@ public synchronized void releaseContainer(Container container) { // Remove the containers from the nodemanger if (null != launchedContainers.remove(container.getId())) { - updateResource(container); + updateResourceAllocation(container.getResource(), false); + --numContainers; } LOG.info("Released container " + container.getId() + " of capacity " @@ -270,36 +272,6 @@ public synchronized void releaseContainer(Container container) { } /** - * Add unallocated resources to the node. This is used when unallocating a - * container. - * @param resource Resources to add. - */ - private synchronized void addUnallocatedResource(Resource resource) { - if (resource == null) { - LOG.error("Invalid resource addition of null resource for " - + rmNode.getNodeAddress()); - return; - } - Resources.addTo(unallocatedResource, resource); - Resources.subtractFrom(allocatedResource, resource); - } - - /** - * Deduct unallocated resources from the node. This is used when allocating a - * container. - * @param resource Resources to deduct. - */ - private synchronized void deductUnallocatedResource(Resource resource) { - if (resource == null) { - LOG.error("Invalid deduction of null resource for " - + rmNode.getNodeAddress()); - return; - } - Resources.subtractFrom(unallocatedResource, resource); - Resources.addTo(allocatedResource, resource); - } - - /** * Reserve container for the attempt on this node. * @param attempt Application attempt asking for the reservation. * @param priority Priority of the reservation. @@ -325,7 +297,7 @@ public String toString() { * Get number of active containers on the node. * @return Number of active containers on the node. */ - public int getNumContainers() { + public synchronized int getNumContainers() { return numContainers; } @@ -334,7 +306,7 @@ public int getNumContainers() { * @return List of running containers in the node. */ public synchronized List getRunningContainers() { - return new ArrayList(launchedContainers.values()); + return new ArrayList<>(launchedContainers.values()); } /** @@ -396,28 +368,11 @@ public String getPartition() { } /** - * Set the resource utilization of the containers in the node. - * @param containersUtilization Resource utilization of the containers. - */ - public void setAggregatedContainersUtilization( - ResourceUtilization containersUtilization) { - this.containersUtilization = containersUtilization; - } - - /** * Get the resource utilization of the containers in the node. * @return Resource utilization of the containers. */ public ResourceUtilization getAggregatedContainersUtilization() { - return this.containersUtilization; - } - - /** - * Set the resource utilization of the node. This includes the containers. - * @param nodeUtilization Resource utilization of the node. - */ - public void setNodeUtilization(ResourceUtilization nodeUtilization) { - this.nodeUtilization = nodeUtilization; + return rmNode.getAggregatedContainersUtilization(); } /** @@ -425,6 +380,6 @@ public void setNodeUtilization(ResourceUtilization nodeUtilization) { * @return Resource utilization of the node. */ public ResourceUtilization getNodeUtilization() { - return this.nodeUtilization; + return rmNode.getNodeUtilization(); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 6a1091d..ff705a7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1101,11 +1101,6 @@ private synchronized void nodeUpdate(RMNode nm) { releaseResources); schedulerHealth.updateSchedulerReleaseCounts(releasedContainers); - // Updating node resource utilization - node.setAggregatedContainersUtilization( - nm.getAggregatedContainersUtilization()); - node.setNodeUtilization(nm.getNodeUtilization()); - // Now node data structures are upto date and ready for scheduling. if(LOG.isDebugEnabled()) { LOG.debug("Node being looked for scheduling " + nm + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 917fc8a..4ffb62f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -1080,11 +1080,6 @@ private synchronized void nodeUpdate(RMNode nm) { attemptScheduling(node); } - // Updating node resource utilization - node.setAggregatedContainersUtilization( - nm.getAggregatedContainersUtilization()); - node.setNodeUtilization(nm.getNodeUtilization()); - long duration = getClock().getTime() - start; fsOpDurations.addNodeUpdateDuration(duration); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 147c3f3..1baf9e5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -741,11 +741,6 @@ private synchronized void nodeUpdate(RMNode rmNode) { completedContainer, RMContainerEventType.FINISHED); } - // Updating node resource utilization - node.setAggregatedContainersUtilization( - rmNode.getAggregatedContainersUtilization()); - node.setNodeUtilization(rmNode.getNodeUtilization()); - // If the node is decommissioning, send an update to have the total // resource equal to the used resource, so no available resource to // schedule.