From 2514a7d8848ae631bee6051822a96002be4fd2cf Mon Sep 17 00:00:00 2001 From: Sunil G Date: Fri, 18 May 2018 14:56:54 +0530 Subject: [PATCH] YARN-7494 --- .../resourcemanager/RMActiveServiceContext.java | 16 +++ .../yarn/server/resourcemanager/RMContext.java | 8 +- .../yarn/server/resourcemanager/RMContextImpl.java | 14 +- .../server/resourcemanager/ResourceManager.java | 12 ++ .../scheduler/AppSchedulingInfo.java | 11 +- .../scheduler/ClusterNodeTracker.java | 62 +++++++++ .../server/resourcemanager/scheduler/Queue.java | 6 + .../scheduler/capacity/AbstractCSQueue.java | 16 ++- .../scheduler/capacity/CapacityScheduler.java | 57 +++++++- .../capacity/CapacitySchedulerConfiguration.java | 105 ++++++++++++++- .../scheduler/capacity/LeafQueue.java | 4 - .../common/ApplicationSchedulingConfig.java | 16 +++ .../scheduler/common/fica/FiCaSchedulerApp.java | 11 ++ .../scheduler/fair/FSLeafQueue.java | 5 + .../scheduler/fair/FSParentQueue.java | 5 + .../scheduler/fifo/FifoScheduler.java | 6 + .../placement/DefaultMultiNodeLookupPolicy.java | 82 ++++++++++++ .../placement/LocalityAppPlacementAllocator.java | 27 +++- .../scheduler/placement/MultiNodeLookupPolicy.java | 67 +++++++++ .../scheduler/placement/MultiNodePolicySpec.java | 48 +++++++ .../scheduler/placement/MultiNodeSorter.java | 149 +++++++++++++++++++++ .../placement/MultiNodeSortingManager.java | 98 ++++++++++++++ .../ResourceUsageBasedMultiNodeLookupPolicy.java | 86 ++++++++++++ .../reservation/ReservationSystemTestUtil.java | 3 + .../resourcetracker/TestNMReconnect.java | 2 + .../scheduler/capacity/TestCapacityScheduler.java | 41 ++++++ .../TestCapacitySchedulerNodeLabelUpdate.java | 72 ++++++++++ .../scheduler/capacity/TestQueueParsing.java | 18 ++- .../scheduler/capacity/TestReservations.java | 3 + .../scheduler/capacity/TestUtils.java | 3 + .../scheduler/fifo/TestFifoScheduler.java | 4 + .../resourcemanager/webapp/TestRMWebApp.java | 3 + 32 files changed, 1043 insertions(+), 17 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/DefaultMultiNodeLookupPolicy.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeLookupPolicy.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodePolicySpec.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceUsageBasedMultiNodeLookupPolicy.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java index 66065e33bae..8fb0de63fdf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java @@ -43,9 +43,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; @@ -113,6 +115,7 @@ private AllocationTagsManager allocationTagsManager; private PlacementConstraintManager placementConstraintManager; private ResourceProfilesManager resourceProfilesManager; + private MultiNodeSortingManager multiNodeSortingManager; public RMActiveServiceContext() { queuePlacementManager = new PlacementManager(); @@ -441,6 +444,19 @@ public void setRMDelegatedNodeLabelsUpdater( rmDelegatedNodeLabelsUpdater = nodeLablesUpdater; } + @Private + @Unstable + public MultiNodeSortingManager getMultiNodeSortingManager() { + return multiNodeSortingManager; + } + + @Private + @Unstable + public void setMultiNodeSortingManager( + MultiNodeSortingManager multiNodeSortingManager) { + this.multiNodeSortingManager = multiNodeSortingManager; + } + @Private @Unstable public void setSchedulerRecoveryStartAndWaitTime(long waitTime) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index eb91a311a3a..a30ff76a6ea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -42,10 +42,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; - +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; @@ -177,4 +178,9 @@ void setRMDelegatedNodeLabelsUpdater( void setPlacementConstraintManager( PlacementConstraintManager placementConstraintManager); + + MultiNodeSortingManager getMultiNodeSortingManager(); + + void setMultiNodeSortingManager( + MultiNodeSortingManager multiNodeSortingManager); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 84e0f6f6b58..cb1d56f34fb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -48,10 +48,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; - +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; @@ -538,6 +539,17 @@ public void setRMDelegatedNodeLabelsUpdater( delegatedNodeLabelsUpdater); } + @Override + public MultiNodeSortingManager getMultiNodeSortingManager() { + return activeServiceContext.getMultiNodeSortingManager(); + } + + @Override + public void setMultiNodeSortingManager( + MultiNodeSortingManager multiNodeSortingManager) { + activeServiceContext.setMultiNodeSortingManager(multiNodeSortingManager); + } + public void setSchedulerRecoveryStartAndWaitTime(long waitTime) { activeServiceContext.setSchedulerRecoveryStartAndWaitTime(waitTime); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 05745ec272e..5cbc308347a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -95,11 +95,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.MemoryPlacementConstraintManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManagerService; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; @@ -538,6 +540,10 @@ private FederationStateStoreService createFederationStateStoreService() { return new FederationStateStoreService(rmContext); } + protected MultiNodeSortingManager createMultiNodeSortingManager() { + return new MultiNodeSortingManager(); + } + protected SystemMetricsPublisher createSystemMetricsPublisher() { List publishers = new ArrayList(); @@ -657,6 +663,12 @@ protected void serviceInit(Configuration configuration) throws Exception { resourceProfilesManager.init(conf); rmContext.setResourceProfilesManager(resourceProfilesManager); + MultiNodeSortingManager multiNodeSortingManager = + createMultiNodeSortingManager(); + multiNodeSortingManager.setRMContext(rmContext); + addService(multiNodeSortingManager); + rmContext.setMultiNodeSortingManager(multiNodeSortingManager); + RMDelegatedNodeLabelsUpdater delegatedNodeLabelsUpdater = createRMDelegatedNodeLabelsUpdater(); if (delegatedNodeLabelsUpdater != null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 1efdd8ba430..6af0b4e8a4b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -93,7 +93,7 @@ private final ReentrantReadWriteLock.WriteLock writeLock; public final ContainerUpdateContext updateContext; - public final Map applicationSchedulingEnvs = new HashMap<>(); + private final Map applicationSchedulingEnvs = new HashMap<>(); private final RMContext rmContext; public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user, @@ -763,4 +763,13 @@ public boolean precheckNode(SchedulerRequestKey schedulerKey, this.readLock.unlock(); } } + + /** + * Get scheduling envs configured for this application. + * + * @return a map of applicationSchedulingEnvs + */ + public Map getApplicationSchedulingEnvs() { + return applicationSchedulingEnvs; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java index 66d88108932..663192481f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.ResourceUtils; @@ -37,6 +38,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -57,6 +59,7 @@ private HashMap nodes = new HashMap<>(); private Map nodeNameToNodeMap = new HashMap<>(); private Map> nodesPerRack = new HashMap<>(); + private Map> nodesPerLabel = new HashMap<>(); private Resource clusterCapacity = Resources.createResource(0, 0); private volatile Resource staleClusterCapacity = @@ -80,6 +83,16 @@ public void addNode(N node) { nodes.put(node.getNodeID(), node); nodeNameToNodeMap.put(node.getNodeName(), node); + List nodes = nodesPerLabel.get(node.getPartition()); + + if (nodes == null) { + nodes = new ArrayList(); + } + nodes.add(node); + + // Update new set of nodes for given partition. + nodesPerLabel.put(node.getPartition(), nodes); + // Update nodes per rack as well String rackName = node.getRackName(); List nodesList = nodesPerRack.get(rackName); @@ -174,6 +187,16 @@ public N removeNode(NodeId nodeId) { } } + List nodes = nodesPerLabel.get(node.getPartition()); + nodes.remove(node); + + // Update new set of nodes for given partition. + if (nodes.isEmpty()) { + nodesPerLabel.remove(node.getPartition()); + } else { + nodesPerLabel.put(node.getPartition(), nodes); + } + // Update cluster capacity Resources.subtractFrom(clusterCapacity, node.getTotalResource()); staleClusterCapacity = Resources.clone(clusterCapacity); @@ -420,4 +443,43 @@ private void updateMaxResources(SchedulerNode node, boolean add) { } return retNodes; } + + /** + * update cached nodes per partition on a node label change event. + * @param partition nodeLabel + * @param nodeIds List of Node IDs + */ + public void updateNodesPerPartition(String partition, Set nodeIds) { + writeLock.lock(); + try { + // Clear all entries. + nodesPerLabel.remove(partition); + + List nodes = new ArrayList(); + for (NodeId nodeId : nodeIds) { + N n = getNode(nodeId); + if (n != null) { + nodes.add(n); + } + } + + // Update new set of nodes for given partition. + nodesPerLabel.put(partition, nodes); + } finally { + writeLock.unlock(); + } + } + + public List getNodesPerPartition(String partition) { + List nodesPerPartition = null; + readLock.lock(); + try { + if (nodesPerLabel.containsKey(partition)) { + nodesPerPartition = new ArrayList(nodesPerLabel.get(partition)); + } + } finally { + readLock.unlock(); + } + return nodesPerPartition; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java index d166e5fc568..f1dd0d8813b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java @@ -138,4 +138,10 @@ public void recoverContainer(Resource clusterResource, * reserved resource asked */ public void decReservedResource(String partition, Resource reservedRes); + + /** + * Get Multi Node scheduling policy name. + * @return policy name + */ + String getMultiNodeSortingPolicyName(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 67b676bea57..734698a1a80 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -92,7 +92,8 @@ Set resourceTypes; final RMNodeLabelsManager labelManager; String defaultLabelExpression; - + private String multiNodeSortingPolicyName = null; + Map acls = new HashMap(); volatile boolean reservationsContinueLooking; @@ -414,6 +415,10 @@ protected void setupQueueConfigs(Resource clusterResource, this.priority = configuration.getQueuePriority( getQueuePath()); + // Update multi-node sorting algorithm for scheduling as configured. + setMultiNodeSortingPolicyName( + configuration.getMultiNodesSortingAlgorithmPolicy(getQueuePath())); + this.userWeights = getUserWeightsFromHierarchy(configuration); } finally { writeLock.unlock(); @@ -1259,4 +1264,13 @@ public void recoverDrainingState() { this.writeLock.unlock(); } } + + @Override + public String getMultiNodeSortingPolicyName() { + return this.multiNodeSortingPolicyName; + } + + public void setMultiNodeSortingPolicyName(String policyName) { + this.multiNodeSortingPolicyName = policyName; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 162d3bb99cf..1f1f4c3380d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.EnumSet; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -52,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -251,6 +253,7 @@ public Configuration getConf() { private ResourceCommitterService resourceCommitterService; private RMNodeLabelsManager labelManager; private AppPriorityACLsManager appPriorityACLManager; + private boolean multiNodePlacementEnabled; private static boolean printedVerboseLoggingForAsyncScheduling = false; @@ -391,6 +394,11 @@ void initScheduler(Configuration configuration) throws // Setup how many containers we can allocate for each round offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit(); + // Register CS specific multi-node policies to common MultiNodeManager. + multiNodePlacementEnabled = this.conf.getMultiNodePlacementEnabled(); + rmContext.getMultiNodeSortingManager().registerMultiNodePolicyNames( + multiNodePlacementEnabled, this.conf.getMultiNodePlacementPolicies()); + LOG.info("Initialized CapacityScheduler with " + "calculator=" + getResourceCalculator().getClass() + ", " + "minimumAllocation=<" + getMinimumResourceCapability() + ">, " + "maximumAllocation=<" @@ -454,6 +462,7 @@ public void serviceStop() throws Exception { if (isConfigurationMutable()) { ((MutableConfigurationProvider) csConfProvider).close(); } + super.serviceStop(); } @@ -1398,6 +1407,23 @@ private boolean canAllocateMore(CSAssignment assignment, int offswitchCount, || assignedContainers < maxAssignPerHeartbeat); } + private CandidateNodeSet getCandidateNodeSet( + FiCaSchedulerNode node) { + CandidateNodeSet candidates = null; + candidates = new SimpleCandidateNodeSet<>(node); + if (multiNodePlacementEnabled) { + Map nodesByPartition = new HashMap<>(); + List nodes = nodeTracker + .getNodesPerPartition(node.getPartition()); + if (nodes != null && !nodes.isEmpty()) { + nodes.forEach(n -> nodesByPartition.put(n.getNodeID(), n)); + candidates = new SimpleCandidateNodeSet( + nodesByPartition, node.getPartition()); + } + } + return candidates; + } + /** * We need to make sure when doing allocation, Node should be existed * And we will construct a {@link CandidateNodeSet} before proceeding @@ -1409,8 +1435,8 @@ private void allocateContainersToNode(NodeId nodeId, int offswitchCount = 0; int assignedContainers = 0; - CandidateNodeSet candidates = - new SimpleCandidateNodeSet<>(node); + CandidateNodeSet candidates = getCandidateNodeSet( + node); CSAssignment assignment = allocateContainersToNode(candidates, withNodeHeartbeat); // Only check if we can allocate more container on the same node when @@ -1649,10 +1675,10 @@ CSAssignment allocateContainersToNode( // We have two different logics to handle allocation on single node / multi // nodes. - if (null != node) { - return allocateContainerOnSingleNode(candidates, node, withNodeHeartbeat); - } else{ + if (multiNodePlacementEnabled) { return allocateContainersOnMultiNodes(candidates); + } else { + return allocateContainerOnSingleNode(candidates, node, withNodeHeartbeat); } } @@ -1829,12 +1855,21 @@ private void updateNodeLabelsAndQueueResource( NodeLabelsUpdateSchedulerEvent labelUpdateEvent) { try { writeLock.lock(); + Set updateLabels = new HashSet(); for (Entry> entry : labelUpdateEvent .getUpdatedNodeToLabels().entrySet()) { NodeId id = entry.getKey(); Set labels = entry.getValue(); + FiCaSchedulerNode node = nodeTracker.getNode(id); + + if (node != null) { + // Update old partition to list. + updateLabels.add(node.getPartition()); + } updateLabelsOnNode(id, labels); + updateLabels.addAll(labels); } + refreshLabelToNodeCache(updateLabels); Resource clusterResource = getClusterResource(); getRootQueue().updateClusterResource(clusterResource, new ResourceLimits(clusterResource)); @@ -1843,6 +1878,18 @@ private void updateNodeLabelsAndQueueResource( } } + private void refreshLabelToNodeCache(Set updateLabels) { + Map> labelMapping = labelManager + .getLabelsToNodes(updateLabels); + for (String label : updateLabels) { + Set nodes = labelMapping.get(label); + if (nodes == null) { + continue; + } + nodeTracker.updateNodesPerPartition(label, nodes); + } + } + private void addNode(RMNode nodeManager) { try { writeLock.lock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 76eaac05718..94bc55b6d14 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -45,6 +45,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AppPriorityACLConfigurationParser.AppPriorityACLKeyType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeLookupPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodePolicySpec; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; @@ -591,6 +593,7 @@ public void setAccessibleNodeLabels(String queue, Set labels) { set(getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS, str); } + public Set getAccessibleNodeLabels(String queue) { String accessibleLabelStr = get(getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS); @@ -630,7 +633,6 @@ public void setAccessibleNodeLabels(String queue, Set labels) { } return Collections.unmodifiableSet(set); } - private float internalGetLabeledQueueCapacity(String queue, String label, String suffix, float defaultValue) { String capacityPropertyName = getNodeLabelPrefix(queue, label) + suffix; @@ -2102,4 +2104,105 @@ private void updateResourceValuesFromConfig(Set resourceTypes, break; } } + + @Private public static final String MULTI_NODE_SORTING_POLICIES = + PREFIX + "multi-node-sorting.policies"; + + @Private public static final String MULTI_NODE_SORTING_POLICY_NAME = + PREFIX + "multi-node-sorting.policy"; + + /** + * resource usage based node sorting algorithm. + */ + public static final String RESOURCE_USAGE_BASED_NODE_SORTING_POLICY = "resource-usage"; + public static final String DEFAULT_NODE_SORTING_POLICY = "default"; + public static final String DEFAULT_NODE_SORTING_POLICY_NAME = "org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.DefaultMultiNodeLookupPolicy"; + public static final long DEFAULT_MULTI_NODE_SORTING_INTERVAL = 1000L; + + @Private + public static final String MULTI_NODE_PLACEMENT_ENABLED = PREFIX + + "multi-node-placement-enabled"; + + @Private + public static final boolean DEFAULT_MULTI_NODE_PLACEMENT_ENABLED = false; + + public String getMultiNodesSortingAlgorithmPolicy( + String queue) { + + String policyName = get( + getQueuePrefix(queue) + "multi-node-sorting.policy"); + + if (policyName == null) { + policyName = get(MULTI_NODE_SORTING_POLICY_NAME); + } + + // If node sorting policy is not configured in queue and in cluster level, + // it is been assumed that this queue is not enabled with multi-node lookup. + if (policyName == null || policyName.isEmpty()) { + return null; + } + + String policyClassName = get(MULTI_NODE_SORTING_POLICY_NAME + DOT + + policyName.trim() + DOT + "class"); + return normalizePolicyName(policyClassName.trim()); + } + + public boolean getMultiNodePlacementEnabled() { + return getBoolean(MULTI_NODE_PLACEMENT_ENABLED, + DEFAULT_MULTI_NODE_PLACEMENT_ENABLED); + } + + public Set getMultiNodePlacementPolicies() { + String[] policies = getTrimmedStrings(MULTI_NODE_SORTING_POLICIES); + + // In other cases, split the accessibleLabelStr by "," + Set set = new HashSet(); + for (String str : policies) { + if (!str.trim().isEmpty()) { + String policyClassName = get( + MULTI_NODE_SORTING_POLICY_NAME + DOT + str.trim() + DOT + "class"); + if (str.trim().equals(DEFAULT_NODE_SORTING_POLICY)) { + policyClassName = get( + MULTI_NODE_SORTING_POLICY_NAME + DOT + str.trim() + DOT + "class", + DEFAULT_NODE_SORTING_POLICY_NAME); + } + + if (policyClassName == null) { + throw new YarnRuntimeException( + "Class is an not instance of " + + MultiNodeLookupPolicy.class.getCanonicalName()); + } + policyClassName = normalizePolicyName(policyClassName.trim()); + long timeout = getLong(MULTI_NODE_SORTING_POLICY_NAME + DOT + str.trim() + + DOT + "sorting-interval.ms", DEFAULT_MULTI_NODE_SORTING_INTERVAL); + set.add(new MultiNodePolicySpec(policyClassName, timeout)); + } + } + + return Collections.unmodifiableSet(set); + } + + private String normalizePolicyName(String policyName) { + // If node sorting poicy is not configured in queue and in cluster level, + // it is been assumed that this queue is not enabled with multi-node lookup. + if (policyName == null || policyName.isEmpty()) { + return null; + } + + // Ensure that custom node sorting algorithm class is valid. + try { + Class nodeSortingPolicyClazz = getClassByName(policyName); + if (MultiNodeLookupPolicy.class + .isAssignableFrom(nodeSortingPolicyClazz)) { + return policyName; + } else { + throw new YarnRuntimeException( + "Class: " + policyName + " not instance of " + + MultiNodeLookupPolicy.class.getCanonicalName()); + } + } catch (ClassNotFoundException e) { + throw new YarnRuntimeException( + "Could not instantiate " + "NodesSortingPolicy: " + policyName, e); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 1ae8f919dfb..886574c389b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -53,10 +53,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; - -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ApplicationSchedulingConfig.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ApplicationSchedulingConfig.java index 1bd37431c15..166d22d2bc0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ApplicationSchedulingConfig.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ApplicationSchedulingConfig.java @@ -19,7 +19,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.DefaultMultiNodeLookupPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceUsageBasedMultiNodeLookupPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.LocalityAppPlacementAllocator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeLookupPolicy; + /** * This class will keep all Scheduling env's names which will help in * placement calculations. @@ -32,4 +36,16 @@ @InterfaceAudience.Private public static final Class DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS = LocalityAppPlacementAllocator.class; + + @InterfaceAudience.Private + public static final String ENV_MULTI_NODE_SORTING_POLICY_CLASS = + "MULTI_NODE_SORTING_POLICY_CLASS"; + + @InterfaceAudience.Private + public static final Class + DEFAULT_MULTI_NODE_SORTING_POLICY_CLASS = DefaultMultiNodeLookupPolicy.class; + + @InterfaceAudience.Private + public static final Class + RESOURCE_BASED_MULTI_NODE_SORTING_POLICY_CLASS = ResourceUsageBasedMultiNodeLookupPolicy.class; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 3ec81915706..6dbffa3341e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AbstractContainerAllocator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ApplicationSchedulingConfig; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; @@ -169,6 +170,16 @@ public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, rc = scheduler.getResourceCalculator(); } + // Update multi-node sorting algorithm to scheduler envs + if (rmApp != null) { + if (!appSchedulingInfo.getApplicationSchedulingEnvs().containsKey( + ApplicationSchedulingConfig.ENV_MULTI_NODE_SORTING_POLICY_CLASS) + && queue.getMultiNodeSortingPolicyName() != null) { + appSchedulingInfo.getApplicationSchedulingEnvs().put( + ApplicationSchedulingConfig.ENV_MULTI_NODE_SORTING_POLICY_CLASS, + queue.getMultiNodeSortingPolicyName()); + } + } containerAllocator = new ContainerAllocator(this, rc, rmContext, activitiesManager); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 49d216694db..4d79486b987 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -609,4 +609,9 @@ protected void dumpStateInternal(StringBuilder sb) { ", LastTimeAtMinShare: " + lastTimeAtMinShare + "}"); } + + @Override + public String getMultiNodeSortingPolicyName() { + return null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index a8e53fc26f2..bf93b1e6825 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -299,4 +299,9 @@ protected void dumpStateInternal(StringBuilder sb) { child.dumpStateInternal(sb); } } + + @Override + public String getMultiNodeSortingPolicyName() { + return null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 7ac9027a78a..13b4bc05eea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -231,6 +231,12 @@ public void decReservedResource(String partition, Resource reservedRes) { // TODO add implementation for FIFO scheduler } + + @Override + public String getMultiNodeSortingPolicyName() { + // TODO add implementation for FIFO scheduler + return null; + } }; public FifoScheduler() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/DefaultMultiNodeLookupPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/DefaultMultiNodeLookupPolicy.java new file mode 100644 index 00000000000..74eafd20f90 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/DefaultMultiNodeLookupPolicy.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentSkipListSet; + +/** + *

+ * This class has the following functionality: + * + *

+ * ResourceUsageBasedMultiNodeLookupPolicy holds sorted nodes list based on the + * resource usage of nodes at given time. + *

+ */ +public class DefaultMultiNodeLookupPolicy + implements MultiNodeLookupPolicy { + + private Map> nodesPerPartition = new HashMap<>(); + protected Comparator comparator; + + public DefaultMultiNodeLookupPolicy() { + this.comparator = new Comparator() { + @Override + public int compare(N o1, N o2) { + return o2.getAllocatedResource().compareTo(o1.getAllocatedResource()); + } + }; + } + + @Override + public Iterator getPreferredNodeIterator(Collection nodes, + String partition) { + addAndRefreshNodesSet((Collection) nodes, partition); + return this.nodesPerPartition.get(partition).iterator(); + } + + @Override + public void addAndRefreshNodesSet(Collection nodes, String partition) { + Set nodeList = null; + if ((nodeList = nodesPerPartition.get(partition)) == null) { + nodeList = new ConcurrentSkipListSet(comparator); + this.nodesPerPartition.put(partition, nodeList); + } + + // Clear existing entries first. + nodeList.clear(); + + // Add fresh set of nodes for re-ordering. + nodeList.addAll(nodes); + } + + @Override + public Set getNodesPerPartition(String partition) { + return Collections.unmodifiableSet(this.nodesPerPartition.get(partition)); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java index a0358b4ada2..6a5b7d6a5ad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java @@ -24,11 +24,15 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ApplicationSchedulingConfig; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; @@ -55,6 +59,7 @@ new ConcurrentHashMap<>(); private volatile String primaryRequestedPartition = RMNodeLabelsManager.NO_LABEL; + private MultiNodeLookupPolicy nodeLookupPolicy = null; private final ReentrantReadWriteLock.ReadLock readLock; private final ReentrantReadWriteLock.WriteLock writeLock; @@ -65,6 +70,20 @@ public LocalityAppPlacementAllocator() { writeLock = lock.writeLock(); } + @SuppressWarnings("unchecked") + @Override + public void initialize(AppSchedulingInfo appSchedulingInfo, + SchedulerRequestKey schedulerRequestKey, RMContext rmContext) { + super.initialize(appSchedulingInfo, schedulerRequestKey, rmContext); + String multiNodePolicyName = appSchedulingInfo + .getApplicationSchedulingEnvs().get( + ApplicationSchedulingConfig.ENV_MULTI_NODE_SORTING_POLICY_CLASS); + if (multiNodePolicyName != null && !multiNodePolicyName.isEmpty()) { + nodeLookupPolicy = (MultiNodeLookupPolicy) rmContext + .getMultiNodeSortingManager().getMultiNodePolicy(multiNodePolicyName); + } + } + @Override @SuppressWarnings("unchecked") public Iterator getPreferredNodeIterator( @@ -74,11 +93,15 @@ public LocalityAppPlacementAllocator() { // in. N singleNode = CandidateNodeSetUtils.getSingleNode(candidateNodeSet); - if (null != singleNode) { + if ( singleNode != null || nodeLookupPolicy == null) { return IteratorUtils.singletonIterator(singleNode); } - return IteratorUtils.emptyIterator(); + // singleNode will be null if Multi-node placement lookup is enabled, and + // hence could consider sorting policies. + return nodeLookupPolicy.getPreferredNodeIterator( + candidateNodeSet.getAllNodes().values(), + candidateNodeSet.getPartition()); } private boolean hasRequestLabelChanged(ResourceRequest requestOne, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeLookupPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeLookupPolicy.java new file mode 100644 index 00000000000..25c792f1ff9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeLookupPolicy.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +import java.util.Collection; +import java.util.Iterator; +import java.util.Set; + +/** + *

+ * This class has the following functionality: + * + *

+ * Provide an interface for MultiNodeLookupPolicy so that different placement + * allocator can choose nodes based on need. + *

+ */ +public interface MultiNodeLookupPolicy { + /** + * Get iterator of preferred node depends on requirement and/or availability + * + * @param partition + * node label + * @param nodes + * List of Nodes + * + * @return iterator of preferred node + */ + Iterator getPreferredNodeIterator(Collection nodes, String partition); + + /** + * Refresh working nodes set for re-ordering based on the algorithm selected. + * + * @param nodes + * a collection working nm's. + */ + void addAndRefreshNodesSet(Collection nodes, String partition); + + /** + * Get sorted nodes per partition. + * + * @param partition + * node label + * + * @return collection of sorted nodes + */ + Set getNodesPerPartition(String partition); + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodePolicySpec.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodePolicySpec.java new file mode 100644 index 00000000000..131dc542b5a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodePolicySpec.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +/** + * MultiNodePolicySpec contains policyName and timeout. + */ +public class MultiNodePolicySpec { + + private String policyName; + private long timeout; + + public MultiNodePolicySpec(String policyName, long timeout) { + this.setTimeout(timeout); + this.setPolicyName(policyName); + } + + public long getTimeout() { + return timeout; + } + + public void setTimeout(long timeout) { + this.timeout = timeout; + } + + public String getPolicyName() { + return policyName; + } + + public void setPolicyName(String policyName) { + this.policyName = policyName; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java new file mode 100644 index 00000000000..5b90491f9f5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +import com.google.common.annotations.VisibleForTesting; + +public class MultiNodeSorter extends AbstractService { + + private MultiNodeLookupPolicy multiNodePolicy; + private static final Log LOG = LogFactory.getLog(MultiNodeSorter.class); + + // ScheduledExecutorService which schedules the PreemptionChecker to run + // periodically. + private ScheduledExecutorService ses; + private ScheduledFuture handler; + private volatile boolean stopped; + private RMContext rmContext; + private MultiNodePolicySpec policySpec; + + public MultiNodeSorter(RMContext rmContext, + MultiNodePolicySpec policy) { + super("MultiNodeLookupPolicy"); + this.rmContext = rmContext; + this.policySpec = policy; + } + + @VisibleForTesting + public synchronized MultiNodeLookupPolicy getMultiNodeLookupPolicy() { + return multiNodePolicy; + } + + public void serviceInit(Configuration conf) throws Exception { + LOG.info("Initializing MultiNodeSorter=" + policySpec.getPolicyName()); + initPolicy(policySpec.getPolicyName()); + super.serviceInit(conf); + } + + @SuppressWarnings("unchecked") + void initPolicy(String policyName) throws YarnException { + Class policyClass; + try { + policyClass = Class.forName(policyName); + } catch (ClassNotFoundException e) { + throw new YarnException( + "Invalid policy name:" + policyName + e.getMessage()); + } + this.multiNodePolicy = (MultiNodeLookupPolicy) ReflectionUtils + .newInstance(policyClass, null); + } + + @Override + public void serviceStart() throws Exception { + LOG.info("Starting SchedulingMonitor=" + getName()); + assert !stopped : "starting when already stopped"; + ses = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + public Thread newThread(Runnable r) { + Thread t = new Thread(r); + t.setName(getName()); + return t; + } + }); + handler = ses.scheduleAtFixedRate(new SortingThread(), + 0, policySpec.getTimeout(), TimeUnit.MILLISECONDS); + super.serviceStart(); + } + + @Override + public void serviceStop() throws Exception { + stopped = true; + if (handler != null) { + LOG.info("Stop " + getName()); + handler.cancel(true); + ses.shutdown(); + } + super.serviceStop(); + } + + @SuppressWarnings("unchecked") + public void reSortClusterNodes() { + Set nodeLabels = new HashSet<>(); + nodeLabels + .addAll(rmContext.getNodeLabelManager().getClusterNodeLabelNames()); + nodeLabels.add(RMNodeLabelsManager.NO_LABEL); + for (String label : nodeLabels) { + Map nodesByPartition = new HashMap<>(); + List nodes = ((AbstractYarnScheduler) rmContext + .getScheduler()).getNodeTracker().getNodesPerPartition(label); + if (nodes != null && !nodes.isEmpty()) { + nodes.forEach(n -> nodesByPartition.put(n.getNodeID(), n)); + multiNodePolicy.addAndRefreshNodesSet( + (Collection) nodesByPartition.values(), label); + } + } + } + + private class SortingThread implements Runnable { + @Override + public void run() { + try { + reSortClusterNodes(); + } catch (Throwable t) { + // The preemption monitor does not alter structures nor do structures + // persist across invocations. Therefore, log, skip, and retry. + LOG.error("Exception raised while executing multinode" + + " sorter, skip this run..., exception=", t); + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java new file mode 100644 index 00000000000..e7eae062922 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + + +public class MultiNodeSortingManager extends AbstractService { + + private static final Log LOG = LogFactory.getLog(MultiNodeSortingManager.class); + + private volatile boolean stopped; + private RMContext rmContext; + private Map> runningMultiNodeSorters; + private Set policySpecs = new HashSet(); + private Configuration conf; + private boolean multiNodePlacementEnabled; + + public MultiNodeSortingManager() { + super("MultiNodeSortingManager"); + this.runningMultiNodeSorters = new ConcurrentHashMap<>(); + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + LOG.info("Initializing MultiNodeSortingManager=" + getName()); + super.serviceInit(conf); + this.conf = conf; + } + + @Override + public void serviceStart() throws Exception { + LOG.info("Starting NodeSortingService=" + getName()); + assert !stopped : "starting when already stopped"; + createAllPolicies(); + super.serviceStart(); + } + + @Override + public void serviceStop() throws Exception { + for (MultiNodeSorter sorter : runningMultiNodeSorters.values()) { + sorter.stop(); + } + super.serviceStop(); + } + + private void createAllPolicies() { + if (!multiNodePlacementEnabled) { + return; + } + for (MultiNodePolicySpec policy : policySpecs) { + MultiNodeSorter mon = new MultiNodeSorter(rmContext, policy); + mon.init(conf); + mon.start(); + runningMultiNodeSorters.put(policy.getPolicyName(), mon); + } + } + + public MultiNodeSorter getMultiNodePolicy(String name) { + return runningMultiNodeSorters.get(name); + } + + public void setRMContext(RMContext rmContext) { + this.rmContext = rmContext; + } + + public void registerMultiNodePolicyNames( + boolean multiNodePlacementEnabled, + Set multiNodePlacementPolicies) { + this.policySpecs.addAll(multiNodePlacementPolicies); + this.multiNodePlacementEnabled = multiNodePlacementEnabled; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceUsageBasedMultiNodeLookupPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceUsageBasedMultiNodeLookupPolicy.java new file mode 100644 index 00000000000..71500975205 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceUsageBasedMultiNodeLookupPolicy.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentSkipListSet; + +/** + *

+ * This class has the following functionality: + * + *

+ * ResourceUsageBasedMultiNodeLookupPolicy holds sorted nodes list based on the + * resource usage of nodes at given time. + *

+ */ +public class ResourceUsageBasedMultiNodeLookupPolicy + implements MultiNodeLookupPolicy { + + private Map> nodesPerPartition = new HashMap<>(); + protected Comparator comparator; + + public ResourceUsageBasedMultiNodeLookupPolicy() { + this.comparator = new Comparator() { + @Override + public int compare(N o1, N o2) { + int allocatedDiff = o1.getAllocatedResource() + .compareTo(o2.getAllocatedResource()); + if (allocatedDiff == 0) { + return o1.getNodeID().compareTo(o2.getNodeID()); + } + return allocatedDiff; + } + }; + } + + @Override + public Iterator getPreferredNodeIterator(Collection nodes, + String partition) { + return this.nodesPerPartition.get(partition).iterator(); + } + + @Override + public void addAndRefreshNodesSet(Collection nodes, String partition) { + Set nodeList = null; + if ((nodeList = nodesPerPartition.get(partition)) == null) { + nodeList = new ConcurrentSkipListSet(comparator); + this.nodesPerPartition.put(partition, nodeList); + } + + // Clear existing entries first. + nodeList.clear(); + + // Add fresh set of nodes for re-ordering. + nodeList.addAll(nodes); + } + + @Override + public Set getNodesPerPartition(String partition) { + return Collections.unmodifiableSet(this.nodesPerPartition.get(partition)); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java index eef86a44990..09d3327263b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -295,6 +296,8 @@ public Resource answer(InvocationOnMock invocation) throws Throwable { }); mockRmContext.setNodeLabelManager(nlm); + mockRmContext + .setMultiNodeSortingManager(mock(MultiNodeSortingManager.class)); return mockRmContext; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java index 3c4e6b424de..a13afb31c6f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java @@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -98,6 +99,7 @@ public void setUp() { context = new RMContextImpl(dispatcher, null, null, null, null, null, null, null, null, null); + context.setMultiNodeSortingManager(new MultiNodeSortingManager()); dispatcher.register(SchedulerEventType.class, new InlineDispatcher.EmptyEventHandler()); dispatcher.register(RMNodeEventType.class, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 1d2aadcf2a8..18481a0bc51 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -35,8 +35,10 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; @@ -146,6 +148,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSorter; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; @@ -4910,4 +4914,41 @@ public Object answer(InvocationOnMock invocation) throws Exception { spyCs.handle(new NodeUpdateSchedulerEvent( spyCs.getNode(nm.getNodeId()).getRMNode())); } + + @Test + public void testMultiNodeSorterForScheduling() throws Exception { + CapacitySchedulerConfiguration config = new CapacitySchedulerConfiguration(); + config.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, + DominantResourceCalculator.class.getName()); + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration( + config); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICIES, + "resource-based"); + String policyName = CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME + + ".resource-based" + ".class"; + String resPolicyClass = "org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceUsageBasedMultiNodeLookupPolicy"; + conf.set(policyName, resPolicyClass); + conf.setBoolean(CapacitySchedulerConfiguration.MULTI_NODE_PLACEMENT_ENABLED, + true); + conf.setInt("yarn.scheduler.minimum-allocation-mb", 512); + conf.setInt("yarn.scheduler.minimum-allocation-vcores", 1); + MockRM rm = new MockRM(conf); + rm.start(); + rm.registerNode("127.0.0.1:1234", 10 * GB); + rm.registerNode("127.0.0.1:1235", 10 * GB); + rm.registerNode("127.0.0.1:1236", 10 * GB); + rm.registerNode("127.0.0.1:1237", 10 * GB); + ResourceScheduler scheduler = rm.getRMContext().getScheduler(); + waitforNMRegistered(scheduler, 4, 5); + MultiNodeSortingManager mns = rm.getRMContext() + .getMultiNodeSortingManager(); + MultiNodeSorter sorter = mns + .getMultiNodePolicy(resPolicyClass); + Set nodes = sorter.getMultiNodeLookupPolicy() + .getNodesPerPartition(""); + Assert.assertEquals(4, nodes.size()); + rm.stop(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java index b4ebd15ccde..4cbfcf02b24 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; @@ -45,6 +46,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo; @@ -817,4 +819,74 @@ private long waitForResourceUpdate(MockRM rm, String queuename, long memory, } return memorySize; } + + private long waitForNodeLabelSchedulerEventUpdate(MockRM rm, String partition, + long expectedNodeCount, long timeout) throws InterruptedException { + long start = System.currentTimeMillis(); + long size = 0; + while (System.currentTimeMillis() - start < timeout) { + CapacityScheduler scheduler = (CapacityScheduler) rm + .getResourceScheduler(); + size = scheduler.getNodeTracker().getNodesPerPartition(partition).size(); + if (size == expectedNodeCount) { + return size; + } + Thread.sleep(100); + } + return size; + } + + @Test + public void testNodeCountBasedOnNodeLabelsFromClusterNodeTracker() + throws Exception { + // set node -> label + mgr.addToCluserNodeLabelsWithDefaultExclusivity( + ImmutableSet.of("x", "y", "z")); + + // set mapping: + // h1 -> x + // h2 -> y + mgr.addLabelsToNode( + ImmutableMap.of(NodeId.newInstance("h1", 1234), toSet("x"))); + mgr.addLabelsToNode( + ImmutableMap.of(NodeId.newInstance("h2", 1234), toSet("x"))); + + // inject node label manager + MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm.getRMContext().setNodeLabelManager(mgr); + rm.start(); + MockNM nm1 = rm.registerNode("h1:1234", 8000); + rm.registerNode("h2:1234", 8000); + rm.registerNode("h3:1234", 8000); + + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + // Ensure that cluster node tracker is updated with correct set of node + // after Node registration. + Assert.assertEquals(2, + cs.getNodeTracker().getNodesPerPartition("x").size()); + Assert.assertEquals(1, cs.getNodeTracker().getNodesPerPartition("").size()); + + rm.unRegisterNode(nm1); + rm.registerNode("h4:1234", 8000); + + // Ensure that cluster node tracker is updated with correct set of node + // after new Node registration and old node label change. + Assert.assertEquals(1, + cs.getNodeTracker().getNodesPerPartition("x").size()); + Assert.assertEquals(2, cs.getNodeTracker().getNodesPerPartition("").size()); + + mgr.replaceLabelsOnNode( + ImmutableMap.of(NodeId.newInstance("h2", 1234), toSet(""))); + + // Last node with label x is replaced by CLI or REST. + Assert.assertEquals(0, + waitForNodeLabelSchedulerEventUpdate(rm, "x", 0, 3000L)); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java index add14ab2fdc..57c5bb555fa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import static org.mockito.Mockito.mock; + import java.io.IOException; import java.util.HashSet; import java.util.List; @@ -36,6 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -52,12 +55,14 @@ private static final double DELTA = 0.000001; private RMNodeLabelsManager nodeLabelManager; - + MultiNodeSortingManager ms; + @Before public void setup() { nodeLabelManager = new NullRMNodeLabelsManager(); nodeLabelManager.init(new YarnConfiguration()); nodeLabelManager.start(); + ms = mock(MultiNodeSortingManager.class); } @Test @@ -507,6 +512,7 @@ public void testQueueParsingReinitializeWithLabels() throws IOException { new NMTokenSecretManagerInRM(conf), new ClientToAMTokenSecretManagerInRM(), null); rmContext.setNodeLabelManager(nodeLabelManager); + rmContext.setMultiNodeSortingManager(ms); capacityScheduler.setConf(conf); capacityScheduler.setRMContext(rmContext); capacityScheduler.init(conf); @@ -660,6 +666,7 @@ public void testQueueParsingWithLabels() throws IOException { new NMTokenSecretManagerInRM(csConf), new ClientToAMTokenSecretManagerInRM(), null); rmContext.setNodeLabelManager(nodeLabelManager); + rmContext.setMultiNodeSortingManager(ms); capacityScheduler.setConf(csConf); capacityScheduler.setRMContext(rmContext); capacityScheduler.init(csConf); @@ -685,6 +692,7 @@ public void testQueueParsingWithLeafQueueDisableElasticity() new NMTokenSecretManagerInRM(csConf), new ClientToAMTokenSecretManagerInRM(), null); rmContext.setNodeLabelManager(nodeLabelManager); + rmContext.setMultiNodeSortingManager(ms); capacityScheduler.setConf(csConf); capacityScheduler.setRMContext(rmContext); capacityScheduler.init(csConf); @@ -709,6 +717,7 @@ public void testQueueParsingWithLabelsInherit() throws IOException { new NMTokenSecretManagerInRM(csConf), new ClientToAMTokenSecretManagerInRM(), null); rmContext.setNodeLabelManager(nodeLabelManager); + rmContext.setMultiNodeSortingManager(ms); capacityScheduler.setConf(csConf); capacityScheduler.setRMContext(rmContext); capacityScheduler.init(csConf); @@ -737,6 +746,7 @@ public void testQueueParsingWhenLabelsNotExistedInNodeLabelManager() nodeLabelsManager.start(); rmContext.setNodeLabelManager(nodeLabelsManager); + rmContext.setMultiNodeSortingManager(ms); capacityScheduler.setConf(csConf); capacityScheduler.setRMContext(rmContext); capacityScheduler.init(csConf); @@ -765,6 +775,7 @@ public void testQueueParsingWhenLabelsInheritedNotExistedInNodeLabelManager() nodeLabelsManager.start(); rmContext.setNodeLabelManager(nodeLabelsManager); + rmContext.setMultiNodeSortingManager(ms); capacityScheduler.setConf(csConf); capacityScheduler.setRMContext(rmContext); capacityScheduler.init(csConf); @@ -793,6 +804,7 @@ public void testSingleLevelQueueParsingWhenLabelsNotExistedInNodeLabelManager() nodeLabelsManager.start(); rmContext.setNodeLabelManager(nodeLabelsManager); + rmContext.setMultiNodeSortingManager(ms); capacityScheduler.setConf(csConf); capacityScheduler.setRMContext(rmContext); capacityScheduler.init(csConf); @@ -820,6 +832,7 @@ public void testQueueParsingWhenLabelsNotExist() throws IOException { nodeLabelsManager.start(); rmContext.setNodeLabelManager(nodeLabelsManager); + rmContext.setMultiNodeSortingManager(ms); capacityScheduler.setConf(csConf); capacityScheduler.setRMContext(rmContext); capacityScheduler.init(csConf); @@ -850,6 +863,7 @@ public void testQueueParsingWithUnusedLabels() throws IOException { new NMTokenSecretManagerInRM(csConf), new ClientToAMTokenSecretManagerInRM(), null); rmContext.setNodeLabelManager(nodeLabelManager); + rmContext.setMultiNodeSortingManager(ms); capacityScheduler.setRMContext(rmContext); capacityScheduler.init(conf); capacityScheduler.start(); @@ -1051,6 +1065,7 @@ public void testQueueParsingWithMoveQueue() new NMTokenSecretManagerInRM(csConf), new ClientToAMTokenSecretManagerInRM(), null); rmContext.setNodeLabelManager(nodeLabelManager); + rmContext.setMultiNodeSortingManager(ms); capacityScheduler.setConf(csConf); capacityScheduler.setRMContext(rmContext); capacityScheduler.init(csConf); @@ -1116,6 +1131,7 @@ public void testQueueOrderingPolicyUpdatedAfterReinitialize() new NMTokenSecretManagerInRM(conf), new ClientToAMTokenSecretManagerInRM(), null); rmContext.setNodeLabelManager(nodeLabelManager); + rmContext.setMultiNodeSortingManager(ms); capacityScheduler.setConf(conf); capacityScheduler.setRMContext(rmContext); capacityScheduler.init(conf); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java index 870588715da..00be8ba588b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java @@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; @@ -153,6 +154,8 @@ private void setup(CapacitySchedulerConfiguration csConf, when(spyRMContext.getScheduler()).thenReturn(cs); when(spyRMContext.getYarnConfiguration()) .thenReturn(new YarnConfiguration()); + when(spyRMContext.getMultiNodeSortingManager()) + .thenReturn(new MultiNodeSortingManager<>()); cs.setRMContext(spyRMContext); cs.init(csConf); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index fae63be5051..efa8ad5f122 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; @@ -138,6 +139,8 @@ public Resource answer(InvocationOnMock invocation) throws Throwable { AllocationTagsManager ptm = mock(AllocationTagsManager.class); rmContext.setAllocationTagsManager(ptm); + MultiNodeSortingManager ms = mock(MultiNodeSortingManager.class); + rmContext.setMultiNodeSortingManager(ms); return rmContext; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index 8814c0e542d..79040d66171 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -100,6 +100,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -193,6 +194,7 @@ public void testAppAttemptMetrics() throws Exception { Configuration conf = new Configuration(); ((RMContextImpl) rmContext).setScheduler(scheduler); + rmContext.setMultiNodeSortingManager(mock(MultiNodeSortingManager.class)); scheduler.setRMContext(rmContext); scheduler.init(conf); scheduler.start(); @@ -241,6 +243,7 @@ public void testNodeLocalAssignment() throws Exception { rmContext.setRMApplicationHistoryWriter( mock(RMApplicationHistoryWriter.class)); ((RMContextImpl) rmContext).setYarnConfiguration(new YarnConfiguration()); + rmContext.setMultiNodeSortingManager(mock(MultiNodeSortingManager.class)); scheduler.setRMContext(rmContext); scheduler.init(conf); @@ -323,6 +326,7 @@ public void testUpdateResourceOnNode() throws Exception { nlm.init(new Configuration()); rmContext.setNodeLabelManager(nlm); rmContext.setAllocationTagsManager(ptm); + rmContext.setMultiNodeSortingManager(mock(MultiNodeSortingManager.class)); scheduler.setRMContext(rmContext); ((RMContextImpl) rmContext).setScheduler(scheduler); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java index 93377be1ea9..870c498d593 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java @@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -199,6 +200,7 @@ public static RMContext mockRMContext(int numApps, int racks, int numNodes, }; rmContext.setNodeLabelManager(new NullRMNodeLabelsManager()); rmContext.setYarnConfiguration(new YarnConfiguration()); + rmContext.setMultiNodeSortingManager(mock(MultiNodeSortingManager.class)); return rmContext; } @@ -236,6 +238,7 @@ public static CapacityScheduler mockCapacityScheduler() throws IOException { RMNodeLabelsManager labelManager = new NullRMNodeLabelsManager(); labelManager.init(yarnConf); rmContext.setNodeLabelManager(labelManager); + rmContext.setMultiNodeSortingManager(mock(MultiNodeSortingManager.class)); cs.setRMContext(rmContext); cs.init(conf); return cs; -- 2.14.3 (Apple Git-98)