diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml
index 6ac726e..dd53536 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml
@@ -102,9 +102,27 @@
40
Number of missed scheduling opportunities after which the CapacityScheduler
- attempts to schedule rack-local containers.
- Typically this should be set to number of nodes in the cluster, By default is setting
- approximately number of nodes in one rack which is 40.
+ attempts to schedule rack-local containers.
+ When setting this parameter, the size of the cluster should be taken into account.
+ We use 40 as the default value, which is approximately the number of nodes in one rack.
+
+
+
+
+ yarn.scheduler.capacity.rack-locality-additional-delay
+ -1
+
+ Number of additional missed scheduling opportunities over the node-locality-delay
+ ones, after which the CapacityScheduler attempts to schedule off-switch containers,
+ instead of rack-local ones.
+ Example: with node-locality-delay=40 and rack-locality-delay=20, the scheduler will
+ attempt rack-local assignments after 40 missed opportunities, and off-switch assignments
+ after 40+20=60 missed opportunities.
+ When setting this parameter, the size of the cluster should be taken into account.
+ We use -1 as the default value, which disables this feature. In this case, the number
+ of missed opportunities for assigning off-switch containers is calculated based on
+ the number of containers and unique locations specified in the resource request,
+ as well as the size of the cluster.
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index cea5aa4..1040857 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -187,6 +187,13 @@
public static final int DEFAULT_NODE_LOCALITY_DELAY = 40;
@Private
+ public static final String RACK_LOCALITY_ADDITIONAL_DELAY =
+ PREFIX + "rack-locality-additional-delay";
+
+ @Private
+ public static final int DEFAULT_RACK_LOCALITY_ADDITIONAL_DELAY = -1;
+
+ @Private
public static final String RACK_LOCALITY_FULL_RESET =
PREFIX + "rack-locality-full-reset";
@@ -738,6 +745,11 @@ public int getNodeLocalityDelay() {
return getInt(NODE_LOCALITY_DELAY, DEFAULT_NODE_LOCALITY_DELAY);
}
+ public int getRackLocalityAdditionalDelay() {
+ return getInt(RACK_LOCALITY_ADDITIONAL_DELAY,
+ DEFAULT_RACK_LOCALITY_ADDITIONAL_DELAY);
+ }
+
public boolean getRackLocalityFullReset() {
return getBoolean(RACK_LOCALITY_FULL_RESET,
DEFAULT_RACK_LOCALITY_FULL_RESET);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 77f10b9..ede7229 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -98,6 +98,7 @@
private float maxAMResourcePerQueuePercent;
private volatile int nodeLocalityDelay;
+ private volatile int rackLocalityAdditionalDelay;
private volatile boolean rackLocalityFullReset;
Map applicationAttemptMap =
@@ -207,6 +208,7 @@ protected synchronized void setupQueueConfigs(Resource clusterResource)
}
nodeLocalityDelay = conf.getNodeLocalityDelay();
+ rackLocalityAdditionalDelay = conf.getRackLocalityAdditionalDelay();
rackLocalityFullReset = conf.getRackLocalityFullReset();
// re-init this since max allocation could have changed
@@ -272,6 +274,8 @@ protected synchronized void setupQueueConfigs(Resource clusterResource)
"acls = " + aclsString +
" [= configuredAcls ]" + "\n" +
"nodeLocalityDelay = " + nodeLocalityDelay + "\n" +
+ "rackLocalityAdditionalDelay = " +
+ rackLocalityAdditionalDelay + "\n" +
"labels=" + labelStrBuilder.toString() + "\n" +
"reservationsContinueLooking = " +
reservationsContinueLooking + "\n" +
@@ -1151,6 +1155,11 @@ public int getNodeLocalityDelay() {
}
@Lock(NoLock.class)
+ public int getRackLocalityAdditionalDelay() {
+ return rackLocalityAdditionalDelay;
+ }
+
+ @Lock(NoLock.class)
public boolean getRackLocalityFullReset() {
return rackLocalityFullReset;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
index 119a732..35102c1 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
@@ -232,29 +232,46 @@ private int getActualNodeLocalityDelay() {
.getCSLeafQueue().getNodeLocalityDelay());
}
+ private int getActualRackLocalityDelay() {
+ return Math.min(rmContext.getScheduler().getNumClusterNodes(),
+ application.getCSLeafQueue().getNodeLocalityDelay()
+ + application.getCSLeafQueue().getRackLocalityAdditionalDelay());
+ }
+
private boolean canAssign(Priority priority, FiCaSchedulerNode node,
NodeType type, RMContainer reservedContainer) {
-
// Clearly we need containers for this application...
if (type == NodeType.OFF_SWITCH) {
if (reservedContainer != null) {
return true;
}
+ // If there are no nodes in the cluster, return false.
+ if (rmContext.getScheduler().getNumClusterNodes() == 0) {
+ return false;
+ }
+ // If we have only ANY requests for this schedulerKey, we should not
+ // delay its scheduling.
+ if (application.getResourceRequests(priority).size() == 1) {
+ return true;
+ }
// 'Delay' off-switch
ResourceRequest offSwitchRequest =
application.getResourceRequest(priority, ResourceRequest.ANY);
long missedOpportunities = application.getSchedulingOpportunities(priority);
- long requiredContainers = offSwitchRequest.getNumContainers();
-
- float localityWaitFactor =
- getLocalityWaitFactor(priority, rmContext.getScheduler()
- .getNumClusterNodes());
- // Cap the delay by the number of nodes in the cluster. Under most conditions
- // this means we will consider each node in the cluster before
- // accepting an off-switch assignment.
- return (Math.min(rmContext.getScheduler().getNumClusterNodes(),
- (requiredContainers * localityWaitFactor)) < missedOpportunities);
+
+ // If rack locality additional delay parameter is enabled.
+ if (application.getCSLeafQueue().getRackLocalityAdditionalDelay() > -1) {
+ return missedOpportunities > getActualRackLocalityDelay();
+ } else {
+ long requiredContainers = offSwitchRequest.getNumContainers();
+ float localityWaitFactor =
+ getLocalityWaitFactor(priority, rmContext.getScheduler()
+ .getNumClusterNodes());
+ // Cap the delay by the number of nodes in the cluster.
+ return (Math.min(rmContext.getScheduler().getNumClusterNodes(),
+ (requiredContainers * localityWaitFactor)) < missedOpportunities);
+ }
}
// Check if we need containers on this rack
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 0eccdfc..fadfa7f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -18,6 +18,29 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CyclicBarrier;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
@@ -73,30 +96,7 @@
import org.mockito.Matchers;
import org.mockito.Mockito;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.ConcurrentModificationException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CyclicBarrier;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-
-public class TestLeafQueue {
+public class TestLeafQueue {
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
private static final Log LOG = LogFactory.getLog(TestLeafQueue.class);
@@ -1892,6 +1892,142 @@ public void testLocalityScheduling() throws Exception {
}
@Test
+ public void testRackLocalityDelayScheduling() throws Exception {
+
+ // Change parameter values for node locality and rack locality delay.
+ csConf.setInt(CapacitySchedulerConfiguration.NODE_LOCALITY_DELAY, 2);
+ csConf.setInt(
+ CapacitySchedulerConfiguration.RACK_LOCALITY_ADDITIONAL_DELAY, 1);
+ Map newQueues = new HashMap();
+
+ CSQueue newRoot = CapacityScheduler.parseQueue(csContext, csConf, null,
+ CapacitySchedulerConfiguration.ROOT, newQueues, queues,
+ TestUtils.spyHook);
+ queues = newQueues;
+ root.reinitialize(newRoot, cs.getClusterResource());
+
+ // Manipulate queue 'b'
+ LeafQueue a = stubLeafQueue((LeafQueue) queues.get(B));
+
+ // Check locality parameters.
+ assertEquals(2, a.getNodeLocalityDelay());
+ assertEquals(1, a.getRackLocalityAdditionalDelay());
+
+ // User
+ String user1 = "user_1";
+
+ // Submit applications
+ final ApplicationAttemptId appAttemptId1 =
+ TestUtils.getMockApplicationAttemptId(0, 0);
+ FiCaSchedulerApp app1 = new FiCaSchedulerApp(appAttemptId1, user1, a,
+ mock(ActiveUsersManager.class), spyRMContext);
+ a.submitApplicationAttempt(app1, user1);
+
+ // Setup some nodes and racks
+ String host1 = "127.0.0.1";
+ String host2 = "127.0.0.2";
+ String host3 = "127.0.0.3";
+ String host4 = "127.0.0.4";
+ String rack1 = "rack_1";
+ String rack2 = "rack_2";
+ String rack3 = "rack_3";
+ FiCaSchedulerNode node2 = TestUtils.getMockNode(host3, rack2, 0, 8 * GB);
+ FiCaSchedulerNode node3 = TestUtils.getMockNode(host4, rack3, 0, 8 * GB);
+
+ final int numNodes = 5;
+ Resource clusterResource =
+ Resources.createResource(numNodes * (8 * GB), numNodes * 16);
+ when(spyRMContext.getScheduler().getNumClusterNodes()).thenReturn(numNodes);
+
+ // Setup resource-requests and submit
+ Priority priority = TestUtils.createMockPriority(1);
+ List app1Requests1 = new ArrayList();
+ app1Requests1.add(TestUtils.createResourceRequest(host1, 1 * GB, 1,
+ true, priority, recordFactory));
+ app1Requests1.add(TestUtils.createResourceRequest(rack1, 1 * GB, 1,
+ true, priority, recordFactory));
+ app1Requests1.add(TestUtils.createResourceRequest(host2, 1 * GB, 1,
+ true, priority, recordFactory));
+ app1Requests1.add(TestUtils.createResourceRequest(rack2, 1 * GB, 1,
+ true, priority, recordFactory));
+ // Adding one extra in the ANY.
+ app1Requests1.add(TestUtils.createResourceRequest(ResourceRequest.ANY,
+ 1 * GB, 3, true, priority, recordFactory));
+ app1.updateResourceRequests(app1Requests1);
+
+ // Start testing...
+ CSAssignment assignment = null;
+
+ assertEquals(3, app1.getTotalRequiredResources(priority));
+
+ // No rack-local yet.
+ assignment = a.assignContainers(clusterResource, node2,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ verifyNoContainerAllocated(assignment);
+ assertEquals(1, app1.getSchedulingOpportunities(priority));
+ assertEquals(3, app1.getTotalRequiredResources(priority));
+ assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
+
+ // Still no rack-local.
+ assignment = a.assignContainers(clusterResource, node2,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ verifyNoContainerAllocated(assignment);
+ assertEquals(2, app1.getSchedulingOpportunities(priority));
+ assertEquals(3, app1.getTotalRequiredResources(priority));
+ assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
+
+ // Rack local now.
+ assignment = a.assignContainers(clusterResource, node2,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ verifyContainerAllocated(assignment, NodeType.RACK_LOCAL);
+ assertEquals(0, app1.getSchedulingOpportunities(priority));
+ assertEquals(2, app1.getTotalRequiredResources(priority));
+
+ // No off-switch until 3 missed opportunities.
+ a.assignContainers(clusterResource, node3,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ a.assignContainers(clusterResource, node3,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ assignment = a.assignContainers(clusterResource, node3,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ assertEquals(3, app1.getSchedulingOpportunities(priority));
+ assertEquals(2, app1.getTotalRequiredResources(priority));
+ assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
+
+ // Now off-switch should succeed.
+ assignment = a.assignContainers(clusterResource, node3,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ verifyContainerAllocated(assignment, NodeType.OFF_SWITCH);
+ assertEquals(4, app1.getSchedulingOpportunities(priority));
+ assertEquals(1, app1.getTotalRequiredResources(priority));
+
+ // Check capping by number of cluster nodes.
+ doReturn(10).when(a).getRackLocalityAdditionalDelay();
+ // Off-switch will happen at 6 missed opportunities now, since cluster size
+ // is 5.
+ assignment = a.assignContainers(clusterResource, node3,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ verifyNoContainerAllocated(assignment);
+ assertEquals(5, app1.getSchedulingOpportunities(priority));
+ assertEquals(1, app1.getTotalRequiredResources(priority));
+ assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
+ assignment = a.assignContainers(clusterResource, node3,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ verifyContainerAllocated(assignment, NodeType.OFF_SWITCH);
+ assertEquals(6, app1.getSchedulingOpportunities(priority));
+ assertEquals(0, app1.getTotalRequiredResources(priority));
+ }
+
+ @Test
public void testApplicationPriorityScheduling() throws Exception {
// Manipulate queue 'a'
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
@@ -2170,16 +2306,18 @@ public void testActivateApplicationAfterQueueRefresh() throws Exception {
}
@Test (timeout = 30000)
- public void testNodeLocalityAfterQueueRefresh() throws Exception {
+ public void testLocalityDelaysAfterQueueRefresh() throws Exception {
// Manipulate queue 'e'
LeafQueue e = stubLeafQueue((LeafQueue)queues.get(E));
// before reinitialization
assertEquals(40, e.getNodeLocalityDelay());
+ assertEquals(-1, e.getRackLocalityAdditionalDelay());
- csConf.setInt(CapacitySchedulerConfiguration
- .NODE_LOCALITY_DELAY, 60);
+ csConf.setInt(CapacitySchedulerConfiguration.NODE_LOCALITY_DELAY, 60);
+ csConf.setInt(
+ CapacitySchedulerConfiguration.RACK_LOCALITY_ADDITIONAL_DELAY, 600);
Map newQueues = new HashMap();
CSQueue newRoot =
CapacityScheduler.parseQueue(csContext, csConf, null,
@@ -2191,6 +2329,7 @@ public void testNodeLocalityAfterQueueRefresh() throws Exception {
// after reinitialization
assertEquals(60, e.getNodeLocalityDelay());
+ assertEquals(600, e.getRackLocalityAdditionalDelay());
}
@Test (timeout = 30000)