diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java index 7e12aae5bed..dbc47d777af 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java @@ -402,6 +402,14 @@ private void updateMaxResources(SchedulerNode node, boolean add) { return sortedList; } + public Map getNodeNameToNodeMap() { + return nodeNameToNodeMap; + } + + public Map> getNodesPerRack() { + return nodesPerRack; + } + /** * Convenience method to return list of nodes corresponding to resourceName * passed in the {@link ResourceRequest}. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 282367edbaa..3d0ea64a690 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; @@ -104,11 +105,13 @@ import java.util.Collections; import java.util.Comparator; import java.util.EnumSet; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; @@ -488,7 +491,7 @@ protected void addApplication(ApplicationId applicationId, // requested is greater than 0 and we have 0 // of that resource on the queue. List invalidAMResourceRequests = - validateResourceRequests(rmApp.getAMResourceRequests(), queue); + validateCapabilityInResourceRequests(rmApp.getAMResourceRequests(), queue); if (!invalidAMResourceRequests.isEmpty()) { String msg = String.format( @@ -906,10 +909,14 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, return EMPTY_ALLOCATION; } + // Validate resourceName in resource request + validateResourceNameInResourceRequests(ask, appAttemptId); + + // Validate capability in resource request ApplicationId applicationId = application.getApplicationId(); FSLeafQueue queue = application.getQueue(); List invalidAsks = - validateResourceRequests(ask, queue); + validateCapabilityInResourceRequests(ask, queue); // We need to be fail-fast here if any invalid ask is detected. // If we would have thrown exception later, this could be problematic as @@ -994,7 +1001,104 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, previousAttemptContainers); } - private List validateResourceRequests( + public Map>> + replayResourceRequests(List ask) { + //Key -> Priority + //Value -> Map + //Key-> Resource Capability + //Value->Map + //Key-> ResourceName (e.g., hostname, rackname, *) + //Value->ResourceRequest + Map>> requestsTable = + new TreeMap>>(); + for (ResourceRequest request : ask) { + Priority priority = request.getPriority(); + Resource capability = request.getCapability(); + String resourceName = request.getResourceName(); + + Map> requests = + requestsTable.get(priority); + if (requests == null){ + requests = new HashMap>(); + requestsTable.put(priority, requests); + } + Map reqMap = requests.get(capability); + if (reqMap == null) { + reqMap = new HashMap(); + requests.put(capability, reqMap); + } + reqMap.put(resourceName, request); + } + + return requestsTable; + } + + /** + * Filter nonexistent resource name in this cluster. + * If request requires to relax locality, change it to Any. + * Otherwise, remove this request directly. + * + * @param ask application resource request list + * @param appAttemptId + */ + public void validateResourceNameInResourceRequests( + List ask, ApplicationAttemptId appAttemptId) { + Map>> requestsTable = + replayResourceRequests(ask); + + for(Map.Entry>> entry : requestsTable.entrySet()) { + Priority priority = entry.getKey(); + Map> requests = entry.getValue(); + // Because Yarn assume that there's only one container size per priority, + // we only deal with this situation. + if (requests.keySet().size() != 1){ + LOG.warn("application: " + appAttemptId + + " request different size container in one priority," + + " priority: " + priority + "," + + " different container size num: " + requests.keySet().size()); + continue; + } + Resource capability = requests.keySet().iterator().next(); + ResourceRequest anyRequest = null; + int anyRequestContainerNewAdd = 0; + for (Map reqMap : requests.values()) { + for (ResourceRequest request : reqMap.values()){ + String resourceName = request.getResourceName(); + if (ResourceRequest.ANY.equals(resourceName)){ + anyRequest = request; + } else if (!nodeTracker.getNodeNameToNodeMap().keySet() + .contains(resourceName) && + !nodeTracker.getNodesPerRack().keySet().contains(resourceName)) { + ask.remove(request); + if (request.getRelaxLocality()){ + anyRequestContainerNewAdd += request.getNumContainers(); + LOG.debug("application: " + appAttemptId + + " request resource name: " + resourceName + + " not exist in this cluster, will change it to ANY."); + } else { + LOG.debug("application: " + appAttemptId + + " request resource name: " + resourceName + + " not exist in this cluster, will remove it."); + } + } + } + } + + if (anyRequestContainerNewAdd == 0) { + continue; + } + if (anyRequest != null) { + anyRequest.setNumContainers(anyRequest.getNumContainers() + + anyRequestContainerNewAdd); + } else { + ask.add(ResourceRequest.newInstance(priority, ResourceRequest.ANY, + capability, anyRequestContainerNewAdd)); + } + } + } + + private List validateCapabilityInResourceRequests( List requests, FSLeafQueue queue) { List validationResults = Lists.newArrayList();