diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementConstraintProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementConstraintProcessor.java index c4c95741df4..928d491dd33 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementConstraintProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementConstraintProcessor.java @@ -17,7 +17,19 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor; -import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.FutureTask; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext; import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor; import org.apache.hadoop.yarn.ams.ApplicationMasterServiceUtils; @@ -36,24 +48,17 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingRequestWithPlacementAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingResponse; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.stream.Collectors; +import com.google.common.collect.Lists; /** * An ApplicationMasterServiceProcessor that performs Constrained placement of @@ -86,7 +91,7 @@ private Response(boolean isSuccess, ApplicationId applicationId, private static final Logger LOG = LoggerFactory.getLogger(PlacementConstraintProcessor.class); - private ExecutorService schedulingThreadPool; + private PriorityExecutor schedulingThreadPool; private int retryAttempts; private Map> requestsToRetry = new ConcurrentHashMap<>(); @@ -96,7 +101,6 @@ private Response(boolean isSuccess, ApplicationId applicationId, private BatchedRequests.IteratorType iteratorType; private PlacementDispatcher placementDispatcher; - @Override public void init(ApplicationMasterServiceContext amsContext, ApplicationMasterServiceProcessor nextProcessor) { @@ -140,7 +144,8 @@ public void init(ApplicationMasterServiceContext amsContext, int schedPSize = ((RMContextImpl) amsContext).getYarnConfiguration().getInt( YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_SCHEDULER_POOL_SIZE, YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_SCHEDULER_POOL_SIZE); - this.schedulingThreadPool = Executors.newFixedThreadPool(schedPSize); + this.schedulingThreadPool = + (PriorityExecutor) PriorityExecutor.newFixedThreadPool(schedPSize); LOG.info("Scheduler pool size [{}]", schedPSize); // Number of times a request that is not satisfied by the scheduler @@ -224,18 +229,17 @@ private void schedulePlacedRequests(ApplicationAttemptId appAttemptId) { SchedulerApplicationAttempt applicationAttempt = this.scheduler.getApplicationAttempt(appAttemptId); Runnable task = () -> { - boolean success = - scheduler.attemptAllocationOnNode( - applicationAttempt, sReqClone, node); + boolean success = scheduler + .attemptAllocationOnNode(applicationAttempt, sReqClone, node); if (!success) { LOG.warn("Unsuccessful allocation attempt [{}] for [{}]", placedReq.getPlacementAttempt(), sReqClone); } - handleSchedulingResponse( - new Response(success, applicationId, sReqClone, - placedReq.getPlacementAttempt(), node)); + handleSchedulingResponse(new Response(success, applicationId, + sReqClone, placedReq.getPlacementAttempt(), node)); }; - this.schedulingThreadPool.submit(task); + this.schedulingThreadPool.execute(task, + applicationAttempt.getPriority().getPriority()); } } } @@ -346,4 +350,37 @@ private void addToRetryList(SchedulingResponse schedulerResponse, ((Response) schedulerResponse).attemptedNode); } } + + private static class PriorityExecutor extends ThreadPoolExecutor { + + private PriorityExecutor(int corePoolSize, int maximumPoolSize, + long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); + } + + public static ExecutorService newFixedThreadPool(int nThreads) { + return new PriorityExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, + new PriorityBlockingQueue()); + } + + public void execute(Runnable command, int priority) { + super.execute(new ComparableFutureTask(command, null, priority)); + } + } + + private static class ComparableFutureTask extends FutureTask + implements Comparable> { + + volatile int priority = 0; + + private ComparableFutureTask(Runnable runnable, T result, int priority) { + super(runnable, result); + this.priority = priority; + } + + @Override + public int compareTo(ComparableFutureTask o) { + return Integer.valueOf(priority).compareTo(o.priority); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java index e129a750c17..1054f3b409f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java @@ -17,8 +17,33 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; +import static java.lang.Thread.sleep; +import static org.apache.hadoop.yarn.api.records.RejectionReason.COULD_NOT_PLACE_ON_NODE; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetCardinality; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetNotIn; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag; +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ExecutionType; @@ -39,33 +64,18 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.util.Records; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; - -import static java.lang.Thread.sleep; -import static org.apache.hadoop.yarn.api.records.RejectionReason.COULD_NOT_PLACE_ON_NODE; -import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE; -import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag; -import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetCardinality; -import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn; -import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetNotIn; +import com.google.common.base.Supplier; /** * This tests end2end workflow of the constraint placement framework. @@ -90,6 +100,7 @@ public void createAndStartRM() { YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER); conf.setInt( YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS, 1); + conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10); startRM(conf); } @@ -867,4 +878,123 @@ private static void verifyMetrics(QueueMetrics metrics, long availableMB, metrics.getAllocatedVirtualCores()); Assert.assertEquals(allocatedContainers, metrics.getAllocatedContainers()); } + + @Test(timeout = 30000) + public void testAntiAffinityPlacementWithAppPriority() throws Exception { + HashMap nodes = new HashMap<>(); + MockNM nm1 = new MockNM("h1:1234", 8192, rm.getResourceTrackerService()); + nodes.put(nm1.getNodeId(), nm1); + MockNM nm2 = new MockNM("h2:1234", 8192, rm.getResourceTrackerService()); + nodes.put(nm2.getNodeId(), nm2); + MockNM nm3 = new MockNM("h3:1234", 8192, rm.getResourceTrackerService()); + nodes.put(nm3.getNodeId(), nm3); + MockNM nm4 = new MockNM("h4:1234", 8192, rm.getResourceTrackerService()); + nodes.put(nm4.getNodeId(), nm4); + nm1.registerNode(); + nm2.registerNode(); + nm3.registerNode(); + nm4.registerNode(); + + Resource resource = Records.newRecord(Resource.class); + resource.setMemorySize(1 * GB); + + // App1 with lowest priority + RMApp app1 = rm.submitApp(resource, "app1", "user", null, false, "default", + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, null, null, true, false, + false, null, 0, null, true, Priority.newInstance(8)); + + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1, + Collections.singletonMap(Collections.singleton("foo"), + PlacementConstraints.build( + PlacementConstraints.targetNotIn(NODE, allocationTag("foo"))))); + am1.addSchedulingRequest( + Arrays.asList(schedulingRequest(1, 11, 1, 512, "foo"), + schedulingRequest(1, 12, 1, 512, "foo"))); + + // App2 with highest priority + RMApp app2 = rm.submitApp(resource, "app2", "user1", null, false, + "default", + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, null, null, true, false, + false, null, 0, null, true, Priority.newInstance(6)); + + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2, + Collections.singletonMap(Collections.singleton("foo"), + PlacementConstraints.build( + PlacementConstraints.targetNotIn(NODE, allocationTag("foo"))))); + am2.addSchedulingRequest( + Arrays.asList(schedulingRequest(1, 21, 1, 512, "foo"), + schedulingRequest(1, 22, 1, 512, "foo"))); + + ArrayList ams = new ArrayList(); + ams.add(am1); + ams.add(am2); + + ExecutorService execService = Executors.newFixedThreadPool(2); + ExecutorCompletionService> completionService = + new ExecutorCompletionService>(execService); + for(MockAM am : ams) { + completionService.submit(new TestCallable(am, nodes)); + } + execService.shutdown(); + + GenericTestUtils.waitFor(new Supplier() { + public Boolean get() { + if (rm.getResourceScheduler() + .getSchedulerAppInfo(am1.getApplicationAttemptId()) + .getLiveContainers().size() == 3) { + return true; + } else { + return false; + } + } + }, 1000, 5000); + + List containerList = new ArrayList<>(); + + for (MockAM am : ams) { + Collection liveContainers = rm.getResourceScheduler() + .getSchedulerAppInfo(am.getApplicationAttemptId()) + .getLiveContainers(); + for (RMContainer liveContainer : liveContainers) { + if (!liveContainer.isAMContainer()) { + containerList.add(liveContainer); + } + } + } + + // sort based on creation time and ensure app2 container has been created + // first as it has higher priority than app1 + containerList + .sort((o1, o2) -> new Long(o1.getCreationTime()) + .compareTo(new Long(o2.getCreationTime()))); + assertEquals(am1.getApplicationAttemptId(), + containerList.get(3).getContainerId().getApplicationAttemptId()); + } + + public class TestCallable implements Callable> { + + private MockAM am; + private HashMap nodes; + + TestCallable(MockAM am, HashMap nodes) { + this.am = am; + this.nodes = nodes; + } + + @Override + public List call() throws Exception { + AllocateResponse allocResponse; + List allocatedContainers = new ArrayList<>(10); + try { + allocResponse = am.schedule(); + allocatedContainers.addAll(allocResponse.getAllocatedContainers()); + waitForContainerAllocation(nodes.values(), am, allocatedContainers, + new ArrayList<>(), 2); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + return allocatedContainers; + } + } }