From 6ac7c57ee8fb3b46becdb0ba0a2a87035945a5f2 Mon Sep 17 00:00:00 2001 From: Sunil G Date: Thu, 1 Mar 2018 23:18:36 +0530 Subject: [PATCH] YARN-7494 --- .../scheduler/AppSchedulingInfo.java | 11 +- .../scheduler/capacity/CapacityScheduler.java | 61 +++++++- .../capacity/CapacitySchedulerConfiguration.java | 90 +++++++++++- .../scheduler/capacity/LeafQueue.java | 15 +- .../common/ApplicationSchedulingConfig.java | 10 ++ .../scheduler/common/fica/FiCaSchedulerApp.java | 11 ++ .../placement/DefaultMultiNodeLookupPolicy.java | 67 +++++++++ .../placement/LocalityAppPlacementAllocator.java | 23 ++- .../scheduler/placement/MultiNodeLookupPolicy.java | 57 ++++++++ .../placement/MultiNodeSortingManager.java | 158 +++++++++++++++++++++ .../scheduler/placement/NodesSortingAlgorithm.java | 55 +++++++ .../placement/PartitionBasedCandidateNodeSet.java | 54 +++++++ .../ResourceUsageNodesSortingAlgorithm.java | 70 +++++++++ 13 files changed, 667 insertions(+), 15 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/DefaultMultiNodeLookupPolicy.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeLookupPolicy.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/NodesSortingAlgorithm.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PartitionBasedCandidateNodeSet.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceUsageNodesSortingAlgorithm.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 1efdd8ba430..6af0b4e8a4b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -93,7 +93,7 @@ private final ReentrantReadWriteLock.WriteLock writeLock; public final ContainerUpdateContext updateContext; - public final Map applicationSchedulingEnvs = new HashMap<>(); + private final Map applicationSchedulingEnvs = new HashMap<>(); private final RMContext rmContext; public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user, @@ -763,4 +763,13 @@ public boolean precheckNode(SchedulerRequestKey schedulerKey, this.readLock.unlock(); } } + + /** + * Get scheduling envs configured for this application. + * + * @return a map of applicationSchedulingEnvs + */ + public Map getApplicationSchedulingEnvs() { + return applicationSchedulingEnvs; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index ddab0c1bd82..bfb86f69366 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.EnumSet; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -146,6 +147,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PartitionBasedCandidateNodeSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet; import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -249,6 +252,9 @@ public Configuration getConf() { private ResourceCommitterService resourceCommitterService; private RMNodeLabelsManager labelManager; private AppPriorityACLsManager appPriorityACLManager; + private boolean multiNodePlacementEnabled; + + private MultiNodeSortingManager multiNodeSortingManager; private static boolean printedVerboseLoggingForAsyncScheduling = false; @@ -382,6 +388,10 @@ void initScheduler(Configuration configuration) throws // Setup how many containers we can allocate for each round offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit(); + this.multiNodePlacementEnabled = this.conf.getMultiNodePlacementEnabled(); + this.multiNodeSortingManager = new MultiNodeSortingManager(rmContext, + this.conf.getMultiNodePlacementPolicies(), this.multiNodePlacementEnabled); + LOG.info("Initialized CapacityScheduler with " + "calculator=" + getResourceCalculator().getClass() + ", " + "minimumAllocation=<" + getMinimumResourceCapability() + ">, " + "maximumAllocation=<" @@ -418,10 +428,18 @@ public void serviceInit(Configuration conf) throws Exception { initScheduler(configuration); // Initialize SchedulingMonitorManager schedulingMonitorManager.initialize(rmContext, conf); + if (multiNodePlacementEnabled) { + // Initialize the service thread to do periodic sorting. + multiNodeSortingManager.init(conf); + } } @Override public void serviceStart() throws Exception { + if (multiNodePlacementEnabled) { + // Start the service thread which does periodic node sorting. + multiNodeSortingManager.start(); + } startSchedulerThreads(); super.serviceStart(); } @@ -445,6 +463,11 @@ public void serviceStop() throws Exception { if (isConfigurationMutable()) { ((MutableConfigurationProvider) csConfProvider).close(); } + + if (multiNodePlacementEnabled) { + // Stop the service thread which does periodic node sorting. + multiNodeSortingManager.stop(); + } super.serviceStop(); } @@ -1345,6 +1368,26 @@ private boolean canAllocateMore(CSAssignment assignment, int offswitchCount, || assignedContainers < maxAssignPerHeartbeat); } + private CandidateNodeSet getCandidateNodeSet( + FiCaSchedulerNode node) { + CandidateNodeSet candidates = null; + if (!multiNodePlacementEnabled) { + candidates = new SimpleCandidateNodeSet<>(node); + } else { + Set labels = new HashSet<>(); + labels.add(node.getPartition()); + Set nodes = labelManager.getLabelsInfoToNodes(labels) + .get(node.getPartition()); + Map nodesByPartition = new HashMap<>(); + for (NodeId nodeId : nodes) { + nodesByPartition.put(nodeId, getNode(nodeId)); + } + candidates = new PartitionBasedCandidateNodeSet( + nodesByPartition, node.getPartition()); + } + return candidates; + } + /** * We need to make sure when doing allocation, Node should be existed * And we will construct a {@link CandidateNodeSet} before proceeding @@ -1356,8 +1399,8 @@ private void allocateContainersToNode(NodeId nodeId, int offswitchCount = 0; int assignedContainers = 0; - CandidateNodeSet candidates = - new SimpleCandidateNodeSet<>(node); + CandidateNodeSet candidates = getCandidateNodeSet( + node); CSAssignment assignment = allocateContainersToNode(candidates, withNodeHeartbeat); // Only check if we can allocate more container on the same node when @@ -1596,10 +1639,10 @@ CSAssignment allocateContainersToNode( // We have two different logics to handle allocation on single node / multi // nodes. - if (null != node) { - return allocateContainerOnSingleNode(candidates, node, withNodeHeartbeat); - } else{ + if (multiNodePlacementEnabled) { return allocateContainersOnMultiNodes(candidates); + } else { + return allocateContainerOnSingleNode(candidates, node, withNodeHeartbeat); } } @@ -2976,4 +3019,12 @@ private LeafQueue autoCreateLeafQueue( } return autoCreatedLeafQueue; } + + /** + * Get Multi-Node sorting manager instance + * @return multiNodeSortingManager + */ + public MultiNodeSortingManager getMultiNodeSortingManager() { + return multiNodeSortingManager; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index bdd30b915c9..a97657ea485 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.authorize.AccessControlList; @@ -40,10 +41,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AppPriorityACLConfigurationParser.AppPriorityACLKeyType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.NodesSortingAlgorithm; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceUsageNodesSortingAlgorithm; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; @@ -582,10 +586,11 @@ public void setAccessibleNodeLabels(String queue, Set labels) { set(getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS, str); } + public Set getAccessibleNodeLabels(String queue) { String accessibleLabelStr = get(getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS); - + // When accessible-label is null, if (accessibleLabelStr == null) { // Only return null when queue is not ROOT @@ -600,12 +605,12 @@ public void setAccessibleNodeLabels(String queue, Set labels) { + " it will be automatically set to \"*\"."); } } - + // always return ANY for queue root if (queue.equals(ROOT)) { return ImmutableSet.of(RMNodeLabelsManager.ANY); } - + // In other cases, split the accessibleLabelStr by "," Set set = new HashSet(); for (String str : accessibleLabelStr.split(",")) { @@ -621,7 +626,6 @@ public void setAccessibleNodeLabels(String queue, Set labels) { } return Collections.unmodifiableSet(set); } - private float internalGetLabeledQueueCapacity(String queue, String label, String suffix, float defaultValue) { String capacityPropertyName = getNodeLabelPrefix(queue, label) + suffix; @@ -2017,4 +2021,82 @@ private void updateResourceValuesFromConfig(Set resourceTypes, break; } } + + @Private public static final String MULTI_NODE_SORTING_ALGORITHM_POLICIES = + PREFIX + "multi-node-sorting.policies"; + + @Private public static final String MULTI_NODE_SORTING_ALGORITHM_CLASS = + PREFIX + "multi-node-sorting.algorithm"; + + /** + * resource usage based node sorting algorithm. + */ + public static final String RESOURCE_USAGE_BASED_NODE_SORTING_ALGORITHM = "resource-usage"; + + @Private + public static final String MULTI_NODE_PLACEMENT_ENABLED = PREFIX + + "multi-node-placement-enabled"; + + @Private + public static final boolean DEFAULT_MULTI_NODE_PLACEMENT_ENABLED = false; + + @SuppressWarnings("unchecked") + public String getMultiNodesSortingAlgorithm( + String queue) { + + String policyName = get( + getQueuePrefix(queue) + "multi-node-sorting.policy"); + + if (policyName == null) { + policyName = get(MULTI_NODE_SORTING_ALGORITHM_CLASS); + } + + // If node sorting poicy is not configured in queue and in cluster level, + // it is been assumed that this queue is not enabled with multi-node lookup + if (policyName == null || policyName.isEmpty()) { + return null; + } + + // Ensure that custom node sorting algorithm class is valid. + if (!policyName.trim() + .equals(RESOURCE_USAGE_BASED_NODE_SORTING_ALGORITHM)) { + try { + Class nodeSortingPolicyClazz = getClassByName(policyName); + if (NodesSortingAlgorithm.class + .isAssignableFrom(nodeSortingPolicyClazz)) { + return policyName; + } else { + throw new YarnRuntimeException( + "Class: " + policyName + " not instance of " + + NodesSortingAlgorithm.class.getCanonicalName()); + } + } catch (ClassNotFoundException e) { + throw new YarnRuntimeException("Could not instantiate " + + "NodesSortingAlgorithm: " + policyName + " for queue: " + queue, + e); + } + } + + return policyName; + } + + public boolean getMultiNodePlacementEnabled() { + return getBoolean(MULTI_NODE_PLACEMENT_ENABLED, + DEFAULT_MULTI_NODE_PLACEMENT_ENABLED); + } + + public Set getMultiNodePlacementPolicies() { + String policies = get(MULTI_NODE_SORTING_ALGORITHM_POLICIES, + RESOURCE_USAGE_BASED_NODE_SORTING_ALGORITHM); + + // In other cases, split the accessibleLabelStr by "," + Set set = new HashSet(); + for (String str : policies.split(",")) { + if (!str.trim().isEmpty()) { + set.add(str.trim()); + } + } + + return Collections.unmodifiableSet(set); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 8d1428d3e49..ba13ffda918 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -53,10 +53,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; - -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; @@ -71,6 +67,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.NodesSortingAlgorithm; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyForPendingApps; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.server.utils.Lock; @@ -123,6 +120,8 @@ private volatile OrderingPolicy orderingPolicy = null; + private String multiNodeSortingAlgorithmName = null; + // record all ignore partition exclusivityRMContainer, this will be used to do // preemption, key is the partition of the RMContainer allocated on private Map> ignorePartitionExclusivityRMContainers = @@ -286,6 +285,10 @@ protected void setupQueueConfigs(Resource clusterResource, usersManager.updateUserWeights(); + // Update multi-node sorting algorithm for scheduling as configured. + multiNodeSortingAlgorithmName = conf + .getMultiNodesSortingAlgorithm(getQueuePath()); + LOG.info( "Initializing " + queueName + "\n" + "capacity = " + queueCapacities .getCapacity() + " [= (float) configuredCapacity / 100 ]" + "\n" @@ -2203,4 +2206,8 @@ public long getMaximumApplicationLifetime() { public long getDefaultApplicationLifetime() { return defaultApplicationLifetime; } + + public String getMultiNodeSortingAlgorithmName() { + return this.multiNodeSortingAlgorithmName; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ApplicationSchedulingConfig.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ApplicationSchedulingConfig.java index 1bd37431c15..227836d3d1e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ApplicationSchedulingConfig.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ApplicationSchedulingConfig.java @@ -20,6 +20,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.LocalityAppPlacementAllocator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.NodesSortingAlgorithm; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceUsageNodesSortingAlgorithm; /** * This class will keep all Scheduling env's names which will help in * placement calculations. @@ -32,4 +34,12 @@ @InterfaceAudience.Private public static final Class DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS = LocalityAppPlacementAllocator.class; + + @InterfaceAudience.Private + public static final String ENV_MULTI_NODE_SORTING_ALGORITHM_CLASS = + "MULTI_NODE_SORTING_ALGORITHM_CLASS"; + + @InterfaceAudience.Private + public static final Class + DEFAULT_MULTI_NODE_SORTING_ALGORITHM_CLASS = ResourceUsageNodesSortingAlgorithm.class; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index f3da0a36f0b..1df7311df61 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AbstractContainerAllocator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ApplicationSchedulingConfig; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; @@ -169,6 +170,16 @@ public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, rc = scheduler.getResourceCalculator(); } + // Update multi-node sorting algorithm to scheduler envs + if (rmApp != null) { + if (!appSchedulingInfo.getApplicationSchedulingEnvs().containsKey( + ApplicationSchedulingConfig.ENV_MULTI_NODE_SORTING_ALGORITHM_CLASS) + && getCSLeafQueue().getMultiNodeSortingAlgorithmName() != null) { + appSchedulingInfo.getApplicationSchedulingEnvs().put( + ApplicationSchedulingConfig.ENV_MULTI_NODE_SORTING_ALGORITHM_CLASS, + getCSLeafQueue().getMultiNodeSortingAlgorithmName()); + } + } containerAllocator = new ContainerAllocator(this, rc, rmContext, activitiesManager); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/DefaultMultiNodeLookupPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/DefaultMultiNodeLookupPolicy.java new file mode 100644 index 00000000000..63d7b84f66f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/DefaultMultiNodeLookupPolicy.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ApplicationSchedulingConfig; + +import java.util.Collection; +import java.util.Iterator; + +/** + *

+ * This class has the following functionality: + * + *

+ * MultiNodeLookupPolicyPerResource holds sorted nodes list based on the + * resource usage of nodes at given time. + *

+ */ +public class DefaultMultiNodeLookupPolicy + implements MultiNodeLookupPolicy { + NodesSortingAlgorithm sortingAlgorithm; + + @Override + public Iterator getPreferredNodeIterator(String partition) { + return sortingAlgorithm.getSortedEntities(partition).iterator(); + } + + @Override + public void addAndRefreshNodesSet(Collection nodes, String partition) { + sortingAlgorithm.addAllEntitiesForOrdering(nodes, partition); + } + + @SuppressWarnings("unchecked") + @Override + public void initPolicy(String multiNodeSortingAlgorithm) { + Class policyClass; + try { + if (multiNodeSortingAlgorithm == null) { + policyClass = ApplicationSchedulingConfig.DEFAULT_MULTI_NODE_SORTING_ALGORITHM_CLASS; + } else { + policyClass = Class.forName(multiNodeSortingAlgorithm); + } + } catch (ClassNotFoundException e) { + policyClass = ApplicationSchedulingConfig.DEFAULT_MULTI_NODE_SORTING_ALGORITHM_CLASS; + } + this.sortingAlgorithm = (NodesSortingAlgorithm) ReflectionUtils + .newInstance(policyClass, null); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java index a0358b4ada2..17ad34a4b15 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java @@ -24,11 +24,15 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ApplicationSchedulingConfig; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; @@ -55,6 +59,7 @@ new ConcurrentHashMap<>(); private volatile String primaryRequestedPartition = RMNodeLabelsManager.NO_LABEL; + private MultiNodeLookupPolicy nodeLookupPolicy = null; private final ReentrantReadWriteLock.ReadLock readLock; private final ReentrantReadWriteLock.WriteLock writeLock; @@ -65,6 +70,19 @@ public LocalityAppPlacementAllocator() { writeLock = lock.writeLock(); } + @SuppressWarnings("unchecked") + @Override + public void initialize(AppSchedulingInfo appSchedulingInfo, + SchedulerRequestKey schedulerRequestKey, RMContext rmContext) { + super.initialize(appSchedulingInfo, schedulerRequestKey, rmContext); + if (rmContext.getResourceManager() + .getResourceScheduler() instanceof CapacityScheduler) { + CapacityScheduler cs = ((CapacityScheduler) rmContext.getResourceManager() + .getResourceScheduler()); + nodeLookupPolicy = (MultiNodeLookupPolicy) cs.getMultiNodeSortingManager(); + } + } + @Override @SuppressWarnings("unchecked") public Iterator getPreferredNodeIterator( @@ -78,7 +96,10 @@ public LocalityAppPlacementAllocator() { return IteratorUtils.singletonIterator(singleNode); } - return IteratorUtils.emptyIterator(); + // singleNode will be null if Multi-node placement lookup is enabled, and + // hence could consider sorting policies. + return nodeLookupPolicy + .getPreferredNodeIterator(candidateNodeSet.getPartition()); } private boolean hasRequestLabelChanged(ResourceRequest requestOne, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeLookupPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeLookupPolicy.java new file mode 100644 index 00000000000..2723ead7ed7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeLookupPolicy.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +import java.util.Collection; +import java.util.Iterator; + +/** + *

+ * This class has the following functionality: + * + *

+ * Provide an interface for MultiNodeLookupPolicy so that different placement + * allocator can choose nodes based on need. + *

+ */ +public interface MultiNodeLookupPolicy { + /** + * Get iterator of preferred node depends on requirement and/or availability + * @param partition node label + * + * @return iterator of preferred node + */ + Iterator getPreferredNodeIterator(String partition); + + /** + * Refresh working nodes set for re-ordering based on the algorithm selected. + * + * @param nodes + * a collection working nm's. + */ + void addAndRefreshNodesSet(Collection nodes, String partition); + + /** + * Define all the initialization needed for NodeLookup policy. + * @param multiNodeSortingAlgorithm node sorting algorithm + */ + void initPolicy(String multiNodeSortingAlgorithm); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java new file mode 100644 index 00000000000..76ee6374985 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ApplicationSchedulingConfig; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; + + +public class MultiNodeSortingManager extends AbstractService { + + private static final Log LOG = LogFactory.getLog(MultiNodeSortingManager.class); + + private ScheduledExecutorService ses; + private ScheduledFuture handler; + private volatile boolean stopped; + private long monitorInterval; + private RMContext rmContext; + private Set> policyList; + private Set policyNames; + private boolean multiNodePlacementEnabled; + + public MultiNodeSortingManager(RMContext rmContext, Set policies, + boolean multiNodePlacementEnabled) { + super("MultiNodeSortingManager"); + this.rmContext = rmContext; + this.policyList = new ConcurrentSkipListSet<>(); + this.policyNames = policies; + this.multiNodePlacementEnabled = multiNodePlacementEnabled; + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + LOG.info("Initializing SchedulingMonitor=" + getName()); + this.monitorInterval = 1000; // TODO: config this later + super.serviceInit(conf); + if (multiNodePlacementEnabled) { + createAllPolicies(); + } + } + + @Override + public void serviceStart() throws Exception { + LOG.info("Starting NodeSortingService=" + getName()); + assert !stopped : "starting when already stopped"; + ses = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + public Thread newThread(Runnable r) { + Thread t = new Thread(r); + t.setName(getName()); + return t; + } + }); + handler = ses.scheduleAtFixedRate(new SortingThread(), 0, monitorInterval, + TimeUnit.MILLISECONDS); + super.serviceStart(); + } + + @Override + public void serviceStop() throws Exception { + stopped = true; + if (handler != null) { + LOG.info("Stop " + getName()); + handler.cancel(true); + ses.shutdown(); + } + super.serviceStop(); + } + + private class SortingThread implements Runnable { + @Override + public void run() { + try { + reSortClusterNodes(); + } catch (Throwable t) { + // The preemption monitor does not alter structures nor do structures + // persist across invocations. Therefore, log, skip, and retry. + LOG.error("Exception raised while executing preemption" + + " checker, skip this run..., exception=", t); + } + } + } + + private void createAllPolicies() { + for (String policyName : policyNames) { + MultiNodeLookupPolicy nodeLookupPolicy = new DefaultMultiNodeLookupPolicy(); + nodeLookupPolicy.initPolicy(policyName); + policyList.add(nodeLookupPolicy); + } + } + + public void addToNodeSortingServiceList(MultiNodeLookupPolicy e) { + policyList.add(e); + } + + public void removeFromNodeSortingServiceList(MultiNodeLookupPolicy e) { + policyList.remove(e); + } + + @SuppressWarnings("unchecked") + public void reSortClusterNodes() { + if (rmContext.getResourceManager() + .getResourceScheduler() instanceof CapacityScheduler) { + CapacityScheduler cs = ((CapacityScheduler) rmContext.getResourceManager() + .getResourceScheduler()); + // Sort to re sort nodes + for (MultiNodeLookupPolicy policy : policyList) { + for (NodeLabel label : rmContext.getNodeLabelManager() + .getClusterNodeLabels()) { + Set labels = new HashSet<>(); + labels.add(label.getName()); + Set nodes = rmContext.getNodeLabelManager() + .getLabelsInfoToNodes(labels).get(label.getName()); + Map nodesByPartition = new HashMap<>(); + for (NodeId nodeId : nodes) { + nodesByPartition.put(nodeId, cs.getNode(nodeId)); + } + policy.addAndRefreshNodesSet( + (Collection) nodesByPartition.values(), label.getName()); + } + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/NodesSortingAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/NodesSortingAlgorithm.java new file mode 100644 index 00000000000..c2024033383 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/NodesSortingAlgorithm.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +import java.util.Collection; + +/** + *

+ * This class has the following functionality: + * + *

+ * Provide an interface for NodesSortingAlgorithm so that sub classes + * could define the compare logic to sort nodes. + *

+ */ +public interface NodesSortingAlgorithm { + /** + * Get collection of sorted objects managed by this Sorting Algorithm. + * + * @param partition + * nodelabel name + * + * @return a collection of {@link SchedulerNode} objects + */ + public Collection getSortedEntities(String partition); + + /** + * Add a collection of {@link SchedulerNode} objects to be managed for + * ordering. + * + * @param sc + * the collection of {@link SchedulerNode} objects to add + * @param partition + * nodeLabel + */ + public void addAllEntitiesForOrdering(Collection sc, String partition); +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PartitionBasedCandidateNodeSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PartitionBasedCandidateNodeSet.java new file mode 100644 index 00000000000..4e57bde56c4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PartitionBasedCandidateNodeSet.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +import java.util.Map; + +/** + * A CandidateNodeSet which keeps an unordered map based on partition. + */ +public class PartitionBasedCandidateNodeSet + implements CandidateNodeSet { + + private Map map; + private String partition; + + public PartitionBasedCandidateNodeSet(Map map, String partition) { + this.map = map; + this.partition = partition; + } + + @Override + public Map getAllNodes() { + return map; + } + + @Override + public long getVersion() { + return 0L; + } + + @Override + public String getPartition() { + return partition; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceUsageNodesSortingAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceUsageNodesSortingAlgorithm.java new file mode 100644 index 00000000000..7d5a67be499 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceUsageNodesSortingAlgorithm.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +import java.util.Collection; +import java.util.Comparator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentSkipListSet; + +/** + *

+ * This class has the following functionality: + * + *

+ * Provide an interface for NodesSortingAlgorithm so that sub classes + * could define the compare logic to sort nodes. + *

+ */ +public class ResourceUsageNodesSortingAlgorithm + implements NodesSortingAlgorithm { + private Map> nodesPerPartition; + protected Comparator comparator; + + public ResourceUsageNodesSortingAlgorithm() { + this.comparator = new Comparator() { + @Override + public int compare(N o1, N o2) { + return o2.getAllocatedResource().compareTo(o1.getAllocatedResource()); + } + }; + } + + public Collection getSortedEntities(String partition) { + return this.nodesPerPartition.get(partition); + } + + @Override + public void addAllEntitiesForOrdering(Collection nl, String partition) { + Set nodeList = null; + if ((nodeList = nodesPerPartition.get(partition)) == null) { + nodeList = new ConcurrentSkipListSet(comparator); + this.nodesPerPartition.put(partition, nodeList); + } + + // Clear existing entries first. + nodeList.clear(); + + // Add fresh set of nodes for re-ordering. + nodeList.addAll(nl); + } +} \ No newline at end of file -- 2.14.3 (Apple Git-98)