diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index b0c6df4..b55766b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -372,6 +372,7 @@ else if (allowed.equals(NodeType.RACK_LOCAL) && this.resetAllowedLocalityLevel(priority, type); } } + resetSchedulingOpportunities(priority); // Required sanity check - AM can call 'allocate' to update resource // request without locking the scheduler, hence we need to check diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 0e114e1..01f61d7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -4488,4 +4488,128 @@ public void testPerfMetricsInited() { assertEquals("Incorrect number of perf metrics", 1, collector.getRecords().size()); } + + @Test + public void testFairSchedulerLocality() throws Exception { + // Set locality.threshold as 1.0, nodes num as 1, so if container is not be + // assigned more than threshold*nodes=1 times, the priority of its + // allowLocality will be degraded. + conf.set(FairSchedulerConfiguration.LOCALITY_THRESHOLD_NODE, "1.0"); + conf.set(FairSchedulerConfiguration.LOCALITY_THRESHOLD_RACK, "1.0"); + + scheduler.init(conf); + scheduler.start(); + + int priorityValue; + int liveContainers; + Priority priority; + FSAppAttempt fsAppAttempt; + ResourceRequest request1; + ResourceRequest request2; + ResourceRequest request3; + ApplicationAttemptId id11; + Map requests; + + priorityValue = 1; + id11 = createAppAttemptId(1, 1); + createMockRMApp(id11); + priority = Priority.newInstance(priorityValue); + scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1", + false); + scheduler.addApplicationAttempt(id11, false, false); + fsAppAttempt = scheduler.getApplicationAttempt(id11); + assertTrue((fsAppAttempt.getSchedulingOpportunities(priority) == 0 ? true + : false)); + + String hostName = "127.0.0.1"; + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(16 * 1024, 16), 1, + hostName); + List ask1 = new ArrayList<>(); + request1 = + createResourceRequest(1024, 8, node1.getHostName(), priorityValue, 1, + true); + request2 = + createResourceRequest(1024, 8, node1.getRackName(), priorityValue, 1, + true); + request3 = + createResourceRequest(1024, 8, ResourceRequest.ANY, priorityValue, 1, + true); + ask1.add(request1); + ask1.add(request2); + ask1.add(request3); + scheduler.allocate(id11, ask1, new ArrayList(), null, null); + requests = fsAppAttempt.getResourceRequests(priority); + + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + FSSchedulerNode node = + (FSSchedulerNode) scheduler.getSchedulerNode(node1.getNodeID()); + // Ensure the rackRequest is not null + assertTrue(requests.get(node1.getHostName()) != null ? true : false); + liveContainers = scheduler.getSchedulerApp(id11).getLiveContainers().size(); + scheduler.attemptScheduling(node); + // The localRequest is dealt and the live containers will be add to 1 + assertEquals(liveContainers + 1, scheduler.getSchedulerApp(id11) + .getLiveContainers().size()); + assertTrue(requests.get(node1.getHostName()) == null ? true : false); + + // The container is assigned successfully and the schedulingOpportunities + // will be reset to 0. If not be set to 0, the container will be degraded + // and assigned by rackRequest in this iteration + assertTrue((fsAppAttempt.getSchedulingOpportunities(priority) == 0 ? true + : false)); + List ask2 = new ArrayList<>(); + request1 = + createResourceRequest(1024, 8, node1.getRackName(), priorityValue, 1, + true); + request2 = + createResourceRequest(1024, 8, ResourceRequest.ANY, priorityValue, 1, + true); + ask2.add(request1); + ask2.add(request2); + scheduler.allocate(id11, ask2, new ArrayList(), null, null); + requests = fsAppAttempt.getResourceRequests(priority); + // Ensure the rackRequest is not null + assertTrue(requests.get(node1.getRackName()) != null ? true : false); + liveContainers = scheduler.getSchedulerApp(id11).getLiveContainers().size(); + scheduler.attemptScheduling(node); + // The live containers num will be not changed + assertEquals(liveContainers, scheduler.getSchedulerApp(id11) + .getLiveContainers().size()); + // Verify the rackRequest is not be dealt with + assertTrue(requests.get(node1.getRackName()) != null ? true : false); + + // The container is not be assigned and the schedulingOpportunities num will + // be increased + assertTrue((fsAppAttempt.getSchedulingOpportunities(priority) == 1 ? true + : false)); + List ask3 = new ArrayList<>(); + request1 = + createResourceRequest(1024, 8, node1.getHostName(), priorityValue, 1, + true); + request2 = + createResourceRequest(1024, 8, node1.getRackName(), priorityValue, 1, + true); + request3 = + createResourceRequest(1024, 8, ResourceRequest.ANY, priorityValue, 1, + true); + ask3.add(request1); + ask3.add(request2); + ask3.add(request3); + scheduler.allocate(id11, ask3, new ArrayList(), null, null); + requests = fsAppAttempt.getResourceRequests(priority); + // Ensure the localRequest is not null + assertTrue(requests.get(node1.getHostName()) != null ? true : false); + liveContainers = scheduler.getSchedulerApp(id11).getLiveContainers().size(); + scheduler.attemptScheduling(node); + // The localRequest is dealt and the live containers will be reach 2 + assertEquals(liveContainers + 1, scheduler.getSchedulerApp(id11) + .getLiveContainers().size()); + // The container is assigned successfully and the schedulingOpportunities + // will be reset to 0 + assertTrue((fsAppAttempt.getSchedulingOpportunities(priority) == 0 ? true + : false)); + assertTrue(requests.get(node1.getHostName()) == null ? true : false); + } }