diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 2901134..b95f7c1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -30,6 +30,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionTimeEstimate; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -55,9 +56,13 @@ private Resource availableResource = Resource.newInstance(0, 0); private Resource usedResource = Resource.newInstance(0, 0); private Resource totalResourceCapability; + private Resource totalRunResourceCapability; + private Resource totalQueueResourceCapability; private RMContainer reservedContainer; private volatile int numContainers; - + + private int numQueuedContainers = 0; + private int estimatedQueueTime = 0; /* set of containers that are allocated containers */ private final Map launchedContainers = @@ -68,11 +73,16 @@ private volatile Set labels = null; - public SchedulerNode(RMNode node, boolean usePortForNodeName, - Set labels) { + public SchedulerNode(RMNode node, Resource resourceForQueuing, + boolean usePortForNodeName, Set labels) { this.rmNode = node; - this.availableResource = Resources.clone(node.getTotalCapability()); - this.totalResourceCapability = Resources.clone(node.getTotalCapability()); + this.totalRunResourceCapability = Resources + .clone(node.getTotalCapability()); + this.totalQueueResourceCapability = Resources.clone(resourceForQueuing); + this.totalResourceCapability = Resources.clone(Resources.add( + this.totalRunResourceCapability, totalQueueResourceCapability)); + // Available resource contain both run and queue slots. + this.availableResource = Resources.clone(this.totalResourceCapability); if (usePortForNodeName) { nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort(); } else { @@ -82,7 +92,8 @@ public SchedulerNode(RMNode node, boolean usePortForNodeName, } public SchedulerNode(RMNode node, boolean usePortForNodeName) { - this(node, usePortForNodeName, CommonNodeLabelsManager.EMPTY_STRING_SET); + this(node, Resource.newInstance(0, 0), usePortForNodeName, + CommonNodeLabelsManager.EMPTY_STRING_SET); } public RMNode getRMNode() { @@ -90,13 +101,16 @@ public RMNode getRMNode() { } /** - * Set total resources on the node. + * Set total resources on the node (corresponds to run slots). * @param resource total resources on the node. */ - public synchronized void setTotalResource(Resource resource){ - this.totalResourceCapability = resource; + public synchronized void setTotalResource(Resource resource) { + // We assume queue slots remain unchanged. + this.totalRunResourceCapability = resource; + this.totalResourceCapability = Resources.add( + this.totalRunResourceCapability, this.totalQueueResourceCapability); this.availableResource = Resources.subtract(totalResourceCapability, - this.usedResource); + this.usedResource); } /** @@ -135,6 +149,10 @@ public String getNodeName() { public String getRackName() { return this.rmNode.getRackName(); } + + public void allocateContainer(RMContainer rmContainer, boolean isQueueSlot) { + allocateContainer(rmContainer, isQueueSlot, null); + } /** * The Scheduler has allocated containers on this node to the given @@ -143,10 +161,24 @@ public String getRackName() { * @param rmContainer * allocated container */ - public synchronized void allocateContainer(RMContainer rmContainer) { + public synchronized void allocateContainer(RMContainer rmContainer, + boolean isQueueSlot, ExecutionTimeEstimate newTaskExecutionTimeEstimate) { Container container = rmContainer.getContainer(); deductAvailableResource(container.getResource()); ++numContainers; + + if (isQueueSlot) { + ++numQueuedContainers; + } + + // KONSTANTINOS: Update estimated queue time. + if (newTaskExecutionTimeEstimate != null) { + updateEstimatedQueueTimeWithTask(newTaskExecutionTimeEstimate + .getIntExecutionTime()); + } else { + LOG.info("KONSTANTINOS: Will not update queue wait time for node " + this.getNodeName() + + ", as task duration for the newly added task is null."); + } launchedContainers.put(container.getId(), rmContainer); @@ -165,6 +197,27 @@ public synchronized void allocateContainer(RMContainer rmContainer) { public synchronized Resource getAvailableResource() { return this.availableResource; } + + /** + * Get available resources corresponding to run slots on the node. + * + * @return available resources on the node + */ + public synchronized Resource getAvailableRunResource() { + return hasAvailRunSlots() ? + Resources.subtract(getTotalRunResource(), getUsedResource()) : + Resource.newInstance(0, 0); + } + + /** + * Get available resources corresponding to queue slots on the node. + * + * @return available resources on the node + */ + public synchronized Resource getAvailableQueueResource() { + return hasAvailRunSlots() ? getTotalQueueResource() : + Resources.subtract(getTotalResource(), getUsedResource()); + } /** * Get used resources on the node. @@ -174,16 +227,63 @@ public synchronized Resource getAvailableResource() { public synchronized Resource getUsedResource() { return this.usedResource; } + + /** + * Get used resources on the node corresponding to run slots. + * + * @return used resources on the node + */ + public synchronized Resource getUsedRunResource() { + return hasAvailRunSlots() ? getUsedResource() : getTotalRunResource(); + } + + /** + * Get used resources on the node corresponding to queue slots. + * + * @return used resources on the node + */ + public synchronized Resource getUsedQueueResource() { + return hasAvailRunSlots() ? Resource.newInstance(0, 0) : + Resources.subtract(getUsedResource(), getTotalRunResource()); + } + + /** + * @return true if there are available run slots in the node. + */ + private boolean hasAvailRunSlots() { + return (getUsedResource().getMemory() <= getTotalRunResource().getMemory()); + } /** * Get total resources on the node. + * This includes both run and queue slots. * * @return total resources on the node. */ public synchronized Resource getTotalResource() { return this.totalResourceCapability; } - + + /** + * Get total resources on the node. + * This only run slots. + * + * @return total run resources on the node. + */ + public synchronized Resource getTotalRunResource() { + return this.totalRunResourceCapability; + } + + /** + * Get total resources on the node. + * This only queue slots. + * + * @return total queue resources on the node. + */ + public synchronized Resource getTotalQueueResource() { + return this.totalQueueResourceCapability; + } + public synchronized boolean isValidContainer(ContainerId containerId) { if (launchedContainers.containsKey(containerId)) { return true; @@ -255,7 +355,9 @@ public abstract void reserveResource(SchedulerApplicationAttempt attempt, public String toString() { return "host: " + rmNode.getNodeAddress() + " #containers=" + getNumContainers() + " available=" + getAvailableResource() - + " used=" + getUsedResource(); + + " used=" + getUsedResource() + " #queued-containers=" + + getNumQueuedContainers() + " estimated-queue-time=" + + getEstimatedQueueTime(); } /** @@ -284,7 +386,7 @@ public synchronized void recoverContainer(RMContainer rmContainer) { if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { return; } - allocateContainer(rmContainer); + allocateContainer(rmContainer, false, rmContainer.getContainer().getExecutionTimeEstimate()); } public Set getLabels() { @@ -294,4 +396,66 @@ public synchronized void recoverContainer(RMContainer rmContainer) { public void updateLabels(Set labels) { this.labels = labels; } + + /** + * The number of containers currently queued in the node. + */ + public int getNumQueuedContainers() { + return numQueuedContainers; + } + + public void setNumQueuedContainers(int numQueuedContainers) { + this.numQueuedContainers = numQueuedContainers; + } + + /** + * The estimated waiting time due to queued containers in the node. + */ + public int getEstimatedQueueTime() { + return this.estimatedQueueTime; + } + + public void setEstimatedQueueTime(int estimatedQueueTime) { + this.estimatedQueueTime = estimatedQueueTime; + } + + /** + * Update the estimated queue time when a new task has been scheduled at the node. + * @param newTaskDuration the estimated duration of the new task + */ + private void updateEstimatedQueueTimeWithTask(int newTaskDuration) { + int runSlotsNo = computeRunSlotsNo(); + + int oldEstQueueTime = this.estimatedQueueTime; // just here for debugging + + if (runSlotsNo != 0) { + this.estimatedQueueTime = (this.estimatedQueueTime * (runSlotsNo - 1) + newTaskDuration) + / runSlotsNo; + } else { + LOG.info("JEFFRA: Not updating est queue time due to runSlotsNum calc being zero"); + } + + LOG.info("KONSTANTINOS: [" + this.getNodeName() + "] Updating estimated queue time. New task: " + + newTaskDuration + "sec, old queue time = " + oldEstQueueTime + + "sec, new queue time = " + this.estimatedQueueTime + "sec."); + } + + private int computeRunSlotsNo() { + if (getNumContainers() == 0) { + return 0; + } + + int allocationMemPerContainer = getUsedResource().getMemory() / getNumContainers(); + int runSlotsNo = getTotalRunResource().getMemory() / allocationMemPerContainer; + + if (LOG.isDebugEnabled()) { + LOG.info("KONSTANTINOS: Node: " + this.getNodeName() + + ", containers no: " + getNumContainers() + ", used memory: " + + getUsedResource().getMemory() + ", available run memory: " + + getTotalRunResource().getMemory() + ", run slots no: " + runSlotsNo); + } + + return runSlotsNo; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNodeReport.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNodeReport.java index b1f6c64..ff21657 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNodeReport.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNodeReport.java @@ -29,13 +29,25 @@ @Stable public class SchedulerNodeReport { private final Resource used; + private final Resource usedRun; + private final Resource usedQueue; private final Resource avail; + private final Resource availRun; + private final Resource availQueue; private final int num; + private final int numQueued; + private final int qTime; public SchedulerNodeReport(SchedulerNode node) { this.used = node.getUsedResource(); + this.usedRun = node.getUsedRunResource(); + this.usedQueue = node.getUsedQueueResource(); this.avail = node.getAvailableResource(); + this.availRun = node.getAvailableRunResource(); + this.availQueue = node.getAvailableQueueResource(); this.num = node.getNumContainers(); + this.numQueued = node.getNumQueuedContainers(); + this.qTime = node.getEstimatedQueueTime(); } /** @@ -46,16 +58,62 @@ public Resource getUsedResource() { } /** - * @return the amount of resources currently available on the node + * @return the amount of resources currently used by the node for run slots. + */ + public Resource getUsedRunResource() { + return usedRun; + } + + /** + * @return the amount of resources currently used by the node for queue slots. + */ + public Resource getUsedQueueResource() { + return usedQueue; + } + + /** + * @return the amount of resources currently available on the node. */ public Resource getAvailableResource() { return avail; } /** + * @return the amount of resources currently available on the node + * corresponding to run slots. + */ + public Resource getAvailableRunResource() { + return availRun; + } + + /** + * @return the amount of resources currently available on the node + * corresponding to queue slots. + */ + public Resource getAvailableQueueResource() { + return availQueue; + } + + /** * @return the number of containers currently running on this node. */ public int getNumContainers() { return num; } + + /** + * + * @return the number of queue containers in the node. + */ + public int getNumQueuedContainers() { + return numQueued; + } + + /** + * @return the estimated waiting queue time for this node due to the queued + * containers. + */ + public int getEstimatedQueueTime() { + return qTime; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index b99b217..39a6476 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -84,6 +84,14 @@ public QueueInfo getQueueInfo(String queueName, boolean includeChildQueues, public Resource getClusterResource(); /** + * Get the run slot resource capacity of the cluster. + * @return the run slot resource capacity of the cluster. + */ + @LimitedPrivate("yarn") + @Unstable + public Resource getClusterRunResource(); + + /** * Get minimum allocatable {@link Resource}. * @return minimum allocatable resource */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index 1a9448a..88effac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -190,10 +190,27 @@ public void finishApplicationAttempt(FiCaSchedulerApp application, * @param clusterResource the resource of the cluster. * @param node node on which resources are available * @param resourceLimits how much overall resource of this queue can use. + * @param allowQueuing determines whether overbooking (queuing) of nodes is allowed. * @return the assignment */ public CSAssignment assignContainers(Resource clusterResource, - FiCaSchedulerNode node, ResourceLimits resourceLimits); + FiCaSchedulerNode node, ResourceLimits resourceLimits, + boolean allowQueuing); + + /** + * Same as other assignContainers call, but without the explicit ability to + * allow queuing. Assign containers to applications in the CSQueue or it's + * children (if any). + * @param clusterResource + * the resource of the cluster. + * @param node + * node on which resources are available + * @param resourceLimits + * how much overall resource of this queue can use. + * @return the assignment + */ + public CSAssignment assignContainers(Resource clusterResource, + FiCaSchedulerNode node, ResourceLimits currentResourceLimits); /** * A container assigned to the queue has completed. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 70fe57e..d291ee8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -22,6 +22,7 @@ import java.io.InputStream; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Comparator; import java.util.EnumSet; import java.util.HashMap; @@ -52,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionTimeEstimate; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; @Override public void setConf(Configuration conf) { yarnConf = conf; @@ -214,6 +239,24 @@ public Configuration getConf() { private AsyncScheduleThread asyncSchedulerThread; private RMNodeLabelsManager labelManager; + // KONSTANTINOS: Maximum number of conservative containers to be queued at the NMs. + private int maxNMQueueSlots = 0; + // KONSTANTINOS: Thread that deals with placing container requests to NM queues. + private NMQueueSchedulingThread nmQueueSchedulingThread; + // KONSTANTINOS: Maximum waiting time for containers when queuing them to NMs (use for + // dynamic queue sizing, based on waiting time and not static number of queue slots). + private int maxQueueWaitTime; + + enum RmPlacementPolicy { + QUEUE_WAIT_TIME, RANDOM, NUM_REQUESTS; + } + private RmPlacementPolicy rmPlacementPolicy; + private Random rmPlacementRandom; + @@ -316,6 +380,42 @@ private synchronized void initScheduler(Configuration configuration) throws "maximumAllocation=<" + getMaximumResourceCapability() + ">, " + "asynchronousScheduling=" + scheduleAsynchronously + ", " + "asyncScheduleInterval=" + asyncScheduleInterval + "ms"); + + LOG.info("KONSTANTINOS: Will read conf value for max NM queue " + + "slots and max queue wait time."); + this.maxNMQueueSlots = this.conf.getInt( + YarnConfiguration.NM_MAX_CONSERVATIVE_QUEUE_SLOTS, + YarnConfiguration.DEFAULT_NM_MAX_CONSERVATIVE_QUEUE_SLOTS); + this.maxQueueWaitTime = this.conf.getInt( + YarnConfiguration.MAX_QUEUE_WAIT_TIME, + YarnConfiguration.DEFAULT_MAX_QUEUE_WAIT_TIME); + + LOG.info("KONSTANTINOS: Max number of conservative containers allowed " + + "to be queued at each NM: " + this.maxNMQueueSlots); + LOG.info("KONSTANTINOS: Max queue wait time at NM queues = " + + this.maxQueueWaitTime); + + // Set RM placement policy: QUEUE_WAIT_TIME, RANDOM, NUM_REQUESTS + String val = this.conf.get(YarnConfiguration.RM_REQUEST_PLACEMENT_POLICY, + YarnConfiguration.DEFAULT_RM_REQUEST_PLACEMENT_POLICY).toUpperCase(); + + if (val.equals("QUEUE_WAIT_TIME")) { + this.rmPlacementPolicy = RmPlacementPolicy.QUEUE_WAIT_TIME; + } else if (val.equals("RANDOM")) { + this.rmPlacementPolicy = RmPlacementPolicy.RANDOM; + this.rmPlacementRandom = new Random(System.currentTimeMillis()); + } else if (val.equals("NUM_REQUESTS")) { + this.rmPlacementPolicy = RmPlacementPolicy.NUM_REQUESTS; + } else { + this.rmPlacementPolicy = RmPlacementPolicy.QUEUE_WAIT_TIME; + } + LOG.info("JEFFRA: Using " + rmPlacementPolicy + + " as the RM placement policy"); + + // Initialize the thread that will send containers to the NM queue slots. + if (this.maxNMQueueSlots > 0 && this.maxQueueWaitTime > 0) { + nmQueueSchedulingThread = new NMQueueSchedulingThread(); + } } private synchronized void startSchedulerThreads() { @@ -324,6 +424,11 @@ private synchronized void startSchedulerThreads() { "asyncSchedulerThread is null"); asyncSchedulerThread.start(); } + + if (nmQueueSchedulingThread != null) { + LOG.info("KONSTANTINOS: Starting thread that queues containers to NMs."); + nmQueueSchedulingThread.start(); + } } @Override @@ -346,6 +451,10 @@ public void serviceStop() throws Exception { asyncSchedulerThread.interrupt(); asyncSchedulerThread.join(THREAD_JOIN_TIMEOUT_MS); } + if (nmQueueSchedulingThread != null) { + nmQueueSchedulingThread.interrupt(); + nmQueueSchedulingThread.join(THREAD_JOIN_TIMEOUT_MS); + } } super.serviceStop(); } @@ -385,12 +494,12 @@ static void schedule(CapacityScheduler cs) { int start = random.nextInt(nodes.size()); for (FiCaSchedulerNode node : nodes) { if (current++ >= start) { - cs.allocateContainersToNode(node); + cs.allocateContainersToNode(node, false); } } // Now, just get everyone to be safe for (FiCaSchedulerNode node : nodes) { - cs.allocateContainersToNode(node); + cs.allocateContainersToNode(node, false); } try { Thread.sleep(cs.getAsyncScheduleInterval()); @@ -430,6 +539,35 @@ public void suspendSchedule() { } + // KONSTANTINOS + private class NMQueueSchedulingThread extends Thread { + private AtomicBoolean queuingThreadActive = new AtomicBoolean(false); + + public void run() { + while (true) { + if (queuingThreadActive.get()) { + assignContainersToNMQueues(); + } + try { + Thread.sleep(500); + } catch (InterruptedException e) { + LOG.warn(NMQueueSchedulingThread.class.getName() + + " is interrupted. Exiting."); + break; + } + } + } + + public void activateQueuingThread() { + queuingThreadActive.set(true); + } + + public void suspendQueuingThread() { + queuingThreadActive.set(false); + } + + } + @Private public static final String ROOT_QUEUE = CapacitySchedulerConfiguration.PREFIX + CapacitySchedulerConfiguration.ROOT; @@ -878,7 +1016,7 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, "or non existant application " + applicationAttemptId); return EMPTY_ALLOCATION; } - + // Sanity check SchedulerUtils.normalizeRequests( ask, getResourceCalculator(), getClusterResource(), @@ -905,10 +1043,10 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, " application=" + application); } application.showRequests(); - + // Update application requests application.updateResourceRequests(ask); - + LOG.debug("allocate: post-update"); application.showRequests(); } @@ -919,6 +1057,20 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, " #ask=" + ask.size()); } +// String askStr = "<"; +// for (ResourceRequest rr : ask) { +// askStr += "(" + rr.getNumContainers() + ")["; +// String tdStr = ""; +// for (ExecutionTimeEstimate estTd : rr.getExecutionTimeEstimates()) { +// if (estTd != null) +// tdStr += estTd.getIntExecutionTime() + ", "; +// } +// askStr += tdStr + "]"; +// } +// askStr += ">"; +// LOG.info("JEFFRA: CapSched.allocate EstTDs ask.size()=" + ask.size() +// + ", appAttemptID=" + applicationAttemptId + ", "+ askStr); + application.updateBlacklist(blacklistAdditions, blacklistRemovals); return application.getAllocation(getResourceCalculator(), @@ -960,6 +1112,19 @@ private synchronized void nodeUpdate(RMNode nm) { FiCaSchedulerNode node = getNode(nm.getNodeID()); + // KONSTANTINOS: Updating number of queued slots and estimated queue time. + if (nm.getLoggerNodeReport() != null) { + node.setNumQueuedContainers(nm.getLoggerNodeReport() + .getPendingConsRequests()); + node.setEstimatedQueueTime(nm.getLoggerNodeReport() + .getEstimatedQueueTime()); + } + if (LOG.isDebugEnabled()) { + LOG.debug("KONSTANTINOS: Node " + node.getNodeName() + + ", new number of queued containers: " + node.getNumQueuedContainers() + + ", new estimated queue time: " + node.getEstimatedQueueTime()); + } + List containerInfoList = nm.pullContainerUpdates(); List newlyLaunchedContainers = new ArrayList(); List completedContainers = new ArrayList(); @@ -1043,7 +1208,8 @@ private synchronized void updateLabelsOnNode(NodeId nodeId, node.updateLabels(newLabels); } - private synchronized void allocateContainersToNode(FiCaSchedulerNode node) { + private synchronized void allocateContainersToNode(FiCaSchedulerNode node, + boolean allowQueuing) { if (rmContext.isWorkPreservingRecoveryEnabled() && !rmContext.isSchedulerReadyForAllocatingContainers()) { return; @@ -1071,7 +1237,7 @@ private synchronized void allocateContainersToNode(FiCaSchedulerNode node) { // TODO, now we only consider limits for parent for non-labeled // resources, should consider labeled resources as well. new ResourceLimits(labelManager.getResourceByLabel( - RMNodeLabelsManager.NO_LABEL, clusterResource))); + RMNodeLabelsManager.NO_LABEL, clusterResource)), allowQueuing); RMContainer excessReservation = assignment.getExcessReservation(); if (excessReservation != null) { @@ -1089,8 +1255,15 @@ private synchronized void allocateContainersToNode(FiCaSchedulerNode node) { // Try to schedule more if there are no reservations to fulfill if (node.getReservedContainer() == null) { - if (calculator.computeAvailableContainers(node.getAvailableResource(), - minimumAllocation) > 0) { + // If no queuing should be done, take into account only the + // available *run* slots. + boolean hasAvailableResource = allowQueuing ? + (calculator.computeAvailableContainers( + node.getAvailableResource(), minimumAllocation) > 0) : + (calculator.computeAvailableContainers( + node.getAvailableRunResource(), minimumAllocation) > 0); + if (hasAvailableResource) { + // if (calculator.computeAvailableContainers(node.getAvailableResource(), minimumAllocation) > 0) { if (LOG.isDebugEnabled()) { LOG.debug("Trying to schedule on node: " + node.getNodeName() + ", available: " + node.getAvailableResource()); @@ -1101,7 +1274,7 @@ private synchronized void allocateContainersToNode(FiCaSchedulerNode node) { // TODO, now we only consider limits for parent for non-labeled // resources, should consider labeled resources as well. new ResourceLimits(labelManager.getResourceByLabel( - RMNodeLabelsManager.NO_LABEL, clusterResource))); + RMNodeLabelsManager.NO_LABEL, clusterResource)), allowQueuing); } } else { LOG.info("Skipping scheduling since node " + node.getNodeID() + @@ -1111,6 +1284,80 @@ private synchronized void allocateContainersToNode(FiCaSchedulerNode node) { } } + + // KONSTANTINOS + private synchronized void assignContainersToNMQueues() { + + Collection nodes = getAllNodes().values(); + Resource clusterUsedResource = Resource.newInstance(0, 0); + Resource clusterTotalRunResource = Resource.newInstance(0, 0); + for (FiCaSchedulerNode node : nodes) { + Resources.addTo(clusterUsedResource, node.getUsedResource()); + // TODO: KONSTANTINOS -- clusterTotalRunResource could be cached, + // similarly to clusterResource as an optimization. + Resources.addTo(clusterTotalRunResource, node.getTotalRunResource()); + } + + LOG.info("KONSTANTINOS: Current used resources by all nodes are " + + clusterUsedResource + ", total run resources are " + + clusterTotalRunResource); + + // If used resources less than 95% of cluster resources, no need for queuing + // yet. + if (Resources.greaterThan(calculator, clusterResource, clusterUsedResource, + Resources.multiply(clusterTotalRunResource, 0.95))) { + LOG.info("Run slot utilization is sufficiently high. Will now queue containers to NMs."); + + List nodesList = new ArrayList( + nodes); + + // Sort nodes list depending on the policy chosen. + if (this.rmPlacementPolicy == RmPlacementPolicy.RANDOM) { + Collections.shuffle(nodesList, rmPlacementRandom); + + } else if (this.rmPlacementPolicy == RmPlacementPolicy.NUM_REQUESTS) { + Collections.sort(nodesList, new Comparator() { + @Override + public int compare(FiCaSchedulerNode node0, FiCaSchedulerNode node1) { + return Integer.valueOf(node0.getNumQueuedContainers()).compareTo( + Integer.valueOf(node1.getNumQueuedContainers())); + } + }); + + } else if (this.rmPlacementPolicy == RmPlacementPolicy.QUEUE_WAIT_TIME) { + Collections.sort(nodesList, new Comparator() { + @Override + public int compare(FiCaSchedulerNode node0, FiCaSchedulerNode node1) { + return Integer.valueOf(node0.getEstimatedQueueTime()).compareTo( + Integer.valueOf(node1.getEstimatedQueueTime())); + } + }); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("KONSTANTINOS: Here is the sorted nodes list based on " + + "the available queue slots: " + nodesList); + } + + for (FiCaSchedulerNode node : nodesList) { + if (LOG.isDebugEnabled()) { + LOG.debug("KONSTANTINOS: node: " + node.getNodeName() + + ", wait time: " + node.getEstimatedQueueTime() + + ", queued containers: " + node.getNumQueuedContainers()); + } + + if (maxQueueWaitTime > node.getEstimatedQueueTime() + && maxNMQueueSlots > node.getNumQueuedContainers()) { + LOG.debug("KONSTANTINOS: Will now queue containers to node " + + node.getNodeName()); + allocateContainersToNode(getNode(node.getNodeID()), true); + } + } + } else { + LOG.info("Run slot utilization is not too high. Will not " + + "queue containers to NMs."); + } + } @Override public void handle(SchedulerEvent event) { @@ -1156,7 +1403,7 @@ public void handle(SchedulerEvent event) { RMNode node = nodeUpdatedEvent.getRMNode(); nodeUpdate(node); if (!scheduleAsynchronously) { - allocateContainersToNode(getNode(node.getNodeID())); + allocateContainersToNode(getNode(node.getNodeID()), false); } } break; @@ -1218,15 +1465,23 @@ public void handle(SchedulerEvent event) { } private synchronized void addNode(RMNode nodeManager) { + // Calculate resource available for queuing. + Resource resourceForQueuing = Resources.multiply(minimumAllocation, maxNMQueueSlots); + // Create the SchedulerNode. FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager, - usePortForNodeName, nodeManager.getNodeLabels()); + resourceForQueuing, usePortForNodeName, nodeManager.getNodeLabels()); this.nodes.put(nodeManager.getNodeID(), schedulerNode); - Resources.addTo(clusterResource, nodeManager.getTotalCapability()); + + // Update total cluster resources (taking queue slots into account). + // Resources.addTo(clusterResource, nodeManager.getTotalCapability()); + Resources.addTo(clusterResource, schedulerNode.getTotalResource()); + Resources.addTo(clusterRunResource, schedulerNode.getTotalRunResource()); // update this node to node label manager if (labelManager != null) { labelManager.activateNode(nodeManager.getNodeID(), - nodeManager.getTotalCapability()); + schedulerNode.getTotalResource()); + // nodeManager.getTotalCapability()); } root.updateClusterResource(clusterResource, new ResourceLimits( @@ -1240,6 +1495,11 @@ private synchronized void addNode(RMNode nodeManager) { if (scheduleAsynchronously && numNodes == 1) { asyncSchedulerThread.beginSchedule(); } + + if (numNodes == 1 && nmQueueSchedulingThread != null) { + LOG.info("KONSTANTINOS: Activating RM thread that queues containers to NMs.."); + nmQueueSchedulingThread.activateQueuingThread(); + } } private synchronized void removeNode(RMNode nodeInfo) { @@ -1253,6 +1513,7 @@ private synchronized void removeNode(RMNode nodeInfo) { return; } Resources.subtractFrom(clusterResource, node.getRMNode().getTotalCapability()); + Resources.subtractFrom(clusterRunResource, node.getTotalRunResource()); root.updateClusterResource(clusterResource, new ResourceLimits( clusterResource)); int numNodes = numNodeManagers.decrementAndGet(); @@ -1261,6 +1522,12 @@ private synchronized void removeNode(RMNode nodeInfo) { asyncSchedulerThread.suspendSchedule(); } + if (numNodes == 0 && nmQueueSchedulingThread != null) { + LOG.info("KONSTANTINOS: Will suspend NMQueueSchedulingThread, " + + "as there are no active NMs."); + this.nmQueueSchedulingThread.suspendQueuingThread(); + } + // Remove running containers List runningContainers = node.getRunningContainers(); for (RMContainer container : runningContainers) { @@ -1650,4 +1917,9 @@ private String handleMoveToPlanQueue(String targetQueueName) { } return ret; } + + @Override + public int getMaxQueueWaitTime() { + return this.maxQueueWaitTime; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java index 28dc988..c3f2a96 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java @@ -49,6 +49,8 @@ Resource getClusterResource(); + Resource getClusterRunResource(); + /** * Get the yarn configuration. */ @@ -61,4 +63,6 @@ Comparator getQueueComparator(); FiCaSchedulerNode getNode(NodeId nodeId); + + int getMaxQueueWaitTime(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 59b9d21..e2fabab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionTimeEstimate; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; @@ -53,11 +54,11 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.security.AccessType; -import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; @@ -121,6 +122,8 @@ new QueueResourceLimitsInfo(); private volatile ResourceLimits cachedResourceLimitsForHeadroom = null; + + private boolean nmQueueCappingEnabled; public LeafQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { @@ -141,6 +144,7 @@ public LeafQueue(CapacitySchedulerContext cs, this.activeApplications = new TreeSet(applicationComparator); setupQueueConfigs(cs.getClusterResource()); + } protected synchronized void setupQueueConfigs(Resource clusterResource) @@ -210,6 +214,12 @@ protected synchronized void setupQueueConfigs(Resource clusterResource) } } + this.nmQueueCappingEnabled = conf.getBoolean( + CapacitySchedulerConfiguration.NM_QUEUE_AM_CAPPING, + CapacitySchedulerConfiguration.DEFAULT_NM_QUEUE_AM_CAPPING); + LOG.info("Initializing " + queueName + "\n" + "capacity = " + queueCapacities.getCapacity() + " [= (float) configuredCapacity / 100 ]" + "\n" + @@ -557,14 +567,27 @@ public synchronized Resource getAMResourceLimit() { synchronized (queueResourceLimitsInfo) { queueCurrentLimit = queueResourceLimitsInfo.getQueueCurrentLimit(); } - Resource queueCap = Resources.max(resourceCalculator, lastClusterResource, - absoluteCapacityResource, queueCurrentLimit); + + Resource queueCap; + if (nmQueueCappingEnabled) { + // Use AM capping policy that doesn't take into account NM queues + Resource clResource = scheduler.getClusterResource(); + Resource clRunResource = scheduler.getClusterRunResource(); + float percRunSlots = Resources.divide(resourceCalculator, clResource, + clRunResource, clResource); + queueCap = Resources.multiply(absoluteCapacityResource, percRunSlots); + } else { + // Use standard AM capping policy, this will take into account NM queues + // as possible resources that AM's can use. + queueCap = Resources.max(resourceCalculator, lastClusterResource, + absoluteCapacityResource, queueCurrentLimit); + } return Resources.multiplyAndNormalizeUp( resourceCalculator, queueCap, maxAMResourcePerQueuePercent, minimumAllocation); } - + public synchronized Resource getUserAMResourceLimit() { /* * The user amresource limit is based on the same approach as the @@ -740,9 +763,15 @@ private synchronized FiCaSchedulerApp getApplication( return labels; } + public CSAssignment assignContainers(Resource clusterResource, + FiCaSchedulerNode node, ResourceLimits currentResourceLimits) { + return assignContainers(clusterResource, node, currentResourceLimits, false); + } + @Override public synchronized CSAssignment assignContainers(Resource clusterResource, - FiCaSchedulerNode node, ResourceLimits currentResourceLimits) { + FiCaSchedulerNode node, ResourceLimits currentResourceLimits, + boolean allowQueuing) { updateCurrentResourceLimits(currentResourceLimits, clusterResource); if(LOG.isDebugEnabled()) { @@ -763,7 +792,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, getApplication(reservedContainer.getApplicationAttemptId()); synchronized (application) { return assignReservedContainer(application, node, reservedContainer, - clusterResource); + clusterResource, allowQueuing); } } @@ -837,7 +866,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, // Try to schedule CSAssignment assignment = assignContainersOnNode(clusterResource, node, application, priority, - null, currentResourceLimits); + null, currentResourceLimits, allowQueuing); // Did the application skip this node? if (assignment.getSkipped()) { @@ -888,7 +917,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, private synchronized CSAssignment assignReservedContainer( FiCaSchedulerApp application, FiCaSchedulerNode node, - RMContainer rmContainer, Resource clusterResource) { + RMContainer rmContainer, Resource clusterResource, boolean allowQueuing) { // Do we still need this reservation? Priority priority = rmContainer.getReservedPriority(); if (application.getTotalRequiredResources(priority) == 0) { @@ -898,7 +927,7 @@ private synchronized CSAssignment assignReservedContainer( // Try to assign if we have sufficient resources assignContainersOnNode(clusterResource, node, application, priority, - rmContainer, new ResourceLimits(Resources.none())); + rmContainer, new ResourceLimits(Resources.none()), allowQueuing); // Doesn't matter... since it's already charged for at time of reservation // "re-reservation" is *free* @@ -1161,7 +1190,8 @@ boolean shouldAllocOrReserveNewContainer(FiCaSchedulerApp application, private CSAssignment assignContainersOnNode(Resource clusterResource, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, - RMContainer reservedContainer, ResourceLimits currentResoureLimits) { + RMContainer reservedContainer, ResourceLimits currentResoureLimits, + boolean allowQueuing) { Resource assigned = Resources.none(); NodeType requestType = null; @@ -1174,7 +1204,7 @@ private CSAssignment assignContainersOnNode(Resource clusterResource, assigned = assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, node, application, priority, reservedContainer, - allocatedContainer, currentResoureLimits); + allocatedContainer, currentResoureLimits, allowQueuing); if (Resources.greaterThan(resourceCalculator, clusterResource, assigned, Resources.none())) { @@ -1202,7 +1232,7 @@ private CSAssignment assignContainersOnNode(Resource clusterResource, assigned = assignRackLocalContainers(clusterResource, rackLocalResourceRequest, node, application, priority, reservedContainer, - allocatedContainer, currentResoureLimits); + allocatedContainer, currentResoureLimits, allowQueuing); if (Resources.greaterThan(resourceCalculator, clusterResource, assigned, Resources.none())) { @@ -1230,7 +1260,7 @@ private CSAssignment assignContainersOnNode(Resource clusterResource, assigned = assignOffSwitchContainers(clusterResource, offSwitchResourceRequest, node, application, priority, reservedContainer, - allocatedContainer, currentResoureLimits); + allocatedContainer, currentResoureLimits, allowQueuing); // update locality statistics if (allocatedContainer.getValue() != null) { @@ -1289,12 +1319,12 @@ private Resource assignNodeLocalContainers(Resource clusterResource, ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, RMContainer reservedContainer, MutableObject allocatedContainer, - ResourceLimits currentResoureLimits) { + ResourceLimits currentResoureLimits, boolean allowQueuing) { if (canAssign(application, priority, node, NodeType.NODE_LOCAL, reservedContainer)) { return assignContainer(clusterResource, node, application, priority, nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer, - allocatedContainer, currentResoureLimits); + allocatedContainer, currentResoureLimits, allowQueuing); } return Resources.none(); @@ -1304,12 +1334,12 @@ private Resource assignRackLocalContainers(Resource clusterResource, ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, RMContainer reservedContainer, MutableObject allocatedContainer, - ResourceLimits currentResoureLimits) { + ResourceLimits currentResoureLimits, boolean allowQueuing) { if (canAssign(application, priority, node, NodeType.RACK_LOCAL, reservedContainer)) { return assignContainer(clusterResource, node, application, priority, rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer, - allocatedContainer, currentResoureLimits); + allocatedContainer, currentResoureLimits, allowQueuing); } return Resources.none(); @@ -1319,12 +1349,12 @@ private Resource assignOffSwitchContainers(Resource clusterResource, ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, RMContainer reservedContainer, MutableObject allocatedContainer, - ResourceLimits currentResoureLimits) { + ResourceLimits currentResoureLimits, boolean allowQueuing) { if (canAssign(application, priority, node, NodeType.OFF_SWITCH, reservedContainer)) { return assignContainer(clusterResource, node, application, priority, offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer, - allocatedContainer, currentResoureLimits); + allocatedContainer, currentResoureLimits, allowQueuing); } return Resources.none(); @@ -1405,10 +1435,11 @@ Container createContainer(FiCaSchedulerApp application, FiCaSchedulerNode node, } - private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode node, - FiCaSchedulerApp application, Priority priority, + private Resource assignContainer(Resource clusterResource, + FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, ResourceRequest request, NodeType type, RMContainer rmContainer, - MutableObject createdContainer, ResourceLimits currentResoureLimits) { + MutableObject createdContainer, ResourceLimits currentResoureLimits, + boolean allowQueuing) { if (LOG.isDebugEnabled()) { LOG.debug("assignContainers: node=" + node.getNodeName() + " application=" + application.getApplicationId() @@ -1430,8 +1461,26 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod } Resource capability = request.getCapability(); - Resource available = node.getAvailableResource(); + Resource available = allowQueuing ? node.getAvailableResource() : + node.getAvailableRunResource(); Resource totalResource = node.getTotalResource(); + + // Get the first execution time estimate from the ResourceRequest. + ExecutionTimeEstimate estTaskDuration = request.getFirstExecutionTimeEstimate(); + // KONSTANTINOS for debugging, to be removed... + if (!request.checkInvariant()) { + LOG.info("KONSTANTINOS: ****_PROBLEM_**** The number of containers is not the " + + "same as the exec time estimates."); +// LOG.info("KONSTANTINOS: The first task duration for this ResourceRequest is " +// + estTaskDuration); +// LOG.info("KONSTANTINOS: This request is for " + request.getNumContainers() +// + " containers. The estimated exec times are " +// + request.getExecutionTimeEstimates()); + } if (!Resources.lessThanOrEqual(resourceCalculator, clusterResource, capability, totalResource)) { @@ -1447,7 +1496,10 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod // Create the container if necessary Container container = getContainer(rmContainer, application, node, capability, priority); - + + // KONSTANTINOS: Set the expected duration for this container. + container.setExecutionTimeEstimate(estTaskDuration); + // something went wrong getting/creating the container if (container == null) { LOG.warn("Couldn't get container for allocation!"); @@ -1461,6 +1513,24 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod int availableContainers = resourceCalculator.computeAvailableContainers(available, capability); + // KONSTANTINOS: If the queue wait time has exceeded the allowed maximum, + // act as if there are no available containers. + // Do that only when queuing containers... + if ((node.getAvailableRunResource().getMemory() == 0) + && (node.getEstimatedQueueTime() > scheduler.getMaxQueueWaitTime())) { + LOG.info("Will not queue more containers, because the queue wait time (" + + node.getEstimatedQueueTime() + " sec) has exceeded the max value."); + availableContainers = 0; + } + // KONSTANTINOS for debugging + if (LOG.isDebugEnabled()) { + LOG.info("KONSTANTINOS: node:" + node.getNodeName() + + ", available memory=" + node.getAvailableRunResource() + + ", node est q wait time=" + node.getEstimatedQueueTime() + + "sec , max scheduler allowed wait time=" + + scheduler.getMaxQueueWaitTime()); + } + boolean needToUnreserve = Resources.greaterThan(resourceCalculator,clusterResource, currentResoureLimits.getAmountNeededUnreserve(), Resources.none()); @@ -1496,6 +1566,13 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod } } + // KONSTANTINOS: Update application taking into account task duration as + // well when removing a container request from the list. + // Also make sure that the Container above (that has the task duration) is + // the same object that gets stored in the newlyAllocatedContainers of the + // FiCaSchedulerApp, which are then passed to the allocation response to + // the AM. + // Inform the application RMContainer allocatedContainer = application.allocate(type, node, priority, request, container); @@ -1506,8 +1583,21 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod } // Inform the node - node.allocateContainer(allocatedContainer); - + boolean isQueueSlot = false; + if (allowQueuing + && Resources.greaterThan(resourceCalculator, clusterResource, + container.getResource(), node.getAvailableRunResource())) { + isQueueSlot = true; + } + // KONSTANTINOS: Update expected queue wait time of node. + node.allocateContainer(allocatedContainer, isQueueSlot, estTaskDuration); + + if (node.getEstimatedQueueTime() > scheduler.getMaxQueueWaitTime()) { + LOG.info("KONSTANTINOS: Node " + node.getNodeName() + ", q wait time=" + + node.getEstimatedQueueTime() + " (higher than max allowed from scheduler=" + + scheduler.getMaxQueueWaitTime() + ")"); + } + LOG.info("assignedContainer" + " application attempt=" + application.getApplicationAttemptId() + " container=" + container + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 5ed6bb8..e649415 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -374,10 +374,16 @@ private synchronized void removeApplication(ApplicationId applicationId, " #applications: " + getNumApplications()); } + public CSAssignment assignContainers(Resource clusterResource, + FiCaSchedulerNode node, ResourceLimits resourceLimits) { + return assignContainers(clusterResource, node, resourceLimits, false); + } + @Override public synchronized CSAssignment assignContainers(Resource clusterResource, - FiCaSchedulerNode node, ResourceLimits resourceLimits) { - CSAssignment assignment = + FiCaSchedulerNode node, ResourceLimits resourceLimits, + boolean allowQueuing) { + CSAssignment assignment = new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); Set nodeLabels = node.getLabels(); @@ -386,7 +392,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, return assignment; } - while (canAssign(clusterResource, node)) { + while (canAssign(clusterResource, node, allowQueuing)) { if (LOG.isDebugEnabled()) { LOG.debug("Trying to assign containers to child-queue of " + getQueueName()); @@ -402,8 +408,8 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, } // Schedule - CSAssignment assignedToChild = - assignContainersToChildQueues(clusterResource, node, resourceLimits); + CSAssignment assignedToChild = assignContainersToChildQueues( + clusterResource, node, resourceLimits, allowQueuing); assignment.setType(assignedToChild.getType()); // Done if no child-queue assigned anything @@ -451,10 +457,19 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, return assignment; } - private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) { - return (node.getReservedContainer() == null) && - Resources.greaterThanOrEqual(resourceCalculator, clusterResource, - node.getAvailableResource(), minimumAllocation); + private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node, + boolean allowQueuing) { +// return (node.getReservedContainer() == null) && +// Resources.greaterThanOrEqual(resourceCalculator, clusterResource, +// node.getAvailableResource(), minimumAllocation); + if (node.getReservedContainer() != null) { + return false; + } + + return allowQueuing ? Resources.greaterThanOrEqual(resourceCalculator, + clusterResource, node.getAvailableResource(), minimumAllocation) + : Resources.greaterThanOrEqual(resourceCalculator, clusterResource, + node.getAvailableRunResource(), minimumAllocation); } private ResourceLimits getResourceLimitsOfChild(CSQueue child, @@ -489,7 +504,8 @@ private ResourceLimits getResourceLimitsOfChild(CSQueue child, } private synchronized CSAssignment assignContainersToChildQueues( - Resource cluster, FiCaSchedulerNode node, ResourceLimits limits) { + Resource cluster, FiCaSchedulerNode node, ResourceLimits limits, + boolean allowQueuing) { CSAssignment assignment = new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); @@ -507,7 +523,8 @@ private synchronized CSAssignment assignContainersToChildQueues( ResourceLimits childLimits = getResourceLimitsOfChild(childQueue, cluster, limits); - assignment = childQueue.assignContainers(cluster, node, childLimits); + assignment = childQueue.assignContainers(cluster, node, childLimits, + allowQueuing); if(LOG.isDebugEnabled()) { LOG.debug("Assigned to queue: " + childQueue.getQueuePath() + " stats: " + childQueue + " --> " + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java index fe6db47..f0fe541 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java @@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -35,13 +36,14 @@ private static final Log LOG = LogFactory.getLog(FiCaSchedulerNode.class); - public FiCaSchedulerNode(RMNode node, boolean usePortForNodeName, - Set nodeLabels) { - super(node, usePortForNodeName, nodeLabels); + public FiCaSchedulerNode(RMNode node, Resource resourceForQueuing, + boolean usePortForNodeName, Set nodeLabels) { + super(node, resourceForQueuing, usePortForNodeName, nodeLabels); } public FiCaSchedulerNode(RMNode node, boolean usePortForNodeName) { - this(node, usePortForNodeName, CommonNodeLabelsManager.EMPTY_STRING_SET); + this(node, Resource.newInstance(0, 0), usePortForNodeName, + CommonNodeLabelsManager.EMPTY_STRING_SET); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 45d5bba..41a5a8e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -528,7 +528,7 @@ private Resource assignContainer( } // Inform the node - node.allocateContainer(allocatedContainer); + node.allocateContainer(allocatedContainer, false); // If this container is used to run AM, update the leaf queue's AM usage if (getLiveContainers().size() == 1 && !getUnmanagedAM()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index beb3ab5..2ba0fd3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -685,7 +685,7 @@ private int assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application application.allocate(type, node, priority, request, container); // Inform the node - node.allocateContainer(rmContainer); + node.allocateContainer(rmContainer, false); // Update usage for this container increaseUsedResources(rmContainer);