diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index f481de5..055db5a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -169,6 +169,9 @@ // Containers whose AMs have been warned that they will be preempted soon. private List warnedContainers = new ArrayList(); + + private ConcurrentHashMap> nodeLocalApps = + new ConcurrentHashMap>(); protected boolean sizeBasedWeight; // Give larger weights to larger jobs protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster @@ -676,8 +679,8 @@ protected synchronized void addApplicationAttempt( } } else { rmContext.getDispatcher().getEventHandler().handle( - new RMAppAttemptEvent(applicationAttemptId, - RMAppAttemptEventType.ATTEMPT_ADDED)); + new RMAppAttemptEvent(applicationAttemptId, + RMAppAttemptEventType.ATTEMPT_ADDED)); } } @@ -730,6 +733,9 @@ private synchronized void removeApplication(ApplicationId applicationId, LOG.warn("Couldn't find application " + applicationId); return; } + for (Set apps : nodeLocalApps.values()) { + apps.remove(applicationId); + } application.stop(finalState); applications.remove(applicationId); } @@ -834,6 +840,7 @@ protected synchronized void completedContainer(RMContainer rmContainer, private synchronized void addNode(RMNode node) { FSSchedulerNode schedulerNode = new FSSchedulerNode(node, usePortForNodeName); nodes.put(node.getNodeID(), schedulerNode); + nodeLocalApps.put(node.getHostName(), new HashSet()); Resources.addTo(clusterResource, node.getTotalCapability()); updateRootQueueMetrics(); updateMaximumAllocation(schedulerNode, true); @@ -874,6 +881,7 @@ private synchronized void removeNode(RMNode rmNode) { } nodes.remove(rmNode.getNodeID()); + nodeLocalApps.remove(rmNode.getHostName()); queueMgr.getRootQueue().setSteadyFairShare(clusterResource); queueMgr.getRootQueue().recomputeSteadyShares(); updateMaximumAllocation(node, false); @@ -896,8 +904,8 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, // Sanity check SchedulerUtils.normalizeRequests(ask, DOMINANT_RESOURCE_CALCULATOR, - clusterResource, minimumAllocation, getMaximumResourceCapability(), - incrAllocation); + clusterResource, minimumAllocation, getMaximumResourceCapability(), + incrAllocation); // Record container allocation start time application.recordContainerRequestTime(getClock().getTime()); @@ -946,6 +954,18 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, Resource headroom = application.getHeadroom(); application.setApplicationHeadroomForMetrics(headroom); + + // Check if any of the asked containers have relaxed-locality = false + // if so, record it.. + for (ResourceRequest req : ask) { + // If locality is a requirement + if (!req.getRelaxLocality()) { + Set reqAppContainers = + nodeLocalApps.putIfAbsent( + req.getResourceName(), new HashSet()); + reqAppContainers.add(application.getApplicationId()); + } + } return new Allocation(allocation.getContainerList(), headroom, preemptionContainerIds, null, null, allocation.getNMTokenList()); } @@ -1085,7 +1105,11 @@ private synchronized void attemptScheduling(FSSchedulerNode node) { assignedContainer = true; } if (!assignedContainer) { break; } - if (!assignMultiple) { break; } + Set nodeLocalAsks = + nodeLocalApps.get(node.getNodeName()); + boolean hasStrictLocalityReqs = + (nodeLocalAsks != null) && !nodeLocalAsks.isEmpty(); + if (!assignMultiple && !hasStrictLocalityReqs) { break; } if ((assignedContainers >= maxAssign) && (maxAssign > 0)) { break; } } }