diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementConstraintProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementConstraintProcessor.java index cf944a6213a..7c8c5def94d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementConstraintProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementConstraintProcessor.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest; import org.apache.hadoop.yarn.api.records.RejectionReason; import org.apache.hadoop.yarn.api.records.Resource; @@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingRequestWithPlacementAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingResponse; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.PlacementConstraintProcessor.Response; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.slf4j.Logger; @@ -48,11 +50,13 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.PriorityBlockingQueue; import java.util.stream.Collectors; /** @@ -95,6 +99,9 @@ private Response(boolean isSuccess, ApplicationId applicationId, private BatchedRequests.IteratorType iteratorType; private PlacementDispatcher placementDispatcher; + PriorityBlockingQueue priorityQueue = + new PriorityBlockingQueue(10, + Comparator.comparing(SchedulingRequestEvent::getAppPriority)); @Override @@ -203,9 +210,12 @@ private void schedulePlacedRequests(ApplicationAttemptId appAttemptId) { ApplicationId applicationId = appAttemptId.getApplicationId(); List placedSchedulingRequests = this.placementDispatcher.pullPlacedRequests(applicationId); + for (PlacedSchedulingRequest placedReq : placedSchedulingRequests) { SchedulingRequest sReq = placedReq.getSchedulingRequest(); for (SchedulerNode node : placedReq.getNodes()) { + SchedulerApplicationAttempt applicationAttempt = + this.scheduler.getApplicationAttempt(appAttemptId); final SchedulingRequest sReqClone = SchedulingRequest.newInstance(sReq.getAllocationRequestId(), sReq.getPriority(), sReq.getExecutionType(), @@ -213,21 +223,12 @@ private void schedulePlacedRequests(ApplicationAttemptId appAttemptId) { ResourceSizing.newInstance( sReq.getResourceSizing().getResources()), sReq.getPlacementConstraint()); - SchedulerApplicationAttempt applicationAttempt = - this.scheduler.getApplicationAttempt(appAttemptId); - Runnable task = () -> { - boolean success = - scheduler.attemptAllocationOnNode( - applicationAttempt, sReqClone, node); - if (!success) { - LOG.warn("Unsuccessful allocation attempt [{}] for [{}]", - placedReq.getPlacementAttempt(), sReqClone); - } - handleSchedulingResponse( - new Response(success, applicationId, sReqClone, - placedReq.getPlacementAttempt(), node)); - }; - this.schedulingThreadPool.submit(task); + + SchedulingRequestEvent schedulingRequestEvent = + new SchedulingRequestEvent(applicationAttempt, placedReq, node, + sReqClone); + this.priorityQueue.add(schedulingRequestEvent); + this.schedulingThreadPool.execute(new SchedulingRequestEventAsyncThread()); } } } @@ -338,4 +339,73 @@ private void addToRetryList(SchedulingResponse schedulerResponse, ((Response) schedulerResponse).attemptedNode); } } + + public class SchedulingRequestEvent { + + private SchedulerApplicationAttempt applicationAttempt; + private PlacedSchedulingRequest placedSchedulingRequest; + private SchedulerNode node; + private SchedulingRequest sReq; + private Priority appPriority; + + SchedulingRequestEvent(SchedulerApplicationAttempt applicationAttempt, + PlacedSchedulingRequest placedSchedulingRequest, SchedulerNode node, + SchedulingRequest sReq) { + this.applicationAttempt = applicationAttempt; + this.placedSchedulingRequest = placedSchedulingRequest; + this.node = node; + this.sReq = sReq; + this.appPriority = applicationAttempt.getPriority(); + } + + public SchedulerApplicationAttempt getApplicationAttempt() { + return this.applicationAttempt; + } + + public SchedulingRequest getSchedulingRequest() { + return this.sReq; + } + + public SchedulerNode getNode() { + return this.node; + } + + public PlacedSchedulingRequest getPlacedSchedulingRequest() { + return this.placedSchedulingRequest; + } + + public Priority getAppPriority() { + return this.appPriority; + } + } + + public class SchedulingRequestEventAsyncThread implements Runnable { + + @Override + public void run() { + while (true && !Thread.currentThread().isInterrupted()) { + try { + SchedulingRequestEvent event = + (SchedulingRequestEvent) priorityQueue.take(); + + boolean success = scheduler.attemptAllocationOnNode( + event.getApplicationAttempt(), event.getSchedulingRequest(), + event.getNode()); + if (!success) { + LOG.warn("Unsuccessful allocation attempt [{}] for [{}]", + event.getPlacedSchedulingRequest().getPlacementAttempt(), + event.getSchedulingRequest()); + } + handleSchedulingResponse(new Response(success, + event.getApplicationAttempt().getApplicationId(), + event.getSchedulingRequest(), + event.getPlacedSchedulingRequest().getPlacementAttempt(), + event.getNode())); + } catch (InterruptedException e) { + LOG.error("Interrupted " + e); + Thread.currentThread().interrupt(); + } + } + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java index e129a750c17..99cb7f5a9b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java @@ -19,6 +19,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ExecutionType; @@ -43,6 +44,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.util.Records; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -57,12 +60,19 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.stream.Collectors; import static java.lang.Thread.sleep; import static org.apache.hadoop.yarn.api.records.RejectionReason.COULD_NOT_PLACE_ON_NODE; import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE; import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag; +import static org.junit.Assert.assertEquals; import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetCardinality; import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn; import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetNotIn; @@ -90,6 +100,7 @@ public void createAndStartRM() { YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER); conf.setInt( YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS, 1); + conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10); startRM(conf); } @@ -867,4 +878,128 @@ private static void verifyMetrics(QueueMetrics metrics, long availableMB, metrics.getAllocatedVirtualCores()); Assert.assertEquals(allocatedContainers, metrics.getAllocatedContainers()); } + + @Test(timeout = 60000) + public void testAffinityPlacementWithAppPriority() throws Exception { + HashMap nodes = new HashMap<>(); + MockNM nm1 = new MockNM("h1:1234", 8192, rm.getResourceTrackerService()); + nodes.put(nm1.getNodeId(), nm1); + MockNM nm2 = new MockNM("h2:1234", 8192, rm.getResourceTrackerService()); + nodes.put(nm2.getNodeId(), nm2); + MockNM nm3 = new MockNM("h3:1234", 8192, rm.getResourceTrackerService()); + nodes.put(nm3.getNodeId(), nm3); + MockNM nm4 = new MockNM("h4:1234", 8192, rm.getResourceTrackerService()); + nodes.put(nm4.getNodeId(), nm4); + nm1.registerNode(); + nm2.registerNode(); + nm3.registerNode(); + nm4.registerNode(); + + Resource resource1 = Records.newRecord(Resource.class); + resource1.setMemorySize(1 * GB); + // App1 with lowest priority + RMApp app1 = rm.submitApp(resource1, "app1", "user", null, false, "default", + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, null, null, true, false, + false, null, 0, null, true, Priority.newInstance(5)); + + // Containers with allocationTag 'foo' should be placed where + // containers with allocationTag 'bar' are already running + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2, + Collections.singletonMap(Collections.singleton("foo"), + PlacementConstraints.build( + PlacementConstraints.targetIn(NODE, allocationTag("bar"))))); + am1.addSchedulingRequest( + Arrays.asList(schedulingRequest(1, 12, 1, 512, "foo"), + schedulingRequest(1, 11, 1, 512, "bar"))); + + Resource resource2 = Records.newRecord(Resource.class); + resource2.setMemorySize(1 * GB); + // App2 with highest priority + RMApp app2 = rm.submitApp(resource2, "app2", "user1", null, false, + "default", + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, null, null, true, false, + false, null, 0, null, true, Priority.newInstance(8)); + + // Containers with allocationTag 'foo' should be placed where + // containers with allocationTag 'bar' are already running + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2, + Collections.singletonMap(Collections.singleton("foo"), + PlacementConstraints.build( + PlacementConstraints.targetIn(NODE, allocationTag("bar"))))); + am2.addSchedulingRequest( + Arrays.asList(schedulingRequest(1, 22, 1, 512, "foo"), + schedulingRequest(1, 21, 1, 512, "bar"))); + + ArrayList ams = new ArrayList(); + ams.add(am1); + ams.add(am2); + + ExecutorService execService = Executors.newFixedThreadPool(10); + ExecutorCompletionService> completionService = + new ExecutorCompletionService>(execService); + for(MockAM am : ams) { + completionService.submit(new TestCallable(am, nodes)); + } + execService.shutdown(); + + // Sleep for sometime so that all futures would be collected for sure + Thread.sleep(1000); + + List>> futures = new ArrayList<>(); + + while (!execService.isTerminated()) { + Future> future = completionService.take(); + futures.add(future); + } + + // Since app2 priority is higher than app1, placement and + // follow up remaining tasks as well happens first for app2. + // Retrieve 1st future and ensure it belongs to app2 + Future> app2Containers = futures.get(0); + + assertEquals(app2.getApplicationId(), app2Containers.get().get(0).getId() + .getApplicationAttemptId().getApplicationId()); + + Set nodeIds = app2Containers.get().stream().map(x -> x.getNodeId()) + .collect(Collectors.toSet()); + // Ensure all containers end up on the same node (affinity) + Assert.assertEquals(1, nodeIds.size()); + + Future> app1Containers = futures.get(1); + + assertEquals(app1.getApplicationId(), app1Containers.get().get(0).getId() + .getApplicationAttemptId().getApplicationId()); + + Set nodeIds1 = app1Containers.get().stream().map(x -> x.getNodeId()) + .collect(Collectors.toSet()); + // Ensure all containers end up on the same node (affinity) + Assert.assertEquals(1, nodeIds1.size()); + } + + public class TestCallable implements Callable> { + + private MockAM am; + private HashMap nodes; + + TestCallable(MockAM am, HashMap nodes) { + this.am = am; + this.nodes = nodes; + } + + @Override + public List call() throws Exception { + AllocateResponse allocResponse; + List allocatedContainers = new ArrayList<>(10); + try { + allocResponse = am.schedule(); + allocatedContainers.addAll(allocResponse.getAllocatedContainers()); + waitForContainerAllocation(nodes.values(), am, allocatedContainers, + new ArrayList<>(), 2); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + return allocatedContainers; + } + } }