diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 21c385a8bcc..980782c90be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -107,6 +107,9 @@ // hierarchy of this queue. private boolean intraQueuePreemptionDisabledInHierarchy; + private volatile LocalityDelayMode localityDelayMode; + private volatile float localityDelayThreshold; + // Track resource usage-by-label like used-resource/pending-resource, etc. volatile ResourceUsage queueUsage; @@ -425,6 +428,10 @@ protected void setupQueueConfigs(Resource clusterResource, configuration.getMultiNodesSortingAlgorithmPolicy(getQueuePath())); this.userWeights = getUserWeightsFromHierarchy(configuration); + + this.localityDelayMode = getLocalityDelayModeFromConf(configuration); + this.localityDelayThreshold = + getLocalityDelayThresholdFromConf(configuration); } finally { writeLock.unlock(); } @@ -802,6 +809,51 @@ public boolean getIntraQueuePreemptionDisabled() { public boolean getIntraQueuePreemptionDisabledInHierarchy() { return intraQueuePreemptionDisabledInHierarchy; } + + @Private + public LocalityDelayMode getLocalityDelayMode() { + return localityDelayMode; + } + + @Private + public float getLocalityDelayThreshold() { + return localityDelayThreshold; + } + + /** + * Get the {@link LocalityDelayMode} from the Capacity Scheduler Conf + * for this queue. If a value is not specified for this queue in the conf, + * then inherit from the parent. + * @return {@link LocalityDelayMode} from the conf + */ + private LocalityDelayMode getLocalityDelayModeFromConf( + CapacitySchedulerConfiguration configuration) { + LocalityDelayMode defaultMode = + CapacitySchedulerConfiguration.DEFAULT_LOCALITY_DELAY_MODE; + if (this.parent != null) { + defaultMode = this.parent.getLocalityDelayMode(); + } + + return configuration.getLocalityDelayMode(this.getQueuePath(), defaultMode); + } + + /** + * Get the locality delay threshold from the Capacity Scheduler Configuration + * for this queue. If a value is not specified for this queue in the conf, + * then inherit from the parent. + * @return locality delay threshold from the conf + */ + private float getLocalityDelayThresholdFromConf( + CapacitySchedulerConfiguration configuration) { + float defaultVal = + CapacitySchedulerConfiguration.DEFAULT_LOCALITY_DELAY_THRESHOLD; + if (this.parent != null) { + defaultVal = this.parent.getLocalityDelayThreshold(); + } + + return configuration.getLocalityDelayThreshold(this.getQueuePath(), + defaultVal); + } @Private public QueueCapacities getQueueCapacities() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index d507e53543c..0d162dde60c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -304,6 +304,18 @@ public void attachContainer(Resource clusterResource, */ public boolean getIntraQueuePreemptionDisabledInHierarchy(); + /** + * Get the {@link LocalityDelayMode} for this queue. + * @return {@link LocalityDelayMode} associated with this queue + */ + LocalityDelayMode getLocalityDelayMode(); + + /** + * Get the locality delay threshold for this queue. + * @return the locality delay threshold associated with this queue + */ + float getLocalityDelayThreshold(); + /** * Get QueueCapacities of this queue * @return queueCapacities diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index a88beef8c97..d1e8f4cffef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -210,6 +210,20 @@ @Private public static final String ROOT = "root"; + @Private + public static final String LOCALITY_DELAY_MODE = "locality-delay-mode"; + + @Private + public static final LocalityDelayMode DEFAULT_LOCALITY_DELAY_MODE = + LocalityDelayMode.ALWAYS_ON; + + @Private + public static final String LOCALITY_DELAY_THRESHOLD = + "locality-delay-threshold"; + + @Private + public static final float DEFAULT_LOCALITY_DELAY_THRESHOLD = 0.2f; + @Private public static final String NODE_LOCALITY_DELAY = PREFIX + "node-locality-delay"; @@ -543,6 +557,36 @@ public int getUserLimit(String queue) { return userLimit; } + public void setLocalityDelayMode(String queue, LocalityDelayMode delayMode) { + setStrings(getQueuePrefix(queue) + LOCALITY_DELAY_MODE, + delayMode.toString()); + } + + public LocalityDelayMode getLocalityDelayMode(String queue, + LocalityDelayMode defaultMode) { + String mode = get(getQueuePrefix(queue) + LOCALITY_DELAY_MODE, + defaultMode.toString()); + + LocalityDelayMode[] legalValues = LocalityDelayMode.values(); + for (LocalityDelayMode legalValue : legalValues) { + if (legalValue.toString().equalsIgnoreCase(mode)) { + return legalValue; + } + } + + return defaultMode; + } + + public void setLocalityDelayThreshold(String queue, float threshold) { + setFloat(getQueuePrefix(queue) + LOCALITY_DELAY_THRESHOLD, threshold); + } + + public float getLocalityDelayThreshold(String queue, float defaultVal) { + float threshold = getFloat( + getQueuePrefix(queue) + LOCALITY_DELAY_THRESHOLD, defaultVal); + return threshold; + } + // TODO (wangda): We need to better distinguish app ordering policy and queue // ordering policy's classname / configuration options, etc. And dedup code // if possible. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index a178f9e9a0b..236267fbceb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -95,6 +95,8 @@ private volatile int rackLocalityAdditionalDelay; private volatile boolean rackLocalityFullReset; + private boolean localityDelayEnabled; + Map applicationAttemptMap = new ConcurrentHashMap<>(); @@ -234,6 +236,14 @@ protected void setupQueueConfigs(Resource clusterResource, rackLocalityFullReset = schedConf .getRackLocalityFullReset(); + if (this.getLocalityDelayMode() == LocalityDelayMode.OFF) { + localityDelayEnabled = false; + } else { + // For both ALWAYS_ON and THRESHOLD, start with true + // For THRESHOLD, this should get updated before the first check + localityDelayEnabled = true; + } + // re-init this since max allocation could have changed this.minimumAllocationFactor = Resources.ratio(resourceCalculator, Resources.subtract(maximumAllocation, minimumAllocation), @@ -322,6 +332,10 @@ protected void setupQueueConfigs(Resource clusterResource, + "nodeLocalityDelay = " + nodeLocalityDelay + "\n" + "rackLocalityAdditionalDelay = " + rackLocalityAdditionalDelay + "\n" + + "localityDelayMode = " + + getLocalityDelayMode() + "\n" + + "localityDelayThreshold = " + + getLocalityDelayThreshold() + "\n" + "labels=" + labelStrBuilder.toString() + "\n" + "reservationsContinueLooking = " + reservationsContinueLooking + "\n" + "preemptionDisabled = " @@ -1095,6 +1109,8 @@ public CSAssignment assignContainers(Resource clusterResource, return CSAssignment.NULL_ASSIGNMENT; } + updateLocalityDelayEnabled(); + Map userLimits = new HashMap<>(); boolean needAssignToQueueCheck = true; for (Iterator assignmentIterator = @@ -1486,6 +1502,50 @@ public int getRackLocalityAdditionalDelay() { return rackLocalityAdditionalDelay; } + /** + * Update {@link #localityDelayEnabled} as follows. + * localityDelayEnabled = (total cluster resources - all used resources) + * >= (total cluster resources * configured locality delay threshold) + */ + private void updateLocalityDelayEnabled() { + if (LOG.isDebugEnabled()) { + LOG.debug("Locality delay mode for queue " + getQueueName() + + " is: " + this.getLocalityDelayMode() + + (this.getLocalityDelayMode() == LocalityDelayMode.THRESHOLD ? + " with threshold: " + this.getLocalityDelayThreshold() : "") + + "."); + } + + // no need to update if locality skip mode doesn't rely on current usage + if (this.getLocalityDelayMode() == LocalityDelayMode.THRESHOLD) { + Resource clusterResource = csContext.getClusterResource(); + + Resource threshold = Resources.multiplyAndNormalizeDown( + resourceCalculator, clusterResource, this.getLocalityDelayThreshold(), + minimumAllocation); + + Resource remaining = Resources.subtract(clusterResource, + csContext.getClusterResourceUsage().getAllUsed()); + + this.localityDelayEnabled = Resources.greaterThanOrEqual( + resourceCalculator, this.csContext.getClusterResource(), + remaining, threshold); + + if (LOG.isDebugEnabled()) { + float clusterResUsage = 100f*Resources.divide(resourceCalculator, + clusterResource, + csContext.getClusterResourceUsage().getAllUsed(), clusterResource); + LOG.debug("Locality delay for queue " + getQueueName() + " is " + + (this.localityDelayEnabled ? "enabled" : "disabled") + + " with cluster usage at " + clusterResUsage + " percent."); + } + } + } + + public boolean isLocalityDelayEnabled() { + return this.localityDelayEnabled; + } + @Lock(NoLock.class) public boolean getRackLocalityFullReset() { return rackLocalityFullReset; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LocalityDelayMode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LocalityDelayMode.java new file mode 100644 index 00000000000..88bd998432d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LocalityDelayMode.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +/** + * Locality Delay Mode for container allocations. When a container request is + * provided with an opportunity on a node different than the one requested, + * what should the allocator do? + */ +public enum LocalityDelayMode { + /** + * Do not skip because of locality so the allocator should allocate the + * container on this node even though it is not the node preferred. + */ + OFF, + + /** + * Always skip because of locality (there is a cap on the number of times + * such a provided opportunity is skipped by an application) in hopes of + * allocating on the requested node or rack. + */ + ALWAYS_ON, + + /** + * Use a threshold so that we skip up to a certain number of times (capped) + * if there is enough capacity available for the queue but do not skip if + * the capacity available for the queue is below the threshold where it is + * likely that this container won't get many more opportunities to be + * allocated in the near future. + */ + THRESHOLD +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index 2643fd0b7a1..8e5e95d85e5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -300,6 +300,15 @@ private boolean canAssign(SchedulerRequestKey schedulerKey, return false; } + // Check if locality delay is allowed for this queue + if (!application.getCSLeafQueue().isLocalityDelayEnabled()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Locality delay is disabled for queue " + + application.getCSLeafQueue().getQueueName()); + } + return true; + } + int uniqLocationAsks = 0; AppPlacementAllocator appPlacementAllocator = application.getAppPlacementAllocator(schedulerKey); @@ -338,6 +347,15 @@ private boolean canAssign(SchedulerRequestKey schedulerKey, // If we are here, we do need containers on this rack for RACK_LOCAL req if (type == NodeType.RACK_LOCAL) { + // Check if locality delay is allowed for this queue + if (!application.getCSLeafQueue().isLocalityDelayEnabled()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Locality delay is disabled for queue " + + application.getCSLeafQueue().getQueueName()); + } + return true; + } + // 'Delay' rack-local just a little bit... long missedOpportunities = application.getSchedulingOpportunities(schedulerKey); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 6a4391ab211..ba6878806f1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -45,6 +45,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CyclicBarrier; @@ -4141,4 +4142,459 @@ public void tearDown() throws Exception { cs.stop(); } } + + private static final Random RANDOM = new Random(); + + private LocalityDelayMode getRandomLocalityDelayMode() { + int randomInt = RANDOM.nextInt(LocalityDelayMode.values().length + 1); + if (randomInt < LocalityDelayMode.values().length) { + return LocalityDelayMode.values()[randomInt]; + } + return null; + } + + @Test + @SuppressWarnings("checkstyle:methodlength") + public void testLocalityDelayConfiguration() throws Exception { + CapacitySchedulerConfiguration conf = + new CapacitySchedulerConfiguration(csConf); + when(csContext.getConfiguration()).thenReturn(conf); + + String rootQueueName = + conf.getQueues(CapacitySchedulerConfiguration.ROOT)[0]; + String rootQueuePath = + CapacitySchedulerConfiguration.ROOT + "." + rootQueueName; + + conf.set(CapacitySchedulerConfiguration.getQueuePrefix(rootQueuePath) + + CapacitySchedulerConfiguration.LOCALITY_DELAY_MODE, "THREShold"); + + // initialize queue confs + queues = new HashMap<>(); + root = CapacitySchedulerQueueManager.parseQueue(csContext, conf, null, + CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); + + assertEquals(LocalityDelayMode.THRESHOLD, + queues.get(rootQueueName).getLocalityDelayMode()); + + conf = new CapacitySchedulerConfiguration(csConf); + when(csContext.getConfiguration()).thenReturn(conf); + + conf.set(CapacitySchedulerConfiguration.getQueuePrefix(rootQueuePath) + + CapacitySchedulerConfiguration.LOCALITY_DELAY_MODE, "a random text"); + + // initialize queue confs + queues = new HashMap<>(); + root = CapacitySchedulerQueueManager.parseQueue(csContext, conf, null, + CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); + + assertEquals(CapacitySchedulerConfiguration.DEFAULT_LOCALITY_DELAY_MODE, + queues.get(rootQueueName).getLocalityDelayMode()); + + for (int i = 0; i < 10; i++) { + conf = new CapacitySchedulerConfiguration(csConf); + when(csContext.getConfiguration()).thenReturn(conf); + + LocalityDelayMode modeRoot = getRandomLocalityDelayMode(); + LocalityDelayMode modeA = getRandomLocalityDelayMode(); + LocalityDelayMode modeB = getRandomLocalityDelayMode(); + LocalityDelayMode modeC = getRandomLocalityDelayMode(); + LocalityDelayMode modeD = getRandomLocalityDelayMode(); + LocalityDelayMode modeE = getRandomLocalityDelayMode(); + LocalityDelayMode modeC1 = getRandomLocalityDelayMode(); + + Float thresholdRoot = RANDOM.nextBoolean() ? RANDOM.nextFloat() : null; + Float thresholdA = RANDOM.nextBoolean() ? RANDOM.nextFloat() : null; + Float thresholdB = RANDOM.nextBoolean() ? RANDOM.nextFloat() : null; + Float thresholdC = RANDOM.nextBoolean() ? RANDOM.nextFloat() : null; + Float thresholdD = RANDOM.nextBoolean() ? RANDOM.nextFloat() : null; + Float thresholdE = RANDOM.nextBoolean() ? RANDOM.nextFloat() : null; + Float thresholdC1 = RANDOM.nextBoolean() ? RANDOM.nextFloat() : null; + + if (modeRoot != null) { + conf.setLocalityDelayMode( + rootQueuePath, modeRoot); + LOG.info("setLocalityDelayMode " + + rootQueuePath + ": " + modeRoot); + } else { + modeRoot = CapacitySchedulerConfiguration.DEFAULT_LOCALITY_DELAY_MODE; + } + + if (modeA != null) { + conf.setLocalityDelayMode( + rootQueuePath + "." + A, modeA); + LOG.info("setLocalityDelayMode " + + rootQueuePath + "." + A + ": " + modeRoot); + } else { + modeA = modeRoot; + } + + if (modeB != null) { + conf.setLocalityDelayMode( + rootQueuePath + "." + B, modeB); + LOG.info("setLocalityDelayMode " + + rootQueuePath + "." + B + ": " + modeB); + } else { + modeB = modeRoot; + } + + if (modeC != null) { + conf.setLocalityDelayMode( + rootQueuePath + "." + C, modeC); + LOG.info("setLocalityDelayMode " + + rootQueuePath + "." + C + ": " + modeC); + } else { + modeC = modeRoot; + } + + if (modeD != null) { + conf.setLocalityDelayMode( + rootQueuePath + "." + D, modeD); + LOG.info("setLocalityDelayMode " + + rootQueuePath + "." + D + ": " + modeD); + } else { + modeD = modeRoot; + } + + if (modeE != null) { + conf.setLocalityDelayMode( + rootQueuePath + "." + E, modeE); + LOG.info("setLocalityDelayMode " + + rootQueuePath + "." + E + ": " + modeE); + } else { + modeE = modeRoot; + } + + if (modeC1 != null) { + conf.setLocalityDelayMode( + rootQueuePath + "." + C + "." + C1, modeC1); + LOG.info("setLocalityDelayMode " + + rootQueuePath + "." + C + "." + C1 + ": " + modeC1); + } else { + modeC1 = modeC; + } + + if (thresholdRoot != null) { + conf.setLocalityDelayThreshold( + rootQueuePath, thresholdRoot); + LOG.info("setLocalityDelayThreshold " + + rootQueuePath + ": " + thresholdRoot); + } else { + thresholdRoot = + CapacitySchedulerConfiguration.DEFAULT_LOCALITY_DELAY_THRESHOLD; + } + + if (thresholdA != null) { + conf.setLocalityDelayThreshold( + rootQueuePath + "." + A, thresholdA); + LOG.info("setLocalityDelayThreshold " + + rootQueuePath + "." + A + ": " + thresholdA); + } else { + thresholdA = thresholdRoot; + } + + if (thresholdB != null) { + conf.setLocalityDelayThreshold( + rootQueuePath + "." + B, thresholdB); + LOG.info("setLocalityDelayThreshold " + + rootQueuePath + "." + B + ": " + thresholdB); + } else { + thresholdB = thresholdRoot; + } + + if (thresholdC != null) { + conf.setLocalityDelayThreshold( + rootQueuePath + "." + C, thresholdC); + LOG.info("setLocalityDelayThreshold " + + rootQueuePath + "." + C + ": " + thresholdC); + } else { + thresholdC = thresholdRoot; + } + + if (thresholdD != null) { + conf.setLocalityDelayThreshold( + rootQueuePath + "." + D, thresholdD); + LOG.info("setLocalityDelayThreshold " + + rootQueuePath + "." + D + ": " + thresholdD); + } else { + thresholdD = thresholdRoot; + } + + if (thresholdE != null) { + conf.setLocalityDelayThreshold( + rootQueuePath + "." + E, thresholdE); + LOG.info("setLocalityDelayThreshold " + + rootQueuePath + "." + E + ": " + thresholdE); + } else { + thresholdE = thresholdRoot; + } + + if (thresholdC1 != null) { + conf.setLocalityDelayThreshold( + rootQueuePath + "." + C + "." + C1, thresholdC1); + LOG.info("setLocalityDelayThreshold " + + rootQueuePath + "." + C + "." + C1 + ": " + thresholdC1); + } else { + thresholdC1 = thresholdC; + } + + // initialize queue confs + queues = new HashMap(); + root = + CapacitySchedulerQueueManager.parseQueue(csContext, conf, null, + CapacitySchedulerConfiguration.ROOT, + queues, queues, + TestUtils.spyHook); + + float delta = 0.0f; + + CSQueue queueRoot = queues.get(rootQueueName); + CSQueue queueA = queues.get(A); + CSQueue queueB = queues.get(B); + CSQueue queueC = queues.get(C); + CSQueue queueD = queues.get(D); + CSQueue queueE = queues.get(E); + CSQueue queueC1 = queues.get(C1); + + assertEquals(modeRoot, queueRoot.getLocalityDelayMode()); + assertEquals(modeA, queueA.getLocalityDelayMode()); + assertEquals(modeB, queueB.getLocalityDelayMode()); + assertEquals(modeC, queueC.getLocalityDelayMode()); + assertEquals(modeD, queueD.getLocalityDelayMode()); + assertEquals(modeE, queueE.getLocalityDelayMode()); + assertEquals(modeC1, queueC1.getLocalityDelayMode()); + + assertEquals(thresholdRoot.floatValue(), + queueRoot.getLocalityDelayThreshold(), delta); + assertEquals(thresholdA.floatValue(), + queueA.getLocalityDelayThreshold(), delta); + assertEquals(thresholdB.floatValue(), + queueB.getLocalityDelayThreshold(), delta); + assertEquals(thresholdC.floatValue(), + queueC.getLocalityDelayThreshold(), delta); + assertEquals(thresholdD.floatValue(), + queueD.getLocalityDelayThreshold(), delta); + assertEquals(thresholdE.floatValue(), + queueE.getLocalityDelayThreshold(), delta); + assertEquals(thresholdC1.floatValue(), + queueC1.getLocalityDelayThreshold(), delta); + } + } + + @Test + @SuppressWarnings("checkstyle:methodlength") + public void testLocalityDelayModeThreshold() throws Exception { + CapacitySchedulerConfiguration conf = + new CapacitySchedulerConfiguration(csConf); + when(csContext.getConfiguration()).thenReturn(conf); + + // Set configs for queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[]{A, B}); + conf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + A, + 50); + conf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + B, + 50); + conf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT + "." + B, + 100); + + // Set locality delay threshold + conf.setLocalityDelayThreshold(CapacitySchedulerConfiguration.ROOT, 0.4f); + + // We run once with ALWAYS_ON and once with THRESHOLD + LocalityDelayMode[] delayModes = + {LocalityDelayMode.ALWAYS_ON, LocalityDelayMode.THRESHOLD}; + + for (LocalityDelayMode delayMode : delayModes) { + // Setup some nodes and racks + String host00 = "127.0.0.1"; + String rack0 = "rack_0"; + FiCaSchedulerNode node00 = TestUtils.getMockNode(host00, rack0, + 0, 8 * GB); + + String host10 = "127.0.0.3"; + String rack1 = "rack_1"; + FiCaSchedulerNode node10 = TestUtils.getMockNode(host10, rack1, + 0, 8 * GB); + + Map nodes = ImmutableMap.of( + node00.getNodeID(), node00, node10.getNodeID(), node10); + + final int numNodes = 2; + Resource clusterResource = Resources.createResource( + numNodes * (8 * GB), numNodes * 1); + when(csContext.getClusterResource()). + thenReturn(clusterResource); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + // Set config for delay mode + conf.setLocalityDelayMode(CapacitySchedulerConfiguration.ROOT, + delayMode); + + // init queues + queues = new HashMap<>(); + root = CapacitySchedulerQueueManager.parseQueue(csContext, conf, null, + CapacitySchedulerConfiguration.ROOT, + queues, queues, + TestUtils.spyHook); + root.updateClusterResource( + clusterResource, + new ResourceLimits(clusterResource)); + + ResourceUsage queueResUsage = root.getQueueResourceUsage(); + when(csContext.getClusterResourceUsage()) + .thenReturn(queueResUsage); + + LeafQueue queueA = (LeafQueue) queues.get(A); + LeafQueue queueB = (LeafQueue) queues.get(B); + + // User + String user0 = "user_0"; + + // Submit applications + // appAThreshold is on queue A and is what we are testing on + final ApplicationAttemptId appAttemptIdAThreshold = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp appAThreshold = + new FiCaSchedulerApp(appAttemptIdAThreshold, user0, queueA, + mock(ActiveUsersManager.class), spyRMContext); + queueA.submitApplicationAttempt(appAThreshold, user0); + + // appBConsumeResources is on queue B and consumes resources to hit + // the configured threshold for app_A + final ApplicationAttemptId appAttemptIdBConsumeResources = + TestUtils.getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp appBConsumeResources = + new FiCaSchedulerApp(appAttemptIdBConsumeResources, user0, queueB, + mock(ActiveUsersManager.class), spyRMContext); + queueB.submitApplicationAttempt(appBConsumeResources, user0); + + Map apps = ImmutableMap.of( + appAThreshold.getApplicationAttemptId(), + appAThreshold, + appBConsumeResources.getApplicationAttemptId(), + appBConsumeResources); + + Priority priority = TestUtils.createMockPriority(1); + SchedulerRequestKey schedulerKey = toSchedulerKey(priority); + + List appBConsumeResourcesRequests = + new ArrayList(); + List appAThresholdRequests = + new ArrayList(); + + CSAssignment assignment; + + // First app_B takes 7 GB from host_0_0 + appBConsumeResourcesRequests.add( + TestUtils.createResourceRequest(host00, 7 * GB, 1, + true, priority, recordFactory)); + appBConsumeResourcesRequests.add( + TestUtils.createResourceRequest(rack0, 7 * GB, 1, + true, priority, recordFactory)); + appBConsumeResourcesRequests.add( + TestUtils.createResourceRequest(ResourceRequest.ANY, 7 * GB, 1, + true, priority, recordFactory)); + appBConsumeResources.updateResourceRequests( + appBConsumeResourcesRequests); + appBConsumeResourcesRequests.clear(); + + // assign NODE_LOCAL 7GB container to app_B + // Free = 9GB / 16GB = 56.25% free + assignment = root.assignContainers(clusterResource, node00, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, queueA, nodes, apps); + applyCSAssignment(clusterResource, assignment, queueB, nodes, apps); + verifyContainerAllocated(assignment, NodeType.NODE_LOCAL); + Assert.assertTrue(Resources.equals(assignment.getResource(), + Resource.newInstance(7 * GB, 1))); + assertEquals(0, + appAThreshold.getSchedulingOpportunities(schedulerKey)); + assertEquals(0, + appBConsumeResources.getSchedulingOpportunities(schedulerKey)); + + // Now app_A attempts to take 1 GB container OFF_SWITCH + appAThresholdRequests.add( + TestUtils.createResourceRequest(host00, 1 * GB, 1, + true, priority, recordFactory)); + appAThresholdRequests.add( + TestUtils.createResourceRequest(rack0, 1 * GB, 1, + true, priority, recordFactory)); + appAThresholdRequests.add( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1 * GB, 10, + true, priority, recordFactory)); + appAThreshold.updateResourceRequests(appAThresholdRequests); + appAThresholdRequests.clear(); + + // app_A gets one scheduling opportunity but passes + assignment = root.assignContainers(clusterResource, node10, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, queueA, nodes, apps); + applyCSAssignment(clusterResource, assignment, queueB, nodes, apps); + verifyNoContainerAllocated(assignment); + assertEquals(1, + appAThreshold.getSchedulingOpportunities(schedulerKey)); + assertEquals(0, + appBConsumeResources.getSchedulingOpportunities(schedulerKey)); + + // app_B to get more resources to hit the threshold + appBConsumeResourcesRequests.add( + TestUtils.createResourceRequest(host10, 7 * GB, 1, + true, priority, recordFactory)); + appBConsumeResourcesRequests.add( + TestUtils.createResourceRequest(rack1, 7 * GB, 1, + true, priority, recordFactory)); + appBConsumeResourcesRequests.add( + TestUtils.createResourceRequest(ResourceRequest.ANY, 7 * GB, 1, + true, priority, recordFactory)); + appBConsumeResources.updateResourceRequests( + appBConsumeResourcesRequests); + appBConsumeResourcesRequests.clear(); + + // Here, + // -app_A gets a scheduling chance but passes + // -app_B gets a scheduling chance and allocates 7GB NODE_LOCAL + // -- Free = 2GB / 16GB = 12.5% free + assignment = root.assignContainers(clusterResource, node10, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, queueA, nodes, apps); + applyCSAssignment(clusterResource, assignment, queueB, nodes, apps); + verifyContainerAllocated(assignment, NodeType.NODE_LOCAL); + Assert.assertTrue(Resources.equals(assignment.getResource(), + Resource.newInstance(7 * GB, 1))); + assertEquals(2, + appAThreshold.getSchedulingOpportunities(schedulerKey)); + assertEquals(0, + appBConsumeResources.getSchedulingOpportunities(schedulerKey)); + + // Now, + // -app_A gets a scheduling chance, + // -- when delayMode is ALWAYS_ON, passes + // -- when delayMode is THRESHOLD, allocates 1GB OFF_SWITCH + assignment = root.assignContainers(clusterResource, node10, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, queueA, nodes, apps); + applyCSAssignment(clusterResource, assignment, queueB, nodes, apps); + if (delayMode == LocalityDelayMode.ALWAYS_ON) { + verifyNoContainerAllocated(assignment); + assertEquals(3, + appAThreshold.getSchedulingOpportunities(schedulerKey)); + assertEquals(0, + appBConsumeResources.getSchedulingOpportunities(schedulerKey)); + } else if (delayMode == LocalityDelayMode.THRESHOLD) { + verifyContainerAllocated(assignment, NodeType.OFF_SWITCH); + Assert.assertTrue(Resources.equals(assignment.getResource(), + Resource.newInstance(1 * GB, 1))); + assertEquals(3, + appAThreshold.getSchedulingOpportunities(schedulerKey)); + assertEquals(0, + appBConsumeResources.getSchedulingOpportunities(schedulerKey)); + } else { + // If a new test has been added, then it must be handled here + assertTrue(false); + } + } + } }