diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 7ca8671..f5b621b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -27,11 +27,7 @@ import java.util.Set; import java.util.Timer; import java.util.TimerTask; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; -import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -92,22 +88,9 @@ private static final Log LOG = LogFactory.getLog(AbstractYarnScheduler.class); - // Nodes in the cluster, indexed by NodeId - protected Map nodes = new ConcurrentHashMap(); - - // Whole capacity of the cluster - protected Resource clusterResource = Resource.newInstance(0, 0); + protected ClusterNodeTracker nodeTracker = new ClusterNodeTracker<>(); protected Resource minimumAllocation; - protected Resource maximumAllocation; - private Resource configuredMaximumAllocation; - private int maxNodeMemory = -1; - private int maxNodeVCores = -1; - private final ReadLock maxAllocReadLock; - private final WriteLock maxAllocWriteLock; - - private boolean useConfiguredMaximumAllocationOnly = true; - private long configuredMaximumAllocationWaitTime; protected RMContext rmContext; @@ -132,9 +115,6 @@ */ public AbstractYarnScheduler(String name) { super(name); - ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - this.maxAllocReadLock = lock.readLock(); - this.maxAllocWriteLock = lock.writeLock(); } @Override @@ -142,9 +122,11 @@ public void serviceInit(Configuration conf) throws Exception { nmExpireInterval = conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS); - configuredMaximumAllocationWaitTime = + long configuredMaximumAllocationWaitTime = conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS); + nodeTracker.setConfiguredMaximumAllocationWaitTime( + configuredMaximumAllocationWaitTime); maxClusterLevelAppPriority = getMaxPriorityFromConf(conf); createReleaseCache(); super.serviceInit(conf); @@ -188,16 +170,12 @@ public void serviceInit(Configuration conf) throws Exception { */ public void addBlacklistedNodeIdsToList(SchedulerApplicationAttempt app, List blacklistNodeIdList) { - for (Map.Entry nodeEntry : nodes.entrySet()) { - if (SchedulerAppUtils.isBlacklisted(app, nodeEntry.getValue(), LOG)) { - blacklistNodeIdList.add(nodeEntry.getKey()); - } - } + nodeTracker.addBlacklistedNodeIdsToList(app, blacklistNodeIdList); } @Override public Resource getClusterResource() { - return clusterResource; + return nodeTracker.getClusterCapacity(); } @Override @@ -207,22 +185,7 @@ public Resource getMinimumResourceCapability() { @Override public Resource getMaximumResourceCapability() { - Resource maxResource; - maxAllocReadLock.lock(); - try { - if (useConfiguredMaximumAllocationOnly) { - if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp() - > configuredMaximumAllocationWaitTime) { - useConfiguredMaximumAllocationOnly = false; - } - maxResource = Resources.clone(configuredMaximumAllocation); - } else { - maxResource = Resources.clone(maximumAllocation); - } - } finally { - maxAllocReadLock.unlock(); - } - return maxResource; + return nodeTracker.getMaxAllowedAllocation(); } @Override @@ -231,15 +194,7 @@ public Resource getMaximumResourceCapability(String queueName) { } protected void initMaximumResourceCapability(Resource maximumAllocation) { - maxAllocWriteLock.lock(); - try { - if (this.configuredMaximumAllocation == null) { - this.configuredMaximumAllocation = Resources.clone(maximumAllocation); - this.maximumAllocation = Resources.clone(maximumAllocation); - } - } finally { - maxAllocWriteLock.unlock(); - } + nodeTracker.setConfiguredMaximumAllocation(maximumAllocation); } protected synchronized void containerLaunchedOnNode( @@ -332,8 +287,7 @@ public RMContainer getRMContainer(ContainerId containerId) { @Override public SchedulerNodeReport getNodeReport(NodeId nodeId) { - N node = nodes.get(nodeId); - return node == null ? null : new SchedulerNodeReport(node); + return nodeTracker.getNodeReport(nodeId); } @Override @@ -431,12 +385,12 @@ public synchronized void recoverContainersOnNode( container)); // recover scheduler node - SchedulerNode schedulerNode = nodes.get(nm.getNodeID()); + SchedulerNode schedulerNode = nodeTracker.getNode(nm.getNodeID()); schedulerNode.recoverContainer(rmContainer); // recover queue: update headroom etc. Queue queue = schedulerAttempt.getQueue(); - queue.recoverContainer(clusterResource, schedulerAttempt, rmContainer); + queue.recoverContainer(getClusterResource(), schedulerAttempt, rmContainer); // recover scheduler attempt schedulerAttempt.recoverContainer(schedulerNode, rmContainer); @@ -621,7 +575,7 @@ protected abstract void decreaseContainer( @Override public SchedulerNode getSchedulerNode(NodeId nodeId) { - return nodes.get(nodeId); + return nodeTracker.getNode(nodeId); } @Override @@ -690,18 +644,12 @@ public synchronized void updateNodeResource(RMNode nm, + " from: " + oldResource + ", to: " + newResource); - nodes.remove(nm.getNodeID()); - updateMaximumAllocation(node, false); + nodeTracker.removeNode(nm.getNodeID()); // update resource to node node.setTotalResource(newResource); - nodes.put(nm.getNodeID(), (N)node); - updateMaximumAllocation(node, true); - - // update resource to clusterResource - Resources.subtractFrom(clusterResource, oldResource); - Resources.addTo(clusterResource, newResource); + nodeTracker.addNode((N) node); } else { // Log resource change LOG.warn("Update resource on node: " + node.getNodeName() @@ -721,80 +669,8 @@ public synchronized void updateNodeResource(RMNode nm, + " does not support reservations"); } - protected void updateMaximumAllocation(SchedulerNode node, boolean add) { - Resource totalResource = node.getTotalResource(); - maxAllocWriteLock.lock(); - try { - if (add) { // added node - int nodeMemory = totalResource.getMemory(); - if (nodeMemory > maxNodeMemory) { - maxNodeMemory = nodeMemory; - maximumAllocation.setMemory(Math.min( - configuredMaximumAllocation.getMemory(), maxNodeMemory)); - } - int nodeVCores = totalResource.getVirtualCores(); - if (nodeVCores > maxNodeVCores) { - maxNodeVCores = nodeVCores; - maximumAllocation.setVirtualCores(Math.min( - configuredMaximumAllocation.getVirtualCores(), maxNodeVCores)); - } - } else { // removed node - if (maxNodeMemory == totalResource.getMemory()) { - maxNodeMemory = -1; - } - if (maxNodeVCores == totalResource.getVirtualCores()) { - maxNodeVCores = -1; - } - // We only have to iterate through the nodes if the current max memory - // or vcores was equal to the removed node's - if (maxNodeMemory == -1 || maxNodeVCores == -1) { - for (Map.Entry nodeEntry : nodes.entrySet()) { - int nodeMemory = - nodeEntry.getValue().getTotalResource().getMemory(); - if (nodeMemory > maxNodeMemory) { - maxNodeMemory = nodeMemory; - } - int nodeVCores = - nodeEntry.getValue().getTotalResource().getVirtualCores(); - if (nodeVCores > maxNodeVCores) { - maxNodeVCores = nodeVCores; - } - } - if (maxNodeMemory == -1) { // no nodes - maximumAllocation.setMemory(configuredMaximumAllocation.getMemory()); - } else { - maximumAllocation.setMemory( - Math.min(configuredMaximumAllocation.getMemory(), maxNodeMemory)); - } - if (maxNodeVCores == -1) { // no nodes - maximumAllocation.setVirtualCores(configuredMaximumAllocation.getVirtualCores()); - } else { - maximumAllocation.setVirtualCores( - Math.min(configuredMaximumAllocation.getVirtualCores(), maxNodeVCores)); - } - } - } - } finally { - maxAllocWriteLock.unlock(); - } - } - protected void refreshMaximumAllocation(Resource newMaxAlloc) { - maxAllocWriteLock.lock(); - try { - configuredMaximumAllocation = Resources.clone(newMaxAlloc); - int maxMemory = newMaxAlloc.getMemory(); - if (maxNodeMemory != -1) { - maxMemory = Math.min(maxMemory, maxNodeMemory); - } - int maxVcores = newMaxAlloc.getVirtualCores(); - if (maxNodeVCores != -1) { - maxVcores = Math.min(maxVcores, maxNodeVCores); - } - maximumAllocation = Resources.createResource(maxMemory, maxVcores); - } finally { - maxAllocWriteLock.unlock(); - } + nodeTracker.setConfiguredMaximumAllocation(newMaxAlloc); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java new file mode 100644 index 0000000..f84402c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java @@ -0,0 +1,296 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +@InterfaceAudience.Private +public class ClusterNodeTracker { + private static final Log LOG = LogFactory.getLog(ClusterNodeTracker.class); + + private ReadWriteLock readWriteLock = new ReentrantReadWriteLock(true); + private Lock readLock = readWriteLock.readLock(); + private Lock writeLock = readWriteLock.writeLock(); + + private HashMap nodes = new HashMap<>(); + private Map nodesPerRack = new HashMap<>(); + + private Resource clusterCapacity = Resources.clone(Resources.none()); + private Resource staleClusterCapacity = null; + + // Max allocation + private Resource maximumAllocation = Resources.clone(Resources.none()); + private Resource configuredMaximumAllocation; + private boolean useConfiguredMaximumAllocationOnly = true; + private long configuredMaximumAllocationWaitTime; + int maxNodeMemory = -1; + int maxNodeVCores = -1; + + public void addNode(N node) { + writeLock.lock(); + try { + nodes.put(node.getNodeID(), node); + + // Update nodes per rack as well + String rackName = node.getRackName(); + Integer numNodes = nodesPerRack.get(rackName); + if (numNodes == null) { + numNodes = new Integer(0); + } + nodesPerRack.put(rackName, ++numNodes); + + // Update cluster capacity + Resources.addTo(clusterCapacity, node.getTotalResource()); + + // Update maximumAllocation + updateMaximumAllocation(node, true); + } finally { + writeLock.unlock(); + } + } + + public boolean exists(NodeId nodeId) { + readLock.lock(); + try { + return nodes.containsKey(nodeId); + } finally { + readLock.unlock(); + } + } + + public N getNode(NodeId nodeId) { + readLock.lock(); + try { + return nodes.get(nodeId); + } finally { + readLock.unlock(); + } + } + + public SchedulerNodeReport getNodeReport(NodeId nodeId) { + readLock.lock(); + try { + N n = nodes.get(nodeId); + return n == null ? null : new SchedulerNodeReport(n); + } finally { + readLock.unlock(); + } + } + + public int nodeCount() { + readLock.lock(); + try { + return nodes.size(); + } finally { + readLock.unlock(); + } + } + + public Resource getClusterCapacity() { + readLock.lock(); + try { + if (staleClusterCapacity == null || + !Resources.equals(staleClusterCapacity, clusterCapacity)) { + staleClusterCapacity = Resources.clone(clusterCapacity); + } + return staleClusterCapacity; + } finally { + readLock.unlock(); + } + } + + public N removeNode(NodeId nodeId) { + writeLock.lock(); + try { + N node = nodes.remove(nodeId); + if (node == null) { + LOG.warn("Attempting to remove a non-existent node " + + node.getNodeName()); + return null; + } + + // Update nodes per rack as well + String rackName = node.getRackName(); + Integer numNodes = nodesPerRack.get(rackName); + if (numNodes > 0) { + nodesPerRack.put(rackName, --numNodes); + } else { + LOG.error("Attempting to remove node from an empty rack " + rackName); + } + + // Update cluster capacity + Resources.subtractFrom(clusterCapacity, node.getTotalResource()); + + // Update maximumAllocation + updateMaximumAllocation(node, false); + + return node; + } finally { + writeLock.unlock(); + } + } + + public Map shallowCopy() { + readLock.lock(); + try { + return (Map) nodes.clone(); + } finally { + readLock.unlock(); + } + } + + public void setConfiguredMaximumAllocation(Resource resource) { + writeLock.lock(); + try { + configuredMaximumAllocation = Resources.clone(resource); + maximumAllocation.setMemory(Math.min(maximumAllocation.getMemory(), + configuredMaximumAllocation.getMemory())); + maximumAllocation.setVirtualCores(Math.min(maximumAllocation.getVirtualCores(), + configuredMaximumAllocation.getVirtualCores())); + } finally { + writeLock.unlock(); + } + } + + public void setConfiguredMaximumAllocationWaitTime( + long configuredMaximumAllocationWaitTime) { + writeLock.lock(); + try { + this.configuredMaximumAllocationWaitTime = + configuredMaximumAllocationWaitTime; + } finally { + writeLock.unlock(); + } + } + + public Resource getMaxAllowedAllocation() { + readLock.lock(); + try { + if (useConfiguredMaximumAllocationOnly) { + if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp() + > configuredMaximumAllocationWaitTime) { + useConfiguredMaximumAllocationOnly = false; + } + return Resources.clone(configuredMaximumAllocation); + } else { + return Resources.clone(maximumAllocation); + } + } finally { + readLock.unlock(); + } + } + + private void updateMaximumAllocation(SchedulerNode node, boolean add) { + Resource totalResource = node.getTotalResource(); + writeLock.lock(); + try { + if (add) { // added node + int nodeMemory = totalResource.getMemory(); + if (nodeMemory > maxNodeMemory) { + maxNodeMemory = nodeMemory; + maximumAllocation.setMemory(Math.min( + configuredMaximumAllocation.getMemory(), maxNodeMemory)); + } + int nodeVCores = totalResource.getVirtualCores(); + if (nodeVCores > maxNodeVCores) { + maxNodeVCores = nodeVCores; + maximumAllocation.setVirtualCores(Math.min( + configuredMaximumAllocation.getVirtualCores(), maxNodeVCores)); + } + } else { // removed node + if (maxNodeMemory == totalResource.getMemory()) { + maxNodeMemory = -1; + } + if (maxNodeVCores == totalResource.getVirtualCores()) { + maxNodeVCores = -1; + } + // We only have to iterate through the nodes if the current max memory + // or vcores was equal to the removed node's + if (maxNodeMemory == -1 || maxNodeVCores == -1) { + for (N n : nodes.values()) { + int nodeMemory = n.getTotalResource().getMemory(); + if (nodeMemory > maxNodeMemory) { + maxNodeMemory = nodeMemory; + } + int nodeVCores = n.getTotalResource().getVirtualCores(); + if (nodeVCores > maxNodeVCores) { + maxNodeVCores = nodeVCores; + } + } + if (maxNodeMemory == -1) { // no nodes + maximumAllocation.setMemory(configuredMaximumAllocation.getMemory()); + } else { + maximumAllocation.setMemory( + Math.min(configuredMaximumAllocation.getMemory(), maxNodeMemory)); + } + if (maxNodeVCores == -1) { // no nodes + maximumAllocation.setVirtualCores(configuredMaximumAllocation.getVirtualCores()); + } else { + maximumAllocation.setVirtualCores( + Math.min(configuredMaximumAllocation.getVirtualCores(), maxNodeVCores)); + } + } + } + } finally { + writeLock.unlock(); + } + } + + public void addBlacklistedNodeIdsToList(SchedulerApplicationAttempt app, + List blacklistNodeIdList) { + readLock.lock(); + try { + for (N node : nodes.values()) { + if (SchedulerAppUtils.isBlacklisted(app, node, LOG)) { + blacklistNodeIdList.add(node.getNodeID()); + } + } + } finally { + readLock.unlock(); + } + } + + public List sortedNodeList(Comparator comparator) { + List sortedList = null; + readLock.lock(); + try { + sortedList = new ArrayList(nodes.keySet()); + } finally { + readLock.unlock(); + } + Collections.sort(sortedList, comparator); + return sortedList; + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index acc2782..220af42 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -524,10 +524,11 @@ private void reinitializeQueues(CapacitySchedulerConfiguration conf) addNewQueues(queues, newQueues); // Re-configure queues - root.reinitialize(newRoot, clusterResource); + root.reinitialize(newRoot, getClusterResource()); updatePlacementRules(); // Re-calculate headroom for active applications + Resource clusterResource = nodeTracker.getClusterCapacity(); root.updateClusterResource(clusterResource, new ResourceLimits( clusterResource)); @@ -996,7 +997,7 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, allocation = application.getAllocation(getResourceCalculator(), - clusterResource, getMinimumResourceCapability()); + getClusterResource(), getMinimumResourceCapability()); } if (updateDemandForQueue != null && !application @@ -1037,7 +1038,8 @@ public QueueInfo getQueueInfo(String queueName, private synchronized void nodeUpdate(RMNode nm) { if (LOG.isDebugEnabled()) { - LOG.debug("nodeUpdate: " + nm + " clusterResources: " + clusterResource); + LOG.debug("nodeUpdate: " + nm + + " clusterResources: " + getClusterResource()); } Resource releaseResources = Resource.newInstance(0, 0); @@ -1120,6 +1122,7 @@ private synchronized void nodeUpdate(RMNode nm) { private synchronized void updateNodeAndQueueResource(RMNode nm, ResourceOption resourceOption) { updateNodeResource(nm, resourceOption); + Resource clusterResource = getClusterResource(); root.updateClusterResource(clusterResource, new ResourceLimits( clusterResource)); } @@ -1129,7 +1132,7 @@ private synchronized void updateNodeAndQueueResource(RMNode nm, */ private synchronized void updateLabelsOnNode(NodeId nodeId, Set newLabels) { - FiCaSchedulerNode node = nodes.get(nodeId); + FiCaSchedulerNode node = nodeTracker.getNode(nodeId); if (null == node) { return; } @@ -1231,12 +1234,12 @@ protected synchronized void allocateContainersToNode(FiCaSchedulerNode node) { LeafQueue queue = ((LeafQueue) reservedApplication.getQueue()); assignment = queue.assignContainers( - clusterResource, + getClusterResource(), node, // TODO, now we only consider limits for parent for non-labeled // resources, should consider labeled resources as well. new ResourceLimits(labelManager.getResourceByLabel( - RMNodeLabelsManager.NO_LABEL, clusterResource)), + RMNodeLabelsManager.NO_LABEL, getClusterResource())), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); if (assignment.isFulfilledReservation()) { CSAssignment tmp = @@ -1262,14 +1265,14 @@ protected synchronized void allocateContainersToNode(FiCaSchedulerNode node) { } assignment = root.assignContainers( - clusterResource, + getClusterResource(), node, // TODO, now we only consider limits for parent for non-labeled // resources, should consider labeled resources as well. new ResourceLimits(labelManager.getResourceByLabel( - RMNodeLabelsManager.NO_LABEL, clusterResource)), + RMNodeLabelsManager.NO_LABEL, getClusterResource())), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - if (Resources.greaterThan(calculator, clusterResource, + if (Resources.greaterThan(calculator, getClusterResource(), assignment.getResource(), Resources.none())) { updateSchedulerHealth(lastNodeUpdateTime, node, assignment); return; @@ -1295,12 +1298,12 @@ protected synchronized void allocateContainersToNode(FiCaSchedulerNode node) { // Try to use NON_EXCLUSIVE assignment = root.assignContainers( - clusterResource, + getClusterResource(), node, // TODO, now we only consider limits for parent for non-labeled // resources, should consider labeled resources as well. new ResourceLimits(labelManager.getResourceByLabel( - RMNodeLabelsManager.NO_LABEL, clusterResource)), + RMNodeLabelsManager.NO_LABEL, getClusterResource())), SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY); updateSchedulerHealth(lastNodeUpdateTime, node, assignment); } @@ -1452,20 +1455,19 @@ public void handle(SchedulerEvent event) { private synchronized void addNode(RMNode nodeManager) { FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager, usePortForNodeName, nodeManager.getNodeLabels()); - this.nodes.put(nodeManager.getNodeID(), schedulerNode); - Resources.addTo(clusterResource, schedulerNode.getTotalResource()); + nodeTracker.addNode(schedulerNode); // update this node to node label manager if (labelManager != null) { labelManager.activateNode(nodeManager.getNodeID(), schedulerNode.getTotalResource()); } - + + Resource clusterResource = getClusterResource(); root.updateClusterResource(clusterResource, new ResourceLimits( clusterResource)); int numNodes = numNodeManagers.incrementAndGet(); - updateMaximumAllocation(schedulerNode, true); - + LOG.info("Added node " + nodeManager.getNodeAddress() + " clusterResource: " + clusterResource); @@ -1479,15 +1481,15 @@ private synchronized void removeNode(RMNode nodeInfo) { if (labelManager != null) { labelManager.deactivateNode(nodeInfo.getNodeID()); } - - FiCaSchedulerNode node = nodes.get(nodeInfo.getNodeID()); + + FiCaSchedulerNode node = nodeTracker.removeNode(nodeInfo.getNodeID()); if (node == null) { return; } - Resources.subtractFrom(clusterResource, node.getTotalResource()); + Resource clusterResource = getClusterResource(); root.updateClusterResource(clusterResource, new ResourceLimits( clusterResource)); - int numNodes = numNodeManagers.decrementAndGet(); + int numNodes = nodeTracker.nodeCount(); if (scheduleAsynchronously && numNodes == 0) { asyncSchedulerThread.suspendSchedule(); @@ -1513,11 +1515,8 @@ private synchronized void removeNode(RMNode nodeInfo) { RMContainerEventType.KILL); } - this.nodes.remove(nodeInfo.getNodeID()); - updateMaximumAllocation(node, false); - LOG.info("Removed node " + nodeInfo.getNodeAddress() + - " clusterResource: " + clusterResource); + " clusterResource: " + getClusterResource()); } private void rollbackContainerResource( @@ -1570,7 +1569,7 @@ protected synchronized void completedContainerInternal( // Inform the queue LeafQueue queue = (LeafQueue)application.getQueue(); - queue.completedContainer(clusterResource, application, node, + queue.completedContainer(getClusterResource(), application, node, rmContainer, containerStatus, event, null, true); if (containerStatus.getExitStatus() == ContainerExitStatus.PREEMPTED) { @@ -1596,7 +1595,7 @@ protected void decreaseContainer(SchedContainerChangeRequest decreaseRequest, FiCaSchedulerApp app = (FiCaSchedulerApp)attempt; LeafQueue queue = (LeafQueue) attempt.getQueue(); try { - queue.decreaseContainer(clusterResource, decreaseRequest, app); + queue.decreaseContainer(getClusterResource(), decreaseRequest, app); // Notify RMNode that the container can be pulled by NodeManager in the // next heartbeat this.rmContext.getDispatcher().getEventHandler() @@ -1619,12 +1618,12 @@ public FiCaSchedulerApp getApplicationAttempt( @Lock(Lock.NoLock.class) public FiCaSchedulerNode getNode(NodeId nodeId) { - return nodes.get(nodeId); + return nodeTracker.getNode(nodeId); } @Lock(Lock.NoLock.class) Map getAllNodes() { - return nodes; + return nodeTracker.shallowCopy(); } @Override @@ -1871,9 +1870,9 @@ public synchronized String moveApplication(ApplicationId appId, } // Move all live containers for (RMContainer rmContainer : app.getLiveContainers()) { - source.detachContainer(clusterResource, app, rmContainer); + source.detachContainer(getClusterResource(), app, rmContainer); // attach the Container to another queue - dest.attachContainer(clusterResource, app, rmContainer); + dest.attachContainer(getClusterResource(), app, rmContainer); } // Detach the application.. source.finishApplicationAttempt(app, sourceQueueName); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 2801bee..765925d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -226,7 +226,7 @@ public FairScheduler() { public boolean isAtLeastReservationThreshold( ResourceCalculator resourceCalculator, Resource resource) { return Resources.greaterThanOrEqual( - resourceCalculator, clusterResource, resource, reservationThreshold); + resourceCalculator, getClusterResource(), resource, reservationThreshold); } private void validateConf(Configuration conf) { @@ -352,6 +352,7 @@ protected synchronized void update() { // Recursively update demands for all queues rootQueue.updateDemand(); + Resource clusterResource = getClusterResource(); rootQueue.setFairShare(clusterResource); // Recursively compute fair shares for all queues // and update metrics @@ -526,6 +527,7 @@ protected Resource resourceDeficit(FSLeafQueue sched, long curTime) { Resource resDueToMinShare = Resources.none(); Resource resDueToFairShare = Resources.none(); ResourceCalculator calc = sched.getPolicy().getResourceCalculator(); + Resource clusterResource = getClusterResource(); if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) { Resource target = Resources.componentwiseMin( sched.getMinShare(), sched.getDemand()); @@ -577,7 +579,7 @@ public Resource getIncrementResourceCapability() { } private FSSchedulerNode getFSSchedulerNode(NodeId nodeId) { - return nodes.get(nodeId); + return nodeTracker.getNode(nodeId); } public double getNodeLocalityThreshold() { @@ -882,18 +884,11 @@ protected synchronized void completedContainerInternal( private synchronized void addNode(List containerReports, RMNode node) { FSSchedulerNode schedulerNode = new FSSchedulerNode(node, usePortForNodeName); - nodes.put(node.getNodeID(), schedulerNode); - String rackName = node.getRackName() == null ? "NULL" : node.getRackName(); - if (nodesPerRack.containsKey(rackName)) { - nodesPerRack.put(rackName, nodesPerRack.get(rackName) + 1); - } else { - nodesPerRack.put(rackName, 1); - } - Resources.addTo(clusterResource, node.getTotalCapability()); - updateMaximumAllocation(schedulerNode, true); + nodeTracker.addNode(schedulerNode); triggerUpdate(); + Resource clusterResource = getClusterResource(); queueMgr.getRootQueue().setSteadyFairShare(clusterResource); queueMgr.getRootQueue().recomputeSteadyShares(); LOG.info("Added node " + node.getNodeAddress() + @@ -904,12 +899,11 @@ private synchronized void addNode(List containerReports, } private synchronized void removeNode(RMNode rmNode) { - FSSchedulerNode node = getFSSchedulerNode(rmNode.getNodeID()); - // This can occur when an UNHEALTHY node reconnects + FSSchedulerNode node = nodeTracker.removeNode(rmNode.getNodeID()); if (node == null) { return; } - Resources.subtractFrom(clusterResource, rmNode.getTotalCapability()); + updateRootQueueMetrics(); triggerUpdate(); @@ -934,18 +928,10 @@ private synchronized void removeNode(RMNode rmNode) { RMContainerEventType.KILL); } - nodes.remove(rmNode.getNodeID()); - String rackName = node.getRackName() == null ? "NULL" : node.getRackName(); - if (nodesPerRack.containsKey(rackName) - && (nodesPerRack.get(rackName) > 0)) { - nodesPerRack.put(rackName, nodesPerRack.get(rackName) - 1); - } else { - LOG.error("Node [" + rmNode.getNodeAddress() + "] being removed from" + - " unknown rack [" + rackName + "] !!"); - } + Resource clusterResource = getClusterResource(); queueMgr.getRootQueue().setSteadyFairShare(clusterResource); queueMgr.getRootQueue().recomputeSteadyShares(); - updateMaximumAllocation(node, false); + LOG.info("Removed node " + rmNode.getNodeAddress() + " cluster capacity: " + clusterResource); } @@ -967,7 +953,7 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, // Sanity check SchedulerUtils.normalizeRequests(ask, DOMINANT_RESOURCE_CALCULATOR, - clusterResource, minimumAllocation, getMaximumResourceCapability(), + getClusterResource(), minimumAllocation, getMaximumResourceCapability(), incrAllocation); // Record container allocation start time @@ -1034,7 +1020,8 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, private synchronized void nodeUpdate(RMNode nm) { long start = getClock().getTime(); if (LOG.isDebugEnabled()) { - LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterResource); + LOG.debug("nodeUpdate: " + nm + + " cluster capacity: " + getClusterResource()); } eventLog.log("HEARTBEAT", nm.getHostName()); FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID()); @@ -1091,14 +1078,8 @@ private synchronized void nodeUpdate(RMNode nm) { void continuousSchedulingAttempt() throws InterruptedException { long start = getClock().getTime(); - List nodeIdList = new ArrayList(nodes.keySet()); - // Sort the nodes by space available on them, so that we offer - // containers on emptier nodes first, facilitating an even spread. This - // requires holding the scheduler lock, so that the space available on a - // node doesn't change during the sort. - synchronized (this) { - Collections.sort(nodeIdList, nodeAvailableResourceComparator); - } + List nodeIdList = + nodeTracker.sortedNodeList(nodeAvailableResourceComparator); // iterate all nodes for (NodeId nodeId : nodeIdList) { @@ -1130,15 +1111,15 @@ void continuousSchedulingAttempt() throws InterruptedException { @Override public int compare(NodeId n1, NodeId n2) { - if (!nodes.containsKey(n1)) { + if (!nodeTracker.exists(n1)) { return 1; } - if (!nodes.containsKey(n2)) { + if (!nodeTracker.exists(n2)) { return -1; } - return RESOURCE_CALCULATOR.compare(clusterResource, - nodes.get(n2).getUnallocatedResource(), - nodes.get(n1).getUnallocatedResource()); + return RESOURCE_CALCULATOR.compare(getClusterResource(), + nodeTracker.getNode(n2).getUnallocatedResource(), + nodeTracker.getNode(n1).getUnallocatedResource()); } } @@ -1150,7 +1131,7 @@ synchronized void attemptScheduling(FSSchedulerNode node) { } final NodeId nodeID = node.getNodeID(); - if (!nodes.containsKey(nodeID)) { + if (!nodeTracker.exists(nodeID)) { // The node might have just been removed while this thread was waiting // on the synchronized lock before it entered this synchronized method LOG.info("Skipping scheduling as the node " + nodeID + @@ -1203,7 +1184,7 @@ public ResourceCalculator getResourceCalculator() { private void updateRootQueueMetrics() { rootMetrics.setAvailableResourcesToQueue( Resources.subtract( - clusterResource, rootMetrics.getAllocatedResources())); + getClusterResource(), rootMetrics.getAllocatedResources())); } /** @@ -1214,6 +1195,7 @@ private void updateRootQueueMetrics() { */ private boolean shouldAttemptPreemption() { if (preemptionEnabled) { + Resource clusterResource = getClusterResource(); return (preemptionUtilizationThreshold < Math.max( (float) rootMetrics.getAllocatedMB() / clusterResource.getMemory(), (float) rootMetrics.getAllocatedVirtualCores() / @@ -1547,7 +1529,7 @@ public QueueInfo getQueueInfo(String queueName, boolean includeChildQueues, @Override public int getNumClusterNodes() { - return nodes.size(); + return nodeTracker.nodeCount(); } @Override @@ -1577,7 +1559,7 @@ public void onReload(AllocationConfiguration queueInfo) { // if it does not already exist, so it can be displayed on the web UI. synchronized (FairScheduler.this) { allocConf = queueInfo; - allocConf.getDefaultSchedulingPolicy().initialize(clusterResource); + allocConf.getDefaultSchedulingPolicy().initialize(getClusterResource()); queueMgr.updateAllocationConfiguration(allocConf); maxRunningEnforcer.updateRunnabilityOnReload(); } @@ -1721,7 +1703,7 @@ public synchronized void updateNodeResource(RMNode nm, ResourceOption resourceOption) { super.updateNodeResource(nm, resourceOption); updateRootQueueMetrics(); - queueMgr.getRootQueue().setSteadyFairShare(clusterResource); + queueMgr.getRootQueue().setSteadyFairShare(getClusterResource()); queueMgr.getRootQueue().recomputeSteadyShares(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 147c3f3..3c06ad6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -142,6 +142,7 @@ public QueueInfo getQueueInfo( QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class); queueInfo.setQueueName(DEFAULT_QUEUE.getQueueName()); queueInfo.setCapacity(1.0f); + Resource clusterResource = getClusterResource(); if (clusterResource.getMemory() == 0) { queueInfo.setCurrentCapacity(0.0f); } else { @@ -297,7 +298,7 @@ public synchronized Configuration getConf() { @Override public int getNumClusterNodes() { - return nodes.size(); + return nodeTracker.nodeCount(); } @Override @@ -327,7 +328,7 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, // Sanity check SchedulerUtils.normalizeRequests(ask, resourceCalculator, - clusterResource, minimumAllocation, getMaximumResourceCapability()); + getClusterResource(), minimumAllocation, getMaximumResourceCapability()); // Release containers releaseContainers(release, application); @@ -377,7 +378,7 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, } private FiCaSchedulerNode getNode(NodeId nodeId) { - return nodes.get(nodeId); + return nodeTracker.getNode(nodeId); } @VisibleForTesting @@ -526,7 +527,7 @@ private void assignContainers(FiCaSchedulerNode node) { application.showRequests(); // Done - if (Resources.lessThan(resourceCalculator, clusterResource, + if (Resources.lessThan(resourceCalculator, getClusterResource(), node.getUnallocatedResource(), minimumAllocation)) { break; } @@ -764,7 +765,7 @@ private synchronized void nodeUpdate(RMNode rmNode) { return; } - if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource, + if (Resources.greaterThanOrEqual(resourceCalculator, getClusterResource(), node.getUnallocatedResource(), minimumAllocation)) { LOG.debug("Node heartbeat " + rmNode.getNodeID() + " available resource = " + node.getUnallocatedResource()); @@ -783,12 +784,12 @@ private void increaseUsedResources(RMContainer rmContainer) { } private void updateAppHeadRoom(SchedulerApplicationAttempt schedulerAttempt) { - schedulerAttempt.setHeadroom(Resources.subtract(clusterResource, + schedulerAttempt.setHeadroom(Resources.subtract(getClusterResource(), usedResource)); } private void updateAvailableResourcesMetrics() { - metrics.setAvailableResourcesToQueue(Resources.subtract(clusterResource, + metrics.setAvailableResourcesToQueue(Resources.subtract(getClusterResource(), usedResource)); } @@ -925,7 +926,7 @@ protected synchronized void completedContainerInternal( private Resource usedResource = recordFactory.newRecordInstance(Resource.class); private synchronized void removeNode(RMNode nodeInfo) { - FiCaSchedulerNode node = getNode(nodeInfo.getNodeID()); + FiCaSchedulerNode node = nodeTracker.removeNode(nodeInfo.getNodeID()); if (node == null) { return; } @@ -937,13 +938,6 @@ private synchronized void removeNode(RMNode nodeInfo) { SchedulerUtils.LOST_CONTAINER), RMContainerEventType.KILL); } - - //Remove the node - this.nodes.remove(nodeInfo.getNodeID()); - updateMaximumAllocation(node, false); - - // Update cluster metrics - Resources.subtractFrom(clusterResource, node.getTotalResource()); } @Override @@ -965,9 +959,7 @@ public ResourceCalculator getResourceCalculator() { private synchronized void addNode(RMNode nodeManager) { FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager, usePortForNodeName); - this.nodes.put(nodeManager.getNodeID(), schedulerNode); - Resources.addTo(clusterResource, schedulerNode.getTotalResource()); - updateMaximumAllocation(schedulerNode, true); + nodeTracker.addNode(schedulerNode); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java index 8411a4d..a3f977b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java @@ -278,22 +278,18 @@ public void testUpdateMaxAllocationUsesTotal() throws IOException { verifyMaximumResourceCapability(configuredMaximumResource, scheduler); - scheduler.nodes = new HashMap(); + scheduler.nodeTracker = new ClusterNodeTracker(); - scheduler.nodes.put(mockNode1.getNodeID(), mockNode1); - scheduler.updateMaximumAllocation(mockNode1, true); + scheduler.nodeTracker.addNode(mockNode1); verifyMaximumResourceCapability(fullResource1, scheduler); - scheduler.nodes.put(mockNode2.getNodeID(), mockNode2); - scheduler.updateMaximumAllocation(mockNode2, true); + scheduler.nodeTracker.addNode(mockNode2); verifyMaximumResourceCapability(fullResource2, scheduler); - scheduler.nodes.remove(mockNode2.getNodeID()); - scheduler.updateMaximumAllocation(mockNode2, false); + scheduler.nodeTracker.removeNode(mockNode2.getNodeID()); verifyMaximumResourceCapability(fullResource1, scheduler); - scheduler.nodes.remove(mockNode1.getNodeID()); - scheduler.updateMaximumAllocation(mockNode1, false); + scheduler.nodeTracker.removeNode(mockNode1.getNodeID()); verifyMaximumResourceCapability(configuredMaximumResource, scheduler); } finally { rm.stop(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index 9bfc283..ddbea4e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -309,7 +309,7 @@ public void testUpdateResourceOnNode() throws Exception { FifoScheduler scheduler = new FifoScheduler(){ @SuppressWarnings("unused") public Map getNodes(){ - return nodes; + return nodeTracker.shallowCopy(); } }; RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null,