diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 7af1891..1900311 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -19,10 +19,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import java.io.Serializable; +import java.text.DecimalFormat; import java.util.Arrays; import java.util.Collection; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -52,6 +54,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -77,6 +80,7 @@ private Resource preemptedResources = Resources.createResource(0); private RMContainerComparator comparator = new RMContainerComparator(); private final Map preemptionMap = new HashMap(); + private Map> reservations = new HashMap<>(); /** * Delay scheduling: We often want to prioritize scheduling of node-local @@ -446,22 +450,54 @@ public Container createContainer( * in {@link FSSchedulerNode}.. */ private void reserve(Priority priority, FSSchedulerNode node, - Container container, boolean alreadyReserved) { - LOG.info("Making reservation: node=" + node.getNodeName() + - " app_id=" + getApplicationId()); - - if (!alreadyReserved) { - getMetrics().reserveResource(getUser(), container.getResource()); - RMContainer rmContainer = - super.reserve(node, priority, null, container); - node.reserveResource(this, priority, rmContainer); - } else { - RMContainer rmContainer = node.getReservedContainer(); - super.reserve(node, priority, rmContainer, container); - node.reserveResource(this, priority, rmContainer); + Container container, NodeType type, boolean alreadyReserved) { + + if (!reservationExceedsThreshold(node, type)) { + LOG.info("Making reservation: node=" + node.getNodeName() + + " app_id=" + getApplicationId()); + if (!alreadyReserved) { + getMetrics().reserveResource(getUser(), container.getResource()); + RMContainer rmContainer = + super.reserve(node, priority, null, container); + node.reserveResource(this, priority, rmContainer); + setReservation(node); + } else { + RMContainer rmContainer = node.getReservedContainer(); + super.reserve(node, priority, rmContainer, container); + node.reserveResource(this, priority, rmContainer); + setReservation(node); + } } } + private boolean reservationExceedsThreshold(FSSchedulerNode node, + NodeType type) { + // Only if not node-local + if (type != NodeType.NODE_LOCAL) { + int existingReservations = getNumReservations(node.getRackName(), + type == NodeType.OFF_SWITCH); + if (existingReservations >= 1) { + int totalAvailNodes = + (type == NodeType.OFF_SWITCH) ? scheduler.getNumClusterNodes() : + scheduler.getNumNodesInRack(node.getRackName()); + float currRatio = + (float) existingReservations / (float) totalAvailNodes; + if (currRatio >= scheduler.getAppReservationThreshold()) { + DecimalFormat df = new DecimalFormat(); + df.setMaximumFractionDigits(2); + LOG.info("Reservation Exceeds Threshold:" + + " app_id=" + getApplicationId() + + " existingReservations=" + existingReservations + + " totalAvailableNodes=" + totalAvailNodes + + " currentRatio=" + df.format(currRatio) + + " threshold=" + df.format( + scheduler.getAppReservationThreshold())); + return true; + } + } + } + return false; + } /** * Remove the reservation on {@code node} at the given {@link Priority}. * This dispatches SchedulerNode handlers as well. @@ -470,10 +506,47 @@ public void unreserve(Priority priority, FSSchedulerNode node) { RMContainer rmContainer = node.getReservedContainer(); unreserveInternal(priority, node); node.unreserveResource(this); + clearReservation(node); getMetrics().unreserveResource( getUser(), rmContainer.getContainer().getResource()); } + private synchronized void setReservation(SchedulerNode node) { + String rackName = node.getRackName() == null ? "NULL" : node.getRackName(); + Set rackReservations = reservations.get(rackName); + if (rackReservations == null) { + rackReservations = new HashSet<>(); + reservations.put(rackName, rackReservations); + } + rackReservations.add(node.getNodeName()); + } + + private synchronized void clearReservation(SchedulerNode node) { + String rackName = node.getRackName() == null ? "NULL" : node.getRackName(); + Set rackReservations = reservations.get(rackName); + if (rackReservations != null) { + rackReservations.remove(node.getNodeName()); + } + } + + int getNumReservations(String rackName, boolean isAny) { + int counter = 0; + if (isAny) { + for (Set nodes : reservations.values()) { + if (nodes != null) { + counter += nodes.size(); + } + } + } else { + Set nodes = reservations.get( + rackName == null ? "NULL" : rackName); + if (nodes != null) { + counter += nodes.size(); + } + } + return counter; + } + /** * Assign a container to this node to facilitate {@code request}. If node does * not have enough memory, create a reservation. This is called once we are @@ -545,7 +618,7 @@ private Resource assignContainer( if (isReservable(container)) { // The desired container won't fit here, so reserve - reserve(request.getPriority(), node, container, reserved); + reserve(request.getPriority(), node, container, type, reserved); return FairScheduler.CONTAINER_RESERVED; } else { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 33d01fc..48c13d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -26,6 +26,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -180,7 +181,13 @@ // Containers whose AMs have been warned that they will be preempted soon. private List warnedContainers = new ArrayList(); - + + private float appReservationThreshold; // percentage of available nodes + // an app can be reserve on + + // Count of number of nodes per rack + private Map nodesPerRack = new ConcurrentHashMap<>(); + protected boolean sizeBasedWeight; // Give larger weights to larger jobs protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not @@ -263,6 +270,14 @@ public FairSchedulerConfiguration getConf() { return conf; } + public int getNumNodesInRack(String rackName) { + String rName = rackName == null ? "NULL" : rackName; + if (nodesPerRack.containsKey(rName)) { + return nodesPerRack.get(rName); + } + return 0; + } + public QueueManager getQueueManager() { return queueMgr; } @@ -870,6 +885,12 @@ private synchronized void addNode(List containerReports, RMNode node) { FSSchedulerNode schedulerNode = new FSSchedulerNode(node, usePortForNodeName); nodes.put(node.getNodeID(), schedulerNode); + String rackName = node.getRackName() == null ? "NULL" : node.getRackName(); + if (nodesPerRack.containsKey(rackName)) { + nodesPerRack.put(rackName, nodesPerRack.get(rackName) + 1); + } else { + nodesPerRack.put(rackName, 1); + } Resources.addTo(clusterResource, node.getTotalCapability()); updateMaximumAllocation(schedulerNode, true); @@ -916,6 +937,11 @@ private synchronized void removeNode(RMNode rmNode) { } nodes.remove(rmNode.getNodeID()); + String rackName = node.getRackName() == null ? "NULL" : node.getRackName(); + if (nodesPerRack.containsKey(rackName) + && (nodesPerRack.get(rackName) > 0)) { + nodesPerRack.put(rackName, nodesPerRack.get(rackName) - 1); + } queueMgr.getRootQueue().setSteadyFairShare(clusterResource); queueMgr.getRootQueue().recomputeSteadyShares(); updateMaximumAllocation(node, false); @@ -1367,6 +1393,7 @@ private void initScheduler(Configuration conf) throws IOException { preemptionInterval = this.conf.getPreemptionInterval(); waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill(); usePortForNodeName = this.conf.getUsePortForNodeName(); + appReservationThreshold = this.conf.getAppReservationThreshold(); updateInterval = this.conf.getUpdateInterval(); if (updateInterval < 0) { @@ -1747,4 +1774,8 @@ protected void decreaseContainer( SchedulerApplicationAttempt attempt) { // TODO Auto-generated method stub } + + public float getAppReservationThreshold() { + return appReservationThreshold; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java index 892484d..186af48 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java @@ -137,6 +137,11 @@ CONF_PREFIX + "update-interval-ms"; public static final int DEFAULT_UPDATE_INTERVAL_MS = 500; + /** Ratio of nodes available for an app to make an reservation on. */ + public static final String APP_RESERVATION_THRESHOLD = + CONF_PREFIX + "app-reservation-threshold"; + public static final float APP_RESERVATION_THRESHOLD_DEFAULT = 0.50f; + public FairSchedulerConfiguration() { super(); } @@ -231,7 +236,7 @@ public boolean isEventLogEnabled() { public String getEventlogDir() { return get(EVENT_LOG_DIR, new File(System.getProperty("hadoop.log.dir", - "/tmp/")).getAbsolutePath() + File.separator + "fairscheduler"); + "/tmp/")).getAbsolutePath() + File.separator + "fairscheduler"); } public int getPreemptionInterval() { @@ -247,6 +252,11 @@ public boolean getUsePortForNodeName() { YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME); } + public float getAppReservationThreshold() { + return getFloat( + APP_RESERVATION_THRESHOLD, APP_RESERVATION_THRESHOLD_DEFAULT); + } + /** * Parses a resource config value of a form like "1024", "1024 mb", * or "1024 mb, 3 vcores". If no units are given, megabytes are assumed. @@ -281,4 +291,5 @@ private static int findResource(String val, String units) } return Integer.parseInt(matcher.group(1)); } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 6248e09..f72c7cb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -760,6 +760,7 @@ public void testSimpleContainerReservation() throws Exception { // Now queue 2 requests likewise ApplicationAttemptId attId = createSchedulingRequest(1024, "queue2", "user1", 1); + scheduler.update(); scheduler.handle(updateEvent); @@ -789,6 +790,195 @@ public void testSimpleContainerReservation() throws Exception { } + @Test (timeout = 5000) + public void testOffSwitchAppReservationThreshold() throws Exception { + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add three node + RMNode node1 = + MockNodes + .newNodeInfo(1, Resources.createResource(3072), 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + RMNode node2 = + MockNodes + .newNodeInfo(1, Resources.createResource(3072), 1, "127.0.0.2"); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + scheduler.handle(nodeEvent2); + + RMNode node3 = + MockNodes + .newNodeInfo(1, Resources.createResource(3072), 1, "127.0.0.3"); + NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3); + scheduler.handle(nodeEvent3); + + + // Ensure capacity on all nodes are allocated + createSchedulingRequest(2048, "queue1", "user1", 1); + scheduler.update(); + scheduler.handle(new NodeUpdateSchedulerEvent(node1)); + createSchedulingRequest(2048, "queue1", "user1", 1); + scheduler.update(); + scheduler.handle(new NodeUpdateSchedulerEvent(node2)); + createSchedulingRequest(2048, "queue1", "user1", 1); + scheduler.update(); + scheduler.handle(new NodeUpdateSchedulerEvent(node3)); + + // Verify capacity allocation + assertEquals(6144, scheduler.getQueueManager().getQueue("queue1"). + getResourceUsage().getMemory()); + + // Create new app with a resource request that can be satisfied by any + // node but would be + ApplicationAttemptId attId = createSchedulingRequest(2048, "queue1", "user1", 1); + scheduler.update(); + scheduler.handle(new NodeUpdateSchedulerEvent(node1)); + + assertEquals(1, + scheduler.getSchedulerApp(attId).getNumReservations(null, true)); + scheduler.update(); + scheduler.handle(new NodeUpdateSchedulerEvent(node2)); + assertEquals(2, + scheduler.getSchedulerApp(attId).getNumReservations(null, true)); + scheduler.update(); + scheduler.handle(new NodeUpdateSchedulerEvent(node3)); + + // No new reservations should happen since it exceeds threshold + assertEquals(2, + scheduler.getSchedulerApp(attId).getNumReservations(null, true)); + + // Add 1 more node + RMNode node4 = + MockNodes + .newNodeInfo(1, Resources.createResource(3072), 1, "127.0.0.4"); + NodeAddedSchedulerEvent nodeEvent4 = new NodeAddedSchedulerEvent(node4); + scheduler.handle(nodeEvent4); + + // New node satisfies resource request + scheduler.update(); + scheduler.handle(new NodeUpdateSchedulerEvent(node4)); + assertEquals(8192, scheduler.getQueueManager().getQueue("queue1"). + getResourceUsage().getMemory()); + + scheduler.handle(new NodeUpdateSchedulerEvent(node1)); + scheduler.handle(new NodeUpdateSchedulerEvent(node2)); + scheduler.handle(new NodeUpdateSchedulerEvent(node3)); + scheduler.update(); + + // Verify number of reservations have decremented + assertEquals(0, + scheduler.getSchedulerApp(attId).getNumReservations(null, true)); + } + + @Test (timeout = 5000) + public void testRackLocalAppReservationThreshold() throws Exception { + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add four node + RMNode node1 = + MockNodes + .newNodeInfo(1, Resources.createResource(3072), 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + // These 3 on different rack + RMNode node2 = + MockNodes + .newNodeInfo(2, Resources.createResource(3072), 1, "127.0.0.2"); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + scheduler.handle(nodeEvent2); + + RMNode node3 = + MockNodes + .newNodeInfo(2, Resources.createResource(3072), 1, "127.0.0.3"); + NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3); + scheduler.handle(nodeEvent3); + + RMNode node4 = + MockNodes + .newNodeInfo(2, Resources.createResource(3072), 1, "127.0.0.4"); + NodeAddedSchedulerEvent nodeEvent4 = new NodeAddedSchedulerEvent(node4); + scheduler.handle(nodeEvent4); + + // Ensure capacity on all nodes are allocated + createSchedulingRequest(2048, "queue1", "user1", 1); + scheduler.update(); + scheduler.handle(new NodeUpdateSchedulerEvent(node1)); + createSchedulingRequest(2048, "queue1", "user1", 1); + scheduler.update(); + scheduler.handle(new NodeUpdateSchedulerEvent(node2)); + createSchedulingRequest(2048, "queue1", "user1", 1); + scheduler.update(); + scheduler.handle(new NodeUpdateSchedulerEvent(node3)); + createSchedulingRequest(2048, "queue1", "user1", 1); + scheduler.update(); + scheduler.handle(new NodeUpdateSchedulerEvent(node4)); + + // Verify capacity allocation + assertEquals(8192, scheduler.getQueueManager().getQueue("queue1"). + getResourceUsage().getMemory()); + + // Create new app with a resource request that can be satisfied by any + // node but would be + ApplicationAttemptId attemptId = + createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); + createMockRMApp(attemptId); + + scheduler.addApplication(attemptId.getApplicationId(), "queue1", "user1", + false); + scheduler.addApplicationAttempt(attemptId, false, false); + List asks = new ArrayList(); + asks.add(createResourceRequest(2048, node2.getRackName(), 1, 1, false)); + + scheduler.allocate(attemptId, asks, new ArrayList(), null, + null, null, null); + + ApplicationAttemptId attId = createSchedulingRequest(2048, "queue1", "user1", 1); + scheduler.update(); + scheduler.handle(new NodeUpdateSchedulerEvent(node1)); + + assertEquals(1, + scheduler.getSchedulerApp(attId).getNumReservations(null, true)); + scheduler.update(); + scheduler.handle(new NodeUpdateSchedulerEvent(node2)); + assertEquals(2, + scheduler.getSchedulerApp(attId).getNumReservations(null, true)); + scheduler.update(); + scheduler.handle(new NodeUpdateSchedulerEvent(node3)); + + // No new reservations should happen since it exceeds threshold + assertEquals(2, + scheduler.getSchedulerApp(attId).getNumReservations(null, true)); + + // Add 1 more node + RMNode node5 = + MockNodes + .newNodeInfo(2, Resources.createResource(3072), 1, "127.0.0.4"); + NodeAddedSchedulerEvent nodeEvent5 = new NodeAddedSchedulerEvent(node5); + scheduler.handle(nodeEvent5); + + // New node satisfies resource request + scheduler.update(); + scheduler.handle(new NodeUpdateSchedulerEvent(node4)); + assertEquals(10240, scheduler.getQueueManager().getQueue("queue1"). + getResourceUsage().getMemory()); + + scheduler.handle(new NodeUpdateSchedulerEvent(node1)); + scheduler.handle(new NodeUpdateSchedulerEvent(node2)); + scheduler.handle(new NodeUpdateSchedulerEvent(node3)); + scheduler.handle(new NodeUpdateSchedulerEvent(node4)); + scheduler.update(); + + // Verify number of reservations have decremented + assertEquals(0, + scheduler.getSchedulerApp(attId).getNumReservations(null, true)); + } + @Test (timeout = 500000) public void testContainerReservationAttemptExceedingQueueMax() throws Exception {