From 318dbd41b0577bb3fb0d6601023d177d64eb0625 Mon Sep 17 00:00:00 2001 From: Sunil G Date: Thu, 28 Jun 2018 17:31:44 -0700 Subject: [PATCH] YARN-7863 --- .../yarn/api/resource/PlacementConstraints.java | 2 + .../server/resourcemanager/ResourceManager.java | 2 + .../nodelabels/NodeAttributesManagerImpl.java | 20 +++++++ .../resourcemanager/scheduler/SchedulerNode.java | 10 ++++ .../scheduler/capacity/CapacityScheduler.java | 34 +++++++++++ .../constraint/PlacementConstraintsUtil.java | 70 ++++++++++++++++++---- .../event/NodeAttributesUpdateSchedulerEvent.java | 39 ++++++++++++ .../scheduler/event/SchedulerEventType.java | 1 + .../SingleConstraintAppPlacementAllocator.java | 34 ++++++++--- 9 files changed, 193 insertions(+), 19 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAttributesUpdateSchedulerEvent.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java index d22a6bd90c0..20c78f7278d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java @@ -49,6 +49,8 @@ private PlacementConstraints() { public static final String NODE = PlacementConstraint.NODE_SCOPE; public static final String RACK = PlacementConstraint.RACK_SCOPE; public static final String NODE_PARTITION = "yarn_node_partition/"; + public static final String DIST_NODE_ATTRIBUTES = "nm.yarn.io/"; + public static final String CENTRAL_NODE_ATTRIBUTES = "rm.yarn.io/"; /** * Creates a constraint that requires allocations to be placed on nodes that diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index e6ea082af52..db4c34f897b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -650,6 +650,8 @@ protected void serviceInit(Configuration configuration) throws Exception { rmContext.setNodeLabelManager(nlm); NodeAttributesManager nam = createNodeAttributesManager(); + NodeAttributesManagerImpl namImpl = (NodeAttributesManagerImpl) nam; + namImpl.setRMContext(rmContext); addService(nam); rmContext.setNodeAttributesManager(nam); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java index 6eb45898940..c663424a901 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java @@ -56,6 +56,8 @@ import org.apache.hadoop.yarn.nodelabels.StringAttributeValue; import org.apache.hadoop.yarn.server.api.protocolrecords.AttributeMappingOperationType; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAttributesUpdateSchedulerEvent; import com.google.common.base.Strings; @@ -91,6 +93,7 @@ private final ReadLock readLock; private final WriteLock writeLock; + private RMContext rmContext = null; public NodeAttributesManagerImpl() { super("NodeAttributesManagerImpl"); @@ -205,6 +208,19 @@ private void internalUpdateAttributesOnNodes( .handle(new NodeAttributesStoreEvent(nodeAttributeMapping, op)); } + // Map used to notify RM + Map> newNodeToAttributesMap = + new HashMap>(); + nodeCollections.forEach((k, v) -> { + newNodeToAttributesMap.put(k, v.attributes.keySet()); + }); + + // Notify RM + if (rmContext != null && rmContext.getDispatcher() != null) { + rmContext.getDispatcher().getEventHandler().handle( + new NodeAttributesUpdateSchedulerEvent(newNodeToAttributesMap)); + } + } finally { writeLock.unlock(); } @@ -702,4 +718,8 @@ protected void serviceStop() throws Exception { store.close(); } } + + public void setRMContext(RMContext rmContext) { + this.rmContext = rmContext; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index d5bfc57e66d..021c47974b1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.NodeAttribute; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; @@ -76,6 +77,7 @@ private final String nodeName; private volatile Set labels = null; + private volatile Set nodeAttributes = null; // Last updated time private volatile long lastHeartbeatMonotonicTime; @@ -488,6 +490,14 @@ public int hashCode() { return getNodeID().hashCode(); } + public Set getNodeAttributes() { + return nodeAttributes; + } + + public void updateNodeAttributes(Set nodeAttributes) { + this.nodeAttributes = nodeAttributes; + } + private static class ContainerInfo { private final RMContainer container; private boolean launchedOnNode; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 776e512d433..10ca8e01bee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.NodeAttribute; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; @@ -135,6 +136,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAttributesUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; @@ -1675,6 +1677,14 @@ public void handle(SchedulerEvent event) { updateNodeLabelsAndQueueResource(labelUpdateEvent); } break; + case NODE_ATTRIBUTES_UPDATE: + { + NodeAttributesUpdateSchedulerEvent attributeUpdateEvent = + (NodeAttributesUpdateSchedulerEvent) event; + + updateNodeAttributes(attributeUpdateEvent); + } + break; case NODE_UPDATE: { NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event; @@ -1808,6 +1818,30 @@ public void handle(SchedulerEvent event) { } } + private void updateNodeAttributes( + NodeAttributesUpdateSchedulerEvent attributeUpdateEvent) { + try { + writeLock.lock(); + for (Entry> entry : attributeUpdateEvent + .getUpdatedNodeToAttributes().entrySet()) { + String hostname = entry.getKey(); + Set attributes = entry.getValue(); + List nodeIds = nodeTracker.getNodeIdsByResourceName(hostname); + updateAttributesOnNode(nodeIds, attributes); + } + } finally { + writeLock.unlock(); + } + } + + private void updateAttributesOnNode(List nodeIds, + Set attributes) { + nodeIds.forEach((k) -> { + SchedulerNode node = nodeTracker.getNode(k); + node.updateNodeAttributes(attributes); + }); + } + /** * Process node labels update. */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java index efa7b65a0fa..6ad8b1f6835 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java @@ -25,6 +25,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeAttribute; +import org.apache.hadoop.yarn.api.records.NodeAttributeType; import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint; @@ -114,19 +116,47 @@ private static boolean canSatisfySingleConstraintExpression( || maxScopeCardinality <= desiredMaxCardinality); } - private static boolean canSatisfyNodePartitionConstraintExpresssion( + private static boolean canSatisfyNodeConstraintExpresssion( TargetExpression targetExpression, SchedulerNode schedulerNode) { Set values = targetExpression.getTargetValues(); - if (values == null || values.isEmpty()) { - return schedulerNode.getPartition().equals( - RMNodeLabelsManager.NO_LABEL); - } else{ - String nodePartition = values.iterator().next(); - if (!nodePartition.equals(schedulerNode.getPartition())) { + + if (targetExpression.getTargetKey().equals(NODE_PARTITION)) { + if (values == null || values.isEmpty()) { + return schedulerNode.getPartition() + .equals(RMNodeLabelsManager.NO_LABEL); + } else { + String nodePartition = values.iterator().next(); + if (!nodePartition.equals(schedulerNode.getPartition())) { + return false; + } + } + } else { + // compare attributes. + String inputAttribute = values.iterator().next(); + NodeAttribute requestAttribute = getNodeConstraintFromRequest( + inputAttribute); + if (requestAttribute == null) { + return true; + } + if (!schedulerNode.getNodeAttributes().contains(requestAttribute)) { + return false; + } + boolean found = false; + for (Iterator it = schedulerNode.getNodeAttributes() + .iterator(); it.hasNext(); ) { + NodeAttribute nodeAttribute = it.next(); + if (requestAttribute.equals(nodeAttribute)) { + if (requestAttribute.getAttributeValue() + .equals(nodeAttribute.getAttributeValue())) { + found = true; + return found; + } + } + } + if (!found) { return false; } } - return true; } @@ -146,10 +176,9 @@ private static boolean canSatisfySingleConstraint(ApplicationId applicationId, singleConstraint, currentExp, schedulerNode, tagsManager)) { return false; } - } else if (currentExp.getTargetType().equals(TargetType.NODE_ATTRIBUTE) - && currentExp.getTargetKey().equals(NODE_PARTITION)) { - // This is a node partition expression, check it. - canSatisfyNodePartitionConstraintExpresssion(currentExp, schedulerNode); + } else if (currentExp.getTargetType().equals(TargetType.NODE_ATTRIBUTE)) { + // This is a node attribute expression, check it. + canSatisfyNodeConstraintExpresssion(currentExp, schedulerNode); } } // return true if all targetExpressions are satisfied @@ -263,4 +292,21 @@ public static boolean canSatisfyConstraints(ApplicationId applicationId, pcm.getMultilevelConstraint(applicationId, sourceTags, pc), schedulerNode, atm); } + + private static NodeAttribute getNodeConstraintFromRequest(String attrString) { + // Input node attribute could be like nm.yarn.io/javaversion=1.8 + + // Step1: Get the prefix first. + String[] prefix = attrString.split("/"); + if (prefix == null) { + return null; + } + String[] splits = prefix[1].split("="); + if (splits == null) { + return null; + } + NodeAttribute nodeAttribute = NodeAttribute + .newInstance(splits[0], NodeAttributeType.STRING, splits[1]); + return nodeAttribute; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAttributesUpdateSchedulerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAttributesUpdateSchedulerEvent.java new file mode 100644 index 00000000000..2a3effbde8f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAttributesUpdateSchedulerEvent.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; + +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.yarn.api.records.NodeAttribute; +import org.apache.hadoop.yarn.api.records.NodeId; + +public class NodeAttributesUpdateSchedulerEvent extends SchedulerEvent { + private Map> nodeToAttributes; + + public NodeAttributesUpdateSchedulerEvent( + Map> newNodeToAttributesMap) { + super(SchedulerEventType.NODE_ATTRIBUTES_UPDATE); + this.nodeToAttributes = newNodeToAttributesMap; + } + + public Map> getUpdatedNodeToAttributes() { + return nodeToAttributes; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java index b107cf4ee61..869bf0ed9e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java @@ -26,6 +26,7 @@ NODE_UPDATE, NODE_RESOURCE_UPDATE, NODE_LABELS_UPDATE, + NODE_ATTRIBUTES_UPDATE, // Source: RMApp APP_ADDED, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java index 1fc6badbe6a..bcb1fba58da 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.records.impl.pb.SchedulingRequestPBImpl; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression; import org.apache.hadoop.yarn.api.resource.PlacementConstraints; import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -69,6 +70,7 @@ private SchedulingRequest schedulingRequest = null; private String targetNodePartition; + private Set targetNodeAttributes; private Set targetAllocationTags; private AllocationTagsManager allocationTagsManager; private PlacementConstraintManager placementConstraintManager; @@ -283,6 +285,9 @@ private void validateAndSetSchedulingRequest(SchedulingRequest // Target allocation tags Set targetAllocationTags = null; + // Set Node Attributes + Set nodeAttributes = null; + for (PlacementConstraint.TargetExpression targetExpression : targetExpressionSet) { // Handle node partition if (targetExpression.getTargetType().equals( @@ -290,9 +295,9 @@ private void validateAndSetSchedulingRequest(SchedulingRequest // For node attribute target, we only support Partition now. And once // YARN-3409 is merged, we will support node attribute. if (!targetExpression.getTargetKey().equals(NODE_PARTITION)) { - throwExceptionWithMetaInfo("When TargetType=" - + PlacementConstraint.TargetExpression.TargetType.NODE_ATTRIBUTE - + " only " + NODE_PARTITION + " is accepted as TargetKey."); + nodeAttributes = handleNodeAttributeFromSchedulingRequest( + targetExpression); + continue; } if (nodePartition != null) { @@ -351,14 +356,29 @@ private void validateAndSetSchedulingRequest(SchedulingRequest // Validation is done. set local results: this.targetNodePartition = nodePartition; this.targetAllocationTags = targetAllocationTags; + this.targetNodeAttributes = nodeAttributes; this.schedulingRequest = new SchedulingRequestPBImpl( ((SchedulingRequestPBImpl) newSchedulingRequest).getProto()); - LOG.info("Successfully added SchedulingRequest to app=" + appSchedulingInfo - .getApplicationAttemptId() + " targetAllocationTags=[" + StringUtils - .join(",", targetAllocationTags) + "]. nodePartition=" - + targetNodePartition); + LOG.info("Successfully added SchedulingRequest to app=" + + appSchedulingInfo.getApplicationAttemptId() + + " targetAllocationTags=[" + + StringUtils.join(",", targetAllocationTags) + "]. nodePartition=" + + targetNodePartition + " targetNodeAttributes=[" + + StringUtils.join(",", targetNodeAttributes) + "]"); + } + + private Set handleNodeAttributeFromSchedulingRequest( + TargetExpression targetExpression) { + Set nodeAttributes = new HashSet(); + Set values = targetExpression.getTargetValues(); + if (values == null || values.isEmpty()) { + return nodeAttributes; + } + + nodeAttributes.addAll(values); + return nodeAttributes; } @Override -- 2.14.3 (Apple Git-98)