diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java index 0a6df09de99..63fd40537ae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java @@ -414,6 +414,27 @@ public float ratio(Resource a, Resource b) { return ratio; } + // Get dominant resource infomation. + // Resource a dominants in resource b. + public static ResourceInformation + getDominantResourceInfo(Resource a, Resource b) { + float ratio = 0.0f; + int resIndex = 0; + int maxLength = ResourceUtils.getNumberOfCountableResourceTypes(); + for (int i = 0; i < maxLength; i++) { + ResourceInformation aResourceInformation = a.getResourceInformation(i); + ResourceInformation bResourceInformation = b.getResourceInformation(i); + final float tmp = divideSafelyAsFloat(aResourceInformation.getValue(), + bResourceInformation.getValue()); + if (tmp > ratio) { + ratio = tmp; + resIndex = i; + } + } + return a.getResourceInformation(resIndex); + } + + @Override public Resource divideAndCeil(Resource numerator, int denominator) { return divideAndCeil(numerator, (long) denominator); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index d0ee25df300..56bfb9a1b7a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -2405,6 +2405,9 @@ private void updateResourceValuesFromConfig(Set resourceTypes, @Private public static final String MULTI_NODE_SORTING_POLICY_NAME = PREFIX + "multi-node-sorting.policy"; + @Private public static final String MULTI_NODE_DOMINANT_RESOURCE_LOOKUP = + PREFIX + "multi-node-dominant.lookup"; + /** * resource usage based node sorting algorithm. */ @@ -2412,6 +2415,7 @@ private void updateResourceValuesFromConfig(Set resourceTypes, public static final String DEFAULT_NODE_SORTING_POLICY_CLASSNAME = "org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceUsageMultiNodeLookupPolicy"; public static final long DEFAULT_MULTI_NODE_SORTING_INTERVAL = 1000L; + public static final boolean DEFAULT_MULTI_NODE_DOMINANT_RESOURCE_LOOKUP = false; @Private public static final String MULTI_NODE_PLACEMENT_ENABLED = PREFIX @@ -2453,6 +2457,11 @@ public boolean getMultiNodePlacementEnabled() { DEFAULT_MULTI_NODE_PLACEMENT_ENABLED); } + public boolean getMultiNodeDominantLookUp() { + return getBoolean(MULTI_NODE_DOMINANT_RESOURCE_LOOKUP, + DEFAULT_MULTI_NODE_DOMINANT_RESOURCE_LOOKUP); + } + public Set getMultiNodePlacementPolicies() { String[] policies = getTrimmedStrings(MULTI_NODE_SORTING_POLICIES); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index cced238b601..e4b33d37289 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -24,14 +24,13 @@ import java.util.Optional; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityLevel; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.DiagnosticsCollector; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; +import org.omg.CORBA.Any; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -826,6 +825,20 @@ ContainerAllocation doAllocation(ContainerAllocation allocationResult, return allocationResult; } + // Get dominant schedulerKey resourceinfo in candidates total resource. + ResourceInformation getDominantResourceInfo(CandidateNodeSet candidates, + SchedulerRequestKey schedulerKey) { + Resource candidatesTotal = Resource.newInstance(0,0); + candidates.getAllNodes().values().stream() + .map(n -> Resources.addTo(candidatesTotal, n.getTotalResource())); + Resource resRequest = + application.getAppSchedulingInfo(). + getPendingAsk(schedulerKey).getPerAllocationResource(); + ResourceInformation[] listResInfo = resRequest.getResources(); + return DominantResourceCalculator + .getDominantResourceInfo(resRequest, candidatesTotal); + } + private ContainerAllocation allocate(Resource clusterResource, CandidateNodeSet candidates, SchedulingMode schedulingMode, ResourceLimits resourceLimits, @@ -839,6 +852,9 @@ private ContainerAllocation allocate(Resource clusterResource, application.getAppSchedulingInfo().getAppPlacementAllocator( schedulerKey); + ResourceInformation dominantResInfo = + getDominantResourceInfo(candidates, schedulerKey); + // This could be null when #pending request decreased by another thread. if (schedulingPS == null) { ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( @@ -852,7 +868,7 @@ private ContainerAllocation allocate(Resource clusterResource, result = ContainerAllocation.PRIORITY_SKIPPED; Iterator iter = schedulingPS.getPreferredNodeIterator( - candidates); + candidates, dominantResInfo); while (iter.hasNext()) { FiCaSchedulerNode node = iter.next(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java index b1b340269d8..a3c52bbdbfe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; import org.apache.commons.collections.IteratorUtils; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.DiagnosticsCollector; @@ -75,7 +76,7 @@ * @return iterator of preferred node */ public Iterator getPreferredNodeIterator( - CandidateNodeSet candidateNodeSet) { + CandidateNodeSet candidateNodeSet, ResourceInformation dominantResInfo) { // Now only handle the case that single node in the candidateNodeSet // TODO, Add support to multi-hosts inside candidateNodeSet which is passed // in. @@ -90,7 +91,8 @@ return multiNodeSortingManager.getMultiNodeSortIterator( candidateNodeSet.getAllNodes().values(), candidateNodeSet.getPartition(), - multiNodeSortPolicyName); + multiNodeSortPolicyName, + dominantResInfo); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/DominantResourceUsageMultiNodeLookupPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/DominantResourceUsageMultiNodeLookupPolicy.java new file mode 100644 index 00000000000..aff9f435fbf --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/DominantResourceUsageMultiNodeLookupPolicy.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +import java.util.Comparator; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListSet; + +/** + *

+ * This class has the following functionality: + * + *

+ * ResourceUsageMultiNodeLookupPolicy holds sorted nodes list based on the + * resource usage of nodes at given time. + *

+ */ +public class DominantResourceUsageMultiNodeLookupPolicy + implements MultiNodeLookupPolicy { + + protected Map> nodesPerPartition = new ConcurrentHashMap<>(); + protected Comparator comparator; + protected ResourceInformation dominantResInfo = null; + + public DominantResourceUsageMultiNodeLookupPolicy() { + this.comparator = new Comparator() { + @Override + public int compare(N o1, N o2) { + if (dominantResInfo != null) { + int allocatedDiff = o1.getAllocatedResource() + .getResourceInformation(dominantResInfo.getName()) + .compareTo(o2.getAllocatedResource() + .getResourceInformation(dominantResInfo.getName())); + if (allocatedDiff == 0) { + return o1.getNodeID().compareTo(o2.getNodeID()); + } + return allocatedDiff; + } else { + int allocatedDiff = o1.getAllocatedResource() + .compareTo(o2.getAllocatedResource()); + if (allocatedDiff == 0) { + return o1.getNodeID().compareTo(o2.getNodeID()); + } + return allocatedDiff; + } + + } + }; + } + + @Override + public Iterator getPreferredNodeIterator(Collection nodes, + String partition) { + return getNodesPerPartition(partition).iterator(); + } + + @Override + public void addAndRefreshNodesSet(Collection nodes, + String partition) { + Set nodeList = new ConcurrentSkipListSet(comparator); + nodeList.addAll(nodes); + nodesPerPartition.put(partition, Collections.unmodifiableSet(nodeList)); + } + + @Override + public void addAndRefreshNodesSet(Collection nodes, + String partition, ResourceInformation dominantResInfo) { + Set nodeList = new ConcurrentSkipListSet(comparator); + nodeList.addAll(nodes); + nodesPerPartition.put(partition, Collections.unmodifiableSet(nodeList)); + this.dominantResInfo = dominantResInfo; + } + + @Override + public Set getNodesPerPartition(String partition) { + return nodesPerPartition.getOrDefault(partition, Collections.emptySet()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeLookupPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeLookupPolicy.java index 662e34d1dc6..af4b10ec374 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeLookupPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeLookupPolicy.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import java.util.Collection; @@ -54,6 +55,16 @@ */ void addAndRefreshNodesSet(Collection nodes, String partition); + /** + * Refresh working nodes set for dominant resource + * re-ordering based on the algorithm selected. + * + * @param nodes + * a collection working nm's. + */ + void addAndRefreshNodesSet(Collection nodes, String partition, + ResourceInformation dominantResInfo); + /** * Get sorted nodes per partition. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java index f9fcdfdd531..35a50dff9d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java @@ -29,6 +29,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java index c8a7e66f5fe..c61fa8c8396 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java @@ -26,6 +26,8 @@ import org.apache.commons.collections.IteratorUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -107,7 +109,7 @@ public void registerMultiNodePolicyNames( } public Iterator getMultiNodeSortIterator(Collection nodes, - String partition, String policyName) { + String partition, String policyName, ResourceInformation dominantResInfo) { // nodeLookupPolicy can be null if app is configured with invalid policy. // in such cases, use the the first node. if(policyName == null) { @@ -129,9 +131,16 @@ public void registerMultiNodePolicyNames( MultiNodeLookupPolicy policy = multiNodeSorter .getMultiNodeLookupPolicy(); - // If sorter thread is not running, refresh node set. - if (!multiNodeSorter.isSorterThreadRunning()) { - policy.addAndRefreshNodesSet(nodes, partition); + + if (this.conf.getBoolean(CapacitySchedulerConfiguration.MULTI_NODE_DOMINANT_RESOURCE_LOOKUP, + CapacitySchedulerConfiguration.DEFAULT_MULTI_NODE_DOMINANT_RESOURCE_LOOKUP)) { + // Should always refresh dominant resource info + policy.addAndRefreshNodesSet(nodes, partition, dominantResInfo); + } else { + // If sorter thread is not running, refresh node set. + if (!multiNodeSorter.isSorterThreadRunning()) { + policy.addAndRefreshNodesSet(nodes, partition); + } } return policy.getPreferredNodeIterator(nodes, partition); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceUsageMultiNodeLookupPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceUsageMultiNodeLookupPolicy.java index d765af8d136..88a75fe16a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceUsageMultiNodeLookupPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceUsageMultiNodeLookupPolicy.java @@ -18,7 +18,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo; import java.util.Comparator; import java.util.Collection; @@ -43,6 +45,7 @@ protected Map> nodesPerPartition = new ConcurrentHashMap<>(); protected Comparator comparator; + protected ResourceInfo dominantResInfo; public ResourceUsageMultiNodeLookupPolicy() { this.comparator = new Comparator() { @@ -72,6 +75,12 @@ public void addAndRefreshNodesSet(Collection nodes, nodesPerPartition.put(partition, Collections.unmodifiableSet(nodeList)); } + @Override + public void addAndRefreshNodesSet(Collection nodes, + String partition, ResourceInformation dominantResInfo) { + //todo nothing. + } + @Override public Set getNodesPerPartition(String partition) { return nodesPerPartition.getOrDefault(partition, Collections.emptySet());