diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 9662c39..052517a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -424,6 +424,7 @@ protected void serviceInit(Configuration configuration) throws Exception { rmContext.setAMFinishingMonitor(amFinishingMonitor); RMNodeLabelsManager nlm = createNodeLabelManager(); + nlm.setRMDispatcher(rmDispatcher); addService(nlm); rmContext.setNodeLabelManager(nlm); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java index ba1727c..08ab068 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java @@ -35,7 +35,9 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent; import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.collect.ImmutableSet; @@ -57,6 +59,8 @@ protected Queue() { new ConcurrentHashMap(); protected AccessControlList adminAcl; + private Dispatcher rmDispatcher = null; + @Override protected void serviceInit(Configuration conf) throws Exception { super.serviceInit(conf); @@ -331,6 +335,7 @@ public boolean containsNodeLabel(String label) { return map; } + @SuppressWarnings("unchecked") private void updateResourceMappings(Map before, Map after) { // Get NMs in before only @@ -379,6 +384,13 @@ private void updateResourceMappings(Map before, Node newNM; if ((newNM = getNMInNodeSet(nodeId, after, true)) != null) { Set newLabels = getLabelsByNode(nodeId, after); + + // if we a previously running NM, we need notify scheduler about this + if (oldNM != null && rmDispatcher != null) { + rmDispatcher.getEventHandler().handle( + new NodeLabelsUpdateSchedulerEvent(nodeId, newLabels)); + } + // no label in the past if (newLabels.isEmpty()) { // update labels @@ -452,4 +464,8 @@ public boolean checkAccess(UserGroupInformation user) { } return false; } + + public void setRMDispatcher(Dispatcher rmDispatcher) { + this.rmDispatcher = rmDispatcher; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index b115fc8..61c3df9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -22,6 +22,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -33,11 +35,14 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.util.resource.Resources; +import com.google.common.collect.ImmutableSet; + /** * Represents a YARN Cluster Node from the viewpoint of the scheduler. @@ -61,8 +66,12 @@ private final RMNode rmNode; private final String nodeName; - - public SchedulerNode(RMNode node, boolean usePortForNodeName) { + + private AtomicReference> labelsRef = + new AtomicReference>(null); + + public SchedulerNode(RMNode node, boolean usePortForNodeName, + Set labels) { this.rmNode = node; this.availableResource = Resources.clone(node.getTotalCapability()); this.totalResourceCapability = Resources.clone(node.getTotalCapability()); @@ -71,6 +80,11 @@ public SchedulerNode(RMNode node, boolean usePortForNodeName) { } else { nodeName = rmNode.getHostName(); } + this.labelsRef.set(ImmutableSet.copyOf(labels)); + } + + public SchedulerNode(RMNode node, boolean usePortForNodeName) { + this(node, usePortForNodeName, CommonNodeLabelsManager.EMPTY_STRING_SET); } public RMNode getRMNode() { @@ -274,4 +288,12 @@ public synchronized void recoverContainer(RMContainer rmContainer) { } allocateContainer(rmContainer); } + + public Set getLabels() { + return labelsRef.get(); + } + + public void updateLabels(Set labels) { + labelsRef.set(ImmutableSet.copyOf(labels)); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index fc0fbb4..effb4ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -64,7 +64,6 @@ Set accessibleLabels; RMNodeLabelsManager labelManager; String defaultLabelExpression; - Resource usedResources = Resources.createResource(0, 0); QueueInfo queueInfo; Map absoluteCapacityByNodeLabels; Map capacitiyByNodeLabels; @@ -158,7 +157,13 @@ public synchronized float getUsedCapacity() { @Override public synchronized Resource getUsedResources() { - return usedResources; + return getUsedResources(RMNodeLabelsManager.NO_LABEL); + } + + @Override + public synchronized Resource getUsedResources(String nodeLabel) { + Resource res = usedResourcesByNodeLabels.get(nodeLabel); + return res == null ? Resources.none() : res; } public synchronized int getNumContainers() { @@ -341,9 +346,7 @@ public Resource getMinimumAllocation() { } synchronized void allocateResource(Resource clusterResource, - Resource resource, Set nodeLabels) { - Resources.addTo(usedResources, resource); - + Resource resource, Set nodeLabels) { // Update usedResources by labels if (nodeLabels == null || nodeLabels.isEmpty()) { if (!usedResourcesByNodeLabels.containsKey(RMNodeLabelsManager.NO_LABEL)) { @@ -368,9 +371,6 @@ synchronized void allocateResource(Resource clusterResource, protected synchronized void releaseResource(Resource clusterResource, Resource resource, Set nodeLabels) { - // Update queue metrics - Resources.subtractFrom(usedResources, resource); - // Update usedResources by labels if (null == nodeLabels || nodeLabels.isEmpty()) { if (!usedResourcesByNodeLabels.containsKey(RMNodeLabelsManager.NO_LABEL)) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index 6438d6c..7572fab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -144,6 +144,14 @@ public Resource getUsedResources(); /** + * Get the currently utilized resources which allocated at nodes with label + * specified + * + * @return used resources by the queue and it's children + */ + public Resource getUsedResources(String nodeLabel); + + /** * Get the current run-state of the queue * @return current run-state */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 28158c1..1d523db 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -47,7 +47,9 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.QueueACL; @@ -92,6 +94,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; @@ -966,6 +969,42 @@ private synchronized void updateNodeAndQueueResource(RMNode nm, updateNodeResource(nm, resourceOption); root.updateClusterResource(clusterResource); } + + /** + * Process node labels update on a node + */ + private synchronized void updateNodeLabels(NodeId nodeId, + Set newLabels) { + FiCaSchedulerNode node = nodes.get(nodeId); + + // labels is same, we don't need do update + if (node.getLabels().size() == newLabels.size() + && node.getLabels().containsAll(newLabels)) { + return; + } + + // Kill running containers since label is changed + for (RMContainer rmContainer : node.getRunningContainers()) { + ContainerId containerId = rmContainer.getContainerId(); + completedContainer(rmContainer, + ContainerStatus.newInstance(containerId, + ContainerState.COMPLETE, + String.format( + "Container=%s killed since labels on the node=%s changed", + containerId.toString(), nodeId.toString()), + ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), + RMContainerEventType.KILL); + } + + // Unreserve container on this node + RMContainer reservedContainer = node.getReservedContainer(); + if (null != reservedContainer) { + dropContainerReservation(reservedContainer); + } + + // Update node labels after we've done this + node.updateLabels(newLabels); + } private synchronized void allocateContainersToNode(FiCaSchedulerNode node) { if (rmContext.isWorkPreservingRecoveryEnabled() @@ -1049,6 +1088,14 @@ public void handle(SchedulerEvent event) { nodeResourceUpdatedEvent.getResourceOption()); } break; + case NODE_LABELS_UPDATE: + { + NodeLabelsUpdateSchedulerEvent labelUpdateEvent = + (NodeLabelsUpdateSchedulerEvent) event; + updateNodeLabels(labelUpdateEvent.getNodeId(), + labelUpdateEvent.getNewNodeLabels()); + } + break; case NODE_UPDATE: { NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event; @@ -1123,7 +1170,7 @@ private synchronized void addNode(RMNode nodeManager) { nodeManager.getTotalCapability()); } FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager, - usePortForNodeName); + usePortForNodeName, nodeManager.getNodeLabels()); this.nodes.put(nodeManager.getNodeID(), schedulerNode); Resources.addTo(clusterResource, nodeManager.getTotalCapability()); root.updateClusterResource(clusterResource); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index ffeec63..f936574 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -463,7 +463,7 @@ public String toString() { return queueName + ": " + "capacity=" + capacity + ", " + "absoluteCapacity=" + absoluteCapacity + ", " + - "usedResources=" + usedResources + ", " + + "usedResources=" + getUsedResources() + ", " + "usedCapacity=" + getUsedCapacity() + ", " + "absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + ", " + "numApps=" + getNumApplications() + ", " + @@ -730,7 +730,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, // if our queue cannot access this node, just return if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels, - labelManager.getLabelsOnNode(node.getNodeID()))) { + node.getLabels())) { return NULL_ASSIGNMENT; } @@ -799,7 +799,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, // Check queue max-capacity limit if (!canAssignToThisQueue(clusterResource, required, - labelManager.getLabelsOnNode(node.getNodeID()), application, true)) { + node.getLabels(), application, true)) { return NULL_ASSIGNMENT; } @@ -832,7 +832,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, // Book-keeping // Note: Update headroom to account for current allocation too... allocateResource(clusterResource, application, assigned, - labelManager.getLabelsOnNode(node.getNodeID())); + node.getLabels()); // Don't reset scheduling opportunities for non-local assignments // otherwise the app will be delayed for each non-local assignment. @@ -911,7 +911,7 @@ private Resource getHeadroom(User user, Resource queueMaxCap, Resource headroom = Resources.min(resourceCalculator, clusterResource, Resources.subtract(userLimit, user.getTotalConsumedResources()), - Resources.subtract(queueMaxCap, usedResources) + Resources.subtract(queueMaxCap, getUsedResources()) ); return headroom; } @@ -959,14 +959,14 @@ synchronized boolean canAssignToThisQueue(Resource clusterResource, LOG.debug("try to use reserved: " + getQueueName() + " usedResources: " - + usedResources + + getUsedResources() + " clusterResources: " + clusterResource + " reservedResources: " + application.getCurrentReservation() + " currentCapacity " + Resources.divide(resourceCalculator, clusterResource, - usedResources, clusterResource) + " required " + required + getUsedResources(), clusterResource) + " required " + required + " potentialNewWithoutReservedCapacity: " + potentialNewWithoutReservedCapacity + " ( " + " max-capacity: " + absoluteMaxCapacity + ")"); @@ -1092,8 +1092,8 @@ private Resource computeUserLimit(FiCaSchedulerApp application, Resource currentCapacity = Resources.lessThan(resourceCalculator, clusterResource, - usedResources, queueCapacity) ? - queueCapacity : Resources.add(usedResources, required); + getUsedResources(), queueCapacity) ? + queueCapacity : Resources.add(getUsedResources(), required); // Never allow a single user to take more than the // queue's configured capacity * user-limit-factor. @@ -1131,7 +1131,7 @@ private Resource computeUserLimit(FiCaSchedulerApp application, " consumed: " + user.getTotalConsumedResources() + " limit: " + limit + " queueCapacity: " + queueCapacity + - " qconsumed: " + usedResources + + " qconsumed: " + getUsedResources() + " currentCapacity: " + currentCapacity + " activeUsers: " + activeUsers + " clusterCapacity: " + clusterResource @@ -1478,7 +1478,7 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod // check if the resource request can access the label if (!SchedulerUtils.checkNodeLabelExpression( - labelManager.getLabelsOnNode(node.getNodeID()), + node.getLabels(), request.getNodeLabelExpression())) { // this is a reserved container, but we cannot allocate it now according // to label not match. This can be caused by node label changed @@ -1601,7 +1601,7 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod " queue=" + this.toString() + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + - " used=" + usedResources + + " used=" + getUsedResources() + " cluster=" + clusterResource); return request.getCapability(); @@ -1669,8 +1669,7 @@ public void completedContainer(Resource clusterResource, // Book-keeping if (removed) { releaseResource(clusterResource, application, - container.getResource(), - labelManager.getLabelsOnNode(node.getNodeID())); + container.getResource(), node.getLabels()); LOG.info("completedContainer" + " container=" + container + " queue=" + this + @@ -1703,7 +1702,7 @@ synchronized void allocateResource(Resource clusterResource, if (LOG.isDebugEnabled()) { LOG.info(getQueueName() + " user=" + userName + - " used=" + usedResources + " numContainers=" + numContainers + + " used=" + getUsedResources() + " numContainers=" + numContainers + " headroom = " + application.getHeadroom() + " user-resources=" + user.getTotalConsumedResources() ); @@ -1721,7 +1720,7 @@ synchronized void releaseResource(Resource clusterResource, metrics.setAvailableResourcesToUser(userName, application.getHeadroom()); LOG.info(getQueueName() + - " used=" + usedResources + " numContainers=" + numContainers + + " used=" + getUsedResources() + " numContainers=" + numContainers + " user=" + userName + " user-resources=" + user.getTotalConsumedResources()); } @@ -1862,9 +1861,10 @@ public void recoverContainer(Resource clusterResource, } // Careful! Locking order is important! synchronized (this) { + FiCaSchedulerNode node = + scheduler.getNode(rmContainer.getContainer().getNodeId()); allocateResource(clusterResource, attempt, rmContainer.getContainer() - .getResource(), labelManager.getLabelsOnNode(rmContainer - .getContainer().getNodeId())); + .getResource(), node.getLabels()); } getParent().recoverContainer(clusterResource, attempt, rmContainer); } @@ -1901,14 +1901,15 @@ public void collectSchedulerApplications( public void attachContainer(Resource clusterResource, FiCaSchedulerApp application, RMContainer rmContainer) { if (application != null) { + FiCaSchedulerNode node = + scheduler.getNode(rmContainer.getContainer().getNodeId()); allocateResource(clusterResource, application, rmContainer.getContainer() - .getResource(), labelManager.getLabelsOnNode(rmContainer - .getContainer().getNodeId())); + .getResource(), node.getLabels()); LOG.info("movedContainer" + " container=" + rmContainer.getContainer() + " resource=" + rmContainer.getContainer().getResource() + " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used=" - + usedResources + " cluster=" + clusterResource); + + getUsedResources() + " cluster=" + clusterResource); // Inform the parent queue getParent().attachContainer(clusterResource, application, rmContainer); } @@ -1918,14 +1919,15 @@ public void attachContainer(Resource clusterResource, public void detachContainer(Resource clusterResource, FiCaSchedulerApp application, RMContainer rmContainer) { if (application != null) { + FiCaSchedulerNode node = + scheduler.getNode(rmContainer.getContainer().getNodeId()); releaseResource(clusterResource, application, rmContainer.getContainer() - .getResource(), labelManager.getLabelsOnNode(rmContainer.getContainer() - .getNodeId())); + .getResource(), node.getLabels()); LOG.info("movedContainer" + " container=" + rmContainer.getContainer() + " resource=" + rmContainer.getContainer().getResource() + " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used=" - + usedResources + " cluster=" + clusterResource); + + getUsedResources() + " cluster=" + clusterResource); // Inform the parent queue getParent().detachContainer(clusterResource, application, rmContainer); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 6ffaf4c..1bde7f1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -73,6 +73,7 @@ private final boolean rootQueue; final Comparator queueComparator; volatile int numApplications; + private final CapacitySchedulerContext scheduler; private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -80,7 +81,7 @@ public ParentQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { super(cs, queueName, parent, old); - + this.scheduler = cs; this.queueComparator = cs.getQueueComparator(); this.rootQueue = (parent == null); @@ -257,7 +258,7 @@ public String toString() { "numChildQueue= " + childQueues.size() + ", " + "capacity=" + capacity + ", " + "absoluteCapacity=" + absoluteCapacity + ", " + - "usedResources=" + usedResources + + "usedResources=" + getUsedResources() + "usedCapacity=" + getUsedCapacity() + ", " + "numApps=" + getNumApplications() + ", " + "numContainers=" + getNumContainers(); @@ -420,10 +421,10 @@ public synchronized CSAssignment assignContainers( Resource clusterResource, FiCaSchedulerNode node, boolean needToUnreserve) { CSAssignment assignment = new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); + Set nodeLabels = node.getLabels(); // if our queue cannot access this node, just return - if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels, - labelManager.getLabelsOnNode(node.getNodeID()))) { + if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels, nodeLabels)) { return assignment; } @@ -434,7 +435,6 @@ public synchronized CSAssignment assignContainers( } boolean localNeedToUnreserve = false; - Set nodeLabels = labelManager.getLabelsOnNode(node.getNodeID()); // Are we over maximum-capacity for this queue? if (!canAssignToThisQueue(clusterResource, nodeLabels)) { @@ -465,7 +465,7 @@ public synchronized CSAssignment assignContainers( " queue=" + getQueueName() + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + - " used=" + usedResources + + " used=" + getUsedResources() + " cluster=" + clusterResource); } else { @@ -519,7 +519,7 @@ private synchronized boolean canAssignToThisQueue(Resource clusterResource, if (currentAbsoluteLabelUsedCapacity >= getAbsoluteMaximumCapacityByNodeLabel(label)) { if (LOG.isDebugEnabled()) { - LOG.debug(getQueueName() + " used=" + usedResources + LOG.debug(getQueueName() + " used=" + getUsedResources() + " current-capacity (" + usedResourcesByNodeLabels.get(label) + ") " + " >= max-capacity (" + labelManager.getResourceByLabel(label, clusterResource) + ")"); @@ -542,16 +542,16 @@ private synchronized boolean assignToQueueIfUnreserve(Resource clusterResource) .getReservedMB(), getMetrics().getReservedVirtualCores()); float capacityWithoutReservedCapacity = Resources.divide( resourceCalculator, clusterResource, - Resources.subtract(usedResources, reservedResources), + Resources.subtract(getUsedResources(), reservedResources), clusterResource); if (capacityWithoutReservedCapacity <= absoluteMaxCapacity) { if (LOG.isDebugEnabled()) { LOG.debug("parent: try to use reserved: " + getQueueName() - + " usedResources: " + usedResources.getMemory() + + " usedResources: " + getUsedResources().getMemory() + " clusterResources: " + clusterResource.getMemory() + " reservedResources: " + reservedResources.getMemory() - + " currentCapacity " + ((float) usedResources.getMemory()) + + " currentCapacity " + ((float) getUsedResources().getMemory()) / clusterResource.getMemory() + " potentialNewWithoutReservedCapacity: " + capacityWithoutReservedCapacity + " ( " + " max-capacity: " @@ -641,13 +641,13 @@ public void completedContainer(Resource clusterResource, // Book keeping synchronized (this) { super.releaseResource(clusterResource, rmContainer.getContainer() - .getResource(), labelManager.getLabelsOnNode(node.getNodeID())); + .getResource(), node.getLabels()); LOG.info("completedContainer" + " queue=" + getQueueName() + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + - " used=" + usedResources + + " used=" + getUsedResources() + " cluster=" + clusterResource); } @@ -703,9 +703,10 @@ public void recoverContainer(Resource clusterResource, } // Careful! Locking order is important! synchronized (this) { + FiCaSchedulerNode node = + scheduler.getNode(rmContainer.getContainer().getNodeId()); super.allocateResource(clusterResource, rmContainer.getContainer() - .getResource(), labelManager.getLabelsOnNode(rmContainer - .getContainer().getNodeId())); + .getResource(), node.getLabels()); } if (parent != null) { parent.recoverContainer(clusterResource, attempt, rmContainer); @@ -730,12 +731,13 @@ public void collectSchedulerApplications( public void attachContainer(Resource clusterResource, FiCaSchedulerApp application, RMContainer rmContainer) { if (application != null) { + FiCaSchedulerNode node = + scheduler.getNode(rmContainer.getContainer().getNodeId()); super.allocateResource(clusterResource, rmContainer.getContainer() - .getResource(), labelManager.getLabelsOnNode(rmContainer - .getContainer().getNodeId())); + .getResource(), node.getLabels()); LOG.info("movedContainer" + " queueMoveIn=" + getQueueName() + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" - + getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster=" + + getAbsoluteUsedCapacity() + " used=" + getUsedResources() + " cluster=" + clusterResource); // Inform the parent if (parent != null) { @@ -748,12 +750,14 @@ public void attachContainer(Resource clusterResource, public void detachContainer(Resource clusterResource, FiCaSchedulerApp application, RMContainer rmContainer) { if (application != null) { + FiCaSchedulerNode node = + scheduler.getNode(rmContainer.getContainer().getNodeId()); super.releaseResource(clusterResource, rmContainer.getContainer().getResource(), - labelManager.getLabelsOnNode(rmContainer.getContainer().getNodeId())); + node.getLabels()); LOG.info("movedContainer" + " queueMoveOut=" + getQueueName() + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" - + getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster=" + + getAbsoluteUsedCapacity() + " used=" + getUsedResources() + " cluster=" + clusterResource); // Inform the parent if (parent != null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java index 5227aac..fe6db47 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java @@ -19,12 +19,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica; +import java.util.Set; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; @@ -32,9 +34,14 @@ public class FiCaSchedulerNode extends SchedulerNode { private static final Log LOG = LogFactory.getLog(FiCaSchedulerNode.class); + + public FiCaSchedulerNode(RMNode node, boolean usePortForNodeName, + Set nodeLabels) { + super(node, usePortForNodeName, nodeLabels); + } public FiCaSchedulerNode(RMNode node, boolean usePortForNodeName) { - super(node, usePortForNodeName); + this(node, usePortForNodeName, CommonNodeLabelsManager.EMPTY_STRING_SET); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeLabelsUpdateSchedulerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeLabelsUpdateSchedulerEvent.java new file mode 100644 index 0000000..4ec60fa --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeLabelsUpdateSchedulerEvent.java @@ -0,0 +1,25 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; + +import java.util.Set; + +import org.apache.hadoop.yarn.api.records.NodeId; + +public class NodeLabelsUpdateSchedulerEvent extends SchedulerEvent { + + private Set newNodeLabels; + private NodeId nodeId; + + public NodeLabelsUpdateSchedulerEvent(NodeId nodeId, Set newLabels) { + super(SchedulerEventType.NODE_LABELS_UPDATE); + this.newNodeLabels = newLabels; + this.nodeId = nodeId; + } + + public Set getNewNodeLabels() { + return newNodeLabels; + } + + public NodeId getNodeId() { + return nodeId; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java index 062f831..13aecb3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java @@ -25,6 +25,7 @@ NODE_REMOVED, NODE_UPDATE, NODE_RESOURCE_UPDATE, + NODE_LABELS_UPDATE, // Source: RMApp APP_ADDED, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java new file mode 100644 index 0000000..d0550ae --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java @@ -0,0 +1,180 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import java.util.ArrayList; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.MemoryRMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; + +public class TestCapacitySchedulerNodeLabelUpdate { + private static final Log LOG = LogFactory + .getLog(TestCapacitySchedulerNodeLabelUpdate.class); + + private final int GB = 1024; + + private YarnConfiguration conf; + + RMNodeLabelsManager mgr; + + @Before + public void setUp() throws Exception { + conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + mgr = new MemoryRMNodeLabelsManager(); + mgr.init(conf); + } + + private Configuration getConfigurationWithQueueLabels(Configuration config) { + CapacitySchedulerConfiguration conf = + new CapacitySchedulerConfiguration(config); + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a"}); + conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100); + conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100); + conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "z", 100); + + final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + conf.setCapacity(A, 100); + conf.setAccessibleNodeLabels(A, ImmutableSet.of("x", "y", "z")); + conf.setCapacityByLabel(A, "x", 100); + conf.setCapacityByLabel(A, "y", 100); + conf.setCapacityByLabel(A, "z", 100); + + return conf; + } + + private Set toSet(E... elements) { + Set set = Sets.newHashSet(elements); + return set; + } + + private void checkUsedResource(MockRM rm, String queueName, int memory) { + checkUsedResource(rm, queueName, memory, RMNodeLabelsManager.NO_LABEL); + } + + private void checkUsedResource(MockRM rm, String queueName, int memory, + String label) { + CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler(); + CSQueue queue = scheduler.getQueue(queueName); + Assert.assertEquals(memory, queue.getUsedResources(label).getMemory()); + } + + @Test (timeout = 30000) + public void testNodeUpdate() throws Exception { + // set node -> label + mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y", "z")); + + // set mapping: + // h1 -> x + // h2 -> y + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"))); + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h2", 0), toSet("y"))); + + // inject node label manager + MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm.getRMContext().setNodeLabelManager(mgr); + rm.start(); + MockNM nm1 = rm.registerNode("h1:1234", 8000); + MockNM nm2 = rm.registerNode("h2:1234", 8000); + MockNM nm3 = rm.registerNode("h3:1234", 8000); + + ContainerId containerId; + + // launch an app to queue a1 (label = x), and check all container will + // be allocated in h1 + RMApp app1 = rm.submitApp(GB, "app", "user", null, "a"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm3); + + // request a container. + am1.allocate("*", GB, 1, new ArrayList(), "x"); + containerId = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); + Assert.assertTrue(rm.waitForState(nm1, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + + // check used resource: + // queue-a used x=1G, ""=1G + checkUsedResource(rm, "a", 1024, "x"); + checkUsedResource(rm, "a", 1024); + + // change h1's label to z, container should be killed + mgr.replaceLabelsOnNode(ImmutableMap.of(NodeId.newInstance("h1", 0), + toSet("z"))); + Assert.assertTrue(rm.waitForState(nm1, containerId, + RMContainerState.KILLED, 10 * 1000)); + + // check used resource: + // queue-a used x=0G, ""=1G ("" not changed) + checkUsedResource(rm, "a", 0, "x"); + checkUsedResource(rm, "a", 1024); + + // request a container with label = y + am1.allocate("*", GB, 1, new ArrayList(), "y"); + containerId = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 3); + Assert.assertTrue(rm.waitForState(nm2, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + + // check used resource: + // queue-a used y=1G, ""=1G + checkUsedResource(rm, "a", 1024, "y"); + checkUsedResource(rm, "a", 1024); + + // change h2's label to no label, container should be killed + mgr.replaceLabelsOnNode(ImmutableMap.of(NodeId.newInstance("h2", 0), + CommonNodeLabelsManager.EMPTY_STRING_SET)); + Assert.assertTrue(rm.waitForState(nm1, containerId, + RMContainerState.KILLED, 10 * 1000)); + + // check used resource: + // queue-a used x=0G, y=0G, ""=1G ("" not changed) + checkUsedResource(rm, "a", 0, "x"); + checkUsedResource(rm, "a", 0, "y"); + checkUsedResource(rm, "a", 1024); + + containerId = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); + + // change h3's label to z, AM container should be killed + mgr.replaceLabelsOnNode(ImmutableMap.of(NodeId.newInstance("h3", 0), + toSet("z"))); + Assert.assertTrue(rm.waitForState(nm1, containerId, + RMContainerState.KILLED, 10 * 1000)); + + // check used resource: + // queue-a used x=0G, y=0G, ""=1G ("" not changed) + checkUsedResource(rm, "a", 0, "x"); + checkUsedResource(rm, "a", 0, "y"); + checkUsedResource(rm, "a", 0); + + rm.close(); + } +}