diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index f481de5..dbeb22f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -169,6 +169,9 @@ // Containers whose AMs have been warned that they will be preempted soon. private List warnedContainers = new ArrayList(); + + private ConcurrentHashMap> nodeLocalApps = + new ConcurrentHashMap>(); protected boolean sizeBasedWeight; // Give larger weights to larger jobs protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster @@ -730,6 +733,9 @@ private synchronized void removeApplication(ApplicationId applicationId, LOG.warn("Couldn't find application " + applicationId); return; } + for (Set apps : nodeLocalApps.values()) { + apps.remove(applicationId); + } application.stop(finalState); applications.remove(applicationId); } @@ -834,6 +840,7 @@ protected synchronized void completedContainer(RMContainer rmContainer, private synchronized void addNode(RMNode node) { FSSchedulerNode schedulerNode = new FSSchedulerNode(node, usePortForNodeName); nodes.put(node.getNodeID(), schedulerNode); + nodeLocalApps.put(node.getHostName(), new HashSet()); Resources.addTo(clusterResource, node.getTotalCapability()); updateRootQueueMetrics(); updateMaximumAllocation(schedulerNode, true); @@ -874,6 +881,7 @@ private synchronized void removeNode(RMNode rmNode) { } nodes.remove(rmNode.getNodeID()); + nodeLocalApps.remove(rmNode.getHostName()); queueMgr.getRootQueue().setSteadyFairShare(clusterResource); queueMgr.getRootQueue().recomputeSteadyShares(); updateMaximumAllocation(node, false); @@ -946,6 +954,22 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, Resource headroom = application.getHeadroom(); application.setApplicationHeadroomForMetrics(headroom); + + // Check if any of the asked containers have relaxed-locality = false + // if so, record it.. + for (ResourceRequest req : ask) { + // If locality is a requirement + if (!req.getRelaxLocality() + && !req.getResourceName().equals(ResourceRequest.ANY)) { + Set reqAppContainers = + nodeLocalApps.putIfAbsent( + req.getResourceName(), new HashSet()); + reqAppContainers = + (reqAppContainers == null) ? nodeLocalApps.get(req + .getResourceName()) : reqAppContainers; + reqAppContainers.add(application.getApplicationId()); + } + } return new Allocation(allocation.getContainerList(), headroom, preemptionContainerIds, null, null, allocation.getNMTokenList()); } @@ -1085,7 +1109,11 @@ private synchronized void attemptScheduling(FSSchedulerNode node) { assignedContainer = true; } if (!assignedContainer) { break; } - if (!assignMultiple) { break; } + Set nodeLocalAsks = + nodeLocalApps.get(node.getNodeName()); + boolean hasStrictLocalityReqs = + (nodeLocalAsks != null) && !nodeLocalAsks.isEmpty(); + if (!assignMultiple && !hasStrictLocalityReqs) { break; } if ((assignedContainers >= maxAssign) && (maxAssign > 0)) { break; } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index 23b708a..5e0aeda 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -133,8 +133,15 @@ protected ApplicationAttemptId createSchedulingRequest( } protected ApplicationAttemptId createSchedulingRequest( + int memory, int vcores, String queueId, String userId, int numContainers, + int priority) { + return createSchedulingRequest(memory, vcores, queueId, userId, numContainers, + priority, ResourceRequest.ANY, true); + } + + protected ApplicationAttemptId createSchedulingRequest( int memory, int vcores, String queueId, String userId, int numContainers, - int priority) { + int priority, String hostName, boolean relaxLocality) { ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); scheduler.addApplication(id.getApplicationId(), queueId, userId, false); // This conditional is for testAclSubmitApplication where app is rejected @@ -143,9 +150,14 @@ protected ApplicationAttemptId createSchedulingRequest( scheduler.addApplicationAttempt(id, false, false); } List ask = new ArrayList(); - ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY, - priority, numContainers, true); + ResourceRequest request = + createResourceRequest(memory, vcores, hostName, + priority, numContainers, relaxLocality); ask.add(request); + if (!hostName.equals(ResourceRequest.ANY)) { + ask.add(createResourceRequest(memory, vcores, ResourceRequest.ANY, + priority, numContainers, true)); + } RMApp rmApp = mock(RMApp.class); RMAppAttempt rmAppAttempt = mock(RMAppAttempt.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 69e0a8c..0ecb251 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -936,7 +936,44 @@ public void testContainerReservationNotExceedingQueueMax() throws Exception { getResourceUsage().getMemory()); } - + + @Test (timeout = 500000) + public void testAssignMultipleForStrictLocality() throws Exception { + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(4096, 5), 1, + "127.0.0.1"); + // Add another node + RMNode node2 = + MockNodes.newNodeInfo(1, Resources.createResource(2048, 5), 2, + "127.0.0.2"); + + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + scheduler.handle(nodeEvent1); + scheduler.handle(nodeEvent2); + + scheduler.update(); + + // Create multiple strictly node-local scheduling requests... + createSchedulingRequest(2048, 1, "queue1", "user1", 1, 1, "127.0.0.1", + false); + createSchedulingRequest(2048, 1, "queue1", "user1", 1, 1, "127.0.0.1", + false); + + scheduler.update(); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); + scheduler.handle(updateEvent); + + // Ensure multiple assignments have happened in a single + // Scheduler update + assertEquals(4096, scheduler.getQueueManager().getQueue("queue1"). + getResourceUsage().getMemory()); + } @Test