diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java index 0ea7314..10015fc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java @@ -86,7 +86,10 @@ // Policy for mapping apps to queues @VisibleForTesting QueuePlacementPolicy placementPolicy; - + + @VisibleForTesting + Map disablePreemption; + //Configured queues in the alloc xml @VisibleForTesting Map> configuredQueues; @@ -109,7 +112,7 @@ public AllocationConfiguration(Map minQueueResources, QueuePlacementPolicy placementPolicy, Map> configuredQueues, ReservationQueueConfiguration globalReservationQueueConfig, - Set reservableQueues) { + Set reservableQueues, Map disablePreemption) { this.minQueueResources = minQueueResources; this.maxQueueResources = maxQueueResources; this.queueMaxApps = queueMaxApps; @@ -118,6 +121,7 @@ public AllocationConfiguration(Map minQueueResources, this.queueWeights = queueWeights; this.userMaxAppsDefault = userMaxAppsDefault; this.queueMaxAppsDefault = queueMaxAppsDefault; + this.disablePreemption = disablePreemption; this.queueMaxAMShareDefault = queueMaxAMShareDefault; this.defaultSchedulingPolicy = defaultSchedulingPolicy; this.schedulingPolicies = schedulingPolicies; @@ -142,6 +146,7 @@ public AllocationConfiguration(Configuration conf) { queueMaxAppsDefault = Integer.MAX_VALUE; queueMaxAMShareDefault = 0.5f; queueAcls = new HashMap>(); + disablePreemption = new HashMap(); minSharePreemptionTimeouts = new HashMap(); fairSharePreemptionTimeouts = new HashMap(); fairSharePreemptionThresholds = new HashMap(); @@ -337,4 +342,10 @@ public void setReservationWindow(long window) { public void setAverageCapacity(int avgCapacity) { globalReservationQueueConfig.setAverageCapacity(avgCapacity); } -} \ No newline at end of file + + public boolean isDisablePreemption(String queueName) { + Boolean val = disablePreemption.get(queueName); + return (val == null) ? false : val; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java index dab6d9f..509ddb2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java @@ -222,6 +222,7 @@ public synchronized void reloadAllocations() throws IOException, new HashMap(); Map> queueAcls = new HashMap>(); + Map disablePreemption = new HashMap(); Set reservableQueues = new HashSet(); int userMaxAppsDefault = Integer.MAX_VALUE; int queueMaxAppsDefault = Integer.MAX_VALUE; @@ -353,7 +354,7 @@ public synchronized void reloadAllocations() throws IOException, queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights, queuePolicies, minSharePreemptionTimeouts, fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls, configuredQueues, - reservableQueues); + reservableQueues, disablePreemption); } // Load placement policy and pass it configured queues @@ -402,7 +403,7 @@ public synchronized void reloadAllocations() throws IOException, minSharePreemptionTimeouts, fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls, newPlacementPolicy, configuredQueues, globalReservationQueueConfig, - reservableQueues); + reservableQueues, disablePreemption); lastSuccessfulReload = clock.getTime(); lastReloadAttemptFailed = false; @@ -424,7 +425,7 @@ private void loadQueue(String parentName, Element element, Map fairSharePreemptionThresholds, Map> queueAcls, Map> configuredQueues, - Set reservableQueues) + Set reservableQueues, Map disablePreemption) throws AllocationConfigurationException { String queueName = element.getAttribute("name").trim(); @@ -501,13 +502,18 @@ private void loadQueue(String parentName, Element element, isLeaf = false; reservableQueues.add(queueName); configuredQueues.get(FSQueueType.PARENT).add(queueName); + } else if ("disablePreemption".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + boolean val = Boolean.valueOf(text); + disablePreemption.put(queueName, val); } else if ("queue".endsWith(field.getTagName()) || "pool".equals(field.getTagName())) { loadQueue(queueName, field, minQueueResources, maxQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights, queuePolicies, minSharePreemptionTimeouts, fairSharePreemptionTimeouts, fairSharePreemptionThresholds, - queueAcls, configuredQueues, reservableQueues); + queueAcls, configuredQueues, reservableQueues, + disablePreemption); isLeaf = false; } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index 7d2e5b8..b65378d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -259,6 +259,9 @@ public RMContainer preemptContainer() { readLock.lock(); try { for (FSQueue queue : childQueues) { + if (queue.isDisablePreemption()) { + continue; + } if (candidateQueue == null || comparator.compare(queue, candidateQueue) > 0) { candidateQueue = queue; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index e488c76..119b5b7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -63,6 +63,8 @@ private long minSharePreemptionTimeout = Long.MAX_VALUE; private float fairSharePreemptionThreshold = 0.5f; + private boolean disablePreemption = false; + public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) { this.name = name; this.scheduler = scheduler; @@ -235,6 +237,14 @@ public void setFairSharePreemptionThreshold(float fairSharePreemptionThreshold) this.fairSharePreemptionThreshold = fairSharePreemptionThreshold; } + public boolean isDisablePreemption() { + return disablePreemption; + } + + public void setDisablePreemption(boolean disable) { + this.disablePreemption = disable; + } + /** * Recomputes the shares for all child queues and applications based on this * queue's current share @@ -263,6 +273,12 @@ public void updatePreemptionVariables() { if (fairSharePreemptionThreshold < 0 && parent != null) { fairSharePreemptionThreshold = parent.getFairSharePreemptionThreshold(); } + // For disable per-queue preemption + disablePreemption = scheduler.getAllocationConfiguration() + .isDisablePreemption(getName()); + if (!disablePreemption && parent != null) { + disablePreemption = parent.isDisablePreemption(); + } } /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java index 202eb09..38ca6b8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java @@ -186,6 +186,7 @@ public void testAllocationFileParsing() throws Exception { out.println(" 180"); out.println(" 40"); out.println(" 0.7"); + out.println(" true"); out.println(" "); out.println(""); // Set default limit of apps per queue to 15 @@ -332,6 +333,9 @@ public void testAllocationFileParsing() throws Exception { // Verify new queue gets default scheduling policy assertEquals(DominantResourceFairnessPolicy.NAME, queueConf.getSchedulingPolicy("root.newqueue").getName()); + + assertTrue(queueConf.isDisablePreemption("root.queueG.queueH")); + assertFalse(queueConf.isDisablePreemption("root.queueA")); } @Test diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index c352cc9..73f6eef 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -2269,6 +2269,103 @@ public void testBackwardsCompatiblePreemptionConfiguration() throws Exception { .getFairSharePreemptionTimeout()); } + @Test + public void testDisablePreemption() throws Exception { + + conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000); + conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + ControlledClock clock = new ControlledClock(); + scheduler.setClock(clock); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println(".25"); + out.println(""); + out.println(""); + out.println(".1"); + out.println("true"); + out.println(""); + out.println(""); + out.println(".1"); + out.println(""); + out.println(""); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Create four nodes + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 1, + "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + RMNode node2 = + MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 2, + "127.0.0.2"); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + scheduler.handle(nodeEvent2); + + RMNode node3 = + MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 3, + "127.0.0.3"); + NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3); + scheduler.handle(nodeEvent3); + + ApplicationAttemptId app13 = + createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 1); + ApplicationAttemptId app14 = + createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 2); + ApplicationAttemptId app15 = + createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3); + ApplicationAttemptId app16 = + createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 1); + ApplicationAttemptId app17 = + createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3); + ApplicationAttemptId app18 = + createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 2); + scheduler.update(); + // Sufficient node check-ins to fully schedule containers + for (int i = 0; i < 2; i++) { + NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); + scheduler.handle(nodeUpdate1); + + NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2); + scheduler.handle(nodeUpdate2); + + NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3); + scheduler.handle(nodeUpdate3); + } + + assertEquals(1, scheduler.getSchedulerApp(app13).getLiveContainers().size()); + assertEquals(1, scheduler.getSchedulerApp(app14).getLiveContainers().size()); + assertEquals(1, scheduler.getSchedulerApp(app15).getLiveContainers().size()); + assertEquals(1, scheduler.getSchedulerApp(app16).getLiveContainers().size()); + assertEquals(1, scheduler.getSchedulerApp(app17).getLiveContainers().size()); + assertEquals(1, scheduler.getSchedulerApp(app18).getLiveContainers().size()); + + ApplicationAttemptId app19 = + createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 4); + clock.tickSec(5); + + Thread.sleep(1000); + scheduler.preemptResources(Resources.createResource(1 * 1024)); + + assertEquals(0, scheduler.getSchedulerApp(app13).getPreemptionContainers().size()); + assertEquals(0, scheduler.getSchedulerApp(app14).getPreemptionContainers().size()); + assertEquals(0, scheduler.getSchedulerApp(app15).getPreemptionContainers().size()); + assertEquals(0, scheduler.getSchedulerApp(app16).getPreemptionContainers().size()); + assertEquals(1, scheduler.getSchedulerApp(app17).getPreemptionContainers().size()); + assertEquals(0, scheduler.getSchedulerApp(app18).getPreemptionContainers().size()); + + } + @Test(timeout = 5000) public void testMultipleContainersWaitingForReservation() throws IOException { scheduler.init(conf);