diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 6c4f300..466cc89 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import com.google.common.collect.ImmutableSet; + import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -42,9 +44,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.util.resource.Resources; -import com.google.common.collect.ImmutableSet; - - /** * Represents a YARN Cluster Node from the viewpoint of the scheduler. */ @@ -54,30 +53,27 @@ private static final Log LOG = LogFactory.getLog(SchedulerNode.class); - private Resource unallocatedResource = Resource.newInstance(0, 0); - private Resource allocatedResource = Resource.newInstance(0, 0); - private Resource totalResource; + private final RMNode rmNode; + private final String nodeName; + + /** Following accounting variables are protected via synchronized methods */ + private Resource capacity; + private Resource allocated = Resource.newInstance(0, 0); + private Resource unallocated; private RMContainer reservedContainer; - private volatile int numContainers; - private volatile ResourceUtilization containersUtilization = - ResourceUtilization.newInstance(0, 0, 0f); - private volatile ResourceUtilization nodeUtilization = - ResourceUtilization.newInstance(0, 0, 0f); + private int numContainers; - /* set of containers that are allocated containers */ + /** Set of containers that are allocated containers. */ protected final Map launchedContainers = new HashMap<>(); - private final RMNode rmNode; - private final String nodeName; - private volatile Set labels = null; public SchedulerNode(RMNode node, boolean usePortForNodeName, Set labels) { this.rmNode = node; - this.unallocatedResource = Resources.clone(node.getTotalCapability()); - this.totalResource = Resources.clone(node.getTotalCapability()); + this.capacity = Resources.clone(node.getTotalCapability()); + this.unallocated = Resources.clone(capacity); if (usePortForNodeName) { nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort(); } else { @@ -99,9 +95,8 @@ public RMNode getRMNode() { * @param resource Total resources on the node. */ public synchronized void setTotalResource(Resource resource){ - this.totalResource = resource; - this.unallocatedResource = Resources.subtract(totalResource, - this.allocatedResource); + this.capacity = resource; + this.unallocated = Resources.subtract(capacity, this.allocated); } /** @@ -149,7 +144,7 @@ public String getRackName() { */ public synchronized void allocateContainer(RMContainer rmContainer) { Container container = rmContainer.getContainer(); - deductUnallocatedResource(container.getResource()); + updateResourceAllocation(container.getResource(), true); ++numContainers; launchedContainers.put(container.getId(), rmContainer); @@ -169,11 +164,7 @@ public synchronized void allocateContainer(RMContainer rmContainer) { */ protected synchronized void changeContainerResource(ContainerId containerId, Resource deltaResource, boolean increase) { - if (increase) { - deductUnallocatedResource(deltaResource); - } else { - addUnallocatedResource(deltaResource); - } + updateResourceAllocation(deltaResource, increase); LOG.info((increase ? "Increased" : "Decreased") + " container " + containerId + " of capacity " + deltaResource + " on host " @@ -207,7 +198,7 @@ public synchronized void decreaseContainer(ContainerId containerId, * @return Unallocated resources on the node */ public synchronized Resource getUnallocatedResource() { - return this.unallocatedResource; + return this.unallocated; } /** @@ -215,7 +206,7 @@ public synchronized Resource getUnallocatedResource() { * @return Allocated resources on the node */ public synchronized Resource getAllocatedResource() { - return this.allocatedResource; + return this.allocated; } /** @@ -223,7 +214,7 @@ public synchronized Resource getAllocatedResource() { * @return Total resources on the node. */ public synchronized Resource getTotalResource() { - return this.totalResource; + return this.capacity; } /** @@ -231,19 +222,28 @@ public synchronized Resource getTotalResource() { * @return If the container is launched by the node. */ public synchronized boolean isValidContainer(ContainerId containerId) { - if (launchedContainers.containsKey(containerId)) { - return true; - } - return false; + return launchedContainers.containsKey(containerId); } /** - * Update the resources of the node when allocating a new container. - * @param container Container to allocate. + * Update allocation based stats. + * @param resource - Resource allocated/released + * @param increase - whether resources are allocated or released */ - protected synchronized void updateResource(Container container) { - addUnallocatedResource(container.getResource()); - --numContainers; + private synchronized void updateResourceAllocation( + Resource resource, boolean increase) { + if (resource == null) { + LOG.error("Invalid update on resource allocation " + + rmNode.getNodeAddress()); + return; + } + if (increase) { + Resources.addTo(allocated, resource); + Resources.subtractFrom(unallocated, resource); + } else { + Resources.subtractFrom(allocated, resource); + Resources.addTo(unallocated, resource); + } } /** @@ -258,7 +258,8 @@ public synchronized void releaseContainer(Container container) { // Remove the containers from the nodemanger if (null != launchedContainers.remove(container.getId())) { - updateResource(container); + updateResourceAllocation(container.getResource(), false); + --numContainers; } LOG.info("Released container " + container.getId() + " of capacity " @@ -269,36 +270,6 @@ public synchronized void releaseContainer(Container container) { } /** - * Add unallocated resources to the node. This is used when unallocating a - * container. - * @param resource Resources to add. - */ - private synchronized void addUnallocatedResource(Resource resource) { - if (resource == null) { - LOG.error("Invalid resource addition of null resource for " - + rmNode.getNodeAddress()); - return; - } - Resources.addTo(unallocatedResource, resource); - Resources.subtractFrom(allocatedResource, resource); - } - - /** - * Deduct unallocated resources from the node. This is used when allocating a - * container. - * @param resource Resources to deduct. - */ - private synchronized void deductUnallocatedResource(Resource resource) { - if (resource == null) { - LOG.error("Invalid deduction of null resource for " - + rmNode.getNodeAddress()); - return; - } - Resources.subtractFrom(unallocatedResource, resource); - Resources.addTo(allocatedResource, resource); - } - - /** * Reserve container for the attempt on this node. * @param attempt Application attempt asking for the reservation. * @param priority Priority of the reservation. @@ -324,7 +295,7 @@ public String toString() { * Get number of active containers on the node. * @return Number of active containers on the node. */ - public int getNumContainers() { + public synchronized int getNumContainers() { return numContainers; } @@ -333,7 +304,7 @@ public int getNumContainers() { * @return List of running containers in the node. */ public synchronized List getRunningContainers() { - return new ArrayList(launchedContainers.values()); + return new ArrayList<>(launchedContainers.values()); } /** @@ -395,28 +366,11 @@ public String getPartition() { } /** - * Set the resource utilization of the containers in the node. - * @param containersUtilization Resource utilization of the containers. - */ - public void setAggregatedContainersUtilization( - ResourceUtilization containersUtilization) { - this.containersUtilization = containersUtilization; - } - - /** * Get the resource utilization of the containers in the node. * @return Resource utilization of the containers. */ public ResourceUtilization getAggregatedContainersUtilization() { - return this.containersUtilization; - } - - /** - * Set the resource utilization of the node. This includes the containers. - * @param nodeUtilization Resource utilization of the node. - */ - public void setNodeUtilization(ResourceUtilization nodeUtilization) { - this.nodeUtilization = nodeUtilization; + return rmNode.getAggregatedContainersUtilization(); } /** @@ -424,6 +378,6 @@ public void setNodeUtilization(ResourceUtilization nodeUtilization) { * @return Resource utilization of the node. */ public ResourceUtilization getNodeUtilization() { - return this.nodeUtilization; + return rmNode.getNodeUtilization(); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index cf5c3b5..1b5be17 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1114,11 +1114,6 @@ private synchronized void nodeUpdate(RMNode nm) { releaseResources); schedulerHealth.updateSchedulerReleaseCounts(releasedContainers); - // Updating node resource utilization - node.setAggregatedContainersUtilization( - nm.getAggregatedContainersUtilization()); - node.setNodeUtilization(nm.getNodeUtilization()); - // Now node data structures are upto date and ready for scheduling. if(LOG.isDebugEnabled()) { LOG.debug("Node being looked for scheduling " + nm + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java index 1d0e78a..e9a7729 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java @@ -142,9 +142,8 @@ public synchronized void markContainerToNonKillable(ContainerId containerId) { } @Override - protected synchronized void updateResource( - Container container) { - super.updateResource(container); + public synchronized void releaseContainer(Container container) { + super.releaseContainer(container); if (killableContainers.containsKey(container.getId())) { Resources.subtractFrom(totalKillableResources, container.getResource()); killableContainers.remove(container.getId()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 3df0600..13fbe70 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -1061,11 +1061,6 @@ private synchronized void nodeUpdate(RMNode nm) { attemptScheduling(node); } - // Updating node resource utilization - node.setAggregatedContainersUtilization( - nm.getAggregatedContainersUtilization()); - node.setNodeUtilization(nm.getNodeUtilization()); - long duration = getClock().getTime() - start; fsOpDurations.addNodeUpdateDuration(duration); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index cf12501..650ca96 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -743,11 +743,6 @@ private synchronized void nodeUpdate(RMNode rmNode) { completedContainer, RMContainerEventType.FINISHED); } - // Updating node resource utilization - node.setAggregatedContainersUtilization( - rmNode.getAggregatedContainersUtilization()); - node.setNodeUtilization(rmNode.getNodeUtilization()); - // If the node is decommissioning, send an update to have the total // resource equal to the used resource, so no available resource to // schedule.