diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java index 800fa34..430fe86 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java @@ -250,7 +250,12 @@ public static boolean fitsIn(Resource smaller, Resource bigger) { return smaller.getMemory() <= bigger.getMemory() && smaller.getVirtualCores() <= bigger.getVirtualCores(); } - + + public static boolean less(Resource smaller, Resource bigger) { + return smaller.getMemory() < bigger.getMemory() && + smaller.getVirtualCores() < bigger.getVirtualCores(); + } + public static Resource componentwiseMin(Resource lhs, Resource rhs) { return createResource(Math.min(lhs.getMemory(), rhs.getMemory()), Math.min(lhs.getVirtualCores(), rhs.getVirtualCores())); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java index 237cad2..4e7953f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java @@ -164,9 +164,8 @@ public AccessControlList getQueueAcl(String queue, QueueACL operation) { * are below their min share. */ public long getMinSharePreemptionTimeout(String queueName) { - Long minSharePreemptionTimeout = minSharePreemptionTimeouts.get(queueName); - return (minSharePreemptionTimeout == null) ? defaultMinSharePreemptionTimeout - : minSharePreemptionTimeout; + return QueueName.findInHierarchy(minSharePreemptionTimeouts, queueName, + defaultMinSharePreemptionTimeout); } /** @@ -219,22 +218,16 @@ public Resource getMaxResources(String queueName) { public boolean hasAccess(String queueName, QueueACL acl, UserGroupInformation user) { - int lastPeriodIndex = queueName.length(); - while (lastPeriodIndex != -1) { - String queue = queueName.substring(0, lastPeriodIndex); - if (getQueueAcl(queue, acl).isUserAllowed(user)) { + for (String subQ : QueueName.pathToRoot(queueName)) { + if (getQueueAcl(subQ, acl).isUserAllowed(user)) { return true; } - - lastPeriodIndex = queueName.lastIndexOf('.', lastPeriodIndex - 1); } - return false; } public SchedulingPolicy getSchedulingPolicy(String queueName) { - SchedulingPolicy policy = schedulingPolicies.get(queueName); - return (policy == null) ? defaultSchedulingPolicy : policy; + return QueueName.findInHierarchy(schedulingPolicies, queueName, defaultSchedulingPolicy); } public SchedulingPolicy getDefaultSchedulingPolicy() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index 5ab60af..a7cac0d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -35,7 +35,6 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.util.resource.Resources; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; @@ -48,11 +47,15 @@ private final List childQueues = new ArrayList(); private Resource demand = Resources.createResource(0); + private final boolean useGlobalPreemption; + private Resource aggregateMinShare; private int runnableApps; public FSParentQueue(String name, FairScheduler scheduler, FSParentQueue parent) { super(name, scheduler, parent); + useGlobalPreemption = scheduler.getConf().getGlobalPreeption(); + aggregateMinShare = super.getMinShare(); } public void addChildQueue(FSQueue child) { @@ -61,6 +64,14 @@ public void addChildQueue(FSQueue child) { @Override public void recomputeShares() { + if (useGlobalPreemption) { + recomputeMinShare(); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Recomputing shares for " + getName() + + "; Using fairShare " + getFairShare() + + " and distributable minShare " + getMinShare()); + } policy.computeShares(childQueues, getFairShare()); for (FSQueue childQueue : childQueues) { childQueue.getMetrics().setFairShare(childQueue.getFairShare()); @@ -68,6 +79,31 @@ public void recomputeShares() { } } + private void recomputeMinShare() { + // find preconfigured minShare, and if none found, + // calculate minShare from child queues. + if (aggregateMinShare.equals(Resources.none())) { + Resource childMinShares = Resources.clone(Resources.none()); + for (FSQueue childQueue : childQueues) { + if (childQueue instanceof FSParentQueue) { + ((FSParentQueue) childQueue).recomputeMinShare(); + } + Resources.addTo(childMinShares, childQueue.getMinShare()); + } + if (!Resources.equals(aggregateMinShare, childMinShares)) { + aggregateMinShare = childMinShares; + getMetrics().setMinShare(childMinShares); + } + } + } + + @Override + public Resource getMinShare() { + // null here due of too early usage of getMinShare in super constructor + return aggregateMinShare == null ? super.getMinShare() : aggregateMinShare; + } + + @Override public Resource getDemand() { return demand; @@ -165,7 +201,10 @@ public RMContainer preemptContainer() { RMContainer toBePreempted = null; // If this queue is not over its fair share, reject - if (!preemptContainerPreCheck()) { + // but only if global preemption aint used. + // in that case we should check inner queus and application + // explicitly + if (!(useGlobalPreemption || preemptContainerPreCheck())) { return toBePreempted; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index 716e1ee..33d5527 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -180,7 +180,7 @@ public abstract void collectSchedulerApplications( * @return true if check passes (can assign) or false otherwise */ protected boolean assignContainerPreCheck(FSSchedulerNode node) { - if (!Resources.fitsIn(getResourceUsage(), + if (!Resources.less(getResourceUsage(), scheduler.getAllocationConfiguration().getMaxResources(getName())) || node.getReservedContainer() != null) { return false; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 5725f8c..331be2c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -26,6 +26,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -152,6 +153,7 @@ // Preemption related variables protected boolean preemptionEnabled; + protected boolean globalPreemtionEnabled; protected float preemptionUtilizationThreshold; // How often tasks are preempted @@ -1071,14 +1073,40 @@ private void updateRootQueueMetrics() { */ private boolean shouldAttemptPreemption() { if (preemptionEnabled) { - return (preemptionUtilizationThreshold < Math.max( - (float) rootMetrics.getAvailableMB() / clusterResource.getMemory(), - (float) rootMetrics.getAvailableVirtualCores() / - clusterResource.getVirtualCores())); + // check global cluster load + if ((preemptionUtilizationThreshold < calculateUsage(rootMetrics, clusterResource))) + return true; + if (globalPreemtionEnabled) { + // check queues utilization + return shouldAttemptPreemption(queueMgr.getRootQueue()); + } } return false; } + private float calculateUsage(FSQueueMetrics metrics, Resource maxResource) { + Resource compareTo = Resources.min(RESOURCE_CALCULATOR, clusterResource, + maxResource, clusterResource); + return Math.max( + (float) metrics.getAllocatedMB() / (float) compareTo.getMemory(), + (float) metrics.getAllocatedVirtualCores() / + (float) compareTo.getVirtualCores()); + } + + private boolean shouldAttemptPreemption(FSQueue queue) { + double usage = calculateUsage(queue.getMetrics(), queue.getMaxShare()); + for (FSQueue childQueue : queue.getChildQueues()) { + if (shouldAttemptPreemption(childQueue)) + return true; + final float queueUsage = calculateUsage( + childQueue.getMetrics(), childQueue.getMaxShare()); + usage = Math.max( usage, queueUsage); + if (usage > preemptionUtilizationThreshold) + return true; + } + return usage > preemptionUtilizationThreshold; + } + @Override public QueueMetrics getRootQueueMetrics() { return rootMetrics; @@ -1188,6 +1216,7 @@ private synchronized void initScheduler(Configuration conf) preemptionEnabled = this.conf.getPreemptionEnabled(); preemptionUtilizationThreshold = this.conf.getPreemptionUtilizationThreshold(); + globalPreemtionEnabled = this.conf.getGlobalPreeption(); assignMultiple = this.conf.getAssignMultiple(); maxAssign = this.conf.getMaxAssign(); sizeBasedWeight = this.conf.getSizeBasedWeight(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java index 0fd242d..22b0b6c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java @@ -123,6 +123,12 @@ protected static final String MAX_ASSIGN = CONF_PREFIX + "max.assign"; protected static final int DEFAULT_MAX_ASSIGN = -1; + /** Whether global preemption enabled. */ + protected static final String GLOBAL_PREEMPTION = CONF_PREFIX + "globalPreemption"; + protected static final boolean DEFAULT_GLOBAL_PREEMPTION = false; + + + public FairSchedulerConfiguration() { super(); } @@ -227,6 +233,10 @@ public boolean getUsePortForNodeName() { YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME); } + public boolean getGlobalPreeption() { + return getBoolean(GLOBAL_PREEMPTION, DEFAULT_GLOBAL_PREEMPTION); + } + /** * Parses a resource config value of a form like "1024", "1024 mb", * or "1024 mb, 3 vcores". If no units are given, megabytes are assumed. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java index 4f8735b..ca417a9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java @@ -133,20 +133,12 @@ private FSQueue getQueue(String name, boolean create, FSQueueType queueType) { */ private FSQueue createQueue(String name, FSQueueType queueType) { List newQueueNames = new ArrayList(); - newQueueNames.add(name); - int sepIndex = name.length(); FSParentQueue parent = null; - // Move up the queue tree until we reach one that exists. - while (sepIndex != -1) { - sepIndex = name.lastIndexOf('.', sepIndex-1); - FSQueue queue; - String curName = null; - curName = name.substring(0, sepIndex); - queue = queues.get(curName); - + for (String subQueue : QueueName.pathToRoot(name)) { + final FSQueue queue = queues.get(subQueue); if (queue == null) { - newQueueNames.add(curName); + newQueueNames.add(subQueue); } else { if (queue instanceof FSParentQueue) { parent = (FSParentQueue)queue; @@ -156,7 +148,7 @@ private FSQueue createQueue(String name, FSQueueType queueType) { } } } - + // At this point, parent refers to the deepest existing parent of the // queue to create. // Now that we know everything worked out, make all the queues @@ -240,15 +232,11 @@ private boolean removeEmptyIncompatibleQueues(String queueToCreate, // Queue doesn't exist already. Check if the new queue would be created // under an existing leaf queue. If so, try removing that leaf queue. - int sepIndex = queueToCreate.length(); - sepIndex = queueToCreate.lastIndexOf('.', sepIndex-1); - while (sepIndex != -1) { - String prefixString = queueToCreate.substring(0, sepIndex); - FSQueue prefixQueue = queues.get(prefixString); + for (String subQ : QueueName.pathToRoot(queueToCreate)) { + FSQueue prefixQueue = queues.get(subQ); if (prefixQueue != null && prefixQueue instanceof FSLeafQueue) { return removeQueueIfEmpty(prefixQueue); } - sepIndex = queueToCreate.lastIndexOf('.', sepIndex-1); } return true; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueName.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueName.java new file mode 100644 index 0000000..d7673f5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueName.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +import java.util.Iterator; +import java.util.Map; + +/** + * Scheduler hierarchical queue abstraction methods. + * In general queue name consists of queues path separated by '.' + * Fully qualified queue name always begins with 'root' queue. + * Also here is a notion of "undefined" which represented by empty queue name. + */ +public class QueueName { + + public static final String ROOT_QUEUE = "root"; + public static final String DEFAULT_QUEUE = makeRootQualified(YarnConfiguration.DEFAULT_QUEUE_NAME); + public static final String ROOT_PREFIX = ROOT_QUEUE + "."; + public static final String UNDEFINED_QUEUE = ""; + + private QueueName() { + } + + /** + * Ensure, that queue name if 'root' qualifeid (i.e. has root prefix) + * Undefined queue remains undefined. + * @param name name to be qualified + * @return fully qualifed queue name, fqqn :) + */ + public static String makeRootQualified(String name) { + if (name.length() == 0 || name.equals(ROOT_QUEUE) || name.startsWith(ROOT_PREFIX)) + return name; + else + return ROOT_PREFIX + name; + } + + /** + * Get parent for given queue name + * @param name + * @return queue name or "" (undefined) + */ + public static String getParent(String queueName) { + final String name = makeRootQualified(queueName); + final int idx = name.lastIndexOf('.'); + if (idx == -1) + return UNDEFINED_QUEUE; + else + return name.substring(0, idx); + } + + /** + * Check that given queue is defined. + * @return true if queue is defined + */ + public boolean isDefined(String name) { + return name.length() > 0; + } + + /** + * Returns iterable on hierarchy from bottom to top. + * Example: root.a.b gives [ root.a.b, root.a, root ] sequence + * @return Iterable + */ + public static Iterable pathToRoot(final String name) { + return new Iterable() { + @Override + public Iterator iterator() { + return pathToRootIterator(name); + } + }; + } + + /** + * @see #hierarchy() + * @return paths iterator + */ + public static Iterator pathToRootIterator(final String queueName) { + return new Iterator() { + private String name = makeRootQualified(queueName); + private int idx = name.length(); + private String path = name; + + @Override + public boolean hasNext() { + return idx != -1; + } + + @Override + public String next() { + String rpath = path; + idx = path.lastIndexOf('.'); + if (idx != -1) + path = path.substring(0, idx); + return rpath; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("immutable iterator"); + } + }; + } + + public static boolean isRoot(String name) { + return makeRootQualified(name).equals(ROOT_QUEUE); + } + + public static boolean isDefault(String name) { + return makeRootQualified(name).equals(DEFAULT_QUEUE); + } + + /** + * Find least common accessor. + * @param queueName1 first + * @param queueName2 second + * @return minimal common parent or any of queues if they are identical + */ + public static String lca(String queueName1, String queueName2) { + // Because queue names include ancestors, separated by periods, we can find + // the lowest common ancestors by going from the start of the names until + // there's a character that doesn't match. + final String name1 = makeRootQualified(queueName1); + final String name2 = makeRootQualified(queueName2); + // We keep track of the last period we encounter to avoid returning root.apple + // when the queues are root.applepie and root.appletart + int lastPeriodIndex = -1; + for (int i = 0; i < Math.max(name1.length(), name2.length()); i++) { + if (name1.length() <= i || name2.length() <= i || + name1.charAt(i) != name2.charAt(i)) { + return name1.substring(lastPeriodIndex); + } else if (name1.charAt(i) == '.') { + lastPeriodIndex = i; + } + } + return queueName1; + } + + /** + * Find in given map value querying in hierarchical order. + * Hierarchy represented as path-keyed map. + * @param map map for values, keys are queue names + * @param queueName queue name lookup for + * @param defaultValue default value if none found + * @param type of return value + * @return value for given queue in hierarchy + */ + public static T findInHierarchy(Map map, String queueName, T defaultValue) { + final Iterable path = QueueName.pathToRoot(queueName); + for (String q : path) { + T found = map.get(q); + if (found != null) + return found; + } + return defaultValue; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueNameTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueNameTest.java new file mode 100644 index 0000000..69ac976 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueNameTest.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import com.google.common.collect.Lists; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class QueueNameTest { + + @Test + public void testNaming() { + assertEquals("root.a.b", QueueName.makeRootQualified("a.b")); + assertEquals("root.a.b", QueueName.makeRootQualified("root.a.b")); + assertEquals("", QueueName.makeRootQualified("")); + + assertTrue(QueueName.isRoot("root")); + assertTrue(QueueName.isDefault("default")); + } + + @Test + public void testHierarcy() { + final String q = "root.a.b"; + assertEquals( + Lists.newArrayList("root.a.b", "root.a", "root"), + Lists.newArrayList(QueueName.pathToRoot(q))); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java index 2a4992c..36992bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java @@ -458,7 +458,56 @@ public void testQueueAlongsideRoot() throws Exception { allocLoader.setReloadListener(confHolder); allocLoader.reloadAllocations(); } - + + @Test + public void testHierarchicalAllocations() throws Exception { + Configuration conf = new Configuration(); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + conf.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, false); + conf.setBoolean(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, false); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + // Give queue A a minimum of 1024 M + out.println(""); + out.println("60"); + out.println("fifo"); + out.println(""); + out.println("120"); + out.println("fair"); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println("300" + + ""); + out.println("drf"); + out.println(""); + out.close(); + + AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); + allocLoader.init(conf); + ReloadListener confHolder = new ReloadListener(); + allocLoader.setReloadListener(confHolder); + allocLoader.reloadAllocations(); + AllocationConfiguration queueConf = confHolder.allocConf; + + assertEquals("FIFO", queueConf.getSchedulingPolicy("root.queueA").getName()); + assertEquals("fair", queueConf.getSchedulingPolicy("root.queueA.queueB").getName()); + assertEquals("FIFO", queueConf.getSchedulingPolicy("root.queueA.queueC").getName()); + // not existent queue + assertEquals("DRF", queueConf.getSchedulingPolicy("root.queueD").getName()); + + assertEquals(60000, queueConf.getMinSharePreemptionTimeout("root.queueA")); + assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueA.queueB")); + assertEquals(60000, queueConf.getMinSharePreemptionTimeout("root.queueA.queueC")); + // not existent queue + assertEquals(300000, queueConf.getMinSharePreemptionTimeout("root.queueD")); + + } + + private class ReloadListener implements AllocationFileLoaderService.Listener { public AllocationConfiguration allocConf; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index abe2d5c..f745bff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -43,6 +43,7 @@ import javax.xml.parsers.ParserConfigurationException; +import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; @@ -2956,4 +2957,178 @@ public void testMoveToNonexistentQueue() throws Exception { createSchedulingRequest(1024, 1, "queue1", "user1", 3); scheduler.moveApplication(appAttId.getApplicationId(), "queue2"); } + + @Test + public void testMinShareInHierarchicalQueues() throws Exception { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + conf.set(FairSchedulerConfiguration.GLOBAL_PREEMPTION, "true"); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println(" 10240mb, 10vcores"); + out.println(" "); + out.println(" "); + out.println(" fair"); + out.println(" "); + out.println(" 6192mb, 6vcores"); + out.println(" "); + out.println(" "); + out.println(" "); + out.println(" "); + out.println(""); + out.println(""); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(10240 * 2, 10 * 2)); + NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); + scheduler.handle(nodeEvent); + scheduler.update(); + + ApplicationAttemptId bigId = createSchedulingRequest(1024, "queue1.big", "user1", 10); + final FSSchedulerApp bigApp = scheduler.getSchedulerApp(bigId); + Thread.sleep(3); // make start time a bit different + + // allocate all containers + for (int i = 0; i < 10; i++) { + final NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); + scheduler.handle(updateEvent); + scheduler.update(); + } + assertEquals(10, bigApp.getLiveContainers().size()); + + ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1.sub1.sub11", "user1", 5); + final FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1); + Thread.sleep(3); // make start time a bit different + ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1.sub2", "user1", 5); + final FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2); + scheduler.update(); + assertEquals(0, app1.getLiveContainers().size()); + assertEquals(0, app2.getLiveContainers().size()); + + completeContainers(bigId, 6); + scheduler.update(); + + // allocate containers + for (int i = 0; i < 6; i++) { + final NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); + scheduler.handle(updateEvent); + scheduler.update(); + } + assertEquals(5, app1.getLiveContainers().size()); + assertEquals(1, app2.getLiveContainers().size()); + + final NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); + scheduler.handle(updateEvent); + assertEquals(5, app1.getLiveContainers().size()); + assertEquals(1, app2.getLiveContainers().size()); + + } + + @Test + public void testGlobalPreemption() throws Exception { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + conf.set(FairSchedulerConfiguration.PREEMPTION, "true"); + conf.set(FairSchedulerConfiguration.GLOBAL_PREEMPTION, "true"); + conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, + FairSchedulerConfiguration.DEFAULT_PREEMPTION_THRESHOLD); + + MockClock clock = new MockClock(); + scheduler.setClock(clock); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println(" 10240mb, 10vcores"); + out.println(" 0"); // preempt immediately + out.println(" "); + out.println(" "); + out.println(" fair"); + out.println(" "); + out.println(" 6192mb, 6vcores"); + out.println(" "); + out.println(" "); + out.println(""); + out.println(""); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(10240 * 2, 10 * 2)); + NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); + scheduler.handle(nodeEvent); + scheduler.update(); + + scheduler.getQueueManager().getQueue("root.queue1") + .setPolicy(SchedulingPolicy.parse("fair")); + + // create one big application + ApplicationAttemptId bigId = createSchedulingRequest(1024, "queue1.big", "user1", 10); + final FSSchedulerApp bigApp = scheduler.getSchedulerApp(bigId); + + final NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); + // allocate containers + for (int i = 0; i < 10; i++) { + scheduler.handle(updateEvent); + } + scheduler.update(); + assertEquals(10, bigApp.getLiveContainers().size()); + + Thread.sleep(3); // make start time a bit different + ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1.sub1.sub11", "user1", 5); + final FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1); + scheduler.update(); + scheduler.handle(updateEvent); + assertEquals(0, app1.getLiveContainers().size()); + // Pretend 15 seconds have passed + clock.tick(15); + scheduler.preemptTasksIfNecessary(); + + // big app should return containers up to its fair share (6GB). + assertEquals(4, scheduler.getSchedulerApp(bigId).getPreemptionContainers().size()); + completePreemteedContainers(bigId); + clock.tick(15); + for (int i = 0; i < 10; i++) { + scheduler.handle(updateEvent); + } + + // needed containers should start + assertEquals(6, scheduler.getSchedulerApp(bigId).getLiveContainers().size()); + assertEquals(4, scheduler.getSchedulerApp(attId1).getLiveContainers().size()); + } + + private void completeContainers(ApplicationAttemptId attId1, int numOfContainers) { + final ArrayList rmContainers = + Lists.newArrayList(scheduler.getSchedulerApp(attId1) + .getLiveContainers()); + final ArrayList containerIds = + Lists.newArrayList(); + for (int i = 0; i < numOfContainers; i++) { + containerIds.add(rmContainers.get(i).getContainerId()); + } + scheduler.allocate(attId1, Lists.newArrayList(), + containerIds, null, null); + } + + private void completePreemteedContainers(ApplicationAttemptId attId1) { + final Set rmContainers = + scheduler.getSchedulerApp(attId1) + .getPreemptionContainers(); + final ArrayList containerIds = + Lists.newArrayList(); + for (RMContainer rmContainer : rmContainers) { + containerIds.add(rmContainer.getContainerId()); + } + scheduler.allocate(attId1, Lists.newArrayList(), + containerIds, null, null); + } + }