diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml index 47db01f..53b6232 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml @@ -111,9 +111,23 @@ 40 Number of missed scheduling opportunities after which the CapacityScheduler - attempts to schedule rack-local containers. - Typically this should be set to number of nodes in the cluster, By default is setting - approximately number of nodes in one rack which is 40. + attempts to schedule rack-local containers. + When setting this parameter, the size of the cluster should be taken into account. + We use 40 as the default value, which is approximately the number of nodes in one rack. + + + + + yarn.scheduler.capacity.rack-locality-delay + -1 + + Number of missed scheduling opportunities after which the CapacityScheduler + attempts to schedule off-switch containers. + When setting this parameter, the size of the cluster should be taken into account. + We use -1 as the default value, which disables this feature. In this case, the number + of missed opportunities for assigning off-switch containers is calculated based on + the number of containers and unique locations specified in the resource request, + as well as the size of the cluster. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 91e29d5..6a6ecc8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -1304,6 +1304,9 @@ protected void setAttemptRecovering(boolean isRecovering) { return appSchedulingInfo.getSchedulingPlacementSet(schedulerRequestKey); } + public Map getResourceRequests(SchedulerRequestKey schedulerRequestKey) { + return appSchedulingInfo.getSchedulingPlacementSet(schedulerRequestKey).getResourceRequests(); + } public void incUnconfirmedRes(Resource res) { unconfirmedAllocatedMem.addAndGet(res.getMemorySize()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 43ec390..be69bb5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -198,6 +198,13 @@ public static final int DEFAULT_NODE_LOCALITY_DELAY = 40; @Private + public static final String RACK_LOCALITY_DELAY = + PREFIX + "rack-locality-delay"; + + @Private + public static final int DEFAULT_RACK_LOCALITY_DELAY = -1; + + @Private public static final String RACK_LOCALITY_FULL_RESET = PREFIX + "rack-locality-full-reset"; @@ -829,6 +836,10 @@ public int getNodeLocalityDelay() { return getInt(NODE_LOCALITY_DELAY, DEFAULT_NODE_LOCALITY_DELAY); } + public int getRackLocalityDelay() { + return getInt(RACK_LOCALITY_DELAY, DEFAULT_RACK_LOCALITY_DELAY); + } + public boolean getRackLocalityFullReset() { return getBoolean(RACK_LOCALITY_FULL_RESET, DEFAULT_RACK_LOCALITY_FULL_RESET); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 1b20556..2b38f89 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -19,7 +19,16 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.lang.StringUtils; @@ -43,26 +52,25 @@ import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.security.AccessType; -import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; - +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal; @@ -95,6 +103,7 @@ private float maxAMResourcePerQueuePercent; private volatile int nodeLocalityDelay; + private volatile int rackLocalityDelay; private volatile boolean rackLocalityFullReset; Map applicationAttemptMap = @@ -215,6 +224,7 @@ protected void setupQueueConfigs(Resource clusterResource) } nodeLocalityDelay = conf.getNodeLocalityDelay(); + rackLocalityDelay = conf.getRackLocalityDelay(); rackLocalityFullReset = conf.getRackLocalityFullReset(); // re-init this since max allocation could have changed @@ -271,9 +281,11 @@ protected void setupQueueConfigs(Resource clusterResource) + "numContainers = " + numContainers + " [= currentNumContainers ]" + "\n" + "state = " + getState() + " [= configuredState ]" + "\n" + "acls = " + aclsString - + " [= configuredAcls ]" + "\n" + "nodeLocalityDelay = " - + nodeLocalityDelay + "\n" + "labels=" + labelStrBuilder - .toString() + "\n" + "reservationsContinueLooking = " + + " [= configuredAcls ]" + "\n" + + "nodeLocalityDelay = " + nodeLocalityDelay + "\n" + + "rackLocalityDelay = " + rackLocalityDelay + "\n" + + "labels=" + labelStrBuilder.toString() + "\n" + + "reservationsContinueLooking = " + reservationsContinueLooking + "\n" + "preemptionDisabled = " + getPreemptionDisabled() + "\n" + "defaultAppPriorityPerQueue = " + defaultAppPriorityPerQueue + "\npriority = " + priority); @@ -1347,6 +1359,11 @@ public int getNodeLocalityDelay() { } @Lock(NoLock.class) + public int getRackLocalityDelay() { + return rackLocalityDelay; + } + + @Lock(NoLock.class) public boolean getRackLocalityFullReset() { return rackLocalityFullReset; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index 8078bcd..04c3314 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -33,27 +33,24 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; - import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; -import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; - -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState; - import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -278,6 +275,11 @@ private int getActualNodeLocalityDelay() { .getCSLeafQueue().getNodeLocalityDelay()); } + private int getActualRackLocalityDelay() { + return Math.min(rmContext.getScheduler().getNumClusterNodes(), application + .getCSLeafQueue().getRackLocalityDelay()); + } + private boolean canAssign(SchedulerRequestKey schedulerKey, FiCaSchedulerNode node, NodeType type, RMContainer reservedContainer) { @@ -287,25 +289,33 @@ private boolean canAssign(SchedulerRequestKey schedulerKey, return true; } + // If we have only ANY requests for this schedulerKey, we should not + // delay its scheduling. + if (application.getResourceRequests(schedulerKey).size() == 1) { + return true; + } + // 'Delay' off-switch long missedOpportunities = application.getSchedulingOpportunities(schedulerKey); - long requiredContainers = application.getOutstandingAsksCount( - schedulerKey); - float localityWaitFactor = - getLocalityWaitFactor(schedulerKey, rmContext.getScheduler() - .getNumClusterNodes()); - // Cap the delay by the number of nodes in the cluster. Under most - // conditions this means we will consider each node in the cluster before - // accepting an off-switch assignment. - return (Math.min(rmContext.getScheduler().getNumClusterNodes(), - (requiredContainers * localityWaitFactor)) < missedOpportunities); + // If rack locality delay parameter is enabled. + if (getActualRackLocalityDelay() > -1) { + return missedOpportunities > getActualRackLocalityDelay(); + } else { + long requiredContainers = + application.getOutstandingAsksCount(schedulerKey); + float localityWaitFactor = getLocalityWaitFactor(schedulerKey, + rmContext.getScheduler().getNumClusterNodes()); + // Cap the delay by the number of nodes in the cluster. + return (Math.min(rmContext.getScheduler().getNumClusterNodes(), + (requiredContainers * localityWaitFactor)) < missedOpportunities); + } } // Check if we need containers on this rack - if (application.getOutstandingAsksCount(schedulerKey, node.getRackName()) - <= 0) { + if (application.getOutstandingAsksCount(schedulerKey, + node.getRackName()) <= 0) { return false; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 3fbbae3..1ceea77 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils.toSchedulerKey; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -41,7 +42,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CyclicBarrier; -import com.google.common.collect.ImmutableMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.security.UserGroupInformation; @@ -77,7 +77,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; -import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; @@ -91,6 +90,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -102,7 +102,7 @@ import org.mockito.Matchers; import org.mockito.Mockito; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils.toSchedulerKey; +import com.google.common.collect.ImmutableMap; public class TestLeafQueue { private final RecordFactory recordFactory = @@ -2109,6 +2109,153 @@ public void testLocalityScheduling() throws Exception { } @Test + public void testRackLocalityDelayScheduling() throws Exception { + + // Change parameter values for node locality and rack locality delay. + csConf.setInt(CapacitySchedulerConfiguration.NODE_LOCALITY_DELAY, 2); + csConf.setInt(CapacitySchedulerConfiguration.RACK_LOCALITY_DELAY, 3); + Map newQueues = new HashMap(); + CSQueue newRoot = CapacitySchedulerQueueManager.parseQueue(csContext, + csConf, null, CapacitySchedulerConfiguration.ROOT, newQueues, queues, + TestUtils.spyHook); + queues = newQueues; + root.reinitialize(newRoot, cs.getClusterResource()); + + // Manipulate queue 'b' + LeafQueue a = stubLeafQueue((LeafQueue) queues.get(B)); + + // Check locality parameters. + assertEquals(2, a.getNodeLocalityDelay()); + assertEquals(3, a.getRackLocalityDelay()); + + // User + String user_0 = "user_0"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), spyRMContext); + a.submitApplicationAttempt(app_0, user_0); + + // Setup some nodes and racks + String host_0 = "127.0.0.1"; + String host_1 = "127.0.0.2"; + String host_2 = "127.0.0.3"; + String host_3 = "127.0.0.4"; + String rack_0 = "rack_0"; + String rack_1 = "rack_1"; + String rack_2 = "rack_2"; + FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_1, 0, 8 * GB); + FiCaSchedulerNode node_3 = TestUtils.getMockNode(host_3, rack_2, 0, 8 * GB); + + Map apps = + ImmutableMap.of(app_0.getApplicationAttemptId(), app_0); + Map nodes = + ImmutableMap.of(node_2.getNodeID(), node_2, node_3.getNodeID(), node_3); + + final int numNodes = 5; + Resource clusterResource = + Resources.createResource(numNodes * (8 * GB), numNodes * 16); + when(spyRMContext.getScheduler().getNumClusterNodes()).thenReturn(numNodes); + + // Setup resource-requests and submit + Priority priority = TestUtils.createMockPriority(1); + List app_0_requests_0 = new ArrayList(); + app_0_requests_0.add(TestUtils.createResourceRequest(host_0, 1 * GB, 1, + true, priority, recordFactory)); + app_0_requests_0.add(TestUtils.createResourceRequest(rack_0, 1 * GB, 1, + true, priority, recordFactory)); + app_0_requests_0.add(TestUtils.createResourceRequest(host_1, 1 * GB, 1, + true, priority, recordFactory)); + app_0_requests_0.add(TestUtils.createResourceRequest(rack_1, 1 * GB, 1, + true, priority, recordFactory)); + // Adding one extra in the ANY. + app_0_requests_0.add(TestUtils.createResourceRequest(ResourceRequest.ANY, + 1 * GB, 3, true, priority, recordFactory)); + app_0.updateResourceRequests(app_0_requests_0); + + // Start testing... + CSAssignment assignment = null; + + SchedulerRequestKey schedulerKey = toSchedulerKey(priority); + assertEquals(3, app_0.getOutstandingAsksCount(schedulerKey)); + + // No rack-local yet. + assignment = a.assignContainers(clusterResource, node_2, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, a, nodes, apps); + verifyNoContainerAllocated(assignment); + assertEquals(1, app_0.getSchedulingOpportunities(schedulerKey)); + assertEquals(3, app_0.getOutstandingAsksCount(schedulerKey)); + assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL + + // Still no rack-local. + assignment = a.assignContainers(clusterResource, node_2, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, a, nodes, apps); + assertEquals(2, app_0.getSchedulingOpportunities(schedulerKey)); + assertEquals(3, app_0.getOutstandingAsksCount(schedulerKey)); + assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL + + // Rack local now. + assignment = a.assignContainers(clusterResource, node_2, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, a, nodes, apps); + assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey)); + assertEquals(2, app_0.getOutstandingAsksCount(schedulerKey)); + assertEquals(NodeType.RACK_LOCAL, assignment.getType()); + + // No off-switch until 3 missed opportunities. + a.assignContainers(clusterResource, node_3, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, a, nodes, apps); + a.assignContainers(clusterResource, node_3, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, a, nodes, apps); + assignment = a.assignContainers(clusterResource, node_3, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, a, nodes, apps); + assertEquals(3, app_0.getSchedulingOpportunities(schedulerKey)); + assertEquals(2, app_0.getOutstandingAsksCount(schedulerKey)); + assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL + + // Now off-switch should succeed. + assignment = a.assignContainers(clusterResource, node_3, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, a, nodes, apps); + assertEquals(4, app_0.getSchedulingOpportunities(schedulerKey)); + assertEquals(1, app_0.getOutstandingAsksCount(schedulerKey)); + assertEquals(NodeType.OFF_SWITCH, assignment.getType()); + + // Check capping by number of cluster nodes. + doReturn(10).when(a).getRackLocalityDelay(); + // Off-switch will happen at 6 missed opportunities now, since cluster size + // is 5. + assignment = a.assignContainers(clusterResource, node_3, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, a, nodes, apps); + assertEquals(5, app_0.getSchedulingOpportunities(schedulerKey)); + assertEquals(1, app_0.getOutstandingAsksCount(schedulerKey)); + assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL + assignment = a.assignContainers(clusterResource, node_3, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, a, nodes, apps); + assertEquals(6, app_0.getSchedulingOpportunities(schedulerKey)); + assertEquals(0, app_0.getOutstandingAsksCount(schedulerKey)); + assertEquals(NodeType.OFF_SWITCH, assignment.getType()); + } + + @Test public void testApplicationPriorityScheduling() throws Exception { // Manipulate queue 'a' LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); @@ -2420,9 +2567,10 @@ public void testNodeLocalityAfterQueueRefresh() throws Exception { // before reinitialization assertEquals(40, e.getNodeLocalityDelay()); + assertEquals(-1, e.getRackLocalityDelay()); - csConf.setInt(CapacitySchedulerConfiguration - .NODE_LOCALITY_DELAY, 60); + csConf.setInt(CapacitySchedulerConfiguration.NODE_LOCALITY_DELAY, 60); + csConf.setInt(CapacitySchedulerConfiguration.RACK_LOCALITY_DELAY, 600); Map newQueues = new HashMap(); CSQueue newRoot = CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null, @@ -2434,6 +2582,7 @@ public void testNodeLocalityAfterQueueRefresh() throws Exception { // after reinitialization assertEquals(60, e.getNodeLocalityDelay()); + assertEquals(600, e.getRackLocalityDelay()); } @Test (timeout = 30000)