diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml
index 47db01f..53b6232 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml
@@ -111,9 +111,23 @@
40
Number of missed scheduling opportunities after which the CapacityScheduler
- attempts to schedule rack-local containers.
- Typically this should be set to number of nodes in the cluster, By default is setting
- approximately number of nodes in one rack which is 40.
+ attempts to schedule rack-local containers.
+ When setting this parameter, the size of the cluster should be taken into account.
+ We use 40 as the default value, which is approximately the number of nodes in one rack.
+
+
+
+
+ yarn.scheduler.capacity.rack-locality-delay
+ -1
+
+ Number of missed scheduling opportunities after which the CapacityScheduler
+ attempts to schedule off-switch containers.
+ When setting this parameter, the size of the cluster should be taken into account.
+ We use -1 as the default value, which disables this feature. In this case, the number
+ of missed opportunities for assigning off-switch containers is calculated based on
+ the number of containers and unique locations specified in the resource request,
+ as well as the size of the cluster.
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index 91e29d5..6a6ecc8 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -1304,6 +1304,9 @@ protected void setAttemptRecovering(boolean isRecovering) {
return appSchedulingInfo.getSchedulingPlacementSet(schedulerRequestKey);
}
+ public Map getResourceRequests(SchedulerRequestKey schedulerRequestKey) {
+ return appSchedulingInfo.getSchedulingPlacementSet(schedulerRequestKey).getResourceRequests();
+ }
public void incUnconfirmedRes(Resource res) {
unconfirmedAllocatedMem.addAndGet(res.getMemorySize());
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index 43ec390..be69bb5 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -198,6 +198,13 @@
public static final int DEFAULT_NODE_LOCALITY_DELAY = 40;
@Private
+ public static final String RACK_LOCALITY_DELAY =
+ PREFIX + "rack-locality-delay";
+
+ @Private
+ public static final int DEFAULT_RACK_LOCALITY_DELAY = -1;
+
+ @Private
public static final String RACK_LOCALITY_FULL_RESET =
PREFIX + "rack-locality-full-reset";
@@ -829,6 +836,10 @@ public int getNodeLocalityDelay() {
return getInt(NODE_LOCALITY_DELAY, DEFAULT_NODE_LOCALITY_DELAY);
}
+ public int getRackLocalityDelay() {
+ return getInt(RACK_LOCALITY_DELAY, DEFAULT_RACK_LOCALITY_DELAY);
+ }
+
public boolean getRackLocalityFullReset() {
return getBoolean(RACK_LOCALITY_FULL_RESET,
DEFAULT_RACK_LOCALITY_FULL_RESET);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 1b20556..2b38f89 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -19,7 +19,16 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang.StringUtils;
@@ -43,26 +52,25 @@
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.security.AccessType;
-import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
-
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal;
@@ -95,6 +103,7 @@
private float maxAMResourcePerQueuePercent;
private volatile int nodeLocalityDelay;
+ private volatile int rackLocalityDelay;
private volatile boolean rackLocalityFullReset;
Map applicationAttemptMap =
@@ -215,6 +224,7 @@ protected void setupQueueConfigs(Resource clusterResource)
}
nodeLocalityDelay = conf.getNodeLocalityDelay();
+ rackLocalityDelay = conf.getRackLocalityDelay();
rackLocalityFullReset = conf.getRackLocalityFullReset();
// re-init this since max allocation could have changed
@@ -271,9 +281,11 @@ protected void setupQueueConfigs(Resource clusterResource)
+ "numContainers = " + numContainers
+ " [= currentNumContainers ]" + "\n" + "state = " + getState()
+ " [= configuredState ]" + "\n" + "acls = " + aclsString
- + " [= configuredAcls ]" + "\n" + "nodeLocalityDelay = "
- + nodeLocalityDelay + "\n" + "labels=" + labelStrBuilder
- .toString() + "\n" + "reservationsContinueLooking = "
+ + " [= configuredAcls ]" + "\n"
+ + "nodeLocalityDelay = " + nodeLocalityDelay + "\n"
+ + "rackLocalityDelay = " + rackLocalityDelay + "\n"
+ + "labels=" + labelStrBuilder.toString() + "\n"
+ + "reservationsContinueLooking = "
+ reservationsContinueLooking + "\n" + "preemptionDisabled = "
+ getPreemptionDisabled() + "\n" + "defaultAppPriorityPerQueue = "
+ defaultAppPriorityPerQueue + "\npriority = " + priority);
@@ -1347,6 +1359,11 @@ public int getNodeLocalityDelay() {
}
@Lock(NoLock.class)
+ public int getRackLocalityDelay() {
+ return rackLocalityDelay;
+ }
+
+ @Lock(NoLock.class)
public boolean getRackLocalityFullReset() {
return rackLocalityFullReset;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
index 8078bcd..04c3314 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
@@ -33,27 +33,24 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
-import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
-
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
-
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -278,6 +275,11 @@ private int getActualNodeLocalityDelay() {
.getCSLeafQueue().getNodeLocalityDelay());
}
+ private int getActualRackLocalityDelay() {
+ return Math.min(rmContext.getScheduler().getNumClusterNodes(), application
+ .getCSLeafQueue().getRackLocalityDelay());
+ }
+
private boolean canAssign(SchedulerRequestKey schedulerKey,
FiCaSchedulerNode node, NodeType type, RMContainer reservedContainer) {
@@ -287,25 +289,33 @@ private boolean canAssign(SchedulerRequestKey schedulerKey,
return true;
}
+ // If we have only ANY requests for this schedulerKey, we should not
+ // delay its scheduling.
+ if (application.getResourceRequests(schedulerKey).size() == 1) {
+ return true;
+ }
+
// 'Delay' off-switch
long missedOpportunities =
application.getSchedulingOpportunities(schedulerKey);
- long requiredContainers = application.getOutstandingAsksCount(
- schedulerKey);
- float localityWaitFactor =
- getLocalityWaitFactor(schedulerKey, rmContext.getScheduler()
- .getNumClusterNodes());
- // Cap the delay by the number of nodes in the cluster. Under most
- // conditions this means we will consider each node in the cluster before
- // accepting an off-switch assignment.
- return (Math.min(rmContext.getScheduler().getNumClusterNodes(),
- (requiredContainers * localityWaitFactor)) < missedOpportunities);
+ // If rack locality delay parameter is enabled.
+ if (getActualRackLocalityDelay() > -1) {
+ return missedOpportunities > getActualRackLocalityDelay();
+ } else {
+ long requiredContainers =
+ application.getOutstandingAsksCount(schedulerKey);
+ float localityWaitFactor = getLocalityWaitFactor(schedulerKey,
+ rmContext.getScheduler().getNumClusterNodes());
+ // Cap the delay by the number of nodes in the cluster.
+ return (Math.min(rmContext.getScheduler().getNumClusterNodes(),
+ (requiredContainers * localityWaitFactor)) < missedOpportunities);
+ }
}
// Check if we need containers on this rack
- if (application.getOutstandingAsksCount(schedulerKey, node.getRackName())
- <= 0) {
+ if (application.getOutstandingAsksCount(schedulerKey,
+ node.getRackName()) <= 0) {
return false;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 3fbbae3..1ceea77 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils.toSchedulerKey;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -41,7 +42,6 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CyclicBarrier;
-import com.google.common.collect.ImmutableMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
@@ -77,7 +77,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
-import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
@@ -91,6 +90,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -102,7 +102,7 @@
import org.mockito.Matchers;
import org.mockito.Mockito;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils.toSchedulerKey;
+import com.google.common.collect.ImmutableMap;
public class TestLeafQueue {
private final RecordFactory recordFactory =
@@ -2109,6 +2109,153 @@ public void testLocalityScheduling() throws Exception {
}
@Test
+ public void testRackLocalityDelayScheduling() throws Exception {
+
+ // Change parameter values for node locality and rack locality delay.
+ csConf.setInt(CapacitySchedulerConfiguration.NODE_LOCALITY_DELAY, 2);
+ csConf.setInt(CapacitySchedulerConfiguration.RACK_LOCALITY_DELAY, 3);
+ Map newQueues = new HashMap();
+ CSQueue newRoot = CapacitySchedulerQueueManager.parseQueue(csContext,
+ csConf, null, CapacitySchedulerConfiguration.ROOT, newQueues, queues,
+ TestUtils.spyHook);
+ queues = newQueues;
+ root.reinitialize(newRoot, cs.getClusterResource());
+
+ // Manipulate queue 'b'
+ LeafQueue a = stubLeafQueue((LeafQueue) queues.get(B));
+
+ // Check locality parameters.
+ assertEquals(2, a.getNodeLocalityDelay());
+ assertEquals(3, a.getRackLocalityDelay());
+
+ // User
+ String user_0 = "user_0";
+
+ // Submit applications
+ final ApplicationAttemptId appAttemptId_0 =
+ TestUtils.getMockApplicationAttemptId(0, 0);
+ FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
+ mock(ActiveUsersManager.class), spyRMContext);
+ a.submitApplicationAttempt(app_0, user_0);
+
+ // Setup some nodes and racks
+ String host_0 = "127.0.0.1";
+ String host_1 = "127.0.0.2";
+ String host_2 = "127.0.0.3";
+ String host_3 = "127.0.0.4";
+ String rack_0 = "rack_0";
+ String rack_1 = "rack_1";
+ String rack_2 = "rack_2";
+ FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_1, 0, 8 * GB);
+ FiCaSchedulerNode node_3 = TestUtils.getMockNode(host_3, rack_2, 0, 8 * GB);
+
+ Map apps =
+ ImmutableMap.of(app_0.getApplicationAttemptId(), app_0);
+ Map nodes =
+ ImmutableMap.of(node_2.getNodeID(), node_2, node_3.getNodeID(), node_3);
+
+ final int numNodes = 5;
+ Resource clusterResource =
+ Resources.createResource(numNodes * (8 * GB), numNodes * 16);
+ when(spyRMContext.getScheduler().getNumClusterNodes()).thenReturn(numNodes);
+
+ // Setup resource-requests and submit
+ Priority priority = TestUtils.createMockPriority(1);
+ List app_0_requests_0 = new ArrayList();
+ app_0_requests_0.add(TestUtils.createResourceRequest(host_0, 1 * GB, 1,
+ true, priority, recordFactory));
+ app_0_requests_0.add(TestUtils.createResourceRequest(rack_0, 1 * GB, 1,
+ true, priority, recordFactory));
+ app_0_requests_0.add(TestUtils.createResourceRequest(host_1, 1 * GB, 1,
+ true, priority, recordFactory));
+ app_0_requests_0.add(TestUtils.createResourceRequest(rack_1, 1 * GB, 1,
+ true, priority, recordFactory));
+ // Adding one extra in the ANY.
+ app_0_requests_0.add(TestUtils.createResourceRequest(ResourceRequest.ANY,
+ 1 * GB, 3, true, priority, recordFactory));
+ app_0.updateResourceRequests(app_0_requests_0);
+
+ // Start testing...
+ CSAssignment assignment = null;
+
+ SchedulerRequestKey schedulerKey = toSchedulerKey(priority);
+ assertEquals(3, app_0.getOutstandingAsksCount(schedulerKey));
+
+ // No rack-local yet.
+ assignment = a.assignContainers(clusterResource, node_2,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ applyCSAssignment(clusterResource, assignment, a, nodes, apps);
+ verifyNoContainerAllocated(assignment);
+ assertEquals(1, app_0.getSchedulingOpportunities(schedulerKey));
+ assertEquals(3, app_0.getOutstandingAsksCount(schedulerKey));
+ assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
+
+ // Still no rack-local.
+ assignment = a.assignContainers(clusterResource, node_2,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ applyCSAssignment(clusterResource, assignment, a, nodes, apps);
+ assertEquals(2, app_0.getSchedulingOpportunities(schedulerKey));
+ assertEquals(3, app_0.getOutstandingAsksCount(schedulerKey));
+ assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
+
+ // Rack local now.
+ assignment = a.assignContainers(clusterResource, node_2,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ applyCSAssignment(clusterResource, assignment, a, nodes, apps);
+ assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey));
+ assertEquals(2, app_0.getOutstandingAsksCount(schedulerKey));
+ assertEquals(NodeType.RACK_LOCAL, assignment.getType());
+
+ // No off-switch until 3 missed opportunities.
+ a.assignContainers(clusterResource, node_3,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ applyCSAssignment(clusterResource, assignment, a, nodes, apps);
+ a.assignContainers(clusterResource, node_3,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ applyCSAssignment(clusterResource, assignment, a, nodes, apps);
+ assignment = a.assignContainers(clusterResource, node_3,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ applyCSAssignment(clusterResource, assignment, a, nodes, apps);
+ assertEquals(3, app_0.getSchedulingOpportunities(schedulerKey));
+ assertEquals(2, app_0.getOutstandingAsksCount(schedulerKey));
+ assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
+
+ // Now off-switch should succeed.
+ assignment = a.assignContainers(clusterResource, node_3,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ applyCSAssignment(clusterResource, assignment, a, nodes, apps);
+ assertEquals(4, app_0.getSchedulingOpportunities(schedulerKey));
+ assertEquals(1, app_0.getOutstandingAsksCount(schedulerKey));
+ assertEquals(NodeType.OFF_SWITCH, assignment.getType());
+
+ // Check capping by number of cluster nodes.
+ doReturn(10).when(a).getRackLocalityDelay();
+ // Off-switch will happen at 6 missed opportunities now, since cluster size
+ // is 5.
+ assignment = a.assignContainers(clusterResource, node_3,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ applyCSAssignment(clusterResource, assignment, a, nodes, apps);
+ assertEquals(5, app_0.getSchedulingOpportunities(schedulerKey));
+ assertEquals(1, app_0.getOutstandingAsksCount(schedulerKey));
+ assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
+ assignment = a.assignContainers(clusterResource, node_3,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ applyCSAssignment(clusterResource, assignment, a, nodes, apps);
+ assertEquals(6, app_0.getSchedulingOpportunities(schedulerKey));
+ assertEquals(0, app_0.getOutstandingAsksCount(schedulerKey));
+ assertEquals(NodeType.OFF_SWITCH, assignment.getType());
+ }
+
+ @Test
public void testApplicationPriorityScheduling() throws Exception {
// Manipulate queue 'a'
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
@@ -2420,9 +2567,10 @@ public void testNodeLocalityAfterQueueRefresh() throws Exception {
// before reinitialization
assertEquals(40, e.getNodeLocalityDelay());
+ assertEquals(-1, e.getRackLocalityDelay());
- csConf.setInt(CapacitySchedulerConfiguration
- .NODE_LOCALITY_DELAY, 60);
+ csConf.setInt(CapacitySchedulerConfiguration.NODE_LOCALITY_DELAY, 60);
+ csConf.setInt(CapacitySchedulerConfiguration.RACK_LOCALITY_DELAY, 600);
Map newQueues = new HashMap();
CSQueue newRoot =
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
@@ -2434,6 +2582,7 @@ public void testNodeLocalityAfterQueueRefresh() throws Exception {
// after reinitialization
assertEquals(60, e.getNodeLocalityDelay());
+ assertEquals(600, e.getRackLocalityDelay());
}
@Test (timeout = 30000)