From 323fabce07950136b72590624ac4208d8070416a Mon Sep 17 00:00:00 2001 From: Sunil G Date: Tue, 13 Mar 2018 23:30:45 +0530 Subject: [PATCH] YARN-7494 --- 1 | 650 +++++++++++++++++++++ .../resourcemanager/RMActiveServiceContext.java | 16 + .../yarn/server/resourcemanager/RMContext.java | 8 +- .../yarn/server/resourcemanager/RMContextImpl.java | 14 +- .../server/resourcemanager/ResourceManager.java | 12 + .../scheduler/AppSchedulingInfo.java | 11 +- .../scheduler/ClusterNodeTracker.java | 22 + .../scheduler/capacity/CapacityScheduler.java | 48 +- .../capacity/CapacitySchedulerConfiguration.java | 90 ++- .../scheduler/capacity/LeafQueue.java | 15 +- .../common/ApplicationSchedulingConfig.java | 10 + .../scheduler/common/fica/FiCaSchedulerApp.java | 11 + .../placement/DefaultMultiNodeLookupPolicy.java | 68 +++ .../placement/LocalityAppPlacementAllocator.java | 27 +- .../scheduler/placement/MultiNodeLookupPolicy.java | 61 ++ .../scheduler/placement/MultiNodeSorter.java | 129 ++++ .../placement/MultiNodeSortingManager.java | 105 ++++ .../scheduler/placement/NodesSortingAlgorithm.java | 55 ++ .../ResourceUsageNodesSortingAlgorithm.java | 70 +++ 19 files changed, 1404 insertions(+), 18 deletions(-) create mode 100644 1 create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/DefaultMultiNodeLookupPolicy.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeLookupPolicy.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/NodesSortingAlgorithm.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceUsageNodesSortingAlgorithm.java diff --git a/1 b/1 new file mode 100644 index 00000000000..ca794cc2672 --- /dev/null +++ b/1 @@ -0,0 +1,650 @@ +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java +index 06a1d00d95b..bac9becdb86 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java +@@ -42,9 +42,11 @@ + import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; + import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; ++import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator; ++import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager; + import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; + import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; + import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; +@@ -111,6 +113,7 @@ + private QueueLimitCalculator queueLimitCalculator; + private AllocationTagsManager allocationTagsManager; + private PlacementConstraintManager placementConstraintManager; ++ private MultiNodeSortingManager multiNodeSortingManager; + + public RMActiveServiceContext() { + queuePlacementManager = new PlacementManager(); +@@ -439,6 +442,19 @@ public void setRMDelegatedNodeLabelsUpdater( + rmDelegatedNodeLabelsUpdater = nodeLablesUpdater; + } + ++ @Private ++ @Unstable ++ public MultiNodeSortingManager getMultiNodeSortingManager() { ++ return multiNodeSortingManager; ++ } ++ ++ @Private ++ @Unstable ++ public void setMultiNodeSortingManager( ++ MultiNodeSortingManager multiNodeSortingManager) { ++ this.multiNodeSortingManager = multiNodeSortingManager; ++ } ++ + @Private + @Unstable + public void setSchedulerRecoveryStartAndWaitTime(long waitTime) { +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +index eb91a311a3a..a30ff76a6ea 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +@@ -42,10 +42,11 @@ + import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; + import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +- ++import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator; ++import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager; + import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; + import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; + import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; +@@ -177,4 +178,9 @@ void setRMDelegatedNodeLabelsUpdater( + + void setPlacementConstraintManager( + PlacementConstraintManager placementConstraintManager); ++ ++ MultiNodeSortingManager getMultiNodeSortingManager(); ++ ++ void setMultiNodeSortingManager( ++ MultiNodeSortingManager multiNodeSortingManager); + } +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +index 0b6be722ac2..2472d341f08 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +@@ -48,10 +48,11 @@ + import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; + import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +- ++import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator; ++import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager; + import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; + import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; + import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; +@@ -540,6 +541,17 @@ public void setRMDelegatedNodeLabelsUpdater( + delegatedNodeLabelsUpdater); + } + ++ @Override ++ public MultiNodeSortingManager getMultiNodeSortingManager() { ++ return activeServiceContext.getMultiNodeSortingManager(); ++ } ++ ++ @Override ++ public void setMultiNodeSortingManager( ++ MultiNodeSortingManager multiNodeSortingManager) { ++ activeServiceContext.setMultiNodeSortingManager(multiNodeSortingManager); ++ } ++ + public void setSchedulerRecoveryStartAndWaitTime(long waitTime) { + activeServiceContext.setSchedulerRecoveryStartAndWaitTime(waitTime); + } +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +index 5140c9fa558..57340a45928 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +@@ -96,11 +96,13 @@ + import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; ++import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.MemoryPlacementConstraintManager; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManagerService; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; ++import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager; + import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; + import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; + import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; +@@ -528,6 +530,10 @@ private FederationStateStoreService createFederationStateStoreService() { + return new FederationStateStoreService(rmContext); + } + ++ protected MultiNodeSortingManager createMultiNodeSortingManager() { ++ return new MultiNodeSortingManager(); ++ } ++ + protected SystemMetricsPublisher createSystemMetricsPublisher() { + List publishers = + new ArrayList(); +@@ -641,6 +647,11 @@ protected void serviceInit(Configuration configuration) throws Exception { + addService(placementConstraintManager); + rmContext.setPlacementConstraintManager(placementConstraintManager); + ++ MultiNodeSortingManager multiNodeSortingManager = ++ createMultiNodeSortingManager(); ++ addService(multiNodeSortingManager); ++ rmContext.setMultiNodeSortingManager(multiNodeSortingManager); ++ + RMDelegatedNodeLabelsUpdater delegatedNodeLabelsUpdater = + createRMDelegatedNodeLabelsUpdater(); + if (delegatedNodeLabelsUpdater != null) { +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +index 1efdd8ba430..6af0b4e8a4b 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +@@ -93,7 +93,7 @@ + private final ReentrantReadWriteLock.WriteLock writeLock; + + public final ContainerUpdateContext updateContext; +- public final Map applicationSchedulingEnvs = new HashMap<>(); ++ private final Map applicationSchedulingEnvs = new HashMap<>(); + private final RMContext rmContext; + + public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user, +@@ -763,4 +763,13 @@ public boolean precheckNode(SchedulerRequestKey schedulerKey, + this.readLock.unlock(); + } + } ++ ++ /** ++ * Get scheduling envs configured for this application. ++ * ++ * @return a map of applicationSchedulingEnvs ++ */ ++ public Map getApplicationSchedulingEnvs() { ++ return applicationSchedulingEnvs; ++ } + } +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java +index 66d88108932..fd8a55dd3d4 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java +@@ -37,6 +37,7 @@ + import java.util.HashMap; + import java.util.List; + import java.util.Map; ++import java.util.Set; + import java.util.concurrent.locks.Lock; + import java.util.concurrent.locks.ReadWriteLock; + import java.util.concurrent.locks.ReentrantReadWriteLock; +@@ -57,6 +58,7 @@ + private HashMap nodes = new HashMap<>(); + private Map nodeNameToNodeMap = new HashMap<>(); + private Map> nodesPerRack = new HashMap<>(); ++ private Map> nodesPerLabel = new HashMap<>(); + + private Resource clusterCapacity = Resources.createResource(0, 0); + private volatile Resource staleClusterCapacity = +@@ -420,4 +422,24 @@ private void updateMaxResources(SchedulerNode node, boolean add) { + } + return retNodes; + } ++ ++ /** ++ * update cached nodes per partition on a node label change event. ++ * @param partition nodeLabel ++ * @param nodeIds List of Node IDs ++ */ ++ public void updateNodesPerPartition(String partition, Set nodeIds) { ++ // Clear all entries. ++ nodesPerLabel.remove(partition); ++ ++ List nodes = new ArrayList(); ++ nodeIds.forEach(n -> nodes.add(getNode(n))); ++ ++ // Update new set of nodes for given partition. ++ nodesPerLabel.put(partition, nodes); ++ } ++ ++ public List getNodesPerPartition(String partition) { ++ return nodesPerLabel.get(partition); ++ } + } +\ No newline at end of file +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +index ddab0c1bd82..8168a8259f7 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +@@ -22,6 +22,7 @@ + import java.util.ArrayList; + import java.util.Collection; + import java.util.EnumSet; ++import java.util.HashMap; + import java.util.HashSet; + import java.util.List; + import java.util.Map; +@@ -52,6 +53,7 @@ + import org.apache.hadoop.yarn.api.records.ContainerStatus; + import org.apache.hadoop.yarn.api.records.ExecutionType; + import org.apache.hadoop.yarn.api.records.NodeId; ++import org.apache.hadoop.yarn.api.records.NodeLabel; + import org.apache.hadoop.yarn.api.records.Priority; + import org.apache.hadoop.yarn.api.records.QueueACL; + import org.apache.hadoop.yarn.api.records.QueueInfo; +@@ -249,6 +251,7 @@ public Configuration getConf() { + private ResourceCommitterService resourceCommitterService; + private RMNodeLabelsManager labelManager; + private AppPriorityACLsManager appPriorityACLManager; ++ private boolean multiNodePlacementEnabled; + + private static boolean printedVerboseLoggingForAsyncScheduling = false; + +@@ -382,6 +385,8 @@ void initScheduler(Configuration configuration) throws + // Setup how many containers we can allocate for each round + offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit(); + ++ this.multiNodePlacementEnabled = this.conf.getMultiNodePlacementEnabled(); ++ + LOG.info("Initialized CapacityScheduler with " + "calculator=" + + getResourceCalculator().getClass() + ", " + "minimumAllocation=<" + + getMinimumResourceCapability() + ">, " + "maximumAllocation=<" +@@ -445,6 +450,7 @@ public void serviceStop() throws Exception { + if (isConfigurationMutable()) { + ((MutableConfigurationProvider) csConfProvider).close(); + } ++ + super.serviceStop(); + } + +@@ -1345,6 +1351,22 @@ private boolean canAllocateMore(CSAssignment assignment, int offswitchCount, + || assignedContainers < maxAssignPerHeartbeat); + } + ++ private CandidateNodeSet getCandidateNodeSet( ++ FiCaSchedulerNode node) { ++ CandidateNodeSet candidates = null; ++ if (!multiNodePlacementEnabled) { ++ candidates = new SimpleCandidateNodeSet<>(node); ++ } else { ++ Map nodesByPartition = new HashMap<>(); ++ List nodes = nodeTracker ++ .getNodesPerPartition(node.getPartition()); ++ nodes.forEach(n -> nodesByPartition.put(n.getNodeID(), n)); ++ candidates = new SimpleCandidateNodeSet( ++ nodesByPartition, node.getPartition()); ++ } ++ return candidates; ++ } ++ + /** + * We need to make sure when doing allocation, Node should be existed + * And we will construct a {@link CandidateNodeSet} before proceeding +@@ -1356,8 +1378,8 @@ private void allocateContainersToNode(NodeId nodeId, + int offswitchCount = 0; + int assignedContainers = 0; + +- CandidateNodeSet candidates = +- new SimpleCandidateNodeSet<>(node); ++ CandidateNodeSet candidates = getCandidateNodeSet( ++ node); + CSAssignment assignment = allocateContainersToNode(candidates, + withNodeHeartbeat); + // Only check if we can allocate more container on the same node when +@@ -1596,10 +1618,10 @@ CSAssignment allocateContainersToNode( + + // We have two different logics to handle allocation on single node / multi + // nodes. +- if (null != node) { +- return allocateContainerOnSingleNode(candidates, node, withNodeHeartbeat); +- } else{ ++ if (multiNodePlacementEnabled) { + return allocateContainersOnMultiNodes(candidates); ++ } else { ++ return allocateContainerOnSingleNode(candidates, node, withNodeHeartbeat); + } + } + +@@ -1776,11 +1798,13 @@ private void updateNodeLabelsAndQueueResource( + NodeLabelsUpdateSchedulerEvent labelUpdateEvent) { + try { + writeLock.lock(); ++ Set updateLabels = new HashSet(); + for (Entry> entry : labelUpdateEvent + .getUpdatedNodeToLabels().entrySet()) { + NodeId id = entry.getKey(); + Set labels = entry.getValue(); + updateLabelsOnNode(id, labels); ++ updateLabels.addAll(labels); + } + Resource clusterResource = getClusterResource(); + getRootQueue().updateClusterResource(clusterResource, +@@ -1790,6 +1814,15 @@ private void updateNodeLabelsAndQueueResource( + } + } + ++ private void refreshLabelToNodeCache(Set updateLabels) { ++ Map> labelMapping = labelManager ++ .getLabelsInfoToNodes(updateLabels); ++ for (String label : updateLabels) { ++ Set nodes = labelMapping.get(label); ++ nodeTracker.updateNodesPerPartition(label, nodes); ++ } ++ } ++ + private void addNode(RMNode nodeManager) { + try { + writeLock.lock(); +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +index bdd30b915c9..a97657ea485 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +@@ -22,6 +22,7 @@ + import com.google.common.collect.ImmutableSet; + import org.apache.commons.logging.Log; + import org.apache.commons.logging.LogFactory; ++import org.apache.hadoop.classification.InterfaceAudience; + import org.apache.hadoop.classification.InterfaceAudience.Private; + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.security.authorize.AccessControlList; +@@ -40,10 +41,13 @@ + import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; + import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping; + import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; ++import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AppPriorityACLConfigurationParser.AppPriorityACLKeyType; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy; ++import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.NodesSortingAlgorithm; ++import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceUsageNodesSortingAlgorithm; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; +@@ -582,10 +586,11 @@ public void setAccessibleNodeLabels(String queue, Set labels) { + set(getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS, str); + } + ++ + public Set getAccessibleNodeLabels(String queue) { + String accessibleLabelStr = + get(getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS); +- ++ + // When accessible-label is null, + if (accessibleLabelStr == null) { + // Only return null when queue is not ROOT +@@ -600,12 +605,12 @@ public void setAccessibleNodeLabels(String queue, Set labels) { + + " it will be automatically set to \"*\"."); + } + } +- ++ + // always return ANY for queue root + if (queue.equals(ROOT)) { + return ImmutableSet.of(RMNodeLabelsManager.ANY); + } +- ++ + // In other cases, split the accessibleLabelStr by "," + Set set = new HashSet(); + for (String str : accessibleLabelStr.split(",")) { +@@ -621,7 +626,6 @@ public void setAccessibleNodeLabels(String queue, Set labels) { + } + return Collections.unmodifiableSet(set); + } +- + private float internalGetLabeledQueueCapacity(String queue, String label, String suffix, + float defaultValue) { + String capacityPropertyName = getNodeLabelPrefix(queue, label) + suffix; +@@ -2017,4 +2021,82 @@ private void updateResourceValuesFromConfig(Set resourceTypes, + break; + } + } ++ ++ @Private public static final String MULTI_NODE_SORTING_ALGORITHM_POLICIES = ++ PREFIX + "multi-node-sorting.policies"; ++ ++ @Private public static final String MULTI_NODE_SORTING_ALGORITHM_CLASS = ++ PREFIX + "multi-node-sorting.algorithm"; ++ ++ /** ++ * resource usage based node sorting algorithm. ++ */ ++ public static final String RESOURCE_USAGE_BASED_NODE_SORTING_ALGORITHM = "resource-usage"; ++ ++ @Private ++ public static final String MULTI_NODE_PLACEMENT_ENABLED = PREFIX ++ + "multi-node-placement-enabled"; ++ ++ @Private ++ public static final boolean DEFAULT_MULTI_NODE_PLACEMENT_ENABLED = false; ++ ++ @SuppressWarnings("unchecked") ++ public String getMultiNodesSortingAlgorithm( ++ String queue) { ++ ++ String policyName = get( ++ getQueuePrefix(queue) + "multi-node-sorting.policy"); ++ ++ if (policyName == null) { ++ policyName = get(MULTI_NODE_SORTING_ALGORITHM_CLASS); ++ } ++ ++ // If node sorting poicy is not configured in queue and in cluster level, ++ // it is been assumed that this queue is not enabled with multi-node lookup ++ if (policyName == null || policyName.isEmpty()) { ++ return null; ++ } ++ ++ // Ensure that custom node sorting algorithm class is valid. ++ if (!policyName.trim() ++ .equals(RESOURCE_USAGE_BASED_NODE_SORTING_ALGORITHM)) { ++ try { ++ Class nodeSortingPolicyClazz = getClassByName(policyName); ++ if (NodesSortingAlgorithm.class ++ .isAssignableFrom(nodeSortingPolicyClazz)) { ++ return policyName; ++ } else { ++ throw new YarnRuntimeException( ++ "Class: " + policyName + " not instance of " ++ + NodesSortingAlgorithm.class.getCanonicalName()); ++ } ++ } catch (ClassNotFoundException e) { ++ throw new YarnRuntimeException("Could not instantiate " ++ + "NodesSortingAlgorithm: " + policyName + " for queue: " + queue, ++ e); ++ } ++ } ++ ++ return policyName; ++ } ++ ++ public boolean getMultiNodePlacementEnabled() { ++ return getBoolean(MULTI_NODE_PLACEMENT_ENABLED, ++ DEFAULT_MULTI_NODE_PLACEMENT_ENABLED); ++ } ++ ++ public Set getMultiNodePlacementPolicies() { ++ String policies = get(MULTI_NODE_SORTING_ALGORITHM_POLICIES, ++ RESOURCE_USAGE_BASED_NODE_SORTING_ALGORITHM); ++ ++ // In other cases, split the accessibleLabelStr by "," ++ Set set = new HashSet(); ++ for (String str : policies.split(",")) { ++ if (!str.trim().isEmpty()) { ++ set.add(str.trim()); ++ } ++ } ++ ++ return Collections.unmodifiableSet(set); ++ } + } +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +index 8d1428d3e49..ba13ffda918 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +@@ -53,10 +53,6 @@ + import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; + import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; + import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +- +-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; +-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; +-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; +@@ -71,6 +67,7 @@ + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils; ++import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.NodesSortingAlgorithm; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyForPendingApps; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; + import org.apache.hadoop.yarn.server.utils.Lock; +@@ -123,6 +120,8 @@ + + private volatile OrderingPolicy orderingPolicy = null; + ++ private String multiNodeSortingAlgorithmName = null; ++ + // record all ignore partition exclusivityRMContainer, this will be used to do + // preemption, key is the partition of the RMContainer allocated on + private Map> ignorePartitionExclusivityRMContainers = +@@ -286,6 +285,10 @@ protected void setupQueueConfigs(Resource clusterResource, + + usersManager.updateUserWeights(); + ++ // Update multi-node sorting algorithm for scheduling as configured. ++ multiNodeSortingAlgorithmName = conf ++ .getMultiNodesSortingAlgorithm(getQueuePath()); ++ + LOG.info( + "Initializing " + queueName + "\n" + "capacity = " + queueCapacities + .getCapacity() + " [= (float) configuredCapacity / 100 ]" + "\n" +@@ -2203,4 +2206,8 @@ public long getMaximumApplicationLifetime() { + public long getDefaultApplicationLifetime() { + return defaultApplicationLifetime; + } ++ ++ public String getMultiNodeSortingAlgorithmName() { ++ return this.multiNodeSortingAlgorithmName; ++ } + } +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ApplicationSchedulingConfig.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ApplicationSchedulingConfig.java +index 1bd37431c15..227836d3d1e 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ApplicationSchedulingConfig.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ApplicationSchedulingConfig.java +@@ -20,6 +20,8 @@ + import org.apache.hadoop.classification.InterfaceAudience; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.LocalityAppPlacementAllocator; ++import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.NodesSortingAlgorithm; ++import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceUsageNodesSortingAlgorithm; + /** + * This class will keep all Scheduling env's names which will help in + * placement calculations. +@@ -32,4 +34,12 @@ + @InterfaceAudience.Private + public static final Class + DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS = LocalityAppPlacementAllocator.class; ++ ++ @InterfaceAudience.Private ++ public static final String ENV_MULTI_NODE_SORTING_ALGORITHM_CLASS = ++ "MULTI_NODE_SORTING_ALGORITHM_CLASS"; ++ ++ @InterfaceAudience.Private ++ public static final Class ++ DEFAULT_MULTI_NODE_SORTING_ALGORITHM_CLASS = ResourceUsageNodesSortingAlgorithm.class; + } +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +index f3da0a36f0b..1df7311df61 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +@@ -74,6 +74,7 @@ + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AbstractContainerAllocator; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocator; ++import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ApplicationSchedulingConfig; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; +@@ -169,6 +170,16 @@ public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, + rc = scheduler.getResourceCalculator(); + } + ++ // Update multi-node sorting algorithm to scheduler envs ++ if (rmApp != null) { ++ if (!appSchedulingInfo.getApplicationSchedulingEnvs().containsKey( ++ ApplicationSchedulingConfig.ENV_MULTI_NODE_SORTING_ALGORITHM_CLASS) ++ && getCSLeafQueue().getMultiNodeSortingAlgorithmName() != null) { ++ appSchedulingInfo.getApplicationSchedulingEnvs().put( ++ ApplicationSchedulingConfig.ENV_MULTI_NODE_SORTING_ALGORITHM_CLASS, ++ getCSLeafQueue().getMultiNodeSortingAlgorithmName()); ++ } ++ } + containerAllocator = new ContainerAllocator(this, rc, rmContext, + activitiesManager); + } +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java +index a0358b4ada2..2030389fd50 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java +@@ -24,11 +24,15 @@ + import org.apache.hadoop.yarn.api.records.ResourceRequest; + import org.apache.hadoop.yarn.api.records.SchedulingRequest; + import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException; ++import org.apache.hadoop.yarn.server.resourcemanager.RMContext; + import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; ++import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; ++import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; ++import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ApplicationSchedulingConfig; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; + import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; +@@ -55,6 +59,7 @@ + new ConcurrentHashMap<>(); + private volatile String primaryRequestedPartition = + RMNodeLabelsManager.NO_LABEL; ++ private MultiNodeLookupPolicy nodeLookupPolicy = null; + + private final ReentrantReadWriteLock.ReadLock readLock; + private final ReentrantReadWriteLock.WriteLock writeLock; +@@ -65,6 +70,13 @@ public LocalityAppPlacementAllocator() { + writeLock = lock.writeLock(); + } + ++ @SuppressWarnings("unchecked") ++ @Override ++ public void initialize(AppSchedulingInfo appSchedulingInfo, ++ SchedulerRequestKey schedulerRequestKey, RMContext rmContext) { ++ super.initialize(appSchedulingInfo, schedulerRequestKey, rmContext); ++ } ++ + @Override + @SuppressWarnings("unchecked") + public Iterator getPreferredNodeIterator( +@@ -78,7 +90,11 @@ public LocalityAppPlacementAllocator() { + return IteratorUtils.singletonIterator(singleNode); + } + +- return IteratorUtils.emptyIterator(); ++ //rmContext.getMultiNodeSortingManager(); ++ // singleNode will be null if Multi-node placement lookup is enabled, and ++ // hence could consider sorting policies. ++ return nodeLookupPolicy ++ .getPreferredNodeIterator(candidateNodeSet.getPartition()); + } + + private boolean hasRequestLabelChanged(ResourceRequest requestOne, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java index 06a1d00d95b..bac9becdb86 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java @@ -42,9 +42,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; @@ -111,6 +113,7 @@ private QueueLimitCalculator queueLimitCalculator; private AllocationTagsManager allocationTagsManager; private PlacementConstraintManager placementConstraintManager; + private MultiNodeSortingManager multiNodeSortingManager; public RMActiveServiceContext() { queuePlacementManager = new PlacementManager(); @@ -439,6 +442,19 @@ public void setRMDelegatedNodeLabelsUpdater( rmDelegatedNodeLabelsUpdater = nodeLablesUpdater; } + @Private + @Unstable + public MultiNodeSortingManager getMultiNodeSortingManager() { + return multiNodeSortingManager; + } + + @Private + @Unstable + public void setMultiNodeSortingManager( + MultiNodeSortingManager multiNodeSortingManager) { + this.multiNodeSortingManager = multiNodeSortingManager; + } + @Private @Unstable public void setSchedulerRecoveryStartAndWaitTime(long waitTime) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index eb91a311a3a..a30ff76a6ea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -42,10 +42,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; - +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; @@ -177,4 +178,9 @@ void setRMDelegatedNodeLabelsUpdater( void setPlacementConstraintManager( PlacementConstraintManager placementConstraintManager); + + MultiNodeSortingManager getMultiNodeSortingManager(); + + void setMultiNodeSortingManager( + MultiNodeSortingManager multiNodeSortingManager); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 0b6be722ac2..2472d341f08 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -48,10 +48,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; - +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; @@ -540,6 +541,17 @@ public void setRMDelegatedNodeLabelsUpdater( delegatedNodeLabelsUpdater); } + @Override + public MultiNodeSortingManager getMultiNodeSortingManager() { + return activeServiceContext.getMultiNodeSortingManager(); + } + + @Override + public void setMultiNodeSortingManager( + MultiNodeSortingManager multiNodeSortingManager) { + activeServiceContext.setMultiNodeSortingManager(multiNodeSortingManager); + } + public void setSchedulerRecoveryStartAndWaitTime(long waitTime) { activeServiceContext.setSchedulerRecoveryStartAndWaitTime(waitTime); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 5140c9fa558..1d6218b25a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -96,11 +96,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.MemoryPlacementConstraintManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManagerService; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; @@ -528,6 +530,10 @@ private FederationStateStoreService createFederationStateStoreService() { return new FederationStateStoreService(rmContext); } + protected MultiNodeSortingManager createMultiNodeSortingManager() { + return new MultiNodeSortingManager(); + } + protected SystemMetricsPublisher createSystemMetricsPublisher() { List publishers = new ArrayList(); @@ -641,6 +647,12 @@ protected void serviceInit(Configuration configuration) throws Exception { addService(placementConstraintManager); rmContext.setPlacementConstraintManager(placementConstraintManager); + MultiNodeSortingManager multiNodeSortingManager = + createMultiNodeSortingManager(); + multiNodeSortingManager.setRMContext(rmContext); + addService(multiNodeSortingManager); + rmContext.setMultiNodeSortingManager(multiNodeSortingManager); + RMDelegatedNodeLabelsUpdater delegatedNodeLabelsUpdater = createRMDelegatedNodeLabelsUpdater(); if (delegatedNodeLabelsUpdater != null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 1efdd8ba430..6af0b4e8a4b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -93,7 +93,7 @@ private final ReentrantReadWriteLock.WriteLock writeLock; public final ContainerUpdateContext updateContext; - public final Map applicationSchedulingEnvs = new HashMap<>(); + private final Map applicationSchedulingEnvs = new HashMap<>(); private final RMContext rmContext; public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user, @@ -763,4 +763,13 @@ public boolean precheckNode(SchedulerRequestKey schedulerKey, this.readLock.unlock(); } } + + /** + * Get scheduling envs configured for this application. + * + * @return a map of applicationSchedulingEnvs + */ + public Map getApplicationSchedulingEnvs() { + return applicationSchedulingEnvs; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java index 66d88108932..fd8a55dd3d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java @@ -37,6 +37,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -57,6 +58,7 @@ private HashMap nodes = new HashMap<>(); private Map nodeNameToNodeMap = new HashMap<>(); private Map> nodesPerRack = new HashMap<>(); + private Map> nodesPerLabel = new HashMap<>(); private Resource clusterCapacity = Resources.createResource(0, 0); private volatile Resource staleClusterCapacity = @@ -420,4 +422,24 @@ private void updateMaxResources(SchedulerNode node, boolean add) { } return retNodes; } + + /** + * update cached nodes per partition on a node label change event. + * @param partition nodeLabel + * @param nodeIds List of Node IDs + */ + public void updateNodesPerPartition(String partition, Set nodeIds) { + // Clear all entries. + nodesPerLabel.remove(partition); + + List nodes = new ArrayList(); + nodeIds.forEach(n -> nodes.add(getNode(n))); + + // Update new set of nodes for given partition. + nodesPerLabel.put(partition, nodes); + } + + public List getNodesPerPartition(String partition) { + return nodesPerLabel.get(partition); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index ddab0c1bd82..24afb050cd9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.EnumSet; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -52,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -249,6 +251,7 @@ public Configuration getConf() { private ResourceCommitterService resourceCommitterService; private RMNodeLabelsManager labelManager; private AppPriorityACLsManager appPriorityACLManager; + private boolean multiNodePlacementEnabled; private static boolean printedVerboseLoggingForAsyncScheduling = false; @@ -382,6 +385,13 @@ void initScheduler(Configuration configuration) throws // Setup how many containers we can allocate for each round offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit(); + // Update multi-node enable config to service during init. + this.multiNodePlacementEnabled = this.conf.getMultiNodePlacementEnabled(); + rmContext.getMultiNodeSortingManager() + .setPolicyNames(this.conf.getMultiNodePlacementPolicies()); + rmContext.getMultiNodeSortingManager() + .setMultiNodePlacementEnabled(multiNodePlacementEnabled); + LOG.info("Initialized CapacityScheduler with " + "calculator=" + getResourceCalculator().getClass() + ", " + "minimumAllocation=<" + getMinimumResourceCapability() + ">, " + "maximumAllocation=<" @@ -445,6 +455,7 @@ public void serviceStop() throws Exception { if (isConfigurationMutable()) { ((MutableConfigurationProvider) csConfProvider).close(); } + super.serviceStop(); } @@ -1345,6 +1356,22 @@ private boolean canAllocateMore(CSAssignment assignment, int offswitchCount, || assignedContainers < maxAssignPerHeartbeat); } + private CandidateNodeSet getCandidateNodeSet( + FiCaSchedulerNode node) { + CandidateNodeSet candidates = null; + if (!multiNodePlacementEnabled) { + candidates = new SimpleCandidateNodeSet<>(node); + } else { + Map nodesByPartition = new HashMap<>(); + List nodes = nodeTracker + .getNodesPerPartition(node.getPartition()); + nodes.forEach(n -> nodesByPartition.put(n.getNodeID(), n)); + candidates = new SimpleCandidateNodeSet( + nodesByPartition, node.getPartition()); + } + return candidates; + } + /** * We need to make sure when doing allocation, Node should be existed * And we will construct a {@link CandidateNodeSet} before proceeding @@ -1356,8 +1383,8 @@ private void allocateContainersToNode(NodeId nodeId, int offswitchCount = 0; int assignedContainers = 0; - CandidateNodeSet candidates = - new SimpleCandidateNodeSet<>(node); + CandidateNodeSet candidates = getCandidateNodeSet( + node); CSAssignment assignment = allocateContainersToNode(candidates, withNodeHeartbeat); // Only check if we can allocate more container on the same node when @@ -1596,10 +1623,10 @@ CSAssignment allocateContainersToNode( // We have two different logics to handle allocation on single node / multi // nodes. - if (null != node) { - return allocateContainerOnSingleNode(candidates, node, withNodeHeartbeat); - } else{ + if (multiNodePlacementEnabled) { return allocateContainersOnMultiNodes(candidates); + } else { + return allocateContainerOnSingleNode(candidates, node, withNodeHeartbeat); } } @@ -1776,11 +1803,13 @@ private void updateNodeLabelsAndQueueResource( NodeLabelsUpdateSchedulerEvent labelUpdateEvent) { try { writeLock.lock(); + Set updateLabels = new HashSet(); for (Entry> entry : labelUpdateEvent .getUpdatedNodeToLabels().entrySet()) { NodeId id = entry.getKey(); Set labels = entry.getValue(); updateLabelsOnNode(id, labels); + updateLabels.addAll(labels); } Resource clusterResource = getClusterResource(); getRootQueue().updateClusterResource(clusterResource, @@ -1790,6 +1819,15 @@ private void updateNodeLabelsAndQueueResource( } } + private void refreshLabelToNodeCache(Set updateLabels) { + Map> labelMapping = labelManager + .getLabelsInfoToNodes(updateLabels); + for (String label : updateLabels) { + Set nodes = labelMapping.get(label); + nodeTracker.updateNodesPerPartition(label, nodes); + } + } + private void addNode(RMNode nodeManager) { try { writeLock.lock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index bdd30b915c9..a97657ea485 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.authorize.AccessControlList; @@ -40,10 +41,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AppPriorityACLConfigurationParser.AppPriorityACLKeyType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.NodesSortingAlgorithm; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceUsageNodesSortingAlgorithm; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; @@ -582,10 +586,11 @@ public void setAccessibleNodeLabels(String queue, Set labels) { set(getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS, str); } + public Set getAccessibleNodeLabels(String queue) { String accessibleLabelStr = get(getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS); - + // When accessible-label is null, if (accessibleLabelStr == null) { // Only return null when queue is not ROOT @@ -600,12 +605,12 @@ public void setAccessibleNodeLabels(String queue, Set labels) { + " it will be automatically set to \"*\"."); } } - + // always return ANY for queue root if (queue.equals(ROOT)) { return ImmutableSet.of(RMNodeLabelsManager.ANY); } - + // In other cases, split the accessibleLabelStr by "," Set set = new HashSet(); for (String str : accessibleLabelStr.split(",")) { @@ -621,7 +626,6 @@ public void setAccessibleNodeLabels(String queue, Set labels) { } return Collections.unmodifiableSet(set); } - private float internalGetLabeledQueueCapacity(String queue, String label, String suffix, float defaultValue) { String capacityPropertyName = getNodeLabelPrefix(queue, label) + suffix; @@ -2017,4 +2021,82 @@ private void updateResourceValuesFromConfig(Set resourceTypes, break; } } + + @Private public static final String MULTI_NODE_SORTING_ALGORITHM_POLICIES = + PREFIX + "multi-node-sorting.policies"; + + @Private public static final String MULTI_NODE_SORTING_ALGORITHM_CLASS = + PREFIX + "multi-node-sorting.algorithm"; + + /** + * resource usage based node sorting algorithm. + */ + public static final String RESOURCE_USAGE_BASED_NODE_SORTING_ALGORITHM = "resource-usage"; + + @Private + public static final String MULTI_NODE_PLACEMENT_ENABLED = PREFIX + + "multi-node-placement-enabled"; + + @Private + public static final boolean DEFAULT_MULTI_NODE_PLACEMENT_ENABLED = false; + + @SuppressWarnings("unchecked") + public String getMultiNodesSortingAlgorithm( + String queue) { + + String policyName = get( + getQueuePrefix(queue) + "multi-node-sorting.policy"); + + if (policyName == null) { + policyName = get(MULTI_NODE_SORTING_ALGORITHM_CLASS); + } + + // If node sorting poicy is not configured in queue and in cluster level, + // it is been assumed that this queue is not enabled with multi-node lookup + if (policyName == null || policyName.isEmpty()) { + return null; + } + + // Ensure that custom node sorting algorithm class is valid. + if (!policyName.trim() + .equals(RESOURCE_USAGE_BASED_NODE_SORTING_ALGORITHM)) { + try { + Class nodeSortingPolicyClazz = getClassByName(policyName); + if (NodesSortingAlgorithm.class + .isAssignableFrom(nodeSortingPolicyClazz)) { + return policyName; + } else { + throw new YarnRuntimeException( + "Class: " + policyName + " not instance of " + + NodesSortingAlgorithm.class.getCanonicalName()); + } + } catch (ClassNotFoundException e) { + throw new YarnRuntimeException("Could not instantiate " + + "NodesSortingAlgorithm: " + policyName + " for queue: " + queue, + e); + } + } + + return policyName; + } + + public boolean getMultiNodePlacementEnabled() { + return getBoolean(MULTI_NODE_PLACEMENT_ENABLED, + DEFAULT_MULTI_NODE_PLACEMENT_ENABLED); + } + + public Set getMultiNodePlacementPolicies() { + String policies = get(MULTI_NODE_SORTING_ALGORITHM_POLICIES, + RESOURCE_USAGE_BASED_NODE_SORTING_ALGORITHM); + + // In other cases, split the accessibleLabelStr by "," + Set set = new HashSet(); + for (String str : policies.split(",")) { + if (!str.trim().isEmpty()) { + set.add(str.trim()); + } + } + + return Collections.unmodifiableSet(set); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 8d1428d3e49..ba13ffda918 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -53,10 +53,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; - -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; @@ -71,6 +67,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.NodesSortingAlgorithm; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyForPendingApps; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.server.utils.Lock; @@ -123,6 +120,8 @@ private volatile OrderingPolicy orderingPolicy = null; + private String multiNodeSortingAlgorithmName = null; + // record all ignore partition exclusivityRMContainer, this will be used to do // preemption, key is the partition of the RMContainer allocated on private Map> ignorePartitionExclusivityRMContainers = @@ -286,6 +285,10 @@ protected void setupQueueConfigs(Resource clusterResource, usersManager.updateUserWeights(); + // Update multi-node sorting algorithm for scheduling as configured. + multiNodeSortingAlgorithmName = conf + .getMultiNodesSortingAlgorithm(getQueuePath()); + LOG.info( "Initializing " + queueName + "\n" + "capacity = " + queueCapacities .getCapacity() + " [= (float) configuredCapacity / 100 ]" + "\n" @@ -2203,4 +2206,8 @@ public long getMaximumApplicationLifetime() { public long getDefaultApplicationLifetime() { return defaultApplicationLifetime; } + + public String getMultiNodeSortingAlgorithmName() { + return this.multiNodeSortingAlgorithmName; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ApplicationSchedulingConfig.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ApplicationSchedulingConfig.java index 1bd37431c15..227836d3d1e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ApplicationSchedulingConfig.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ApplicationSchedulingConfig.java @@ -20,6 +20,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.LocalityAppPlacementAllocator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.NodesSortingAlgorithm; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceUsageNodesSortingAlgorithm; /** * This class will keep all Scheduling env's names which will help in * placement calculations. @@ -32,4 +34,12 @@ @InterfaceAudience.Private public static final Class DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS = LocalityAppPlacementAllocator.class; + + @InterfaceAudience.Private + public static final String ENV_MULTI_NODE_SORTING_ALGORITHM_CLASS = + "MULTI_NODE_SORTING_ALGORITHM_CLASS"; + + @InterfaceAudience.Private + public static final Class + DEFAULT_MULTI_NODE_SORTING_ALGORITHM_CLASS = ResourceUsageNodesSortingAlgorithm.class; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index f3da0a36f0b..1df7311df61 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AbstractContainerAllocator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ApplicationSchedulingConfig; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; @@ -169,6 +170,16 @@ public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, rc = scheduler.getResourceCalculator(); } + // Update multi-node sorting algorithm to scheduler envs + if (rmApp != null) { + if (!appSchedulingInfo.getApplicationSchedulingEnvs().containsKey( + ApplicationSchedulingConfig.ENV_MULTI_NODE_SORTING_ALGORITHM_CLASS) + && getCSLeafQueue().getMultiNodeSortingAlgorithmName() != null) { + appSchedulingInfo.getApplicationSchedulingEnvs().put( + ApplicationSchedulingConfig.ENV_MULTI_NODE_SORTING_ALGORITHM_CLASS, + getCSLeafQueue().getMultiNodeSortingAlgorithmName()); + } + } containerAllocator = new ContainerAllocator(this, rc, rmContext, activitiesManager); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/DefaultMultiNodeLookupPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/DefaultMultiNodeLookupPolicy.java new file mode 100644 index 00000000000..523b206690e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/DefaultMultiNodeLookupPolicy.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ApplicationSchedulingConfig; + +import java.util.Collection; +import java.util.Iterator; + +/** + *

+ * This class has the following functionality: + * + *

+ * MultiNodeLookupPolicyPerResource holds sorted nodes list based on the + * resource usage of nodes at given time. + *

+ */ +public class DefaultMultiNodeLookupPolicy + implements MultiNodeLookupPolicy { + + NodesSortingAlgorithm sortingAlgorithm; + + @Override + public Iterator getPreferredNodeIterator(String partition) { + return sortingAlgorithm.getSortedEntities(partition).iterator(); + } + + @Override + public void addAndRefreshNodesSet(Collection nodes, String partition) { + sortingAlgorithm.addAllEntitiesForOrdering(nodes, partition); + } + + @SuppressWarnings("unchecked") + @Override + public void initPolicy(String multiNodeSortingAlgorithm) { + Class policyClass; + try { + if (multiNodeSortingAlgorithm == null) { + policyClass = ApplicationSchedulingConfig.DEFAULT_MULTI_NODE_SORTING_ALGORITHM_CLASS; + } else { + policyClass = Class.forName(multiNodeSortingAlgorithm); + } + } catch (ClassNotFoundException e) { + policyClass = ApplicationSchedulingConfig.DEFAULT_MULTI_NODE_SORTING_ALGORITHM_CLASS; + } + this.sortingAlgorithm = (NodesSortingAlgorithm) ReflectionUtils + .newInstance(policyClass, null); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java index a0358b4ada2..05d64bfaa0e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java @@ -24,11 +24,15 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ApplicationSchedulingConfig; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; @@ -55,6 +59,7 @@ new ConcurrentHashMap<>(); private volatile String primaryRequestedPartition = RMNodeLabelsManager.NO_LABEL; + private MultiNodeLookupPolicy nodeLookupPolicy = null; private final ReentrantReadWriteLock.ReadLock readLock; private final ReentrantReadWriteLock.WriteLock writeLock; @@ -65,6 +70,21 @@ public LocalityAppPlacementAllocator() { writeLock = lock.writeLock(); } + @SuppressWarnings("unchecked") + @Override + public void initialize(AppSchedulingInfo appSchedulingInfo, + SchedulerRequestKey schedulerRequestKey, RMContext rmContext) { + super.initialize(appSchedulingInfo, schedulerRequestKey, rmContext); + String multiNodeAlgorithName = appSchedulingInfo + .getApplicationSchedulingEnvs().get( + ApplicationSchedulingConfig.ENV_MULTI_NODE_SORTING_ALGORITHM_CLASS); + if (multiNodeAlgorithName != null && !multiNodeAlgorithName.isEmpty()) { + nodeLookupPolicy = (MultiNodeLookupPolicy) rmContext + .getMultiNodeSortingManager() + .getMultiNodePolicy(multiNodeAlgorithName); + } + } + @Override @SuppressWarnings("unchecked") public Iterator getPreferredNodeIterator( @@ -74,11 +94,14 @@ public LocalityAppPlacementAllocator() { // in. N singleNode = CandidateNodeSetUtils.getSingleNode(candidateNodeSet); - if (null != singleNode) { + if ( singleNode != null || nodeLookupPolicy == null) { return IteratorUtils.singletonIterator(singleNode); } - return IteratorUtils.emptyIterator(); + // singleNode will be null if Multi-node placement lookup is enabled, and + // hence could consider sorting policies. + return nodeLookupPolicy + .getPreferredNodeIterator(candidateNodeSet.getPartition()); } private boolean hasRequestLabelChanged(ResourceRequest requestOne, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeLookupPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeLookupPolicy.java new file mode 100644 index 00000000000..da56bb07c25 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeLookupPolicy.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +import java.util.Collection; +import java.util.Iterator; + +/** + *

+ * This class has the following functionality: + * + *

+ * Provide an interface for MultiNodeLookupPolicy so that different placement + * allocator can choose nodes based on need. + *

+ */ +public interface MultiNodeLookupPolicy { + /** + * Get iterator of preferred node depends on requirement and/or availability + * + * @param partition + * node label + * + * @return iterator of preferred node + */ + Iterator getPreferredNodeIterator(String partition); + + /** + * Refresh working nodes set for re-ordering based on the algorithm selected. + * + * @param nodes + * a collection working nm's. + */ + void addAndRefreshNodesSet(Collection nodes, String partition); + + /** + * Define all the initialization needed for NodeLookup policy. + * + * @param multiNodeSortingAlgorithm + * node sorting algorithm + */ + void initPolicy(String multiNodeSortingAlgorithm); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java new file mode 100644 index 00000000000..090cf03daef --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +import com.google.common.annotations.VisibleForTesting; + +public class MultiNodeSorter extends AbstractService { + + private final MultiNodeLookupPolicy multiNodePolicy; + private static final Log LOG = LogFactory.getLog(MultiNodeSorter.class); + + // ScheduledExecutorService which schedules the PreemptionChecker to run + // periodically. + private ScheduledExecutorService ses; + private ScheduledFuture handler; + private volatile boolean stopped; + private long monitorInterval; + private RMContext rmContext; + private String policyName; + + public MultiNodeSorter(RMContext rmContext, + MultiNodeLookupPolicy multiNodePolicy, String policyName) { + super("MultiNodeLookupPolicy"); + this.multiNodePolicy = multiNodePolicy; + this.rmContext = rmContext; + this.policyName = policyName; + } + + @VisibleForTesting + public synchronized MultiNodeLookupPolicy getMultiNodeLookupPolicy() { + return multiNodePolicy; + } + + public void serviceInit(Configuration conf) throws Exception { + LOG.info("Initializing MultiNodeSorter=" + getName()); + multiNodePolicy.initPolicy(policyName); + super.serviceInit(conf); + } + + @Override + public void serviceStart() throws Exception { + LOG.info("Starting SchedulingMonitor=" + getName()); + assert !stopped : "starting when already stopped"; + ses = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + public Thread newThread(Runnable r) { + Thread t = new Thread(r); + t.setName(getName()); + return t; + } + }); + handler = ses.scheduleAtFixedRate(new SortingThread(), + 0, monitorInterval, TimeUnit.MILLISECONDS); + super.serviceStart(); + } + + @Override + public void serviceStop() throws Exception { + stopped = true; + if (handler != null) { + LOG.info("Stop " + getName()); + handler.cancel(true); + ses.shutdown(); + } + super.serviceStop(); + } + + @SuppressWarnings("unchecked") + public void reSortClusterNodes() { + for (NodeLabel label : rmContext.getNodeLabelManager() + .getClusterNodeLabels()) { + Map nodesByPartition = new HashMap<>(); + List nodes = ((AbstractYarnScheduler) rmContext + .getScheduler()).getNodeTracker() + .getNodesPerPartition(label.getName()); + nodes.forEach(n -> nodesByPartition.put(n.getNodeID(), n)); + multiNodePolicy.addAndRefreshNodesSet( + (Collection) nodesByPartition.values(), label.getName()); + } + } + + private class SortingThread implements Runnable { + @Override + public void run() { + try { + reSortClusterNodes(); + } catch (Throwable t) { + // The preemption monitor does not alter structures nor do structures + // persist across invocations. Therefore, log, skip, and retry. + LOG.error("Exception raised while executing multinode" + + " sorter, skip this run..., exception=", t); + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java new file mode 100644 index 00000000000..92e8d2a1dc8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + + +public class MultiNodeSortingManager extends AbstractService { + + private static final Log LOG = LogFactory.getLog(MultiNodeSortingManager.class); + + private volatile boolean stopped; + private RMContext rmContext; + private Map> runningMultiNodeSorters; + private Set policyNames = new HashSet(); + private boolean multiNodePlacementEnabled = false; + private Configuration conf; + + public MultiNodeSortingManager() { + super("MultiNodeSortingManager"); + this.runningMultiNodeSorters = new ConcurrentHashMap<>(); + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + LOG.info("Initializing MultiNodeSortingManager=" + getName()); + super.serviceInit(conf); + this.conf = conf; + } + + @Override + public void serviceStart() throws Exception { + LOG.info("Starting NodeSortingService=" + getName()); + assert !stopped : "starting when already stopped"; + if (multiNodePlacementEnabled) { + createAllPolicies(); + } + super.serviceStart(); + } + + @Override + public void serviceStop() throws Exception { + for (MultiNodeSorter sorter : runningMultiNodeSorters.values()) { + sorter.stop(); + } + super.serviceStop(); + } + + + private void createAllPolicies() { + for (String policyName : policyNames) { + MultiNodeLookupPolicy nodeLookupPolicy = new DefaultMultiNodeLookupPolicy(); + MultiNodeSorter mon = new MultiNodeSorter(rmContext, + nodeLookupPolicy, policyName); + mon.init(conf); + mon.start(); + runningMultiNodeSorters.put(policyName, mon); + } + } + + public MultiNodeSorter getMultiNodePolicy(String name) { + return runningMultiNodeSorters.get(name); + } + + public void setRMContext(RMContext rmContext) { + this.rmContext = rmContext; + } + + public void setPolicyNames(Set multiNodePlacementPolicies) { + this.policyNames = multiNodePlacementPolicies; + } + + public boolean isMultiNodePlacementEnabled() { + return multiNodePlacementEnabled; + } + + public void setMultiNodePlacementEnabled(boolean multiNodePlacementEnabled) { + this.multiNodePlacementEnabled = multiNodePlacementEnabled; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/NodesSortingAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/NodesSortingAlgorithm.java new file mode 100644 index 00000000000..c2024033383 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/NodesSortingAlgorithm.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +import java.util.Collection; + +/** + *

+ * This class has the following functionality: + * + *

+ * Provide an interface for NodesSortingAlgorithm so that sub classes + * could define the compare logic to sort nodes. + *

+ */ +public interface NodesSortingAlgorithm { + /** + * Get collection of sorted objects managed by this Sorting Algorithm. + * + * @param partition + * nodelabel name + * + * @return a collection of {@link SchedulerNode} objects + */ + public Collection getSortedEntities(String partition); + + /** + * Add a collection of {@link SchedulerNode} objects to be managed for + * ordering. + * + * @param sc + * the collection of {@link SchedulerNode} objects to add + * @param partition + * nodeLabel + */ + public void addAllEntitiesForOrdering(Collection sc, String partition); +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceUsageNodesSortingAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceUsageNodesSortingAlgorithm.java new file mode 100644 index 00000000000..7d5a67be499 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceUsageNodesSortingAlgorithm.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +import java.util.Collection; +import java.util.Comparator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentSkipListSet; + +/** + *

+ * This class has the following functionality: + * + *

+ * Provide an interface for NodesSortingAlgorithm so that sub classes + * could define the compare logic to sort nodes. + *

+ */ +public class ResourceUsageNodesSortingAlgorithm + implements NodesSortingAlgorithm { + private Map> nodesPerPartition; + protected Comparator comparator; + + public ResourceUsageNodesSortingAlgorithm() { + this.comparator = new Comparator() { + @Override + public int compare(N o1, N o2) { + return o2.getAllocatedResource().compareTo(o1.getAllocatedResource()); + } + }; + } + + public Collection getSortedEntities(String partition) { + return this.nodesPerPartition.get(partition); + } + + @Override + public void addAllEntitiesForOrdering(Collection nl, String partition) { + Set nodeList = null; + if ((nodeList = nodesPerPartition.get(partition)) == null) { + nodeList = new ConcurrentSkipListSet(comparator); + this.nodesPerPartition.put(partition, nodeList); + } + + // Clear existing entries first. + nodeList.clear(); + + // Add fresh set of nodes for re-ordering. + nodeList.addAll(nl); + } +} \ No newline at end of file -- 2.14.3 (Apple Git-98)