diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 5764c8c..856c880 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.apache.hadoop.security.AccessControlException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.service.AbstractService; @@ -42,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.DynamicQueueConf; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -196,6 +198,38 @@ public String moveApplication(ApplicationId appId, String newQueue) + " does not support moving apps between queues"); } + public void removeQueue(String queueName) throws YarnException { + throw new YarnException(getClass().getSimpleName() + + " does not support removing queues"); + } + + @Override + public void addQueue(Queue newQueue) throws YarnException { + throw new YarnException(getClass().getSimpleName() + + " does not support this operation"); + } + + @Override + public void setEntitlement(String queue, DynamicQueueConf dyQConf) + throws YarnException { + throw new YarnException(getClass().getSimpleName() + + " does not support this operation"); + } + + @Override + public void killAppsInQueue(String queueName) + throws SchedulerConfigEditException { + throw new SchedulerConfigEditException(getClass().getSimpleName() + + " does not support this operation"); + } + + @Override + public void moveAllApps(String sourceQueue, String destQueue) + throws YarnException { + throw new YarnException(getClass().getSimpleName() + + " does not support this operation"); + } + private void killOrphanContainerOnNode(RMNode node, NMContainerStatus container) { if (!container.getContainerState().equals(ContainerState.COMPLETE)) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index a127123..39de799 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -54,7 +54,7 @@ private static final Log LOG = LogFactory.getLog(AppSchedulingInfo.class); private final ApplicationAttemptId applicationAttemptId; final ApplicationId applicationId; - private final String queueName; + private String queueName; Queue queue; final String user; // TODO making containerIdCounter long @@ -443,6 +443,10 @@ public synchronized void transferStateFromPreviousAppSchedulingInfo( this.blacklist = appInfo.getBlackList(); } + public synchronized void setQueueName(String queueName) { + this.queueName = queueName; + } + public synchronized void recoverContainer(RMContainer rmContainer) { QueueMetrics metrics = queue.getMetrics(); if (pending) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java index 507b798..06ee3df 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.Map; +import java.util.TreeMap; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -38,6 +39,8 @@ import org.apache.hadoop.metrics2.lib.MutableCounterInt; import org.apache.hadoop.metrics2.lib.MutableCounterLong; import org.apache.hadoop.metrics2.lib.MutableGaugeInt; +import org.apache.hadoop.metrics2.util.Quantile; +import org.apache.hadoop.metrics2.util.SampleQuantiles; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -91,6 +94,10 @@ final MetricsSystem metricsSystem; private final Map users; private final Configuration conf; + private final TreeMap allocationDelays; + public final Quantile[] allocationQuantiles = { new Quantile(0.50, 0.050), + new Quantile(0.95, 0.005), new Quantile(0.99, 0.001) }; + private final SampleQuantiles allocationDelayGlobal; protected QueueMetrics(MetricsSystem ms, String queueName, Queue parent, boolean enableUserMetrics, Configuration conf) { @@ -102,6 +109,8 @@ protected QueueMetrics(MetricsSystem ms, String queueName, Queue parent, metricsSystem = ms; this.conf = conf; runningTime = buildBuckets(conf); + allocationDelays = new TreeMap(); + allocationDelayGlobal = new SampleQuantiles(allocationQuantiles); } protected QueueMetrics tag(MetricsInfo info, String value) { @@ -420,19 +429,30 @@ public void reserveResource(String user, Resource res) { } } - public void unreserveResource(String user, Resource res) { + public void unreserveResource(String user, Resource res, long allocationDelay) { reservedContainers.decr(); reservedMB.decr(res.getMemory()); reservedVCores.decr(res.getVirtualCores()); QueueMetrics userMetrics = getUserMetrics(user); if (userMetrics != null) { - userMetrics.unreserveResource(user, res); + userMetrics.unreserveResource(user, res, allocationDelay); } if (parent != null) { - parent.unreserveResource(user, res); + parent.unreserveResource(user, res, allocationDelay); + } + // keep allocation delays only at the root to control memory footprint + if (parent == null && allocationDelay > 0) { + if (allocationDelays.get(res) == null) { + allocationDelays.put(res, new SampleQuantiles(allocationQuantiles)); + } + allocationDelays.get(res).insert(allocationDelay); + allocationDelayGlobal.insert(allocationDelay); + LOG.info("DELAY," + res.getMemory() + "," + allocationDelay); } } + + public void incrActiveUsers() { activeUsers.incr(); } @@ -546,4 +566,51 @@ public int getActiveApps() { public MetricsSystem getMetricsSystem() { return metricsSystem; } + + /** + * Allocation Delays are kept at the root to contain memory footprint. If the + * exact capability was never reserved, we average out the closest known + * values. + */ + public long getAllocationDelay(Resource capability, Quantile quantile) { + if (parent == null) { + if (allocationDelays.get(capability) != null) { + return allocationDelays.get(capability).snapshot().get(quantile); + } + if (allocationDelays.floorEntry(capability) != null + && allocationDelays.ceilingEntry(capability) != null) { + long floor = + allocationDelays.floorEntry(capability).getValue().snapshot() + .get(quantile); + long ceiling = + allocationDelays.ceilingEntry(capability).getValue().snapshot() + .get(quantile); + return (floor + ceiling) / 2; + } + if (allocationDelays.ceilingEntry(capability) != null) { + return allocationDelays.ceilingEntry(capability).getValue().snapshot() + .get(quantile); + } + if (allocationDelays.floorEntry(capability) != null) { + return allocationDelays.floorEntry(capability).getValue().snapshot() + .get(quantile); + } + return 0; + } else { + return parent.getAllocationDelay(capability, quantile); + } + } + + /** + * Provide an approximate 95th percentile delay + * + * @return + */ + public long get95thReservationDelay() { + if (allocationDelayGlobal.snapshot() != null) { + return allocationDelayGlobal.snapshot().get(allocationQuantiles[1]); + } else { + return 0; + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java index 5649ccf..614aa4e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java @@ -23,8 +23,13 @@ import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.DynamicQueueConf; /** * This interface is the one implemented by the schedulers. It mainly extends @@ -49,4 +54,65 @@ * @throws IOException */ void reinitialize(Configuration conf, RMContext rmContext) throws IOException; + + /** + * Remove an existing queue. Implementations might limit when a queue could be + * removed (e.g., must have zero entitlement, and no applications running, or + * must be a leaf, etc..). + * + * @param queueName name of the queue to remove + * @throws YarnException + */ + void removeQueue(String queueName) throws YarnException; + + /** + * Add to the scheduler a new Queue. Implementations might limit what type of + * queues can be dynamically added (e.g., Queue must be a leaf, must be + * attached to existing parent, must have zero entitlement). + * + * @param newQueue the queue being added. + * @throws YarnException + */ + void addQueue(Queue newQueue) throws YarnException; + + /** + * This method increase the entitlement for current queue (must respect + * invariants, e.g., no overcommit of parents, non negative, etc.). + * Entitlement is a general term for weights in FairScheduler, capacity for + * the CapacityScheduler, etc. + * + * @param queue the queue for which we change entitlement + * @param dyQConf the new entitlement for the queue (capacity, maxCapacity, + * etc..) + * @throws YarnException + */ + void setEntitlement(String queue, DynamicQueueConf dyQConf) + throws YarnException; + + /** + * Terminate all applications in the specified queue. Implementation might + * limit the scope of applicability (e.g., only leaf queue). + * + * @param queueName + * @throws YarnException + */ + void killAppsInQueue(String queueName) throws YarnException; + + /** + * Move an application to a new queue, return the name of the queue where the + * application is located after this operation (target queue if succeeded, old + * queue if it failed). + */ + public String moveApplication(ApplicationId appId, String newQueue) + throws YarnException; + + /** + * Completely drain sourceQueue of applications, by moving all of them to + * destQueue. + * + * @param sourceQueue + * @param destQueue + * @throws YarnException + */ + void moveAllApps(String sourceQueue, String destQueue) throws YarnException; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 32dd23b..f67bbe2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -539,7 +539,7 @@ public synchronized void move(Queue newQueue) { for (Map map : reservedContainers.values()) { for (RMContainer reservedContainer : map.values()) { Resource resource = reservedContainer.getReservedResource(); - oldMetrics.unreserveResource(user, resource); + oldMetrics.unreserveResource(user, resource, -1); newMetrics.reserveResource(user, resource); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerConfigEditException.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerConfigEditException.java new file mode 100644 index 0000000..eb1b15e --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerConfigEditException.java @@ -0,0 +1,11 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +public class SchedulerConfigEditException extends IllegalArgumentException { + + private static final long serialVersionUID = 7100374511387193257L; + + public SchedulerConfigEditException(String string) { + super(string); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index ccb71e2..aaefa95 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -238,4 +238,23 @@ public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) * @param apps the collection to add the applications to */ public void collectSchedulerApplications(Collection apps); + + /** + * Detach a container from this queue + * @param clusterResource + * @param application + * @param rmContainer + */ + public void detachContainer(Resource clusterResource, + FiCaSchedulerApp application, RMContainer container); + + /** + * Attach a container to this queue + * @param clusterResource + * @param application + * @param container + */ + public void attachContainer(Resource clusterResource, + FiCaSchedulerApp application, RMContainer container); + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 2681238..2150b4f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.SettableFuture; import java.io.IOException; import java.io.InputStream; @@ -32,6 +33,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.HashSet; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -94,6 +97,14 @@ import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.DynamicQueueConf; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerConfigEditException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent; + @LimitedPrivate("yarn") @Evolving @SuppressWarnings("unchecked") @@ -440,9 +451,12 @@ private void reinitializeQueues(CapacitySchedulerConfiguration conf) private void validateExistingQueues( Map queues, Map newQueues) throws IOException { - for (String queue : queues.keySet()) { - if (!newQueues.containsKey(queue)) { - throw new IOException(queue + " cannot be found during refresh!"); + // check that all static queues are included in the newQueues list + for (Map.Entry e : queues.entrySet()) { + if (!(e.getValue() instanceof ReservationQueue)) { + if (!newQueues.containsKey(e.getKey())) { + throw new IOException(e.getKey() + " cannot be found during refresh!"); + } } } } @@ -474,10 +488,16 @@ static CSQueue parseQueue( Map oldQueues, QueueHook hook) throws IOException { CSQueue queue; - String[] childQueueNames = - conf.getQueues((parent == null) ? - queueName : (parent.getQueuePath()+"."+queueName)); - if (childQueueNames == null || childQueueNames.length == 0) { + String[] childQueueNames = + conf.getQueues((parent == null) ? queueName : (parent.getQueuePath() + + "." + queueName)); + // Check if the queue will be dynamically managed by the Reservation system + String fullQueueName = + (parent == null) ? queueName + : (parent.getQueuePath() + "." + queueName); + boolean isReservableQueue = conf.isReservableQueue(fullQueueName); + if ((childQueueNames == null || childQueueNames.length == 0) + && !isReservableQueue) { if (null == parent) { throw new IllegalStateException( "Queue configuration missing child queue names for " + queueName); @@ -488,20 +508,29 @@ static CSQueue parseQueue( // Used only for unit tests queue = hook.hook(queue); } else { - ParentQueue parentQueue = - new ParentQueue(csContext, queueName, parent,oldQueues.get(queueName)); + ParentQueue parentQueue; + if (isReservableQueue) { + parentQueue = + new PlanQueue(csContext, queueName, parent, + oldQueues.get(queueName)); + } else { + parentQueue = + new ParentQueue(csContext, queueName, parent, + oldQueues.get(queueName)); + } // Used only for unit tests queue = hook.hook(parentQueue); - - List childQueues = new ArrayList(); - for (String childQueueName : childQueueNames) { - CSQueue childQueue = - parseQueue(csContext, conf, queue, childQueueName, + if (!isReservableQueue) { + List childQueues = new ArrayList(); + for (String childQueueName : childQueueNames) { + CSQueue childQueue = + parseQueue(csContext, conf, queue, childQueueName, queues, oldQueues, hook); - childQueues.add(childQueue); + childQueues.add(childQueue); + } + parentQueue.setChildQueues(childQueues); } - parentQueue.setChildQueues(childQueues); } if(queue instanceof LeafQueue == true && queues.containsKey(queueName) @@ -515,12 +544,13 @@ static CSQueue parseQueue( return queue; } - synchronized CSQueue getQueue(String queueName) { + public synchronized CSQueue getQueue(String queueName) { return queues.get(queueName); } private synchronized void addApplication(ApplicationId applicationId, - String queueName, String user, boolean isAppRecovering) { + String queueName, String user, boolean isAppRecovering, + ReservationId reservationID) { // santiy checks. CSQueue queue = getQueue(queueName); if (queue == null) { @@ -531,11 +561,33 @@ private synchronized void addApplication(ApplicationId applicationId, return; } if (!(queue instanceof LeafQueue)) { - String message = "Application " + applicationId + - " submitted by user " + user + " to non-leaf queue: " + queueName; - this.rmContext.getDispatcher().getEventHandler() - .handle(new RMAppRejectedEvent(applicationId, message)); - return; + // Check if it is a dynamic reservation queue + if (queue instanceof PlanQueue) { + if (reservationID != null) { + String resQName = reservationID.toString(); + queue = getQueue(resQName); + if (queue == null) { + String message = + "Application " + applicationId + " submitted by user " + user + + " to a reservation which is not yet currently active: " + + resQName; + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMAppRejectedEvent(applicationId, message)); + return; + } + queueName = resQName; + } else { + queueName = queueName + PlanQueue.DEFAULT_QUEUE_SUFFIX; + queue = getQueue(queueName); + } + } else { + String message = + "Application " + applicationId + " submitted by user " + user + + " to non-leaf queue: " + queueName; + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMAppRejectedEvent(applicationId, message)); + return; + } } // Submit to the queue try { @@ -620,7 +672,7 @@ private synchronized void doneApplicationAttempt( ApplicationAttemptId applicationAttemptId, RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) { LOG.info("Application Attempt " + applicationAttemptId + " is done." + - " finalState=" + rmAppAttemptFinalState); + " finalState=" + rmAppAttemptFinalState); FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId); SchedulerApplication application = @@ -661,12 +713,16 @@ private synchronized void doneApplicationAttempt( // Inform the queue String queueName = attempt.getQueue().getQueueName(); + if (attempt.getQueue() instanceof ReservationQueue) { + queueName = + ((ReservationQueue) attempt.getQueue()).getReservationQueueName(); + } CSQueue queue = queues.get(queueName); if (!(queue instanceof LeafQueue)) { LOG.error("Cannot finish application " + "from non-leaf queue: " + queueName); } else { - queue.finishApplicationAttempt(attempt, queue.getQueueName()); + queue.finishApplicationAttempt(attempt, queueName); } } @@ -897,7 +953,7 @@ public void handle(SchedulerEvent event) { AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; addApplication(appAddedEvent.getApplicationId(), appAddedEvent.getQueue(), appAddedEvent.getUser(), - appAddedEvent.getIsAppRecovering()); + appAddedEvent.getIsAppRecovering(), appAddedEvent.getReservationID()); } break; case APP_REMOVED: @@ -1128,4 +1184,242 @@ private CapacitySchedulerConfiguration loadCapacitySchedulerConfiguration( throw new IOException(e); } } + + @Override + public synchronized void removeQueue(String queueName) + throws SchedulerConfigEditException { + LOG.info("Removing queue: " + queueName); + CSQueue q = this.getQueue(queueName); + if (!(q instanceof LeafQueue)) { + throw new SchedulerConfigEditException("The queue that we are asked " + + "to remove is not a LeafQueue"); + } + LeafQueue disposableLeafQueue = (LeafQueue) q; + // at this point we should have no more apps + if (disposableLeafQueue.getApplications().size() > 0 + || disposableLeafQueue.pendingApplications.size() > 0) { + throw new SchedulerConfigEditException("The queue is not empty " + + disposableLeafQueue.getApplications().size() + " active apps " + + disposableLeafQueue.pendingApplications.size() + " pending apps"); + } + if (disposableLeafQueue.getCapacity() > 0) { + throw new SchedulerConfigEditException( + "The queue has non-zero capacity: " + + disposableLeafQueue.getCapacity()); + } + + ((ParentQueue) disposableLeafQueue.getParent()).removeChildQueue(q); + this.queues.remove(queueName); + LOG.info("Queue " + queueName + " has been removed"); + } + + @Override + public synchronized void killAppsInQueue(String queueName) + throws SchedulerConfigEditException { + CSQueue q = this.getQueue(queueName); + if (!(q instanceof LeafQueue)) { + throw new SchedulerConfigEditException("killAppsInQueue can only " + + "be invoked on LeafQueue(s)"); + } + LeafQueue disposableLeafQueue = (LeafQueue) q; + // remove all pending apps first + for (FiCaSchedulerApp app : disposableLeafQueue.pendingApplications) { + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMAppEvent(app.getApplicationId(), RMAppEventType.KILL)); + } + // then remove all running apps + for (FiCaSchedulerApp app : disposableLeafQueue.getApplications()) { + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMAppEvent(app.getApplicationId(), RMAppEventType.KILL)); + } + } + + @Override + public synchronized void addQueue(Queue queue) + throws SchedulerConfigEditException { + + if (!(queue instanceof ReservationQueue)) { + throw new SchedulerConfigEditException("Queue " + queue.getQueueName() + + " is not a dynamic Queue"); + } + ReservationQueue newQueue = (ReservationQueue) queue; + CSQueue p = (ParentQueue) newQueue.getParent(); + + String queuename = newQueue.getReservationQueueName(); + + if (p == null || !(p instanceof ParentQueue)) { + throw new SchedulerConfigEditException("Parent queue for " + + ((ReservationQueue) queue).getReservationQueueName() + + " not properly set"); + } + + this.queues.put(queuename, newQueue); + ((ParentQueue) p).addChildQueue(newQueue); + LOG.info("Creation of leaf queue " + newQueue + " succeeded"); + } + + @Override + public synchronized void setEntitlement(String inQueue, + DynamicQueueConf sesConf) throws SchedulerConfigEditException { + LeafQueue queue = getCheckLeafQueue(inQueue); + ParentQueue parent = (ParentQueue) queue.getParent(); + + if (!(queue instanceof ReservationQueue)) { + throw new SchedulerConfigEditException("Entitlement can be" + + " modified dynamically only for PlanQueue(s)"); + } + + if (!(parent instanceof PlanQueue)) { + throw new SchedulerConfigEditException("The parent of a PlanQueue" + + " must be an ReservationQueue"); + } + + float sumChilds = parent.sumOfChildCapacities(); + float newChildCap = sumChilds - queue.getCapacity() + sesConf.getCapacity(); + + if (newChildCap >= 0 && newChildCap < 1.0f + CSQueueUtils.EPSILON) { + // note: epsilon checks here are not ok, as the epsilons might accumulate + // and + // become a problem in aggregate + if (Math.abs(sesConf.getCapacity() - queue.getCapacity()) == 0 + && Math.abs(sesConf.getMaxCapacity() - queue.getMaximumCapacity()) == 0) { + return; + } + if (sesConf.getCapacity() > queue.getCapacity()) { + ((ReservationQueue) queue).addCapacity((sesConf.getCapacity() - queue + .getCapacity())); + } else { + ((ReservationQueue) queue) + .subtractCapacity((queue.getCapacity() - sesConf.getCapacity())); + } + } else { + throw new SchedulerConfigEditException( + "Sum of child queues would exceed 100%"); + } + // note: we currently set maxCapacity to capacity + // this might be revised later + queue.setMaxCapacity(sesConf.getMaxCapacity()); + LOG.info("Set " + inQueue + " entitlement to " + queue.getCapacity() + + " request was (" + sesConf.getCapacity() + ")"); + } + + @Override + public synchronized String moveApplication(ApplicationId appId, + String targetQueueName) throws YarnException { + + FiCaSchedulerApp app = + (FiCaSchedulerApp) applications.get(appId).getCurrentAppAttempt(); + + String sourceQueueName = app.getQueue().getQueueName(); + if (app.getQueue() instanceof ReservationQueue) { + sourceQueueName = + ((ReservationQueue) app.getQueue()).getReservationQueueName(); + } + LeafQueue source = getCheckLeafQueue(sourceQueueName); + LeafQueue dest = getCheckLeafQueue(targetQueueName); + + synchronized (source) { + synchronized (dest) { + synchronized (app) { + + // TODO: Check the max-capacity of the queue/user (see. + // assignToQueue) + + // unreserve all reserved containers + for (RMContainer rmContainer : app.getReservedContainers()) { + source.unreserve(app, rmContainer.getReservedPriority(), + nodes.get(rmContainer.getAllocatedNode()), rmContainer); + } + + // Move all live containers + for (RMContainer rmContainer : app.getLiveContainers()) { + source.detachContainer(clusterResource, app, rmContainer); + // attach the Container to another queue + dest.attachContainer(clusterResource, app, rmContainer); + + } + // Detach the application.. + source.finishApplicationAttempt(app, sourceQueueName); + + // Basically only a bunch or re-names + app.moveToQueue(dest); + + // Submit to a new queue + dest.submitApplicationAttempt(app, app.getUser()); + } + } + } + LOG.info("App: " + app.getApplicationId() + " successfully moved from " + + sourceQueueName + " to: " + targetQueueName); + + return targetQueueName; + + } + + @Override + public synchronized void moveAllApps(String sourceQueue, String destQueue) + throws YarnException { + + // check source and destination + LeafQueue source = getCheckLeafQueue(sourceQueue); + getCheckLeafQueue(destQueue); + + // generate move events for each up in pending + for (SchedulerApplicationAttempt app : source.pendingApplications) { + SettableFuture future = SettableFuture.create(); + this.rmContext + .getDispatcher() + .getEventHandler() + .handle(new RMAppMoveEvent(app.getApplicationId(), destQueue, future)); + } + + // generate move events for each up in running apps + for (SchedulerApplicationAttempt app : source.getApplications()) { + SettableFuture future = SettableFuture.create(); + this.rmContext + .getDispatcher() + .getEventHandler() + .handle(new RMAppMoveEvent(app.getApplicationId(), destQueue, future)); + } + } + + /** + * Check that the String provided in input is the name of an existing, + * LeafQueue, if successful returns the queue. + * + * @param queue + * @return the LeafQueue + */ + private LeafQueue getCheckLeafQueue(String queue) { + CSQueue ret = this.getQueue(queue); + if (ret == null) { + throw new SchedulerConfigEditException("Queue " + queue + + " doesn't exist"); + } + if (!(ret instanceof LeafQueue)) { + throw new SchedulerConfigEditException("This queue is not a LeafQueue"); + } + return (LeafQueue) ret; + } + + public Set getPlanQueueNames() { + Set ret = new HashSet(); + for (Map.Entry l : queues.entrySet()) { + if (l.getValue() instanceof PlanQueue) { + ret.add(l.getKey()); + } + } + return ret; + } + + public Set getReservationQueueNames(String planQueueName) { + Set ret = new HashSet(); + for (Map.Entry l : queues.entrySet()) { + if (l.getValue() instanceof ReservationQueue + && l.getValue().getParent().getQueueName().equals(planQueueName)) { + ret.add(l.getKey()); + } + } + return ret; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 6fe695e..df5770f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -146,6 +146,63 @@ @Private public static final boolean DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE = false; + @Private + public static final String AVERAGE_CAPACITY = "average-capacity"; + + @Private + public static final String IS_RESERVABLE_QUEUE = "reservable.queue"; + + @Private + public static final String RESERVATION_WINDOW = "reservation-window"; + + @Private + public static final String INSTANTANEOUS_MAX_CAPACITY = + "instantaneous-max-capacity"; + + @Private + public static final long DEFAULT_RESERVATION_WINDOW = 0L; + + @Private + public static final String RESERVATION_ADMISSION_POLICY = + "reservation-policy"; + + @Private + public static final String RESERVATION_AGENT_NAME = "reservation-agent"; + + @Private + public static final String RESERVATION_SHOW_RESERVATION_AS_QUEUE = + "show-reservations-as-queues"; + + @Private + public static final String DEFAULT_RESERVATION_ADMISSION_POLICY = + "org.apache.hadoop.yarn.server.resourcemanager.reservation.CapacityOverTimePolicy"; + + @Private + public static final String DEFAULT_RESERVATION_AGENT_NAME = + "org.apache.hadoop.yarn.server.resourcemanager.reservation.GreedyReservationAgent"; + + @Private + public static final String RESERVATION_PLANNER_NAME = "reservation-planner"; + + @Private + public static final String DEFAULT_RESERVATION_PLANNER_NAME = + "org.apache.hadoop.yarn.server.resourcemanager.reservation.SimpleCapacityReplanner"; + + @Private + public static final String RESERVATION_KILL_ON_EXPIRY = + "reservation-kill-on-expiry"; + + @Private + public static final boolean DEFAULT_RESERVATION_KILL_ON_EXPIRY = true; + + @Private + public static final String RESERVATION_ENFORCEMENT_WINDOW = + "reservation-enforcement-window"; + + // default to 1h lookahead enforcement + @Private + public static final long DEFAULT_RESERVATION_ENFORCEMENT_WINDOW = 3600000; + public CapacitySchedulerConfiguration() { this(new Configuration()); } @@ -162,7 +219,7 @@ public CapacitySchedulerConfiguration(Configuration configuration, } } - private String getQueuePrefix(String queue) { + public String getQueuePrefix(String queue) { String queueName = PREFIX + queue + DOT; return queueName; } @@ -378,4 +435,100 @@ public void setScheduleAynschronously(boolean async) { setBoolean(SCHEDULE_ASYNCHRONOUSLY_ENABLE, async); } + public boolean isReservableQueue(String queue) { + boolean isReservableQueue = + getBoolean(getQueuePrefix(queue) + IS_RESERVABLE_QUEUE, false); + return isReservableQueue; + } + + public void setReservableQueue(String queue, boolean isReservableQueue) { + setBoolean(getQueuePrefix(queue) + IS_RESERVABLE_QUEUE, isReservableQueue); + LOG.debug("here setReservableQueue: queuePrefix=" + getQueuePrefix(queue) + + ", isReservableQueue=" + isReservableQueue(queue)); + } + + public long getReservationWindow(String queue) { + long reservationWindow = + getLong(getQueuePrefix(queue) + RESERVATION_WINDOW, + DEFAULT_RESERVATION_WINDOW); + return reservationWindow; + } + + public float getAverageCapacity(String queue) { + float avgCapacity = + getFloat(getQueuePrefix(queue) + AVERAGE_CAPACITY, + MAXIMUM_CAPACITY_VALUE); + return avgCapacity; + } + + public float getInstantaneousMaxCapacity(String queue) { + float instMaxCapacity = + getFloat(getQueuePrefix(queue) + INSTANTANEOUS_MAX_CAPACITY, + MAXIMUM_CAPACITY_VALUE); + return instMaxCapacity; + } + + public void setInstantaneousMaxCapacity(String queue, float instMaxCapacity) { + setFloat(getQueuePrefix(queue) + INSTANTANEOUS_MAX_CAPACITY, + instMaxCapacity); + } + + public void setReservationWindow(String queue, long reservationWindow) { + setLong(getQueuePrefix(queue) + RESERVATION_WINDOW, reservationWindow); + } + + public void setAverageCapacity(String queue, float avgCapacity) { + setFloat(getQueuePrefix(queue) + AVERAGE_CAPACITY, avgCapacity); + } + + public String getReservationAdmissionPolicy(String queue) { + String reservationPolicy = + get(getQueuePrefix(queue) + RESERVATION_ADMISSION_POLICY, + DEFAULT_RESERVATION_ADMISSION_POLICY); + return reservationPolicy; + } + + public void setReservationAdmissionPolicy(String queue, + String reservationPolicy) { + set(getQueuePrefix(queue) + RESERVATION_ADMISSION_POLICY, reservationPolicy); + } + + public String getReservationAgent(String queue) { + String reservationAgent = + get(getQueuePrefix(queue) + RESERVATION_AGENT_NAME, + DEFAULT_RESERVATION_AGENT_NAME); + return reservationAgent; + } + + public void setReservationAgent(String queue, String reservationPolicy) { + set(getQueuePrefix(queue) + RESERVATION_AGENT_NAME, reservationPolicy); + } + + public boolean getShowReservationAsQueues(String queuePath) { + boolean showReservationAsQueues = + getBoolean(getQueuePrefix(queuePath) + + RESERVATION_SHOW_RESERVATION_AS_QUEUE, false); + return showReservationAsQueues; + } + + public String getReplanner(String queue) { + String replanner = + get(getQueuePrefix(queue) + RESERVATION_PLANNER_NAME, + DEFAULT_RESERVATION_PLANNER_NAME); + return replanner; + } + + public boolean getKillOnExpiry(String queue) { + boolean killOnExpiry = + getBoolean(getQueuePrefix(queue) + RESERVATION_KILL_ON_EXPIRY, + DEFAULT_RESERVATION_KILL_ON_EXPIRY); + return killOnExpiry; + } + + public long getEnforcementWindow(String queue) { + long enforcementWindow = + getLong(getQueuePrefix(queue) + RESERVATION_ENFORCEMENT_WINDOW, + DEFAULT_RESERVATION_ENFORCEMENT_WINDOW); + return enforcementWindow; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 65938aa..6205cfd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -150,8 +150,7 @@ public LeafQueue(CapacitySchedulerContext cs, Resources.subtract(maximumAllocation, minimumAllocation), maximumAllocation); - float capacity = - (float)cs.getConfiguration().getCapacity(getQueuePath()) / 100; + float capacity = getCapacityFromConf(); float absoluteCapacity = parent.getAbsoluteCapacity() * capacity; float maximumCapacity = @@ -217,6 +216,11 @@ public LeafQueue(CapacitySchedulerContext cs, this.activeApplications = new TreeSet(applicationComparator); } + // externalizing in method, to allow overriding + protected float getCapacityFromConf() { + return (float)scheduler.getConfiguration().getCapacity(getQueuePath()) / 100; + } + private synchronized void setupQueueConfigs( Resource clusterResource, float capacity, float absoluteCapacity, @@ -228,9 +232,9 @@ private synchronized void setupQueueConfigs( Map acls, int nodeLocalityDelay) { // Sanity check - CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity); + CSQueueUtils.checkMaxCapacity(this.queueName, capacity, maximumCapacity); float absCapacity = getParent().getAbsoluteCapacity() * capacity; - CSQueueUtils.checkAbsoluteCapacities(getQueueName(), absCapacity, absoluteMaxCapacity); + CSQueueUtils.checkAbsoluteCapacities(this.queueName, absCapacity, absoluteMaxCapacity); this.capacity = capacity; this.absoluteCapacity = absCapacity; @@ -366,7 +370,7 @@ public String getQueueName() { @Override public String getQueuePath() { - return getParent().getQueuePath() + "." + getQueueName(); + return getParent().getQueuePath() + "." + this.queueName; } /** @@ -453,11 +457,11 @@ public synchronized void setAbsoluteUsedCapacity(float absUsedCapacity) { */ synchronized void setMaxCapacity(float maximumCapacity) { // Sanity check - CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity); + CSQueueUtils.checkMaxCapacity(this.queueName, capacity, maximumCapacity); float absMaxCapacity = CSQueueUtils.computeAbsoluteMaximumCapacity( maximumCapacity, getParent()); - CSQueueUtils.checkAbsoluteCapacities(getQueueName(), absoluteCapacity, absMaxCapacity); + CSQueueUtils.checkAbsoluteCapacities(this.queueName, absoluteCapacity, absMaxCapacity); this.maximumCapacity = maximumCapacity; this.absoluteMaxCapacity = absMaxCapacity; @@ -475,7 +479,7 @@ synchronized void setUserLimit(int userLimit) { * Set user limit factor - used only for testing. * @param userLimitFactor new user limit factor */ - synchronized void setUserLimitFactor(int userLimitFactor) { + synchronized void setUserLimitFactor(float userLimitFactor) { this.userLimitFactor = userLimitFactor; } @@ -545,7 +549,7 @@ public synchronized QueueInfo getQueueInfo( } } - userAclInfo.setQueueName(getQueueName()); + userAclInfo.setQueueName(this.queueName); userAclInfo.setUserAcls(operations); return Collections.singletonList(userAclInfo); } @@ -643,7 +647,9 @@ public void submitApplicationAttempt(FiCaSchedulerApp application, addApplicationAttempt(application, user); } - metrics.submitAppAttempt(userName); + if(application.isPending()){ + metrics.submitAppAttempt(userName); + } getParent().submitApplicationAttempt(application, userName); } @@ -722,7 +728,7 @@ private synchronized void activateApplications() { i.remove(); LOG.info("Application " + application.getApplicationId() + " from user: " + application.getUser() + - " activated in queue: " + getQueueName()); + " activated in queue: " + this.queueName); } } } @@ -738,7 +744,7 @@ private synchronized void addApplicationAttempt(FiCaSchedulerApp application, Us LOG.info("Application added -" + " appId: " + application.getApplicationId() + - " user: " + user + "," + " leaf-queue: " + getQueueName() + + " user: " + user + "," + " leaf-queue: " + this.queueName + " #user-pending-applications: " + user.getPendingApplications() + " #user-active-applications: " + user.getActiveApplications() + " #queue-pending-applications: " + getNumPendingApplications() + @@ -781,7 +787,7 @@ public synchronized void removeApplicationAttempt(FiCaSchedulerApp application, LOG.info("Application removed -" + " appId: " + application.getApplicationId() + " user: " + application.getUser() + - " queue: " + getQueueName() + + " queue: " + this.queueName + " #user-pending-applications: " + user.getPendingApplications() + " #user-active-applications: " + user.getActiveApplications() + " #queue-pending-applications: " + getNumPendingApplications() + @@ -813,9 +819,11 @@ private synchronized FiCaSchedulerApp getApplication( if (reservedContainer != null) { FiCaSchedulerApp application = getApplication(reservedContainer.getApplicationAttemptId()); - synchronized (application) { - return assignReservedContainer(application, node, reservedContainer, - clusterResource); + if (application != null) { + synchronized (application) { + return assignReservedContainer(application, node, reservedContainer, + clusterResource); + } } } @@ -950,7 +958,7 @@ private synchronized boolean assignToQueue(Resource clusterResource, clusterResource); if (potentialNewCapacity > absoluteMaxCapacity) { if (LOG.isDebugEnabled()) { - LOG.debug(getQueueName() + LOG.debug(this.queueName + " usedResources: " + usedResources + " clusterResources: " + clusterResource + " currentCapacity " @@ -1061,7 +1069,7 @@ private Resource computeUserLimit(FiCaSchedulerApp application, if (LOG.isDebugEnabled()) { String userName = application.getUser(); LOG.debug("User limit computation for " + userName + - " in queue " + getQueueName() + + " in queue " + this.queueName + " userLimit=" + userLimit + " userLimitFactor=" + userLimitFactor + " required: " + required + @@ -1088,7 +1096,7 @@ private synchronized boolean assignToUser(Resource clusterResource, if (Resources.greaterThan(resourceCalculator, clusterResource, user.getConsumedResources(), limit)) { if (LOG.isDebugEnabled()) { - LOG.debug("User " + userName + " in queue " + getQueueName() + + LOG.debug("User " + userName + " in queue " + this.queueName + " will exceed limit - " + " consumed: " + user.getConsumedResources() + " limit: " + limit @@ -1388,15 +1396,15 @@ private void reserve(FiCaSchedulerApp application, Priority priority, node.reserveResource(application, priority, rmContainer); } - private boolean unreserve(FiCaSchedulerApp application, Priority priority, + protected boolean unreserve(FiCaSchedulerApp application, Priority priority, FiCaSchedulerNode node, RMContainer rmContainer) { // Done with the reservation? if (application.unreserve(node, priority)) { node.unreserveResource(application); - + long reservationDealy = -1; // FIXME TODO THIS SHOULD COME FROM UNRESERVE // Update reserved metrics getMetrics().unreserveResource( - application.getUser(), rmContainer.getContainer().getResource()); + application.getUser(), rmContainer.getContainer().getResource(), reservationDealy); return true; } return false; @@ -1463,7 +1471,7 @@ synchronized void allocateResource(Resource clusterResource, metrics.setAvailableResourcesToUser(userName, application.getHeadroom()); if (LOG.isDebugEnabled()) { - LOG.info(getQueueName() + + LOG.info(this.queueName + " user=" + userName + " used=" + usedResources + " numContainers=" + numContainers + " headroom = " + application.getHeadroom() + @@ -1487,7 +1495,7 @@ synchronized void releaseResource(Resource clusterResource, user.releaseContainer(resource); metrics.setAvailableResourcesToUser(userName, application.getHeadroom()); - LOG.info(getQueueName() + + LOG.info(this.queueName + " used=" + usedResources + " numContainers=" + numContainers + " user=" + userName + " user-resources=" + user.getConsumedResources()); } @@ -1619,4 +1627,48 @@ public void collectSchedulerApplications( apps.add(app.getApplicationAttemptId()); } } + + @Override + synchronized public void attachContainer(Resource clusterResource, + FiCaSchedulerApp application, RMContainer rmContainer) { + if (application != null) { + allocateResource(clusterResource, application, rmContainer.getContainer() + .getResource()); + LOG.info("movedContainer" + " container=" + rmContainer.getContainer() + + " resource=" + rmContainer.getContainer().getResource() + + " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity() + + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used=" + + usedResources + " cluster=" + clusterResource); + // Inform the parent queue + getParent().attachContainer(clusterResource, application, rmContainer); + } + } + + @Override + synchronized public void detachContainer(Resource clusterResource, + FiCaSchedulerApp application, RMContainer rmContainer) { + if (application != null) { + releaseResource(clusterResource, application, rmContainer.getContainer() + .getResource()); + LOG.info("movedContainer" + " container=" + rmContainer.getContainer() + + " resource=" + rmContainer.getContainer().getResource() + + " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity() + + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used=" + + usedResources + " cluster=" + clusterResource); + // Inform the parent queue + getParent().detachContainer(clusterResource, application, rmContainer); + } + } + + public void setCapacity(float capacity) { + this.capacity = capacity; + } + + public void setAbsoluteCapacity(float absoluteCapacity) { + this.absoluteCapacity = absoluteCapacity; + } + + public void setMaxApplications(int maxApplications) { + this.maxApplications = maxApplications; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index d83eed3..def7d1e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -156,7 +156,7 @@ public ParentQueue(CapacitySchedulerContext cs, ", fullname=" + getQueuePath()); } - private synchronized void setupQueueConfigs( + protected synchronized void setupQueueConfigs( Resource clusterResource, float capacity, float absoluteCapacity, float maximumCapacity, float absoluteMaxCapacity, @@ -202,7 +202,7 @@ private synchronized void setupQueueConfigs( void setChildQueues(Collection childQueues) { // Validate - float childCapacities = 0; + float childCapacities = 0; for (CSQueue queue : childQueues) { childCapacities += queue.getCapacity(); } @@ -214,7 +214,7 @@ void setChildQueues(Collection childQueues) { " capacity of " + childCapacities + " for children of queue " + queueName); } - + this.childQueues.clear(); this.childQueues.addAll(childQueues); if (LOG.isDebugEnabled()) { @@ -666,7 +666,7 @@ synchronized CSAssignment assignContainersToChildQueues(Resource cluster, assignment.getResource(), Resources.none())) { // Remove and re-insert to sort iter.remove(); - LOG.info("Re-sorting assigned queue: " + childQueue.getQueuePath() + + LOG.info("Re-sorting assigned queue: " + childQueue.getQueuePath() + " stats: " + childQueue); childQueues.add(childQueue); if (LOG.isDebugEnabled()) { @@ -791,4 +791,94 @@ public void collectSchedulerApplications( queue.collectSchedulerApplications(apps); } } + + synchronized void addChildQueue(CSQueue newQueue) { + if (newQueue.getCapacity() > 0) { + throw new IllegalArgumentException("Queue " + newQueue + + " being added has non zero capacity."); + } + boolean added = this.childQueues.add(newQueue); + if (LOG.isDebugEnabled()) { + LOG.debug("updateChildQueues (action: add queue): " + added + " " + + getChildQueuesToPrint()); + } + } + + synchronized void removeChildQueue(CSQueue remQueue) { + if (remQueue.getCapacity() > 0) { + throw new IllegalArgumentException("Queue " + remQueue + + " being removed has non zero capacity."); + } + boolean removed = false; + Iterator qiter = childQueues.iterator(); + while (qiter.hasNext()) { + CSQueue cs = qiter.next(); + if (cs.equals(remQueue)) { + qiter.remove(); + removed = true; // are we sure? + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("updateChildQueues (action: remove queue): " + removed + " " + + getChildQueuesToPrint()); + } + } + + @Override + public void attachContainer(Resource clusterResource, + FiCaSchedulerApp application, RMContainer rmContainer) { + if (application != null) { + // Careful! Locking order is important! + // Book keeping + synchronized (this) { + allocateResource(clusterResource, rmContainer.getContainer() + .getResource()); + + LOG.info("movedContainer" + " queueMoveIn=" + getQueueName() + + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + + getAbsoluteUsedCapacity() + " used=" + usedResources + + " cluster=" + clusterResource); + } + + // Inform the parent + if (parent != null) { + parent.attachContainer(clusterResource, application, rmContainer); + } + } + } + + @Override + public void detachContainer(Resource clusterResource, + FiCaSchedulerApp application, RMContainer rmContainer) { + if (application != null) { + // Careful! Locking order is important! + // Book keeping + synchronized (this) { + releaseResource(clusterResource, rmContainer.getContainer() + .getResource()); + + LOG.info("movedContainer" + " queueMoveOut=" + getQueueName() + + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + + getAbsoluteUsedCapacity() + " used=" + usedResources + + " cluster=" + clusterResource); + } + + // Inform the parent + if (parent != null) { + parent.detachContainer(clusterResource, application, rmContainer); + } + } + } + + public float sumOfChildCapacities() { + float ret = 0; + for (CSQueue l : childQueues) { + ret += l.getCapacity(); + } + return ret; + } + + public Map getACLs() { + return acls; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java new file mode 100644 index 0000000..df5e2ba --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java @@ -0,0 +1,149 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; + +/** + * This represents a dynamic queue managed by the {@link ReservationSystem}. + * From the user perspective this is equivalent to a LeafQueue that respect + * reservations, but functionality wise is a sub-class of ParentQueue + * + */ +public class PlanQueue extends ParentQueue { + + public static final String DEFAULT_QUEUE_SUFFIX = "-default"; + + private static final Log LOG = LogFactory.getLog(PlanQueue.class); + + private int maxAppsForReservation; + private int maxAppsPerUserForReservation; + private int userLimit; + private float userLimitFactor; + private CapacitySchedulerContext schedulerContext; + private boolean showReservationsAsQueues; + + public PlanQueue(CapacitySchedulerContext cs, String queueName, + CSQueue parent, CSQueue old) { + super(cs, queueName, parent, old); + + this.schedulerContext = cs; + // Set the reservation queue attributes for the Plan + CapacitySchedulerConfiguration conf = cs.getConfiguration(); + String queuePath = super.getQueuePath(); + maxAppsForReservation = conf.getMaximumApplicationsPerQueue(queuePath); + showReservationsAsQueues = conf.getShowReservationAsQueues(queuePath); + if (maxAppsForReservation < 0) { + maxAppsForReservation = + (int) (CapacitySchedulerConfiguration.DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS * super + .getAbsoluteCapacity()); + } + userLimit = conf.getUserLimit(queuePath); + userLimitFactor = conf.getUserLimitFactor(queuePath); + maxAppsPerUserForReservation = + (int) (maxAppsForReservation * (userLimit / 100.0f) * userLimitFactor); + + StringBuffer queueInfo = new StringBuffer(); + queueInfo.append("Created Plan Queue: ").append(queueName) + .append("\nwith capacity: [").append(super.getCapacity()) + .append("]\nwith max capacity: [").append(super.getMaximumCapacity()) + .append("\nwith max reservation apps: [").append(maxAppsForReservation) + .append("]\nwith max reservation apps per user: [") + .append(maxAppsPerUserForReservation).append("]\nwith user limit: [") + .append(userLimit).append("]\nwith user limit factor: [") + .append(userLimitFactor).append("]."); + LOG.info(queueInfo.toString()); + } + + @Override + public synchronized void reinitialize(CSQueue newlyParsedQueue, + Resource clusterResource) throws IOException { + // Sanity check + if (!(newlyParsedQueue instanceof PlanQueue) + || !newlyParsedQueue.getQueuePath().equals(getQueuePath())) { + throw new IOException("Trying to reinitialize " + getQueuePath() + + " from " + newlyParsedQueue.getQueuePath()); + } + + PlanQueue newlyParsedParentQueue = (PlanQueue) newlyParsedQueue; + + if (newlyParsedParentQueue.getChildQueues().size() > 0) { + throw new IOException( + "Reservable Queue should not have sub-queues in the" + + "configuration"); + } + + // Set new configs + setupQueueConfigs(clusterResource, newlyParsedParentQueue.getCapacity(), + newlyParsedParentQueue.getAbsoluteCapacity(), + newlyParsedParentQueue.getMaximumCapacity(), + newlyParsedParentQueue.getAbsoluteMaximumCapacity(), + newlyParsedParentQueue.getState(), newlyParsedParentQueue.getACLs()); + + this.maxAppsForReservation = newlyParsedParentQueue.maxAppsForReservation; + this.maxAppsPerUserForReservation = + newlyParsedParentQueue.maxAppsPerUserForReservation; + + // run reinitialize on each existing queue, to trigger absolute cap + // recomputations + for (CSQueue ses : this.getChildQueues()) { + CSQueueUtils.updateQueueStatistics( + schedulerContext.getResourceCalculator(), ses, this, + schedulerContext.getClusterResource(), + schedulerContext.getMinimumResourceCapability()); + ses.reinitialize(ses, clusterResource); + ((ReservationQueue) ses).setMaxApplications(this + .getMaxApplicationsForReservations()); + ((ReservationQueue) ses).setMaxApplicationsPerUser(this + .getMaxApplicationsPerUserForReservation()); + } + showReservationsAsQueues = newlyParsedParentQueue.showReservationsAsQueues; + } + + /** + * Number of maximum applications for each of the reservations in this Plan. + * + * @return maxAppsForreservation + */ + public int getMaxApplicationsForReservations() { + return maxAppsForReservation; + } + + /** + * Number of maximum applications per user for each of the reservations in + * this Plan. + * + * @return maxAppsPerUserForreservation + */ + public int getMaxApplicationsPerUserForReservation() { + return maxAppsPerUserForReservation; + } + + /** + * User limit value for each of the reservations in this Plan. + * + * @return userLimit + */ + public int getUserLimitForReservation() { + return userLimit; + } + + /** + * User limit factor value for each of the reservations in this Plan. + * + * @return userLimitFactor + */ + public float getUserLimitFactor() { + return userLimitFactor; + } + + /** + * Determine whether to hide/show the ReservationQueues + */ + public boolean showReservationsAsLeafs() { + return showReservationsAsQueues; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java new file mode 100644 index 0000000..432e7c0 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java @@ -0,0 +1,108 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; + +/** + * This represents a dynamic {@link LeafQueue} managed by the + * {@link ReservationSystem} + * + */ +public class ReservationQueue extends LeafQueue { + private static final Log LOG = LogFactory.getLog(ReservationQueue.class); + + private int maxSystemApps; + private int maxApplications; + private int maxApplicationsPerUser; + + public ReservationQueue(CapacitySchedulerContext cs, String queueName, + PlanQueue parent) { + super(cs, queueName, parent, null); + maxSystemApps = cs.getConfiguration().getMaximumSystemApplications(); + // the following parameters are common to all reservation in the plan + this.maxApplications = parent.getMaxApplicationsForReservations(); + this.maxApplicationsPerUser = + parent.getMaxApplicationsPerUserForReservation(); + setUserLimit(parent.getUserLimitForReservation()); + setUserLimitFactor(parent.getUserLimitFactor()); + } + + /** + * This methods removes capacity from a queue and adjusts its absoluteCapacity + * + * @param capacity + * @throws AccessControlException + */ + synchronized public void subtractCapacity(float capacity) + throws IllegalArgumentException { + + if (capacity > this.getCapacity() + CSQueueUtils.EPSILON) { + throw new IllegalArgumentException( + "Capacity demand is higher than available."); + } else { + this.setCapacity(this.getCapacity() - capacity); + this.setAbsoluteCapacity(getParent().getAbsoluteCapacity() + * this.getCapacity()); + setMaxApplications((int) (maxSystemApps * getAbsoluteCapacity())); + if (LOG.isDebugEnabled()) { + LOG.debug("successfully subtracted " + capacity + + " capacity from queue " + this.getQueueName()); + } + } + + } + + /** + * This method adds capacity to a queue and adjusts its absoluteCapacity + * + * @param capacity + * @throws AccessControlException + */ + synchronized public void addCapacity(float capacity) + throws IllegalArgumentException { + if ((this.getCapacity() + capacity) > (1.0f + CSQueueUtils.EPSILON)) { + throw new IllegalArgumentException("Queue was over provisionned."); + } else { + this.setCapacity(this.getCapacity() + capacity); + this.setAbsoluteCapacity(getParent().getAbsoluteCapacity() + * this.getCapacity()); + setMaxApplications((int) (maxSystemApps * getAbsoluteCapacity())); + if (LOG.isDebugEnabled()) { + LOG.debug("successfully added " + capacity + " capacity to queue " + + this.getQueueName()); + } + + } + } + + // used by the super constructor, we initialize to zero + protected float getCapacityFromConf() { + return 0f; + } + + @Override + public int getMaxApplications() { + return maxApplications; + } + + @Override + public synchronized int getMaxApplicationsPerUser() { + return maxApplicationsPerUser; + } + + public synchronized void setMaxApplicationsPerUser(int maxApplicationsPerUser) { + this.maxApplicationsPerUser = maxApplicationsPerUser; + } + + @Override + public String getQueueName() { + return this.getParent().getQueueName(); + } + + public String getReservationQueueName() { + return super.getQueueName(); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/DynamicQueueConf.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/DynamicQueueConf.java new file mode 100644 index 0000000..d78ef11 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/DynamicQueueConf.java @@ -0,0 +1,28 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common; + +public class DynamicQueueConf { + + private float capacity; + private float maxCapacity; + + public DynamicQueueConf(float capacity, float maxCapacity){ + this.setCapacity(capacity); + this.maxCapacity = maxCapacity; + } + + public float getMaxCapacity() { + return maxCapacity; + } + + public void setMaxCapacity(float maxCapacity) { + this.maxCapacity = maxCapacity; + } + + public float getCapacity() { + return capacity; + } + + public void setCapacity(float capacity) { + this.capacity = capacity; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 846d1e1..ba46df0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -252,4 +253,13 @@ public synchronized Allocation getAllocation(ResourceCalculator rc, allocation.getNMTokenList()); } + public void moveToQueue(LeafQueue dest) { + // leverage parent move + super.move(dest); + // update RMAPP queue as well + this.rmContext.getRMApps().get(getApplicationId()) + .setQueue(dest.getQueueName()); + } + + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java index 7e0b89e..d4e8d2a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java @@ -19,25 +19,28 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ReservationId; public class AppAddedSchedulerEvent extends SchedulerEvent { private final ApplicationId applicationId; private final String queue; private final String user; + private final ReservationId reservationID; private final boolean isAppRecovering; - public AppAddedSchedulerEvent( - ApplicationId applicationId, String queue, String user) { - this(applicationId, queue, user, false); + public AppAddedSchedulerEvent(ApplicationId applicationId, String queue, + String user, ReservationId reservationID) { + this(applicationId, queue, user, false, reservationID); } public AppAddedSchedulerEvent(ApplicationId applicationId, String queue, - String user, boolean isAppRecovering) { + String user, boolean isAppRecovering, ReservationId reservationID) { super(SchedulerEventType.APP_ADDED); this.applicationId = applicationId; this.queue = queue; this.user = user; + this.reservationID = reservationID; this.isAppRecovering = isAppRecovering; } @@ -56,4 +59,8 @@ public String getUser() { public boolean getIsAppRecovering() { return isAppRecovering; } + + public ReservationId getReservationID() { + return reservationID; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java index 0c36c55..97610ce 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java @@ -204,7 +204,7 @@ public void unreserve(Priority priority, FSSchedulerNode node) { app.unreserve(node, priority); node.unreserveResource(app); getMetrics().unreserveResource( - app.getUser(), rmContainer.getContainer().getResource()); + app.getUser(), rmContainer.getContainer().getResource(), -1); } /**