diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/InvalidPlacementTagsQueryException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/InvalidPlacementTagsQueryException.java new file mode 100644 index 00000000000..5689a34b4e2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/InvalidPlacementTagsQueryException.java @@ -0,0 +1,15 @@ +package org.apache.hadoop.yarn.server.resourcemanager.placement; + +import org.apache.hadoop.yarn.exceptions.YarnException; + +/** + * Exception when invalid parameter specified to do placement tags related + * queries. + */ +public class InvalidPlacementTagsQueryException extends YarnException { + private static final long serialVersionUID = 12312831974894L; + + public InvalidPlacementTagsQueryException(String msg) { + super(msg); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementTagsManager.java new file mode 100644 index 00000000000..3e84505a898 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementTagsManager.java @@ -0,0 +1,166 @@ +package org.apache.hadoop.yarn.server.resourcemanager.placement; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * Support storing maps between container-tags/applications and + * nodes. This will be required by affinity/anti-affinity implementation and + * cardinality. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class PlacementTagsManager { + private ReentrantReadWriteLock.ReadLock readLock; + private ReentrantReadWriteLock.WriteLock writeLock; + + // Application's tags to node + Map perAppMappings; + + // Global tags to node mapping (used to fast return aggregated tags + // cardinality across apps) + NodeToCountedTags globalMapping; + + /** + * Store node to counted tags. + */ + private static class NodeToCountedTags { + // Map> + private Map> nodeToTagsWithCount; + + // protected by external locks + private void addTagsToNode(NodeId nodeId, Set tags) { + Map innerMap = nodeToTagsWithCount.computeIfAbsent( + nodeId, k -> new HashMap<>()); + + for (String tag : tags) { + Integer count = innerMap.get(tag); + if (count == null) { + innerMap.put(tag, 1); + } else{ + innerMap.put(tag, count + 1); + } + } + } + + private void removeTagsFromNode(NodeId nodeId, Set tags) { + Map innerMap = nodeToTagsWithCount.get(nodeId); + if (innerMap == null) { + return; + } + + for (String tag : tags) { + Integer count = innerMap.get(tag); + if (count > 1) { + innerMap.put(tag, count - 1); + } else{ + innerMap.remove(tag); + } + } + } + + private int getCardinality(NodeId nodeId, Set tags) { + Map innerMap = nodeToTagsWithCount.get(nodeId); + if (innerMap == null) { + return 0; + } + int minCardinality = Integer.MAX_VALUE; + for (String tag : tags) { + if (!innerMap.containsKey(tag)) { + return 0; + } + minCardinality = Math.min(minCardinality, innerMap.get(tag)); + } + return minCardinality; + } + } + + PlacementTagsManager() { + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); + } + + /** + * Notify container allocated on a node + * @param nodeId allocated node. + * @param applicationId applicationId + * @param containerId container id. + * @param allocationTags allocation tags, see + * {@link SchedulingRequest#getAllocationTags()} + */ + public void addContainer(NodeId nodeId, ApplicationId applicationId, + ContainerId containerId, Set allocationTags) { + writeLock.lock(); + try { + NodeToCountedTags perAppTagsMapping = perAppMappings.computeIfAbsent( + applicationId, k -> new NodeToCountedTags()); + perAppTagsMapping.addTagsToNode(nodeId, allocationTags); + globalMapping.addTagsToNode(nodeId, allocationTags); + } finally { + writeLock.unlock(); + } + } + + /** + * Notify container removed. + * + * @param nodeId nodeId + * @param containerId containerId. + * @param allocationTags allocation tags for given container + */ + public void removeContainer(NodeId nodeId, ContainerId containerId, + Set allocationTags) { + writeLock.lock(); + try { + NodeToCountedTags perAppTagsMapping = perAppMappings.get( + containerId.getApplicationAttemptId().getApplicationId()); + if (perAppTagsMapping == null) { + return; + } + + perAppTagsMapping.removeTagsFromNode(nodeId, allocationTags); + globalMapping.removeTagsFromNode(nodeId, allocationTags); + } finally { + writeLock.unlock(); + } + } + + /** + * Get cardinality for following conditions (intersect) + * @param nodeId nodeId, required. + * @param applicationId applicationId (could be null). + * @param tags allocation tags, see + * {@link SchedulingRequest#getAllocationTags()}, could be null. + * + * @return cardinality of specified query on the node. + * + * @throw InvalidPlacementTagsQueryException when illegal query + * parameter specified + */ + public int getNodeCardinality(NodeId nodeId, ApplicationId applicationId, + Set tags) throws InvalidPlacementTagsQueryException { + if (applicationId == null && (tags == null || tags.isEmpty())) { + throw new InvalidPlacementTagsQueryException( + "Must specify either applicationId or tags to query cardinality"); + } + + NodeToCountedTags mapping; + if (applicationId != null) { + mapping = perAppMappings.get(applicationId); + } else { + mapping = globalMapping; + } + + return mapping.getCardinality(nodeId, tags); + } +}