diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 7af1891..43053eb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -19,10 +19,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import java.io.Serializable; +import java.text.DecimalFormat; import java.util.Arrays; import java.util.Collection; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -52,6 +54,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -77,6 +80,7 @@ private Resource preemptedResources = Resources.createResource(0); private RMContainerComparator comparator = new RMContainerComparator(); private final Map preemptionMap = new HashMap(); + private Map> reservations = new HashMap<>(); /** * Delay scheduling: We often want to prioritize scheduling of node-local @@ -195,7 +199,7 @@ public Resource getHeadroom() { clusterAvailableResources, queueMaxAvailableResources); Resource headroom = policy.getHeadroom(queueFairShare, - queueUsage, maxAvailableResource); + queueUsage, maxAvailableResource); if (LOG.isDebugEnabled()) { LOG.debug("Headroom calculation for " + this.getName() + ":" + "Min(" + @@ -215,7 +219,7 @@ public synchronized float getLocalityWaitFactor( // waitFactor can't be more than '1' // i.e. no point skipping more than clustersize opportunities - return Math.min(((float)requiredResources / clusterNodes), 1.0f); + return Math.min(((float) requiredResources / clusterNodes), 1.0f); } /** @@ -347,7 +351,7 @@ else if (allowed.equals(NodeType.RACK_LOCAL) && // Update consumption and track allocations List resourceRequestList = appSchedulingInfo.allocate( - type, node, priority, request, container); + type, node, priority, request, container); this.attemptResourceUsage.incUsed(container.getResource()); // Update resource requests related to "request" and store in RMContainer @@ -379,7 +383,7 @@ public synchronized void resetAllowedLocalityLevel(Priority priority, NodeType level) { NodeType old = allowedLocalityLevel.get(priority); LOG.info("Raising locality level from " + old + " to " + level + " at " + - " priority " + priority); + " priority " + priority); allowedLocalityLevel.put(priority, level); } @@ -446,22 +450,53 @@ public Container createContainer( * in {@link FSSchedulerNode}.. */ private void reserve(Priority priority, FSSchedulerNode node, - Container container, boolean alreadyReserved) { - LOG.info("Making reservation: node=" + node.getNodeName() + - " app_id=" + getApplicationId()); - - if (!alreadyReserved) { - getMetrics().reserveResource(getUser(), container.getResource()); - RMContainer rmContainer = - super.reserve(node, priority, null, container); - node.reserveResource(this, priority, rmContainer); - } else { - RMContainer rmContainer = node.getReservedContainer(); - super.reserve(node, priority, rmContainer, container); - node.reserveResource(this, priority, rmContainer); + Container container, NodeType type, boolean alreadyReserved) { + + if (!reservationExceedsThreshold(node, type)) { + LOG.info("Making reservation: node=" + node.getNodeName() + + " app_id=" + getApplicationId()); + if (!alreadyReserved) { + getMetrics().reserveResource(getUser(), container.getResource()); + RMContainer rmContainer = + super.reserve(node, priority, null, container); + node.reserveResource(this, priority, rmContainer); + setReservation(node); + } else { + RMContainer rmContainer = node.getReservedContainer(); + super.reserve(node, priority, rmContainer, container); + node.reserveResource(this, priority, rmContainer); + setReservation(node); + } } } + private boolean reservationExceedsThreshold(FSSchedulerNode node, + NodeType type) { + if (type != NodeType.NODE_LOCAL) { + int existingReservations = getNumReservations(node.getRackName(), + type == NodeType.OFF_SWITCH); + if (existingReservations >= 1) { + int totalAvailNodes = + (type == NodeType.OFF_SWITCH) ? scheduler.getNumClusterNodes() : + scheduler.getNumNodesInRack(node.getRackName()); + float currRatio = + (float) existingReservations / (float) totalAvailNodes; + if (currRatio >= scheduler.getConf().getAppReservationThreshold()) { + DecimalFormat df = new DecimalFormat(); + df.setMaximumFractionDigits(2); + LOG.info("Reservation Exceeds Threshold:" + + " app_id=" + getApplicationId() + + " existingReservations=" + existingReservations + + " totalAvailableNodes=" + totalAvailNodes + + " currentRatio=" + df.format(currRatio) + + " threshold=" + df.format( + scheduler.getConf().getAppReservationThreshold())); + return true; + } + } + } + return false; + } /** * Remove the reservation on {@code node} at the given {@link Priority}. * This dispatches SchedulerNode handlers as well. @@ -470,10 +505,47 @@ public void unreserve(Priority priority, FSSchedulerNode node) { RMContainer rmContainer = node.getReservedContainer(); unreserveInternal(priority, node); node.unreserveResource(this); + clearReservation(node); getMetrics().unreserveResource( getUser(), rmContainer.getContainer().getResource()); } + private synchronized void setReservation(SchedulerNode node) { + String rackName = node.getRackName() == null ? "NULL" : node.getRackName(); + Set rackReservations = reservations.get(rackName); + if (rackReservations == null) { + rackReservations = new HashSet<>(); + reservations.put(rackName, rackReservations); + } + rackReservations.add(node.getNodeName()); + } + + private synchronized void clearReservation(SchedulerNode node) { + String rackName = node.getRackName() == null ? "NULL" : node.getRackName(); + Set rackReservations = reservations.get(rackName); + if (rackReservations != null) { + rackReservations.remove(node.getNodeName()); + } + } + + int getNumReservations(String rackName, boolean isAny) { + int counter = 0; + if (isAny) { + for (Set nodes : reservations.values()) { + if (nodes != null) { + counter += nodes.size(); + } + } + } else { + Set nodes = reservations.get( + rackName == null ? "NULL" : rackName); + if (nodes != null) { + counter += nodes.size(); + } + } + return counter; + } + /** * Assign a container to this node to facilitate {@code request}. If node does * not have enough memory, create a reservation. This is called once we are @@ -545,7 +617,7 @@ private Resource assignContainer( if (isReservable(container)) { // The desired container won't fit here, so reserve - reserve(request.getPriority(), node, container, reserved); + reserve(request.getPriority(), node, container, type, reserved); return FairScheduler.CONTAINER_RESERVED; } else { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 33d01fc..4915c89 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -26,6 +26,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -196,6 +197,11 @@ // heartbeat protected int maxAssign; // Max containers to assign per heartbeat + protected float appReservationThreshold; // percentage of available nodes + // an app can be reservde on + + protected Map nodesPerRack = new ConcurrentHashMap<>(); + @VisibleForTesting final MaxRunningAppsEnforcer maxRunningEnforcer; @@ -263,6 +269,14 @@ public FairSchedulerConfiguration getConf() { return conf; } + public int getNumNodesInRack(String rackName) { + String rName = rackName == null ? "NULL" : rackName; + if (nodesPerRack.containsKey(rName)) { + return nodesPerRack.get(rName); + } + return 0; + } + public QueueManager getQueueManager() { return queueMgr; } @@ -646,7 +660,7 @@ protected synchronized void addApplication(ApplicationId applicationId, LOG.info(msg); rmContext.getDispatcher().getEventHandler() .handle(new RMAppEvent(applicationId, - RMAppEventType.APP_REJECTED, msg)); + RMAppEventType.APP_REJECTED, msg)); return; } @@ -710,8 +724,8 @@ protected synchronized void addApplicationAttempt( } } else { rmContext.getDispatcher().getEventHandler().handle( - new RMAppAttemptEvent(applicationAttemptId, - RMAppAttemptEventType.ATTEMPT_ADDED)); + new RMAppAttemptEvent(applicationAttemptId, + RMAppAttemptEventType.ATTEMPT_ADDED)); } } @@ -818,7 +832,7 @@ private synchronized void removeApplicationAttempt( if (wasRunnable) { maxRunningEnforcer.untrackRunnableApp(attempt); maxRunningEnforcer.updateRunnabilityOnAppRemoval(attempt, - attempt.getQueue()); + attempt.getQueue()); } else { maxRunningEnforcer.untrackNonRunnableApp(attempt); } @@ -862,14 +876,20 @@ protected synchronized void completedContainer(RMContainer rmContainer, } LOG.info("Application attempt " + application.getApplicationAttemptId() - + " released container " + container.getId() + " on node: " + node - + " with event: " + event); + + " released container " + container.getId() + " on node: " + node + + " with event: " + event); } private synchronized void addNode(List containerReports, RMNode node) { FSSchedulerNode schedulerNode = new FSSchedulerNode(node, usePortForNodeName); nodes.put(node.getNodeID(), schedulerNode); + String rackName = node.getRackName() == null ? "NULL" : node.getRackName(); + if (nodesPerRack.containsKey(rackName)) { + nodesPerRack.put(rackName, nodesPerRack.get(rackName) + 1); + } else { + nodesPerRack.put(rackName, 1); + } Resources.addTo(clusterResource, node.getTotalCapability()); updateMaximumAllocation(schedulerNode, true); @@ -916,6 +936,11 @@ private synchronized void removeNode(RMNode rmNode) { } nodes.remove(rmNode.getNodeID()); + String rackName = node.getRackName() == null ? "NULL" : node.getRackName(); + if (nodesPerRack.containsKey(rackName) + && (nodesPerRack.get(rackName) > 0)) { + nodesPerRack.put(rackName, nodesPerRack.get(rackName) - 1); + } queueMgr.getRootQueue().setSteadyFairShare(clusterResource); queueMgr.getRootQueue().recomputeSteadyShares(); updateMaximumAllocation(node, false); @@ -1367,6 +1392,7 @@ private void initScheduler(Configuration conf) throws IOException { preemptionInterval = this.conf.getPreemptionInterval(); waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill(); usePortForNodeName = this.conf.getUsePortForNodeName(); + appReservationThreshold = this.conf.getAppReservationThreshold(); updateInterval = this.conf.getUpdateInterval(); if (updateInterval < 0) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java index 892484d..5dc720e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java @@ -137,6 +137,11 @@ CONF_PREFIX + "update-interval-ms"; public static final int DEFAULT_UPDATE_INTERVAL_MS = 500; + /** Ratio of nodes available for an app to make an reservation on */ + public static final String APP_RESERVATION_THRESHOLD = + CONF_PREFIX + "app-reservation-threshold"; + public static final float APP_RESERVATION_THRESHOLD_DEFAULT = 0.50f; + public FairSchedulerConfiguration() { super(); } @@ -177,8 +182,8 @@ public Resource getIncrementAllocation() { public float getReservationThresholdIncrementMultiple() { return getFloat( - RM_SCHEDULER_RESERVATION_THRESHOLD_INCERMENT_MULTIPLE, - DEFAULT_RM_SCHEDULER_RESERVATION_THRESHOLD_INCREMENT_MULTIPLE); + RM_SCHEDULER_RESERVATION_THRESHOLD_INCERMENT_MULTIPLE, + DEFAULT_RM_SCHEDULER_RESERVATION_THRESHOLD_INCREMENT_MULTIPLE); } public float getLocalityThresholdNode() { @@ -190,11 +195,13 @@ public float getLocalityThresholdRack() { } public boolean isContinuousSchedulingEnabled() { - return getBoolean(CONTINUOUS_SCHEDULING_ENABLED, DEFAULT_CONTINUOUS_SCHEDULING_ENABLED); + return getBoolean(CONTINUOUS_SCHEDULING_ENABLED, + DEFAULT_CONTINUOUS_SCHEDULING_ENABLED); } public int getContinuousSchedulingSleepMs() { - return getInt(CONTINUOUS_SCHEDULING_SLEEP_MS, DEFAULT_CONTINUOUS_SCHEDULING_SLEEP_MS); + return getInt(CONTINUOUS_SCHEDULING_SLEEP_MS, + DEFAULT_CONTINUOUS_SCHEDULING_SLEEP_MS); } public long getLocalityDelayNodeMs() { @@ -231,7 +238,7 @@ public boolean isEventLogEnabled() { public String getEventlogDir() { return get(EVENT_LOG_DIR, new File(System.getProperty("hadoop.log.dir", - "/tmp/")).getAbsolutePath() + File.separator + "fairscheduler"); + "/tmp/")).getAbsolutePath() + File.separator + "fairscheduler"); } public int getPreemptionInterval() { @@ -244,7 +251,12 @@ public int getWaitTimeBeforeKill() { public boolean getUsePortForNodeName() { return getBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, - YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME); + YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME); + } + + public float getAppReservationThreshold() { + return getFloat( + APP_RESERVATION_THRESHOLD, APP_RESERVATION_THRESHOLD_DEFAULT); } /** @@ -281,4 +293,5 @@ private static int findResource(String val, String units) } return Integer.parseInt(matcher.group(1)); } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 6248e09..16b6bbc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -294,7 +294,7 @@ public void testSimpleFairShareCalculation() throws IOException { // Add one big node (only care about aggregate capacity) RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(10 * 1024), 1, - "127.0.0.1"); + "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); scheduler.handle(nodeEvent1); @@ -711,9 +711,10 @@ public void testSimpleContainerAllocation() throws IOException { // Asked for less than increment allocation. assertEquals( - FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB, - scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemory()); + FairSchedulerConfiguration + .DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + scheduler.getQueueManager().getQueue("queue1"). + getResourceUsage().getMemory()); NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2); scheduler.handle(updateEvent2); @@ -756,16 +757,17 @@ public void testSimpleContainerReservation() throws Exception { // Make sure queue 1 is allocated app capacity assertEquals(1024, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemory()); + getResourceUsage().getMemory()); // Now queue 2 requests likewise ApplicationAttemptId attId = createSchedulingRequest(1024, "queue2", "user1", 1); + scheduler.update(); scheduler.handle(updateEvent); // Make sure queue 2 is waiting with a reservation assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). - getResourceUsage().getMemory()); + getResourceUsage().getMemory()); assertEquals(1024, scheduler.getSchedulerApp(attId).getCurrentReservation().getMemory()); // Now another node checks in with capacity @@ -790,6 +792,88 @@ public void testSimpleContainerReservation() throws Exception { } @Test (timeout = 500000) + public void testOffSwitchAppReservationThreshold() throws Exception { + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add three node + RMNode node1 = + MockNodes + .newNodeInfo(1, Resources.createResource(3072), 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + RMNode node2 = + MockNodes + .newNodeInfo(1, Resources.createResource(3072), 1, "127.0.0.2"); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + scheduler.handle(nodeEvent2); + + RMNode node3 = + MockNodes + .newNodeInfo(1, Resources.createResource(3072), 1, "127.0.0.3"); + NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3); + scheduler.handle(nodeEvent3); + + + // Queue 1 requests full capacity of node + createSchedulingRequest(2048, "queue1", "user1", 1); + scheduler.update(); + scheduler.handle(new NodeUpdateSchedulerEvent(node1)); + createSchedulingRequest(2048, "queue1", "user1", 1); + scheduler.update(); + scheduler.handle(new NodeUpdateSchedulerEvent(node2)); + createSchedulingRequest(2048, "queue1", "user1", 1); + scheduler.update(); + scheduler.handle(new NodeUpdateSchedulerEvent(node3)); + + // Make sure queue 1 is allocated app capacity + assertEquals(6144, scheduler.getQueueManager().getQueue("queue1"). + getResourceUsage().getMemory()); + + // Create new app + ApplicationAttemptId attId = createSchedulingRequest(2048, "queue1", "user1", 1); + scheduler.update(); + scheduler.handle(new NodeUpdateSchedulerEvent(node1)); + + assertEquals(1, + scheduler.getSchedulerApp(attId).getNumReservations(null, true)); + scheduler.update(); + scheduler.handle(new NodeUpdateSchedulerEvent(node2)); + assertEquals(2, + scheduler.getSchedulerApp(attId).getNumReservations(null, true)); + scheduler.update(); + scheduler.handle(new NodeUpdateSchedulerEvent(node3)); + + // No new reservations should happen since it exceeds threshold + assertEquals(2, + scheduler.getSchedulerApp(attId).getNumReservations(null, true)); + + // Add 1 more node + RMNode node4 = + MockNodes + .newNodeInfo(1, Resources.createResource(3072), 1, "127.0.0.4"); + NodeAddedSchedulerEvent nodeEvent4 = new NodeAddedSchedulerEvent(node4); + scheduler.handle(nodeEvent4); + + // New node satisfies resource request + scheduler.update(); + scheduler.handle(new NodeUpdateSchedulerEvent(node4)); + assertEquals(8192, scheduler.getQueueManager().getQueue("queue1"). + getResourceUsage().getMemory()); + + scheduler.handle(new NodeUpdateSchedulerEvent(node1)); + scheduler.handle(new NodeUpdateSchedulerEvent(node2)); + scheduler.handle(new NodeUpdateSchedulerEvent(node3)); + scheduler.update(); + + // Verify number of reservations have decremented + assertEquals(0, + scheduler.getSchedulerApp(attId).getNumReservations(null, true)); + } + + @Test (timeout = 500000) public void testContainerReservationAttemptExceedingQueueMax() throws Exception { conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);