diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java index bf4eae8..22ca0c7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java @@ -98,6 +98,8 @@ // Reservation system configuration private ReservationQueueConfiguration globalReservationQueueConfig; + private final Set nonPreemptableQueues; + public AllocationConfiguration(Map minQueueResources, Map maxQueueResources, Map queueMaxApps, Map userMaxApps, @@ -114,7 +116,8 @@ public AllocationConfiguration(Map minQueueResources, QueuePlacementPolicy placementPolicy, Map> configuredQueues, ReservationQueueConfiguration globalReservationQueueConfig, - Set reservableQueues) { + Set reservableQueues, + Set nonPreemptableQueues) { this.minQueueResources = minQueueResources; this.maxQueueResources = maxQueueResources; this.queueMaxApps = queueMaxApps; @@ -135,6 +138,7 @@ public AllocationConfiguration(Map minQueueResources, this.globalReservationQueueConfig = globalReservationQueueConfig; this.placementPolicy = placementPolicy; this.configuredQueues = configuredQueues; + this.nonPreemptableQueues = nonPreemptableQueues; } public AllocationConfiguration(Configuration conf) { @@ -161,6 +165,7 @@ public AllocationConfiguration(Configuration conf) { } placementPolicy = QueuePlacementPolicy.fromConfiguration(conf, configuredQueues); + nonPreemptableQueues = new HashSet(); } /** @@ -210,6 +215,10 @@ public float getFairSharePreemptionThreshold(String queueName) { -1f : fairSharePreemptionThreshold; } + public boolean isPreemptionDisabled(String queueName) { + return nonPreemptableQueues.contains(queueName); + } + public ResourceWeights getQueueWeight(String queue) { ResourceWeights weight = queueWeights.get(queue); return (weight == null) ? ResourceWeights.NEUTRAL : weight; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java index 9049525..9df6c8b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java @@ -224,6 +224,7 @@ public synchronized void reloadAllocations() throws IOException, Map> queueAcls = new HashMap>(); Set reservableQueues = new HashSet(); + Set nonPreemptableQueues = new HashSet(); int userMaxAppsDefault = Integer.MAX_VALUE; int queueMaxAppsDefault = Integer.MAX_VALUE; Resource queueMaxResourcesDefault = Resources.unbounded(); @@ -360,7 +361,7 @@ public synchronized void reloadAllocations() throws IOException, queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights, queuePolicies, minSharePreemptionTimeouts, fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls, configuredQueues, - reservableQueues); + reservableQueues, nonPreemptableQueues); } // Load placement policy and pass it configured queues @@ -409,7 +410,7 @@ public synchronized void reloadAllocations() throws IOException, defaultSchedPolicy, minSharePreemptionTimeouts, fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls, newPlacementPolicy, configuredQueues, globalReservationQueueConfig, - reservableQueues); + reservableQueues, nonPreemptableQueues); lastSuccessfulReload = clock.getTime(); lastReloadAttemptFailed = false; @@ -431,7 +432,8 @@ private void loadQueue(String parentName, Element element, Map fairSharePreemptionThresholds, Map> queueAcls, Map> configuredQueues, - Set reservableQueues) + Set reservableQueues, + Set nonPreemptableQueues) throws AllocationConfigurationException { String queueName = element.getAttribute("name").trim(); @@ -508,13 +510,15 @@ private void loadQueue(String parentName, Element element, isLeaf = false; reservableQueues.add(queueName); configuredQueues.get(FSQueueType.PARENT).add(queueName); + } else if ("disablePreemption".equals(field.getTagName())) { + nonPreemptableQueues.add(queueName); } else if ("queue".endsWith(field.getTagName()) || "pool".equals(field.getTagName())) { loadQueue(queueName, field, minQueueResources, maxQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights, queuePolicies, minSharePreemptionTimeouts, fairSharePreemptionTimeouts, fairSharePreemptionThresholds, - queueAcls, configuredQueues, reservableQueues); + queueAcls, configuredQueues, reservableQueues, nonPreemptableQueues); isLeaf = false; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index febe050..63b094c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -260,6 +260,14 @@ public RMContainer preemptContainer() { readLock.lock(); try { for (FSQueue queue : childQueues) { + // Skip selection for non-preemptable queue + if (queue.isPreemptionDisabled()) { + //if (LOG.isDebugEnabled()) { + LOG.info("skipping from queue=" + getName() + + " because it's a non-preemptable queue"); + //} + continue; + } if (candidateQueue == null || comparator.compare(queue, candidateQueue) > 0) { candidateQueue = queue; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index 713bdca..6317425 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -62,6 +62,7 @@ private long fairSharePreemptionTimeout = Long.MAX_VALUE; private long minSharePreemptionTimeout = Long.MAX_VALUE; private float fairSharePreemptionThreshold = 0.5f; + private boolean preemptionDisabled = false; public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) { this.name = name; @@ -235,6 +236,14 @@ public void setFairSharePreemptionThreshold(float fairSharePreemptionThreshold) this.fairSharePreemptionThreshold = fairSharePreemptionThreshold; } + public boolean isPreemptionDisabled() { + return preemptionDisabled; + } + + public void setPreemptionDisabled(boolean preemptionDisabled) { + this.preemptionDisabled = preemptionDisabled; + } + /** * Recomputes the shares for all child queues and applications based on this * queue's current share @@ -242,7 +251,8 @@ public void setFairSharePreemptionThreshold(float fairSharePreemptionThreshold) public abstract void recomputeShares(); /** - * Update the min/fair share preemption timeouts and threshold for this queue. + * Update the min/fair share preemption timeouts, threshold and preemption + * disabled flag for this queue. */ public void updatePreemptionVariables() { // For min share timeout @@ -263,6 +273,10 @@ public void updatePreemptionVariables() { if (fairSharePreemptionThreshold < 0 && parent != null) { fairSharePreemptionThreshold = parent.getFairSharePreemptionThreshold(); } + // For preemption disabled flag + preemptionDisabled = scheduler.getAllocationConfiguration() + .isPreemptionDisabled(getName()); + } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java index 5ff9422..d386032 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java @@ -81,6 +81,7 @@ protected void render(Block html) { } ri._(STEADY_FAIR_SHARE + ":", qinfo.getSteadyFairShare().toString()); ri._(INSTANTANEOUS_FAIR_SHARE + ":", qinfo.getFairShare().toString()); + ri._("Preemptable:", !qinfo.isPreemptionDisabled()); html._(InfoBlock.class); // clear the info contents so this queue's info doesn't accumulate into another queue's info diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java index ee37f18..ff1e81d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java @@ -65,6 +65,8 @@ private String queueName; private String schedulingPolicy; + private boolean preemptionDisabled; + private FairSchedulerQueueInfoList childQueues; public FairSchedulerQueueInfo() { @@ -108,6 +110,7 @@ public FairSchedulerQueueInfo(FSQueue queue, FairScheduler scheduler) { return; } + preemptionDisabled = queue.isPreemptionDisabled(); childQueues = getChildQueues(queue, scheduler); } @@ -228,4 +231,8 @@ public String getSchedulingPolicy() { return childQueues != null ? childQueues.getQueueInfoList() : new ArrayList(); } + + public boolean isPreemptionDisabled() { + return preemptionDisabled; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 2f48380..ea2d20f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -2456,6 +2456,165 @@ public void testPreemptionDecisionWithVariousTimeout() throws Exception { } @Test + /** + * Tests the decision to preempt tasks respect to non-preemptable queues + * 1, Queues as follow: + * queueA(non-preemptable) + * queueB(preemptable) + * parentQueue(non-preemptable) + * --queueC(preemptable) + * queueD(preemptable) + * 2, Submit request to queueA, queueB, queueC, and all of them are over MinShare + * 3, Now all resource are occupied + * 4, Submit request to queueD, and need to preempt resource from other queues + * 5, Only preemptable queue(queueB) would be preempted. + */ + public void testPreemptionDecisionWithNonPreemptableQueue() throws Exception { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + ControlledClock clock = new ControlledClock(); + scheduler.setClock(clock); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("0mb,0vcores"); + out.println(""); + out.println(""); + out.println(".25"); + out.println("1024mb,0vcores"); + out.println(""); + out.println(""); + out.println(""); + out.println(".25"); + out.println("1024mb,0vcores"); + out.println(""); + out.println(""); + out.println(""); + out.println(".25"); + out.println("1024mb,0vcores"); + out.println(""); + out.println(""); + out.println(""); + out.println(".25"); + out.println("2048mb,0vcores"); + out.println(""); + out.println("5"); + out.println("10"); + out.println(".5"); + out.println(""); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Create four nodes(3G each) + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 1, + "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + RMNode node2 = + MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 2, + "127.0.0.2"); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + scheduler.handle(nodeEvent2); + + RMNode node3 = + MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 3, + "127.0.0.3"); + NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3); + scheduler.handle(nodeEvent3); + + RMNode node4 = + MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 4, + "127.0.0.4"); + NodeAddedSchedulerEvent nodeEvent4 = new NodeAddedSchedulerEvent(node4); + scheduler.handle(nodeEvent4); + + // Submit apps to queueA, queueB, queueC, + // now all resource of the cluster is occupied + ApplicationAttemptId app1 = + createSchedulingRequest(1 * 1024, "queueA", "user1", 4, 1); + ApplicationAttemptId app2 = + createSchedulingRequest(1 * 1024, "queueB", "user1", 4, 2); + ApplicationAttemptId app3 = + createSchedulingRequest(1 * 1024, "parentQueue.queueC", "user1", 4, 3); + + scheduler.update(); + + // Sufficient node check-ins to fully schedule containers + for (int i = 0; i < 3; i++) { + NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); + scheduler.handle(nodeUpdate1); + + NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2); + scheduler.handle(nodeUpdate2); + + NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3); + scheduler.handle(nodeUpdate3); + + NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4); + scheduler.handle(nodeUpdate4); + } + + assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size()); + assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size()); + assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size()); + + // Now new requests arrive from queues D + ApplicationAttemptId app4 = + createSchedulingRequest(1 * 1024, "queueD", "user1", 4, 1); + scheduler.update(); + FSLeafQueue schedD = + scheduler.getQueueManager().getLeafQueue("queueD", true); + + // After minSharePreemptionTime has passed, 2G resource should preempted from + // queueB to queueD + clock.tickSec(6); + assertEquals(2048, + scheduler.resourceDeficit(schedD, clock.getTime()).getMemory()); + + scheduler.preemptResources(Resources.createResource(2 * 1024)); + // now only app2 is selected to be preempted + assertTrue("App2 should have container to be preempted", + !Collections.disjoint( + scheduler.getSchedulerApp(app2).getLiveContainers(), + scheduler.getSchedulerApp(app2).getPreemptionContainers())); + assertTrue("App1 should not have container to be preempted", + Collections.disjoint( + scheduler.getSchedulerApp(app1).getLiveContainers(), + scheduler.getSchedulerApp(app1).getPreemptionContainers())); + assertTrue("App3 should not have container to be preempted", + Collections.disjoint( + scheduler.getSchedulerApp(app3).getLiveContainers(), + scheduler.getSchedulerApp(app3).getPreemptionContainers())); + // Pretend 20 seconds have passed + clock.tickSec(20); + scheduler.preemptResources(Resources.createResource(2 * 1024)); + for (int i = 0; i < 3; i++) { + NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); + scheduler.handle(nodeUpdate1); + + NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2); + scheduler.handle(nodeUpdate2); + + NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3); + scheduler.handle(nodeUpdate3); + + NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4); + scheduler.handle(nodeUpdate4); + } + // after preemption + assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size()); + assertEquals(2, scheduler.getSchedulerApp(app2).getLiveContainers().size()); + assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size()); + assertEquals(2, scheduler.getSchedulerApp(app4).getLiveContainers().size()); + } + + @Test public void testBackwardsCompatiblePreemptionConfiguration() throws Exception { conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);