diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 72ee7db..da40bc0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.DynamicQueueConf; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; @@ -224,6 +225,24 @@ public String moveApplication(ApplicationId appId, String newQueue) + " does not support moving apps between queues"); } + public void removeQueue(String queueName) throws YarnException { + throw new YarnException(getClass().getSimpleName() + + " does not support removing queues"); + } + + @Override + public void addQueue(Queue newQueue) throws YarnException { + throw new YarnException(getClass().getSimpleName() + + " does not support this operation"); + } + + @Override + public void setEntitlement(String queue, DynamicQueueConf dyQConf) + throws YarnException { + throw new YarnException(getClass().getSimpleName() + + " does not support this operation"); + } + private void killOrphanContainerOnNode(RMNode node, NMContainerStatus container) { if (!container.getContainerState().equals(ContainerState.COMPLETE)) { @@ -460,4 +479,9 @@ public synchronized void killAllAppsInQueue(String queueName) .handle(new RMAppEvent(app.getApplicationId(), RMAppEventType.KILL)); } } + + public Set getReservableQueues() throws YarnException { + throw new YarnException(getClass().getSimpleName() + + " does not support reservations"); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerConfigEditException.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerConfigEditException.java new file mode 100644 index 0000000..eb1b15e --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerConfigEditException.java @@ -0,0 +1,11 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +public class SchedulerConfigEditException extends IllegalArgumentException { + + private static final long serialVersionUID = 7100374511387193257L; + + public SchedulerConfigEditException(String string) { + super(string); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index 5ce16c2..5afbde2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.List; +import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -40,6 +41,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.DynamicQueueConf; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; /** @@ -220,4 +222,44 @@ public String moveApplication(ApplicationId appId, String newQueue) * @throws YarnException */ void killAllAppsInQueue(String queueName) throws YarnException; + + /** + * Remove an existing queue. Implementations might limit when a queue could be + * removed (e.g., must have zero entitlement, and no applications running, or + * must be a leaf, etc..). + * + * @param queueName name of the queue to remove + * @throws YarnException + */ + void removeQueue(String queueName) throws YarnException; + + /** + * Add to the scheduler a new Queue. Implementations might limit what type of + * queues can be dynamically added (e.g., Queue must be a leaf, must be + * attached to existing parent, must have zero entitlement). + * + * @param newQueue the queue being added. + * @throws YarnException + */ + void addQueue(Queue newQueue) throws YarnException; + + /** + * This method increase the entitlement for current queue (must respect + * invariants, e.g., no overcommit of parents, non negative, etc.). + * Entitlement is a general term for weights in FairScheduler, capacity for + * the CapacityScheduler, etc. + * + * @param queue the queue for which we change entitlement + * @param dyQConf the new entitlement for the queue (capacity, maxCapacity, + * etc..) + * @throws YarnException + */ + void setEntitlement(String queue, DynamicQueueConf dyQConf) + throws YarnException; + + /** + * Gets the list of queues managed by the Reservation System + * @return the list of queues which support reservations + */ + public Set getReservableQueues() throws YarnException; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index c8a73bf..5d673bf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -18,6 +18,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.SettableFuture; + import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; @@ -30,6 +34,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.HashSet; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -92,6 +98,14 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.DynamicQueueConf; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerConfigEditException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent; + @LimitedPrivate("yarn") @Evolving @SuppressWarnings("unchecked") @@ -475,9 +489,12 @@ private void reinitializeQueues(CapacitySchedulerConfiguration conf) private void validateExistingQueues( Map queues, Map newQueues) throws IOException { - for (String queue : queues.keySet()) { - if (!newQueues.containsKey(queue)) { - throw new IOException(queue + " cannot be found during refresh!"); + // check that all static queues are included in the newQueues list + for (Map.Entry e : queues.entrySet()) { + if (!(e.getValue() instanceof ReservationQueue)) { + if (!newQueues.containsKey(e.getKey())) { + throw new IOException(e.getKey() + " cannot be found during refresh!"); + } } } } @@ -509,10 +526,16 @@ static CSQueue parseQueue( Map oldQueues, QueueHook hook) throws IOException { CSQueue queue; - String[] childQueueNames = - conf.getQueues((parent == null) ? - queueName : (parent.getQueuePath()+"."+queueName)); - if (childQueueNames == null || childQueueNames.length == 0) { + String[] childQueueNames = + conf.getQueues((parent == null) ? queueName : (parent.getQueuePath() + + "." + queueName)); + // Check if the queue will be dynamically managed by the Reservation system + String fullQueueName = + (parent == null) ? queueName + : (parent.getQueuePath() + "." + queueName); + boolean isReservableQueue = conf.isReservableQueue(fullQueueName); + if ((childQueueNames == null || childQueueNames.length == 0) + && !isReservableQueue) { if (null == parent) { throw new IllegalStateException( "Queue configuration missing child queue names for " + queueName); @@ -523,20 +546,29 @@ static CSQueue parseQueue( // Used only for unit tests queue = hook.hook(queue); } else { - ParentQueue parentQueue = - new ParentQueue(csContext, queueName, parent,oldQueues.get(queueName)); + ParentQueue parentQueue; + if (isReservableQueue) { + parentQueue = + new PlanQueue(csContext, queueName, parent, + oldQueues.get(queueName)); + } else { + parentQueue = + new ParentQueue(csContext, queueName, parent, + oldQueues.get(queueName)); + } // Used only for unit tests queue = hook.hook(parentQueue); - - List childQueues = new ArrayList(); - for (String childQueueName : childQueueNames) { - CSQueue childQueue = - parseQueue(csContext, conf, queue, childQueueName, + if (!isReservableQueue) { + List childQueues = new ArrayList(); + for (String childQueueName : childQueueNames) { + CSQueue childQueue = + parseQueue(csContext, conf, queue, childQueueName, queues, oldQueues, hook); - childQueues.add(childQueue); + childQueues.add(childQueue); + } + parentQueue.setChildQueues(childQueues); } - parentQueue.setChildQueues(childQueues); } if(queue instanceof LeafQueue == true && queues.containsKey(queueName) @@ -550,7 +582,7 @@ static CSQueue parseQueue( return queue; } - synchronized CSQueue getQueue(String queueName) { + public synchronized CSQueue getQueue(String queueName) { if (queueName == null) { return null; } @@ -591,7 +623,8 @@ else if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) { } private synchronized void addApplication(ApplicationId applicationId, - String queueName, String user, boolean isAppRecovering) { + String queueName, String user, boolean isAppRecovering, + ReservationId reservationID) { if (mappings != null && mappings.size() > 0) { try { @@ -627,11 +660,33 @@ private synchronized void addApplication(ApplicationId applicationId, return; } if (!(queue instanceof LeafQueue)) { - String message = "Application " + applicationId + - " submitted by user " + user + " to non-leaf queue: " + queueName; - this.rmContext.getDispatcher().getEventHandler() - .handle(new RMAppRejectedEvent(applicationId, message)); - return; + // Check if it is a dynamic reservation queue + if (queue instanceof PlanQueue) { + if (reservationID != null) { + String resQName = reservationID.toString(); + queue = getQueue(resQName); + if (queue == null) { + String message = + "Application " + applicationId + " submitted by user " + user + + " to a reservation which is not yet currently active: " + + resQName; + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMAppRejectedEvent(applicationId, message)); + return; + } + queueName = resQName; + } else { + queueName = queueName + PlanQueue.DEFAULT_QUEUE_SUFFIX; + queue = getQueue(queueName); + } + } else { + String message = + "Application " + applicationId + " submitted by user " + user + + " to non-leaf queue: " + queueName; + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMAppRejectedEvent(applicationId, message)); + return; + } } // Submit to the queue try { @@ -718,7 +773,7 @@ private synchronized void doneApplicationAttempt( ApplicationAttemptId applicationAttemptId, RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) { LOG.info("Application Attempt " + applicationAttemptId + " is done." + - " finalState=" + rmAppAttemptFinalState); + " finalState=" + rmAppAttemptFinalState); FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId); SchedulerApplication application = @@ -759,12 +814,16 @@ private synchronized void doneApplicationAttempt( // Inform the queue String queueName = attempt.getQueue().getQueueName(); + if (attempt.getQueue() instanceof ReservationQueue) { + queueName = + ((ReservationQueue) attempt.getQueue()).getReservationQueueName(); + } CSQueue queue = queues.get(queueName); if (!(queue instanceof LeafQueue)) { LOG.error("Cannot finish application " + "from non-leaf queue: " + queueName); } else { - queue.finishApplicationAttempt(attempt, queue.getQueueName()); + queue.finishApplicationAttempt(attempt, queueName); } } @@ -984,7 +1043,9 @@ public void handle(SchedulerEvent event) { AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; addApplication(appAddedEvent.getApplicationId(), appAddedEvent.getQueue(), - appAddedEvent.getUser(), appAddedEvent.getIsAppRecovering()); + appAddedEvent.getUser(), + appAddedEvent.getIsAppRecovering(), + appAddedEvent.getReservationID()); } break; case APP_REMOVED: @@ -1218,6 +1279,102 @@ private CapacitySchedulerConfiguration loadCapacitySchedulerConfiguration( } @Override + public synchronized void removeQueue(String queueName) + throws SchedulerConfigEditException { + LOG.info("Removing queue: " + queueName); + CSQueue q = this.getQueue(queueName); + if (!(q instanceof LeafQueue)) { + throw new SchedulerConfigEditException("The queue that we are asked " + + "to remove is not a LeafQueue"); + } + LeafQueue disposableLeafQueue = (LeafQueue) q; + // at this point we should have no more apps + if (disposableLeafQueue.getApplications().size() > 0 + || disposableLeafQueue.pendingApplications.size() > 0) { + throw new SchedulerConfigEditException("The queue is not empty " + + disposableLeafQueue.getApplications().size() + " active apps " + + disposableLeafQueue.pendingApplications.size() + " pending apps"); + } + if (disposableLeafQueue.getCapacity() > 0) { + throw new SchedulerConfigEditException( + "The queue has non-zero capacity: " + + disposableLeafQueue.getCapacity()); + } + + ((ParentQueue) disposableLeafQueue.getParent()).removeChildQueue(q); + this.queues.remove(queueName); + LOG.info("Queue " + queueName + " has been removed"); + } + + @Override + public synchronized void addQueue(Queue queue) + throws SchedulerConfigEditException { + + if (!(queue instanceof ReservationQueue)) { + throw new SchedulerConfigEditException("Queue " + queue.getQueueName() + + " is not a dynamic Queue"); + } + ReservationQueue newQueue = (ReservationQueue) queue; + CSQueue p = (ParentQueue) newQueue.getParent(); + + String queuename = newQueue.getReservationQueueName(); + + if (p == null || !(p instanceof ParentQueue)) { + throw new SchedulerConfigEditException("Parent queue for " + + ((ReservationQueue) queue).getReservationQueueName() + + " not properly set"); + } + + this.queues.put(queuename, newQueue); + ((ParentQueue) p).addChildQueue(newQueue); + LOG.info("Creation of leaf queue " + newQueue + " succeeded"); + } + + @Override + public synchronized void setEntitlement(String inQueue, + DynamicQueueConf sesConf) throws SchedulerConfigEditException, YarnException { + LeafQueue queue = getAndCheckLeafQueue(inQueue); + ParentQueue parent = (ParentQueue) queue.getParent(); + + if (!(queue instanceof ReservationQueue)) { + throw new SchedulerConfigEditException("Entitlement can be" + + " modified dynamically only for PlanQueue(s)"); + } + + if (!(parent instanceof PlanQueue)) { + throw new SchedulerConfigEditException("The parent of a PlanQueue" + + " must be an ReservationQueue"); + } + + float sumChilds = parent.sumOfChildCapacities(); + float newChildCap = sumChilds - queue.getCapacity() + sesConf.getCapacity(); + + if (newChildCap >= 0 && newChildCap < 1.0f + CSQueueUtils.EPSILON) { + // note: epsilon checks here are not ok, as the epsilons might accumulate + // and + // become a problem in aggregate + if (Math.abs(sesConf.getCapacity() - queue.getCapacity()) == 0 + && Math.abs(sesConf.getMaxCapacity() - queue.getMaximumCapacity()) == 0) { + return; + } + if (sesConf.getCapacity() > queue.getCapacity()) { + ((ReservationQueue) queue).addCapacity((sesConf.getCapacity() - queue + .getCapacity())); + } else { + ((ReservationQueue) queue) + .subtractCapacity((queue.getCapacity() - sesConf.getCapacity())); + } + } else { + throw new SchedulerConfigEditException( + "Sum of child queues would exceed 100%"); + } + // note: we currently set maxCapacity to capacity + // this might be revised later + queue.setMaxCapacity(sesConf.getMaxCapacity()); + LOG.info("Set " + inQueue + " entitlement to " + queue.getCapacity() + + " request was (" + sesConf.getCapacity() + ")"); + } + public synchronized String moveApplication(ApplicationId appId, String targetQueueName) throws YarnException { FiCaSchedulerApp app = @@ -1271,4 +1428,25 @@ private LeafQueue getAndCheckLeafQueue(String queue) throws YarnException { } return (LeafQueue) ret; } + + public Set getReservableQueues() { + Set ret = new HashSet(); + for (Map.Entry l : queues.entrySet()) { + if (l.getValue() instanceof PlanQueue) { + ret.add(l.getKey()); + } + } + return ret; + } + + public Set getReservationQueueNames(String planQueueName) { + Set ret = new HashSet(); + for (Map.Entry l : queues.entrySet()) { + if (l.getValue() instanceof ReservationQueue + && l.getValue().getParent().getQueueName().equals(planQueueName)) { + ret.add(l.getKey()); + } + } + return ret; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 5c93c5f..aa3e0de 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -150,8 +150,7 @@ public LeafQueue(CapacitySchedulerContext cs, Resources.subtract(maximumAllocation, minimumAllocation), maximumAllocation); - float capacity = - (float)cs.getConfiguration().getCapacity(getQueuePath()) / 100; + float capacity = getCapacityFromConf(); float absoluteCapacity = parent.getAbsoluteCapacity() * capacity; float maximumCapacity = @@ -217,6 +216,11 @@ public LeafQueue(CapacitySchedulerContext cs, this.activeApplications = new TreeSet(applicationComparator); } + // externalizing in method, to allow overriding + protected float getCapacityFromConf() { + return (float)scheduler.getConfiguration().getCapacity(getQueuePath()) / 100; + } + private synchronized void setupQueueConfigs( Resource clusterResource, float capacity, float absoluteCapacity, @@ -228,9 +232,9 @@ private synchronized void setupQueueConfigs( Map acls, int nodeLocalityDelay) { // Sanity check - CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity); + CSQueueUtils.checkMaxCapacity(this.queueName, capacity, maximumCapacity); float absCapacity = getParent().getAbsoluteCapacity() * capacity; - CSQueueUtils.checkAbsoluteCapacities(getQueueName(), absCapacity, absoluteMaxCapacity); + CSQueueUtils.checkAbsoluteCapacities(this.queueName, absCapacity, absoluteMaxCapacity); this.capacity = capacity; this.absoluteCapacity = absCapacity; @@ -366,7 +370,7 @@ public String getQueueName() { @Override public String getQueuePath() { - return getParent().getQueuePath() + "." + getQueueName(); + return getParent().getQueuePath() + "." + this.queueName; } /** @@ -453,11 +457,11 @@ public synchronized void setAbsoluteUsedCapacity(float absUsedCapacity) { */ synchronized void setMaxCapacity(float maximumCapacity) { // Sanity check - CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity); + CSQueueUtils.checkMaxCapacity(this.queueName, capacity, maximumCapacity); float absMaxCapacity = CSQueueUtils.computeAbsoluteMaximumCapacity( maximumCapacity, getParent()); - CSQueueUtils.checkAbsoluteCapacities(getQueueName(), absoluteCapacity, absMaxCapacity); + CSQueueUtils.checkAbsoluteCapacities(this.queueName, absoluteCapacity, absMaxCapacity); this.maximumCapacity = maximumCapacity; this.absoluteMaxCapacity = absMaxCapacity; @@ -475,7 +479,7 @@ synchronized void setUserLimit(int userLimit) { * Set user limit factor - used only for testing. * @param userLimitFactor new user limit factor */ - synchronized void setUserLimitFactor(int userLimitFactor) { + synchronized void setUserLimitFactor(float userLimitFactor) { this.userLimitFactor = userLimitFactor; } @@ -545,7 +549,7 @@ public synchronized QueueInfo getQueueInfo( } } - userAclInfo.setQueueName(getQueueName()); + userAclInfo.setQueueName(this.queueName); userAclInfo.setUserAcls(operations); return Collections.singletonList(userAclInfo); } @@ -724,7 +728,7 @@ private synchronized void activateApplications() { i.remove(); LOG.info("Application " + application.getApplicationId() + " from user: " + application.getUser() + - " activated in queue: " + getQueueName()); + " activated in queue: " + this.queueName); } } } @@ -740,7 +744,7 @@ private synchronized void addApplicationAttempt(FiCaSchedulerApp application, Us LOG.info("Application added -" + " appId: " + application.getApplicationId() + - " user: " + user + "," + " leaf-queue: " + getQueueName() + + " user: " + user + "," + " leaf-queue: " + this.queueName + " #user-pending-applications: " + user.getPendingApplications() + " #user-active-applications: " + user.getActiveApplications() + " #queue-pending-applications: " + getNumPendingApplications() + @@ -783,7 +787,7 @@ public synchronized void removeApplicationAttempt(FiCaSchedulerApp application, LOG.info("Application removed -" + " appId: " + application.getApplicationId() + " user: " + application.getUser() + - " queue: " + getQueueName() + + " queue: " + this.queueName + " #user-pending-applications: " + user.getPendingApplications() + " #user-active-applications: " + user.getActiveApplications() + " #queue-pending-applications: " + getNumPendingApplications() + @@ -815,9 +819,11 @@ private synchronized FiCaSchedulerApp getApplication( if (reservedContainer != null) { FiCaSchedulerApp application = getApplication(reservedContainer.getApplicationAttemptId()); - synchronized (application) { - return assignReservedContainer(application, node, reservedContainer, - clusterResource); + if (application != null) { + synchronized (application) { + return assignReservedContainer(application, node, reservedContainer, + clusterResource); + } } } @@ -952,7 +958,7 @@ private synchronized boolean assignToQueue(Resource clusterResource, clusterResource); if (potentialNewCapacity > absoluteMaxCapacity) { if (LOG.isDebugEnabled()) { - LOG.debug(getQueueName() + LOG.debug(this.queueName + " usedResources: " + usedResources + " clusterResources: " + clusterResource + " currentCapacity " @@ -1068,7 +1074,7 @@ private Resource computeUserLimit(FiCaSchedulerApp application, if (LOG.isDebugEnabled()) { String userName = application.getUser(); LOG.debug("User limit computation for " + userName + - " in queue " + getQueueName() + + " in queue " + this.queueName + " userLimit=" + userLimit + " userLimitFactor=" + userLimitFactor + " required: " + required + @@ -1095,7 +1101,7 @@ private synchronized boolean assignToUser(Resource clusterResource, if (Resources.greaterThan(resourceCalculator, clusterResource, user.getConsumedResources(), limit)) { if (LOG.isDebugEnabled()) { - LOG.debug("User " + userName + " in queue " + getQueueName() + + LOG.debug("User " + userName + " in queue " + this.queueName + " will exceed limit - " + " consumed: " + user.getConsumedResources() + " limit: " + limit @@ -1395,7 +1401,7 @@ private void reserve(FiCaSchedulerApp application, Priority priority, node.reserveResource(application, priority, rmContainer); } - private boolean unreserve(FiCaSchedulerApp application, Priority priority, + protected boolean unreserve(FiCaSchedulerApp application, Priority priority, FiCaSchedulerNode node, RMContainer rmContainer) { // Done with the reservation? if (application.unreserve(node, priority)) { @@ -1470,7 +1476,7 @@ synchronized void allocateResource(Resource clusterResource, metrics.setAvailableResourcesToUser(userName, application.getHeadroom()); if (LOG.isDebugEnabled()) { - LOG.info(getQueueName() + + LOG.info(this.queueName + " user=" + userName + " used=" + usedResources + " numContainers=" + numContainers + " headroom = " + application.getHeadroom() + @@ -1494,7 +1500,7 @@ synchronized void releaseResource(Resource clusterResource, user.releaseContainer(resource); metrics.setAvailableResourcesToUser(userName, application.getHeadroom()); - LOG.info(getQueueName() + + LOG.info(this.queueName + " used=" + usedResources + " numContainers=" + numContainers + " user=" + userName + " user-resources=" + user.getConsumedResources()); } @@ -1661,4 +1667,16 @@ public void detachContainer(Resource clusterResource, getParent().detachContainer(clusterResource, application, rmContainer); } } + + public void setCapacity(float capacity) { + this.capacity = capacity; + } + + public void setAbsoluteCapacity(float absoluteCapacity) { + this.absoluteCapacity = absoluteCapacity; + } + + public void setMaxApplications(int maxApplications) { + this.maxApplications = maxApplications; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 8c654b7..48519b4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -156,7 +156,7 @@ public ParentQueue(CapacitySchedulerContext cs, ", fullname=" + getQueuePath()); } - private synchronized void setupQueueConfigs( + protected synchronized void setupQueueConfigs( Resource clusterResource, float capacity, float absoluteCapacity, float maximumCapacity, float absoluteMaxCapacity, @@ -792,6 +792,40 @@ public void collectSchedulerApplications( } } + + synchronized void addChildQueue(CSQueue newQueue) { + if (newQueue.getCapacity() > 0) { + throw new IllegalArgumentException("Queue " + newQueue + + " being added has non zero capacity."); + } + boolean added = this.childQueues.add(newQueue); + if (LOG.isDebugEnabled()) { + LOG.debug("updateChildQueues (action: add queue): " + added + " " + + getChildQueuesToPrint()); + } + } + + synchronized void removeChildQueue(CSQueue remQueue) { + if (remQueue.getCapacity() > 0) { + throw new IllegalArgumentException("Queue " + remQueue + + " being removed has non zero capacity."); + } + boolean removed = false; + Iterator qiter = childQueues.iterator(); + while (qiter.hasNext()) { + CSQueue cs = qiter.next(); + if (cs.equals(remQueue)) { + qiter.remove(); + removed = true; // are we sure? + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("updateChildQueues (action: remove queue): " + removed + " " + + getChildQueuesToPrint()); + } + } + + @Override public void attachContainer(Resource clusterResource, FiCaSchedulerApp application, RMContainer rmContainer) { @@ -824,4 +858,16 @@ public void detachContainer(Resource clusterResource, } } } + + public float sumOfChildCapacities() { + float ret = 0; + for (CSQueue l : childQueues) { + ret += l.getCapacity(); + } + return ret; + } + + public Map getACLs() { + return acls; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java new file mode 100644 index 0000000..df5e2ba --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java @@ -0,0 +1,149 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; + +/** + * This represents a dynamic queue managed by the {@link ReservationSystem}. + * From the user perspective this is equivalent to a LeafQueue that respect + * reservations, but functionality wise is a sub-class of ParentQueue + * + */ +public class PlanQueue extends ParentQueue { + + public static final String DEFAULT_QUEUE_SUFFIX = "-default"; + + private static final Log LOG = LogFactory.getLog(PlanQueue.class); + + private int maxAppsForReservation; + private int maxAppsPerUserForReservation; + private int userLimit; + private float userLimitFactor; + private CapacitySchedulerContext schedulerContext; + private boolean showReservationsAsQueues; + + public PlanQueue(CapacitySchedulerContext cs, String queueName, + CSQueue parent, CSQueue old) { + super(cs, queueName, parent, old); + + this.schedulerContext = cs; + // Set the reservation queue attributes for the Plan + CapacitySchedulerConfiguration conf = cs.getConfiguration(); + String queuePath = super.getQueuePath(); + maxAppsForReservation = conf.getMaximumApplicationsPerQueue(queuePath); + showReservationsAsQueues = conf.getShowReservationAsQueues(queuePath); + if (maxAppsForReservation < 0) { + maxAppsForReservation = + (int) (CapacitySchedulerConfiguration.DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS * super + .getAbsoluteCapacity()); + } + userLimit = conf.getUserLimit(queuePath); + userLimitFactor = conf.getUserLimitFactor(queuePath); + maxAppsPerUserForReservation = + (int) (maxAppsForReservation * (userLimit / 100.0f) * userLimitFactor); + + StringBuffer queueInfo = new StringBuffer(); + queueInfo.append("Created Plan Queue: ").append(queueName) + .append("\nwith capacity: [").append(super.getCapacity()) + .append("]\nwith max capacity: [").append(super.getMaximumCapacity()) + .append("\nwith max reservation apps: [").append(maxAppsForReservation) + .append("]\nwith max reservation apps per user: [") + .append(maxAppsPerUserForReservation).append("]\nwith user limit: [") + .append(userLimit).append("]\nwith user limit factor: [") + .append(userLimitFactor).append("]."); + LOG.info(queueInfo.toString()); + } + + @Override + public synchronized void reinitialize(CSQueue newlyParsedQueue, + Resource clusterResource) throws IOException { + // Sanity check + if (!(newlyParsedQueue instanceof PlanQueue) + || !newlyParsedQueue.getQueuePath().equals(getQueuePath())) { + throw new IOException("Trying to reinitialize " + getQueuePath() + + " from " + newlyParsedQueue.getQueuePath()); + } + + PlanQueue newlyParsedParentQueue = (PlanQueue) newlyParsedQueue; + + if (newlyParsedParentQueue.getChildQueues().size() > 0) { + throw new IOException( + "Reservable Queue should not have sub-queues in the" + + "configuration"); + } + + // Set new configs + setupQueueConfigs(clusterResource, newlyParsedParentQueue.getCapacity(), + newlyParsedParentQueue.getAbsoluteCapacity(), + newlyParsedParentQueue.getMaximumCapacity(), + newlyParsedParentQueue.getAbsoluteMaximumCapacity(), + newlyParsedParentQueue.getState(), newlyParsedParentQueue.getACLs()); + + this.maxAppsForReservation = newlyParsedParentQueue.maxAppsForReservation; + this.maxAppsPerUserForReservation = + newlyParsedParentQueue.maxAppsPerUserForReservation; + + // run reinitialize on each existing queue, to trigger absolute cap + // recomputations + for (CSQueue ses : this.getChildQueues()) { + CSQueueUtils.updateQueueStatistics( + schedulerContext.getResourceCalculator(), ses, this, + schedulerContext.getClusterResource(), + schedulerContext.getMinimumResourceCapability()); + ses.reinitialize(ses, clusterResource); + ((ReservationQueue) ses).setMaxApplications(this + .getMaxApplicationsForReservations()); + ((ReservationQueue) ses).setMaxApplicationsPerUser(this + .getMaxApplicationsPerUserForReservation()); + } + showReservationsAsQueues = newlyParsedParentQueue.showReservationsAsQueues; + } + + /** + * Number of maximum applications for each of the reservations in this Plan. + * + * @return maxAppsForreservation + */ + public int getMaxApplicationsForReservations() { + return maxAppsForReservation; + } + + /** + * Number of maximum applications per user for each of the reservations in + * this Plan. + * + * @return maxAppsPerUserForreservation + */ + public int getMaxApplicationsPerUserForReservation() { + return maxAppsPerUserForReservation; + } + + /** + * User limit value for each of the reservations in this Plan. + * + * @return userLimit + */ + public int getUserLimitForReservation() { + return userLimit; + } + + /** + * User limit factor value for each of the reservations in this Plan. + * + * @return userLimitFactor + */ + public float getUserLimitFactor() { + return userLimitFactor; + } + + /** + * Determine whether to hide/show the ReservationQueues + */ + public boolean showReservationsAsLeafs() { + return showReservationsAsQueues; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java new file mode 100644 index 0000000..432e7c0 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java @@ -0,0 +1,108 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; + +/** + * This represents a dynamic {@link LeafQueue} managed by the + * {@link ReservationSystem} + * + */ +public class ReservationQueue extends LeafQueue { + private static final Log LOG = LogFactory.getLog(ReservationQueue.class); + + private int maxSystemApps; + private int maxApplications; + private int maxApplicationsPerUser; + + public ReservationQueue(CapacitySchedulerContext cs, String queueName, + PlanQueue parent) { + super(cs, queueName, parent, null); + maxSystemApps = cs.getConfiguration().getMaximumSystemApplications(); + // the following parameters are common to all reservation in the plan + this.maxApplications = parent.getMaxApplicationsForReservations(); + this.maxApplicationsPerUser = + parent.getMaxApplicationsPerUserForReservation(); + setUserLimit(parent.getUserLimitForReservation()); + setUserLimitFactor(parent.getUserLimitFactor()); + } + + /** + * This methods removes capacity from a queue and adjusts its absoluteCapacity + * + * @param capacity + * @throws AccessControlException + */ + synchronized public void subtractCapacity(float capacity) + throws IllegalArgumentException { + + if (capacity > this.getCapacity() + CSQueueUtils.EPSILON) { + throw new IllegalArgumentException( + "Capacity demand is higher than available."); + } else { + this.setCapacity(this.getCapacity() - capacity); + this.setAbsoluteCapacity(getParent().getAbsoluteCapacity() + * this.getCapacity()); + setMaxApplications((int) (maxSystemApps * getAbsoluteCapacity())); + if (LOG.isDebugEnabled()) { + LOG.debug("successfully subtracted " + capacity + + " capacity from queue " + this.getQueueName()); + } + } + + } + + /** + * This method adds capacity to a queue and adjusts its absoluteCapacity + * + * @param capacity + * @throws AccessControlException + */ + synchronized public void addCapacity(float capacity) + throws IllegalArgumentException { + if ((this.getCapacity() + capacity) > (1.0f + CSQueueUtils.EPSILON)) { + throw new IllegalArgumentException("Queue was over provisionned."); + } else { + this.setCapacity(this.getCapacity() + capacity); + this.setAbsoluteCapacity(getParent().getAbsoluteCapacity() + * this.getCapacity()); + setMaxApplications((int) (maxSystemApps * getAbsoluteCapacity())); + if (LOG.isDebugEnabled()) { + LOG.debug("successfully added " + capacity + " capacity to queue " + + this.getQueueName()); + } + + } + } + + // used by the super constructor, we initialize to zero + protected float getCapacityFromConf() { + return 0f; + } + + @Override + public int getMaxApplications() { + return maxApplications; + } + + @Override + public synchronized int getMaxApplicationsPerUser() { + return maxApplicationsPerUser; + } + + public synchronized void setMaxApplicationsPerUser(int maxApplicationsPerUser) { + this.maxApplicationsPerUser = maxApplicationsPerUser; + } + + @Override + public String getQueueName() { + return this.getParent().getQueueName(); + } + + public String getReservationQueueName() { + return super.getQueueName(); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/DynamicQueueConf.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/DynamicQueueConf.java new file mode 100644 index 0000000..d78ef11 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/DynamicQueueConf.java @@ -0,0 +1,28 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common; + +public class DynamicQueueConf { + + private float capacity; + private float maxCapacity; + + public DynamicQueueConf(float capacity, float maxCapacity){ + this.setCapacity(capacity); + this.maxCapacity = maxCapacity; + } + + public float getMaxCapacity() { + return maxCapacity; + } + + public void setMaxCapacity(float maxCapacity) { + this.maxCapacity = maxCapacity; + } + + public float getCapacity() { + return capacity; + } + + public void setCapacity(float capacity) { + this.capacity = capacity; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index f64bd62..e436392 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -400,7 +400,7 @@ public void testRefreshQueues() throws Exception { cs.stop(); } - private void checkQueueCapacities(CapacityScheduler cs, + void checkQueueCapacities(CapacityScheduler cs, float capacityA, float capacityB) { CSQueue rootQueue = cs.getRootQueue(); CSQueue queueA = findQueue(rootQueue, A); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerDynamicBehavior.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerDynamicBehavior.java new file mode 100644 index 0000000..6167b76 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerDynamicBehavior.java @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerConfigEditException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.DynamicQueueConf; +import org.junit.Before; +import org.junit.Test; + +public class TestCapacitySchedulerDynamicBehavior { + private static final Log LOG = LogFactory + .getLog(TestCapacitySchedulerDynamicBehavior.class); + private static final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + private static final String B = CapacitySchedulerConfiguration.ROOT + ".b"; + private static final String B1 = B + ".b1"; + private static final String B2 = B + ".b2"; + private static final String B3 = B + ".b3"; + private static float A_CAPACITY = 10.5f; + private static float B_CAPACITY = 89.5f; + private static float A1_CAPACITY = 30; + private static float A2_CAPACITY = 70; + private static float B1_CAPACITY = 79.2f; + private static float B2_CAPACITY = 0.8f; + private static float B3_CAPACITY = 20; + + private final TestCapacityScheduler tcs = new TestCapacityScheduler(); + + private int GB = 1024; + + private MockRM rm; + + @Before + public void setUp() { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + setupPlanQueueConfiguration(conf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_RESERVATIONS, false); + rm = new MockRM(conf); + rm.start(); + } + + @Test + public void testRefreshQueuesWithReservations() throws Exception { + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + // Test add one reservation dynamically and manually modify capacity + ReservationQueue a1 = + new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a")); + cs.addQueue(a1); + a1.addCapacity(A1_CAPACITY / 100); + + // Test add another reservation queue and use setEntitlement to modify + // capacity + ReservationQueue a2 = + new ReservationQueue(cs, "a2", (PlanQueue) cs.getQueue("a")); + cs.addQueue(a2); + cs.setEntitlement("a2", new DynamicQueueConf(A2_CAPACITY / 100, 1.0f)); + + // Verify all allocations match + tcs.checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY); + + // Reinitialize and verify all dynamic queued survived + CapacitySchedulerConfiguration conf = cs.getConfiguration(); + conf.setCapacity(A, 80f); + conf.setCapacity(B, 20f); + cs.reinitialize(conf, rm.getRMContext()); + + tcs.checkQueueCapacities(cs, 80f, 20f); + } + + @Test + public void testAddQueueFailCases() throws Exception { + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + try { + // Test invalid addition (adding non-zero size queue) + ReservationQueue a1 = + new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a")); + a1.addCapacity(A1_CAPACITY / 100); + cs.addQueue(a1); + fail(); + } catch (Exception e) { + // expected + } + + // Test add one reservation dynamically and manually modify capacity + ReservationQueue a1 = + new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a")); + cs.addQueue(a1); + a1.addCapacity(A1_CAPACITY / 100); + + // Test add another reservation queue and use setEntitlement to modify + // capacity + ReservationQueue a2 = + new ReservationQueue(cs, "a2", (PlanQueue) cs.getQueue("a")); + + cs.addQueue(a2); + + try { + // Test invalid entitlement (sum of queues exceed 100%) + cs.setEntitlement("a2", new DynamicQueueConf(A2_CAPACITY / 100 + 0.1f, + 1.0f)); + fail(); + } catch (Exception e) { + // expected + } + + cs.setEntitlement("a2", new DynamicQueueConf(A2_CAPACITY / 100, 1.0f)); + + // Verify all allocations match + tcs.checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY); + + cs.stop(); + } + + @Test + public void testRemoveQueue() throws Exception { + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + // Test add one reservation dynamically and manually modify capacity + ReservationQueue a1 = + new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a")); + cs.addQueue(a1); + a1.addCapacity(A1_CAPACITY / 100); + + // submit an app + RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1"); + // check preconditions + List appsInA1 = cs.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + try { + cs.removeQueue("a1"); + fail(); + } catch (SchedulerConfigEditException s) { + // expected a1 contains applications + } + // clear queue by killling all apps + cs.killAllAppsInQueue("a1"); + // wait for events of move to propagate + rm.waitForState(app.getApplicationId(), RMAppState.KILLED); + + try { + cs.removeQueue("a1"); + fail(); + } catch (SchedulerConfigEditException s) { + // expected a1 is not zero capacity + } + // set capacity to zero + cs.setEntitlement("a1", new DynamicQueueConf(0f, 0f)); + cs.removeQueue("a1"); + + assertTrue(cs.getQueue("a1") == null); + + rm.stop(); + } + + private void setupPlanQueueConfiguration(CapacitySchedulerConfiguration conf) { + + conf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] { "a", "b" }); + + conf.setCapacity(A, A_CAPACITY); + conf.setCapacity(B, B_CAPACITY); + + // Define 2nd-level queues + conf.setQueues(A, new String[] { "a1", "a2" }); + + conf.setQueues(B, new String[] { "b1", "b2", "b3" }); + conf.setCapacity(B1, B1_CAPACITY); + conf.setUserLimitFactor(B1, 100.0f); + conf.setCapacity(B2, B2_CAPACITY); + conf.setUserLimitFactor(B2, 100.0f); + conf.setCapacity(B3, B3_CAPACITY); + conf.setUserLimitFactor(B3, 100.0f); + + conf.setReservableQueue(A, true); + conf.setReservationWindow(A, 86400 * 1000); + conf.setAverageCapacity(A, 1.0f); + + LOG.info("Setup a as a plan queue"); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java new file mode 100644 index 0000000..de89e2a --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Before; +import org.junit.Test; + +public class TestReservationQueue { + + CapacitySchedulerConfiguration csConf; + CapacitySchedulerContext csContext; + final static int GB = 1024; + private final ResourceCalculator resourceCalculator = + new DefaultResourceCalculator(); + ReservationQueue reservationQueue; + + @Before + public void setup() { + + // setup a context / conf + csConf = new CapacitySchedulerConfiguration(); + YarnConfiguration conf = new YarnConfiguration(); + csContext = mock(CapacitySchedulerContext.class); + when(csContext.getConfiguration()).thenReturn(csConf); + when(csContext.getConf()).thenReturn(conf); + when(csContext.getMinimumResourceCapability()).thenReturn( + Resources.createResource(GB, 1)); + when(csContext.getMaximumResourceCapability()).thenReturn( + Resources.createResource(16 * GB, 32)); + when(csContext.getClusterResource()).thenReturn( + Resources.createResource(100 * 16 * GB, 100 * 32)); + when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); + + // create a queue + PlanQueue pq = new PlanQueue(csContext, "root", null, null); + reservationQueue = new ReservationQueue(csContext, "a", pq); + + } + + @Test + public void testAddSubtractCapacity() throws Exception { + + // verify that setting, adding, subtracting capacity works + reservationQueue.setCapacity(1.0F); + assertTrue(" actual capacity: " + reservationQueue.getCapacity(), + reservationQueue.getCapacity() - 1 < CSQueueUtils.EPSILON); + reservationQueue.subtractCapacity(0.1F); + assertTrue(" actual capacity: " + reservationQueue.getCapacity(), + reservationQueue.getCapacity() - 0.9 < CSQueueUtils.EPSILON); + reservationQueue.addCapacity(0.1F); + assertTrue(" actual capacity: " + reservationQueue.getCapacity(), + reservationQueue.getCapacity() - 1 < CSQueueUtils.EPSILON); + + try{ + reservationQueue.addCapacity(0.1F); + fail(); + } catch(IllegalArgumentException iae){ + //expected + assertTrue(" actual capacity: " + reservationQueue.getCapacity(), + reservationQueue.getCapacity() - 1 < CSQueueUtils.EPSILON); + } + + try{ + reservationQueue.subtractCapacity(1.1F); + fail(); + } catch(IllegalArgumentException iae){ + //expected + assertTrue(" actual capacity: " + reservationQueue.getCapacity(), + reservationQueue.getCapacity() - 1 < CSQueueUtils.EPSILON); + } + + + } +}