diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index f481de5..8db5019 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -169,7 +170,11 @@ // Containers whose AMs have been warned that they will be preempted soon. private List warnedContainers = new ArrayList(); - + + private ConcurrentHashMap> + nodeLocalApps = + new ConcurrentHashMap>(); + protected boolean sizeBasedWeight; // Give larger weights to larger jobs protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not @@ -730,6 +735,9 @@ private synchronized void removeApplication(ApplicationId applicationId, LOG.warn("Couldn't find application " + applicationId); return; } + for (Map apps : nodeLocalApps.values()) { + apps.remove(applicationId); + } application.stop(finalState); applications.remove(applicationId); } @@ -834,6 +842,8 @@ protected synchronized void completedContainer(RMContainer rmContainer, private synchronized void addNode(RMNode node) { FSSchedulerNode schedulerNode = new FSSchedulerNode(node, usePortForNodeName); nodes.put(node.getNodeID(), schedulerNode); + nodeLocalApps.put(node.getHostName(), + new ConcurrentHashMap()); Resources.addTo(clusterResource, node.getTotalCapability()); updateRootQueueMetrics(); updateMaximumAllocation(schedulerNode, true); @@ -874,6 +884,7 @@ private synchronized void removeNode(RMNode rmNode) { } nodes.remove(rmNode.getNodeID()); + nodeLocalApps.remove(rmNode.getHostName()); queueMgr.getRootQueue().setSteadyFairShare(clusterResource); queueMgr.getRootQueue().recomputeSteadyShares(); updateMaximumAllocation(node, false); @@ -896,15 +907,62 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, // Sanity check SchedulerUtils.normalizeRequests(ask, DOMINANT_RESOURCE_CALCULATOR, - clusterResource, minimumAllocation, getMaximumResourceCapability(), - incrAllocation); + clusterResource, minimumAllocation, getMaximumResourceCapability(), + incrAllocation); // Record container allocation start time application.recordContainerRequestTime(getClock().getTime()); + // Check if any of the asked containers have relaxed-locality = false + // if so, record it.. + for (ResourceRequest req : ask) { + // If locality is a requirement + if (!req.getRelaxLocality() + && !req.getResourceName().equals(ResourceRequest.ANY)) { + Map m = + new ConcurrentHashMap(); + m.put(application.getApplicationId(), new AtomicInteger(0)); + Map appMap = + nodeLocalApps.putIfAbsent(req.getResourceName(), m); + appMap = (appMap == null) ? + nodeLocalApps.get(req.getResourceName()) : appMap; + if (appMap != null) { + AtomicInteger count = + appMap.putIfAbsent( + application.getApplicationId(), new AtomicInteger(0)); + count = (count == null) ? + appMap.get(application.getApplicationId()) : count; + count.incrementAndGet(); + } + } + } + + for (ContainerId rel : release) { + FSAppAttempt schedulerApp = + getSchedulerApp(application.getApplicationAttemptId()); + RMContainer rmContainer = schedulerApp.getRMContainer(rel); + List relReqs = rmContainer.getResourceRequests(); + for (ResourceRequest req : relReqs) { + FSSchedulerNode node = nodes.get(rmContainer.getAllocatedNode()); + if (node != null) { + Map m = + nodeLocalApps.get(node.getNodeName()); + if (m != null && !req.getRelaxLocality()) { + AtomicInteger count = m.get(application.getApplicationId()); + if (count != null) { + if (count.decrementAndGet() == 0) { + m.remove(application.getApplicationId()); + } + } + } + } + } + } + // Release containers releaseContainers(release, application); + synchronized (application) { if (!ask.isEmpty()) { if (LOG.isDebugEnabled()) { @@ -946,6 +1004,7 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, Resource headroom = application.getHeadroom(); application.setApplicationHeadroomForMetrics(headroom); + return new Allocation(allocation.getContainerList(), headroom, preemptionContainerIds, null, null, allocation.getNMTokenList()); } @@ -1085,7 +1144,11 @@ private synchronized void attemptScheduling(FSSchedulerNode node) { assignedContainer = true; } if (!assignedContainer) { break; } - if (!assignMultiple) { break; } + Map nodeLocalAsks = + nodeLocalApps.get(node.getNodeName()); + boolean hasStrictLocalityReqs = + (nodeLocalAsks != null) && !nodeLocalAsks.isEmpty(); + if (!assignMultiple && !hasStrictLocalityReqs) { break; } if ((assignedContainers >= maxAssign) && (maxAssign > 0)) { break; } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index 23b708a..223496e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -42,6 +42,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics; + +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; @@ -133,8 +135,15 @@ protected ApplicationAttemptId createSchedulingRequest( } protected ApplicationAttemptId createSchedulingRequest( + int memory, int vcores, String queueId, String userId, int numContainers, + int priority) { + return createSchedulingRequest(memory, vcores, queueId, userId, numContainers, + priority, null); + } + + protected ApplicationAttemptId createSchedulingRequest( int memory, int vcores, String queueId, String userId, int numContainers, - int priority) { + int priority, RMNode node) { ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); scheduler.addApplication(id.getApplicationId(), queueId, userId, false); // This conditional is for testAclSubmitApplication where app is rejected @@ -143,9 +152,16 @@ protected ApplicationAttemptId createSchedulingRequest( scheduler.addApplicationAttempt(id, false, false); } List ask = new ArrayList(); - ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY, - priority, numContainers, true); + ResourceRequest request = + createResourceRequest(memory, vcores, ResourceRequest.ANY, + priority, numContainers, true); ask.add(request); + if (node != null) { + ask.add(createResourceRequest(memory, vcores, node.getHostName(), + priority, numContainers, false)); + ask.add(createResourceRequest(memory, vcores, node.getRackName(), + priority, numContainers, true)); + } RMApp rmApp = mock(RMApp.class); RMAppAttempt rmAppAttempt = mock(RMAppAttempt.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 69e0a8c..91a6de9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -936,7 +936,99 @@ public void testContainerReservationNotExceedingQueueMax() throws Exception { getResourceUsage().getMemory()); } - + + @Test (timeout = 500000) + public void testAssignMultipleForStrictLocality() throws Exception { + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(4096, 5), 1, + "127.0.0.1"); + // Add another node + RMNode node2 = + MockNodes.newNodeInfo(1, Resources.createResource(2048, 5), 2, + "127.0.0.2"); + + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + scheduler.handle(nodeEvent1); + scheduler.handle(nodeEvent2); + + scheduler.update(); + + // Create multiple strictly node-local scheduling requests... + ApplicationAttemptId app1 = + createSchedulingRequest(1024, 1, "queue1", "user1", 1, 1, node1); + ApplicationAttemptId app2 = + createSchedulingRequest(1024, 1, "queue1", "user1", 1, 1, node1); + ApplicationAttemptId app3 = + createSchedulingRequest(1024, 1, "queue1", "user1", 1, 1, node1); + + scheduler.update(); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); + scheduler.handle(updateEvent); + + // Ensure multiple assignments have happened in a single + // Scheduler update + assertEquals(3072, scheduler.getQueueManager().getQueue("queue1"). + getResourceUsage().getMemory()); + + // release app1 container + ContainerId c1 = scheduler.getSchedulerApp(app1) + .getLiveContainers().iterator().next().getContainerId(); + List rel = new ArrayList(); + rel.add(c1); + scheduler.allocate(app1, new ArrayList(), rel, + new ArrayList(), new ArrayList()); + + scheduler.update(); + scheduler.handle(updateEvent); + + // Ensure queue resource has gone down + assertEquals(2048, scheduler.getQueueManager().getQueue("queue1"). + getResourceUsage().getMemory()); + + // release remaining container + ContainerId c2 = scheduler.getSchedulerApp(app2) + .getLiveContainers().iterator().next().getContainerId(); + rel = new ArrayList(); + rel.add(c2); + scheduler.allocate(app2, new ArrayList(), rel, + new ArrayList(), new ArrayList()); + scheduler.update(); + scheduler.handle(updateEvent); + // release app2 container + ContainerId c3 = scheduler.getSchedulerApp(app3) + .getLiveContainers().iterator().next().getContainerId(); + rel = new ArrayList(); + rel.add(c3); + scheduler.allocate(app3, new ArrayList(), rel, + new ArrayList(), new ArrayList()); + + scheduler.update(); + scheduler.handle(updateEvent); + + // Ensure queue resource has gone down + assertEquals(0, scheduler.getQueueManager().getQueue("queue1"). + getResourceUsage().getMemory()); + + // Create multiple relaxed locality scheduling requests... + createSchedulingRequest(1024, 1, "queue1", "user1"); + createSchedulingRequest(1024, 1, "queue1", "user1"); + createSchedulingRequest(1024, 1, "queue1", "user1"); + + scheduler.update(); + scheduler.handle(updateEvent); + + // Ensure only s single assignment has happened in a single + // Scheduler update + assertEquals(1024, scheduler.getQueueManager().getQueue("queue1"). + getResourceUsage().getMemory()); + + } @Test