diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index ee6a637..a83a5c1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -299,7 +299,7 @@ protected boolean isEventThreadWaiting() { } @VisibleForTesting - protected boolean isDrained() { + public boolean isDrained() { return this.drained; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java index c545e9e..721eb36 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java @@ -38,6 +38,8 @@ // containers. private volatile Resource headroom; + private boolean allowPreempt = false; + public ResourceLimits(Resource limit) { this(limit, Resources.none()); } @@ -72,4 +74,11 @@ public void setAmountNeededUnreserve(Resource amountNeededUnreserve) { this.amountNeededUnreserve = amountNeededUnreserve; } + public boolean isAllowPreemption() { + return allowPreempt; + } + + public void setIsAllowPreemption(boolean allowPreempt) { + this.allowPreempt = allowPreempt; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 03edd40..5261c20 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -66,7 +66,7 @@ /* set of containers that are allocated containers */ - private final Map launchedContainers = + protected final Map launchedContainers = new HashMap(); private final RMNode rmNode; @@ -163,7 +163,7 @@ public synchronized void allocateContainer(RMContainer rmContainer) { + " available after allocation"); } - private synchronized void changeContainerResource(ContainerId containerId, + protected synchronized void changeContainerResource(ContainerId containerId, Resource deltaResource, boolean increase) { if (increase) { deductAvailableResource(deltaResource); @@ -228,7 +228,7 @@ public synchronized boolean isValidContainer(ContainerId containerId) { return false; } - private synchronized void updateResource(Container container) { + protected synchronized void updateResourceOfCompletedContainer(Container container) { addAvailableResource(container.getResource()); --numContainers; } @@ -247,7 +247,7 @@ public synchronized void releaseContainer(Container container) { /* remove the containers from the nodemanger */ if (null != launchedContainers.remove(container.getId())) { - updateResource(container); + updateResourceOfCompletedContainer(container); } LOG.info("Released container " + container.getId() + " of capacity " diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 39ca29b..5a388b7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.Map; import java.util.Set; @@ -45,6 +46,7 @@ import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType; import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; @@ -474,12 +476,19 @@ synchronized boolean canAssignToThisQueue(Resource clusterResource, Resource nowTotalUsed = queueUsage.getUsed(nodePartition); - // Set headroom for currentResourceLimits - currentResourceLimits.setHeadroom(Resources.subtract(currentLimitResource, - nowTotalUsed)); + // Set headroom for currentResourceLimits: + // When queue is a parent queue: Headroom = limit - used + killable + // When queue is a leaf queue: Headroom = limit - used (leaf queue cannot preempt itself) + Resource usedConsideredKillable = nowTotalUsed; + if (null != getChildQueues() && !getChildQueues().isEmpty()) { + usedConsideredKillable = Resources.subtract(nowTotalUsed, + getTotalKillableResource(nodePartition)); + } + currentResourceLimits.setHeadroom( + Resources.subtract(currentLimitResource, usedConsideredKillable)); if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource, - nowTotalUsed, currentLimitResource)) { + usedConsideredKillable, currentLimitResource)) { // if reservation continous looking enabled, check to see if could we // potentially use this node instead of a reserved node if the application @@ -491,7 +500,7 @@ synchronized boolean canAssignToThisQueue(Resource clusterResource, resourceCouldBeUnreserved, Resources.none())) { // resource-without-reserved = used - reserved Resource newTotalWithoutReservedResource = - Resources.subtract(nowTotalUsed, resourceCouldBeUnreserved); + Resources.subtract(usedConsideredKillable, resourceCouldBeUnreserved); // when total-used-without-reserved-resource < currentLimit, we still // have chance to allocate on this node by unreserving some containers @@ -620,11 +629,10 @@ public Priority getDefaultApplicationPriority() { // considering all labels in cluster, only those labels which are // use some resource of this queue can be considered. Set nodeLabels = new HashSet(); - if (this.getAccessibleNodeLabels() != null - && this.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY)) { - nodeLabels.addAll(Sets.union(this.getQueueCapacities() - .getNodePartitionsSet(), this.getQueueResourceUsage() - .getNodePartitionsSet())); + if (this.getAccessibleNodeLabels() != null && this.getAccessibleNodeLabels() + .contains(RMNodeLabelsManager.ANY)) { + nodeLabels.addAll(Sets.union(this.getQueueCapacities().getNodePartitionsSet(), + this.getQueueResourceUsage().getNodePartitionsSet())); } else { nodeLabels.addAll(this.getAccessibleNodeLabels()); } @@ -636,4 +644,14 @@ public Priority getDefaultApplicationPriority() { } return nodeLabels; } + + public Resource getTotalKillableResource(String partition) { + return csContext.getPreemptionManager().getKillableResource(queueName, + partition); + } + + public Iterator getKillableContainers(String partition) { + return csContext.getPreemptionManager().getKillableContainers(queueName, + partition); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java index 68f6f12..6406efe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java @@ -26,6 +26,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.util.resource.Resources; +import java.util.List; + @Private @Unstable public class CSAssignment { @@ -42,6 +44,7 @@ private boolean fulfilledReservation; private final AssignmentInformation assignmentInformation; private boolean increaseAllocation; + private List containersToKill; public CSAssignment(Resource resource, NodeType type) { this(resource, type, null, null, false, false); @@ -147,4 +150,12 @@ public boolean isIncreasedAllocation() { public void setIncreasedAllocation(boolean flag) { increaseAllocation = flag; } + + public void setContainersToKill(List containersToKill) { + this.containersToKill = containersToKill; + } + + public List getContainersToKill() { + return containersToKill; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 26b6a2b..a172462 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -105,6 +105,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; @@ -145,6 +147,8 @@ // timeout to join when we stop this service protected final long THREAD_JOIN_TIMEOUT_MS = 1000; + private PreemptionManager preemptionManager = new PreemptionManager(); + static final Comparator nonPartitionedQueueComparator = new Comparator() { @Override @@ -502,6 +506,9 @@ private void initializeQueues(CapacitySchedulerConfiguration conf) LOG.info("Initialized root queue " + root); updatePlacementRules(); setQueueAcls(authorizer, queues); + + // Notify Preemption Manager + preemptionManager.refreshQueues(null, root); } @Lock(CapacityScheduler.class) @@ -529,6 +536,9 @@ private void reinitializeQueues(CapacitySchedulerConfiguration conf) labelManager.reinitializeQueueLabels(getQueueToLabels()); setQueueAcls(authorizer, queues); + + // Notify Preemption Manager + preemptionManager.refreshQueues(null, root); } @VisibleForTesting @@ -1217,8 +1227,9 @@ private synchronized void allocateContainersToNode(FiCaSchedulerNode node) { // Try to schedule more if there are no reservations to fulfill if (node.getReservedContainer() == null) { - if (calculator.computeAvailableContainers(node.getAvailableResource(), - minimumAllocation) > 0) { + if (calculator.computeAvailableContainers(Resources + .add(node.getAvailableResource(), node.getTotalKillableResources()), + minimumAllocation) > 0) { if (LOG.isDebugEnabled()) { LOG.debug("Trying to schedule on node: " + node.getNodeName() + ", available: " + node.getAvailableResource()); @@ -1566,7 +1577,7 @@ public FiCaSchedulerApp getApplicationAttempt( ApplicationAttemptId applicationAttemptId) { return super.getApplicationAttempt(applicationAttemptId); } - + @Lock(Lock.NoLock.class) public FiCaSchedulerNode getNode(NodeId nodeId) { return nodes.get(nodeId); @@ -1613,12 +1624,30 @@ public void preemptContainer(ApplicationAttemptId aid, RMContainer cont) { @Override public void killPreemptedContainer(RMContainer cont) { if (LOG.isDebugEnabled()) { - LOG.debug(SchedulerEventType.KILL_PREEMPTED_CONTAINER + ": container" - + cont.toString()); + LOG.debug( + SchedulerEventType.KILL_PREEMPTED_CONTAINER + ": container" + cont + .toString()); + } + + if (!conf.getLazyPreemptionEnabled()) { + super.completedContainer(cont, SchedulerUtils + .createPreemptedContainerStatus(cont.getContainerId(), + SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL); + } else { + FiCaSchedulerNode node = (FiCaSchedulerNode) getSchedulerNode( + cont.getAllocatedNode()); + node.markContainerCanBeKilled(cont.getContainerId()); + + // notify PreemptionManager + // Get the application for the finished container + FiCaSchedulerApp application = getCurrentAttemptForContainer( + cont.getContainerId()); + if (null != application) { + String leafQueueName = application.getCSLeafQueue().getQueueName(); + getPreemptionManager().addKillableContainer( + new KillableContainer(cont, node.getPartition(), leafQueueName)); + } } - super.completedContainer(cont, SchedulerUtils - .createPreemptedContainerStatus(cont.getContainerId(), - SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL); } @Override @@ -2010,4 +2039,9 @@ public void updateApplicationPriority(Priority newPriority, + rmApp.getQueue() + " for application: " + applicationId + " for the user: " + rmApp.getUser()); } + + @Override + public PreemptionManager getPreemptionManager() { + return preemptionManager; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 1e62b44..f5addf3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -255,6 +255,12 @@ public static final String RESERVATION_ENFORCEMENT_WINDOW = "reservation-enforcement-window"; + @Private + public static final String LAZY_PREEMPTION_ENALBED = PREFIX + "lazy-preemption-enabled"; + + @Private + public static final boolean DEFAULT_LAZY_PREEMPTION_ENABLED = false; + public CapacitySchedulerConfiguration() { this(new Configuration()); } @@ -959,4 +965,8 @@ public Integer getDefaultApplicationPriorityConfPerQueue(String queue) { DEFAULT_CONFIGURATION_APPLICATION_PRIORITY); return defaultPriority; } + + public boolean getLazyPreemptionEnabled() { + return getBoolean(LAZY_PREEMPTION_ENALBED, DEFAULT_LAZY_PREEMPTION_ENABLED); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java index 2a0dd0da..3b3e4e0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java @@ -21,9 +21,11 @@ import java.util.Comparator; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -61,4 +63,8 @@ PartitionedQueueComparator getPartitionedQueueComparator(); FiCaSchedulerNode getNode(NodeId nodeId); + + FiCaSchedulerApp getApplicationAttempt(ApplicationAttemptId attemptId); + + PreemptionManager getPreemptionManager(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 9c6d8ee..05e40be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; @@ -841,6 +842,40 @@ private void handleExcessReservedContainer(Resource clusterResource, assignment.setExcessReservation(null); } } + + private void killToPreemptContainers(Resource clusterResource, + FiCaSchedulerNode node, + CSAssignment assignment) { + if (assignment.getContainersToKill() != null) { + StringBuilder sb = new StringBuilder("Killing containers: ["); + + for (RMContainer c : assignment.getContainersToKill()) { + FiCaSchedulerApp application = csContext.getApplicationAttempt( + c.getApplicationAttemptId()); + LeafQueue q = application.getCSLeafQueue(); + q.completedContainer(clusterResource, application, node, c, SchedulerUtils + .createPreemptedContainerStatus(c.getContainerId(), + SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL, + null, false); + sb.append("(container=" + c.getContainerId() + " resource=" + c + .getAllocatedResource() + ")"); + } + + sb.append("] for container=" + assignment.getAssignmentInformation() + .getFirstAllocatedOrReservedContainerId() + " resource=" + assignment + .getResource()); + LOG.info(sb.toString()); + + } + } + + private void setPreemptionAllowed(ResourceLimits limits, String nodePartition) { + // Set preemption-allowed: + // For leaf queue, only under-utilized queue is allowed to preempt resources from other queues + float usedCapacity = queueCapacities.getAbsoluteUsedCapacity(nodePartition); + float guaranteedCapacity = queueCapacities.getAbsoluteCapacity(nodePartition); + limits.setIsAllowPreemption(usedCapacity < guaranteedCapacity); + } @Override public synchronized CSAssignment assignContainers(Resource clusterResource, @@ -853,6 +888,8 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, + " #applications=" + orderingPolicy.getNumSchedulableEntities()); } + setPreemptionAllowed(currentResourceLimits, node.getPartition()); + // Check for reserved resources RMContainer reservedContainer = node.getReservedContainer(); if (reservedContainer != null) { @@ -864,6 +901,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, currentResourceLimits, schedulingMode, reservedContainer); handleExcessReservedContainer(clusterResource, assignment, node, application); + killToPreemptContainers(clusterResource, node, assignment); return assignment; } } @@ -925,6 +963,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, handleExcessReservedContainer(clusterResource, assignment, node, application); + killToPreemptContainers(clusterResource, node, assignment); if (Resources.greaterThan(resourceCalculator, clusterResource, assigned, Resources.none())) { @@ -1281,6 +1320,10 @@ public void completedContainer(Resource clusterResource, rmContainer, null, event, this, sortQueues); } } + + // Notify PreemptionManager + csContext.getPreemptionManager().removeKillableContainer( + new KillableContainer(rmContainer, node.getPartition(), queueName)); } synchronized void allocateResource(Resource clusterResource, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index badab72..b5be4a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -18,18 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; - import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -48,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.nodelabels.RMNodeLabel; import org.apache.hadoop.yarn.security.AccessType; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -56,12 +45,25 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.util.resource.Resources; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + @Private @Evolving public class ParentQueue extends AbstractCSQueue { @@ -493,19 +495,34 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, } private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) { - return (node.getReservedContainer() == null) && - Resources.greaterThanOrEqual(resourceCalculator, clusterResource, - node.getAvailableResource(), minimumAllocation); + // Only do following check when I'm a root queue. CapacityScheduler already + // did this check before entering queue. And container will be allocated + // once for the whole queue hierarchy + if (node.getReservedContainer() == null && rootQueue) { + Resource totalUsable = + Resources.add(node.getAvailableResource(), node.getTotalKillableResources()); + + return Resources.greaterThanOrEqual(resourceCalculator, + clusterResource, + totalUsable, + minimumAllocation); + } + + return false; } - + private ResourceLimits getResourceLimitsOfChild(CSQueue child, - Resource clusterResource, ResourceLimits parentLimits) { + Resource clusterResource, ResourceLimits parentLimits, + String nodePartition) { // Set resource-limit of a given child, child.limit = // min(my.limit - my.used + child.used, child.max) // Parent available resource = parent-limit - parent-used-resource Resource parentMaxAvailableResource = Resources.subtract(parentLimits.getLimit(), getUsedResources()); + // Deduct killable from used + Resources.addTo(parentMaxAvailableResource, + getTotalKillableResource(nodePartition)); // Child's limit = parent-available-resource + child-used Resource childLimit = @@ -567,7 +584,7 @@ private synchronized CSAssignment assignContainersToChildQueues( // Get ResourceLimits of child queue before assign containers ResourceLimits childLimits = - getResourceLimitsOfChild(childQueue, cluster, limits); + getResourceLimitsOfChild(childQueue, cluster, limits, node.getPartition()); assignment = childQueue.assignContainers(cluster, node, childLimits, schedulingMode); @@ -712,8 +729,8 @@ public synchronized void updateClusterResource(Resource clusterResource, // Update all children for (CSQueue childQueue : childQueues) { // Get ResourceLimits of child queue before assign containers - ResourceLimits childLimits = - getResourceLimitsOfChild(childQueue, clusterResource, resourceLimits); + ResourceLimits childLimits = getResourceLimitsOfChild(childQueue, + clusterResource, resourceLimits, RMNodeLabelsManager.NO_LABEL); childQueue.updateClusterResource(clusterResource, childLimits); } @@ -800,4 +817,55 @@ public void detachContainer(Resource clusterResource, public synchronized int getNumApplications() { return numApplications; } + + @Override + synchronized void allocateResource(Resource clusterResource, + Resource resource, String nodePartition, + boolean changeContainerResource) { + super.allocateResource(clusterResource, resource, nodePartition, + changeContainerResource); + + // check if we need to kill (killable) containers if maximum resource violated. + if (getQueueCapacities().getAbsoluteMaximumCapacity(nodePartition) + < getQueueCapacities().getAbsoluteUsedCapacity(nodePartition)) { + killContainersToEnforceMaxQueueCapacity(nodePartition, clusterResource); + } + } + + private void killContainersToEnforceMaxQueueCapacity(String partition, + Resource clusterResource) { + Iterator killableContainerIter = getKillableContainers( + partition); + if (!killableContainerIter.hasNext()) { + return; + } + + Resource partitionResource = labelManager.getResourceByLabel(partition, + null); + Resource maxResource = Resources.multiply(partitionResource, + getQueueCapacities().getAbsoluteMaximumCapacity(partition)); + + while (Resources.greaterThan(resourceCalculator, partitionResource, + queueUsage.getUsed(partition), maxResource)) { + RMContainer toKillContainer = killableContainerIter.next(); + FiCaSchedulerApp attempt = csContext.getApplicationAttempt( + toKillContainer.getContainerId().getApplicationAttemptId()); + FiCaSchedulerNode node = csContext.getNode( + toKillContainer.getAllocatedNode()); + LeafQueue lq = attempt.getCSLeafQueue(); + if (null != attempt && null != node) { + lq.completedContainer(clusterResource, attempt, node, toKillContainer, + SchedulerUtils.createPreemptedContainerStatus(toKillContainer.getContainerId(), + SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL, + null, false); + } + + LOG.info("Killed container=" + toKillContainer.getContainerId() + + " from queue=" + lq.getQueueName() + " to make queue=" + this + .getQueueName() + "'s max-capacity enforced"); + if (!killableContainerIter.hasNext()) { + break; + } + } + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java index ee01bd1..afac235 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java @@ -108,6 +108,8 @@ protected CSAssignment getCSAssignmentFromAllocateResult( assignment.setFulfilledReservation(true); } } + + assignment.setContainersToKill(result.getToKillContainers()); } return assignment; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java index 1df9410..8f749f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java @@ -19,11 +19,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.util.resource.Resources; +import java.util.List; + public class ContainerAllocation { /** * Skip the locality (e.g. node-local, rack-local, any), and look at other @@ -56,6 +59,7 @@ NodeType containerNodeType = NodeType.NODE_LOCAL; NodeType requestNodeType = NodeType.NODE_LOCAL; Container updatedContainer; + private List toKillContainers; public ContainerAllocation(RMContainer containerToBeUnreserved, Resource resourceToBeAllocated, AllocationState state) { @@ -86,4 +90,12 @@ public NodeType getContainerNodeType() { public Container getUpdatedContainer() { return updatedContainer; } + + public void setToKillContainers(List toKillContainers) { + this.toKillContainers = toKillContainers; + } + + public List getToKillContainers() { + return toKillContainers; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index 820cccd..aea100d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator; +import org.apache.commons.collections.ArrayStack; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.Container; @@ -42,6 +43,10 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + /** * Allocate normal (new) containers, considers locality/label, etc. Using * delayed scheduling mechanism to get better locality allocation. @@ -435,9 +440,6 @@ private ContainerAllocation assignContainer(Resource clusterResource, return ContainerAllocation.LOCALITY_SKIPPED; } - assert Resources.greaterThan( - rc, clusterResource, available, Resources.none()); - boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer( priority, capability); @@ -460,6 +462,29 @@ private ContainerAllocation assignContainer(Resource clusterResource, boolean reservationsContinueLooking = application.getCSLeafQueue().getReservationContinueLooking(); + // Check if we need to kill some containers to allocate this one + List toKillContainers = null; + if (availableContainers == 0 && currentResoureLimits.isAllowPreemption()) { + Resource availableConsidersKillable = Resources.clone(available); + for (RMContainer killableContainer : node + .getKillableContainers().values()) { + if (null == toKillContainers) { + toKillContainers = new ArrayList<>(); + } + toKillContainers.add(killableContainer); + Resources.addTo(availableConsidersKillable, + killableContainer.getAllocatedResource()); + if (Resources.fitsIn(rc, + clusterResource, + capability, + availableConsidersKillable)) { + // Stop if we find enough spaces + availableContainers = 1; + break; + } + } + } + if (availableContainers > 0) { // Allocate... // We will only do continuous reservation when this is not allocated from @@ -499,12 +524,17 @@ private ContainerAllocation assignContainer(Resource clusterResource, new ContainerAllocation(unreservedContainer, request.getCapability(), AllocationState.ALLOCATED); result.containerNodeType = type; + result.setToKillContainers(toKillContainers); return result; } else { + // TODO, handle reservation case: + // Container should only be reserved when + // 1) Node available-resource > 0 OR + // 2) Node available-resource + killable-resource > 0 && isPreemptionAllowed + // if we are allowed to allocate but this node doesn't have space, reserve // it or if this was an already a reserved container, reserve it again if (shouldAllocOrReserveNewContainer || rmContainer != null) { - if (reservationsContinueLooking && rmContainer == null) { // we could possibly ignoring queue capacity or user limits when // reservationsContinueLooking is set. Make sure we didn't need to @@ -522,6 +552,7 @@ private ContainerAllocation assignContainer(Resource clusterResource, new ContainerAllocation(null, request.getCapability(), AllocationState.RESERVED); result.containerNodeType = type; + result.setToKillContainers(toKillContainers); return result; } // Skip the locality request @@ -613,8 +644,7 @@ private ContainerAllocation handleNewContainerAllocation( } ContainerAllocation doAllocation(ContainerAllocation allocationResult, - Resource clusterResource, FiCaSchedulerNode node, - SchedulingMode schedulingMode, Priority priority, + FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer) { // Create the container if necessary Container container = @@ -678,9 +708,7 @@ private ContainerAllocation allocate(Resource clusterResource, if (AllocationState.ALLOCATED == result.state || AllocationState.RESERVED == result.state) { - result = - doAllocation(result, clusterResource, node, schedulingMode, priority, - reservedContainer); + result = doAllocation(result, node, priority, reservedContainer); } return result; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/KillableContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/KillableContainer.java new file mode 100644 index 0000000..675b0b4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/KillableContainer.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption; + +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; + +public class KillableContainer { + RMContainer container; + String partition; + String leafQueueName; + + public KillableContainer(RMContainer container, String partition, String leafQueueName) { + this.container = container; + this.partition = partition; + this.leafQueueName = leafQueueName; + } + + public RMContainer getRMContainer() { + return this.container; + } + + public String getNodePartition() { + return this.partition; + } + + public String getLeafQueueName() { + return this.leafQueueName; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java new file mode 100644 index 0000000..9db3bd5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class PreemptionManager { + private ReentrantReadWriteLock.ReadLock readLock; + private ReentrantReadWriteLock.WriteLock writeLock; + private Map entities = new HashMap<>(); + + private static class PreemptableEntity { + // Partition to killable resources and containers + Map totalKillableResources = new HashMap<>(); + Map> killableContainers = + new HashMap<>(); + PreemptableEntity parent; + + public PreemptableEntity(PreemptableEntity parent) { + this.parent = parent; + } + + public void addKillableContainer(KillableContainer container) { + String partition = container.getNodePartition(); + if (!totalKillableResources.containsKey(partition)) { + totalKillableResources.put(partition, Resources.createResource(0)); + killableContainers.put(partition, + new ConcurrentSkipListMap()); + } + + RMContainer c = container.getRMContainer(); + Resources.addTo(totalKillableResources.get(partition), + c.getAllocatedResource()); + killableContainers.get(partition).put(c.getContainerId(), c); + + if (null != parent) { + parent.addKillableContainer(container); + } + } + + public void removeKillableContainer(KillableContainer container) { + String partition = container.getNodePartition(); + RMContainer c = container.getRMContainer(); + + Resources.subtractFrom(totalKillableResources.get(partition), + c.getAllocatedResource()); + killableContainers.get(partition).remove(c.getContainerId()); + + if (null != parent) { + parent.removeKillableContainer(container); + } + } + } + + public PreemptionManager() { + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); + } + + public void refreshQueues(CSQueue parent, CSQueue current) { + try { + writeLock.lock(); + PreemptableEntity parentEntity = null; + if (parent != null) { + parentEntity = entities.get(parent.getQueueName()); + } + + if (!entities.containsKey(current.getQueueName())) { + entities.put(current.getQueueName(), + new PreemptableEntity(parentEntity)); + } + + if (current.getChildQueues() != null) { + for (CSQueue child : current.getChildQueues()) { + refreshQueues(current, child); + } + } + } + finally { + writeLock.unlock(); + } + } + + public void addKillableContainer(KillableContainer container) { + try { + writeLock.lock(); + PreemptableEntity entity = entities.get(container.getLeafQueueName()); + if (null != entity) { + entity.addKillableContainer(container); + } + } + finally { + writeLock.unlock(); + ; + } + } + + public void removeKillableContainer(KillableContainer container) { + try { + writeLock.lock(); + ContainerId cId = container.getRMContainer().getContainerId(); + PreemptableEntity entity = entities.get(container.getLeafQueueName()); + if (null != entity) { + entity.removeKillableContainer(container); + } + } + finally { + writeLock.unlock(); + } + } + + public void moveKillableContainer(KillableContainer oldContainer, + KillableContainer newContainer) { + // TODO + } + + public void updateKillableContainerResource(KillableContainer container, + Resource oldResource, Resource newResource) { + // TODO + } + + @VisibleForTesting + public Map getKillableContainersMap( + String queueName, String partition) { + try { + readLock.lock(); + PreemptableEntity entity = entities.get(queueName); + if (entity != null) { + Map containers = + entity.killableContainers.get(partition); + return containers; + } + return Collections.emptyMap(); + } + finally { + readLock.unlock(); + } + } + + public Iterator getKillableContainers(String queueName, + String partition) { + return getKillableContainersMap(queueName, partition).values().iterator(); + } + + public Resource getKillableResource(String queueName, String partition) { + try { + readLock.lock(); + PreemptableEntity entity = entities.get(queueName); + if (entity != null) { + Resource res = entity.totalKillableResources.get(partition); + if (res == null || res.equals(Resources.none())) { + return Resources.none(); + } + return Resources.clone(res); + } + return Resources.none(); + } + finally { + readLock.unlock(); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java index fe6db47..f180cf9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java @@ -18,22 +18,29 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica; - -import java.util.Set; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; public class FiCaSchedulerNode extends SchedulerNode { private static final Log LOG = LogFactory.getLog(FiCaSchedulerNode.class); + private Map killableContainers = new HashMap<>(); + private Resource totalKillableResources = Resource.newInstance(0, 0); public FiCaSchedulerNode(RMNode node, boolean usePortForNodeName, Set nodeLabels) { @@ -92,7 +99,6 @@ public synchronized void reserveResource( @Override public synchronized void unreserveResource( SchedulerApplicationAttempt application) { - // adding NP checks as this can now be called for preemption if (getReservedContainer() != null && getReservedContainer().getContainer() != null @@ -115,4 +121,44 @@ public synchronized void unreserveResource( } setReservedContainer(null); } + + public synchronized void markContainerCanBeKilled(ContainerId containerId) { + RMContainer c = launchedContainers.get(containerId); + if (c != null && !killableContainers.containsKey(containerId)) { + killableContainers.put(containerId, c); + Resources.addTo(totalKillableResources, c.getAllocatedResource()); + } + } + + @Override + protected synchronized void updateResourceOfCompletedContainer( + Container container) { + super.updateResourceOfCompletedContainer(container); + if (killableContainers.containsKey(container.getId())) { + Resources.subtractFrom(totalKillableResources, container.getResource()); + killableContainers.remove(container.getId()); + } + } + + @Override + protected synchronized void changeContainerResource(ContainerId containerId, + Resource deltaResource, boolean increase) { + super.changeContainerResource(containerId, deltaResource, increase); + + if (killableContainers.containsKey(containerId)) { + if (increase) { + Resources.addTo(totalKillableResources, deltaResource); + } else { + Resources.subtractFrom(totalKillableResources, deltaResource); + } + } + } + + public synchronized Resource getTotalKillableResources() { + return totalKillableResources; + } + + public synchronized Map getKillableContainers() { + return killableContainers; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java new file mode 100644 index 0000000..b09dca1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java @@ -0,0 +1,291 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import com.sun.xml.internal.xsom.impl.scd.Iterators; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMActiveServices; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Map; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestCapacitySchedulerPreemption { + private static final Log LOG = LogFactory + .getLog(TestCapacitySchedulerPreemption.class); + + private final int GB = 1024; + + private YarnConfiguration conf; + + RMNodeLabelsManager mgr; + + Clock clock; + + @Before + public void setUp() throws Exception { + conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true); + conf.setClass(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, + ProportionalCapacityPreemptionPolicy.class, + SchedulingEditPolicy.class); + mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + clock = mock(Clock.class); + when(clock.getTime()).thenReturn(0L); + } + + private SchedulingEditPolicy getSchedulingEditPolicy(MockRM rm) { + RMActiveServices activeServices = rm.getRMActiveService(); + SchedulingMonitor mon = null; + for (Service service : activeServices.getServices()) { + if (service instanceof SchedulingMonitor) { + mon = (SchedulingMonitor) service; + break; + } + } + + if (mon != null) { + return mon.getSchedulingEditPolicy(); + } + return null; + } + + @Test + public void testSimplePreemption() throws Exception { + /** + * Test case: Submit two application (app1/app2) to different queues, queue + * structure: + * + *

+     *             Root
+     *            /  |  \
+     *           a   b   c
+     *          10   20  70
+     * 
+ * + * 1) Two nodes in the cluster, each of them has 4G. + * + * 2) app1 submit to queue-a first, it asked 7 * 1G containers, so there's no + * more resource available. + * + * 3) app2 submit to queue-c, ask for one 1G container (for AM) + * + * Now the cluster is fulfilled. + * + * 4) app2 asks for another 1G container, system will preempt one container + * from app1, and app2 will receive the preempted container + */ + Configuration rmConf = TestUtils.getConfigurationWithMultipleQueues(conf); + + // Set preemption related configurations + rmConf.setInt(ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL, 0); + rmConf.setBoolean(CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED, true); + rmConf.setFloat(ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND, 1.0f); + rmConf.setFloat(ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR, 1.0f); + + MockRM rm1 = new MockRM(rmConf); + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB); + MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB); + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + am1.allocate("*", 1 * GB, 7, new ArrayList()); + + // Do allocation 3 times for node1/node2 + for (int i = 0; i < 3; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + + // App1 should have 7 containers now, and no available resource for cluster + FiCaSchedulerApp schedulerApp1 = + cs.getApplicationAttempt(am1.getApplicationAttemptId()); + Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); + + // Submit app2 to queue-c and asks for a 1G container for AM + RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2); + + // NM1/NM2 has available resource = 0G + Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId()) + .getAvailableResource().getMemory()); + Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId()) + .getAvailableResource().getMemory()); + + // AM asks for a 1 * GB container + am2.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.newInstance(1), ResourceRequest.ANY, + Resources.createResource(1 * GB), 1)), null); + + // Get edit policy and do one update + SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1); + + // Call edit schedule twice, and check if one container from app1 marked + // to be "killable" + editPolicy.editSchedule(); + editPolicy.editSchedule(); + + // Sleep for a while make sure events handled (TODO, sleep should be fixed) + Thread.sleep(1000); + + PreemptionManager pm = cs.getPreemptionManager(); + Map killableContainers = pm.getKillableContainersMap("a", + RMNodeLabelsManager.NO_LABEL); + Assert.assertEquals(1, killableContainers.size()); + Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey() + .getApplicationAttemptId(), am1.getApplicationAttemptId()); + + // Call CS.handle once to see if container preempted + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + + FiCaSchedulerApp schedulerApp2 = + cs.getApplicationAttempt(am2.getApplicationAttemptId()); + + // App1 has 6 containers, and app2 has 2 containers + Assert.assertEquals(6, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(2, schedulerApp2.getLiveContainers().size()); + + rm1.close(); + } + + @Test + public void testPreemptionConsidersNodeLocalityDelay() throws Exception { + /** + * Test case: same as testSimplePreemption steps 1-3. + * + * Step 4: app2 asks for 1G container with locality specified, so it needs + * to wait for missed-opportunity before get scheduled. + * Check if system waits missed-opportunity before finish killable container + */ + Configuration rmConf = TestUtils.getConfigurationWithMultipleQueues(conf); + + // Set preemption related configurations + rmConf.setInt(ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL, 0); + rmConf.setBoolean(CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED, true); + rmConf.setFloat(ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND, 1.0f); + rmConf.setFloat(ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR, 1.0f); + + MockRM rm1 = new MockRM(rmConf); + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB); + MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB); + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + am1.allocate("*", 1 * GB, 6, new ArrayList()); + + // Do allocation 3 times for node1/node2 + for (int i = 0; i < 3; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + + // App1 should have 7 containers now, and no available resource for cluster + FiCaSchedulerApp schedulerApp1 = + cs.getApplicationAttempt(am1.getApplicationAttemptId()); + Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); + + // Submit app2 to queue-c and asks for a 1G container for AM + RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2); + + // NM1/NM2 has available resource = 0G + Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId()) + .getAvailableResource().getMemory()); + Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId()) + .getAvailableResource().getMemory()); + + // AM asks for a 1 * GB container with unknown host and unknown rack + am2.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.newInstance(1), ResourceRequest.ANY, + Resources.createResource(1 * GB), 1), ResourceRequest + .newInstance(Priority.newInstance(1), "unknownhost", + Resources.createResource(1 * GB), 1), ResourceRequest + .newInstance(Priority.newInstance(1), "/default-rack", + Resources.createResource(1 * GB), 1)), null); + + // Get edit policy and do one update + SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1); + + // Call edit schedule twice, and check if one container from app1 marked + // to be "killable" + editPolicy.editSchedule(); + editPolicy.editSchedule(); + + // Sleep for a while make sure events handled (TODO, sleep should be fixed) + Thread.sleep(1000); + + PreemptionManager pm = cs.getPreemptionManager(); + Map killableContainers = pm.getKillableContainersMap("a", + RMNodeLabelsManager.NO_LABEL); + Assert.assertEquals(1, killableContainers.size()); + Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey() + .getApplicationAttemptId(), am1.getApplicationAttemptId()); + + // Call CS.handle once to see if container preempted + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + + FiCaSchedulerApp schedulerApp2 = + cs.getApplicationAttempt(am2.getApplicationAttemptId()); + + // App1 has 7 containers, and app2 has 1 containers (not container preempted) + Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(1, schedulerApp2.getLiveContainers().size()); + + // Do allocation again, now container will be preempted + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + + // App1 has 7 containers, and app2 has 1 containers (not container preempted) + Assert.assertEquals(6, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(2, schedulerApp2.getLiveContainers().size()); + + rm1.close(); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index 489ef77..f54a768 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -349,4 +349,40 @@ public static Configuration getConfigurationWithDefaultQueueLabels( conf.setDefaultNodeLabelExpression(B, "y"); return conf; } + + /** + * Get a queue structure: + *
+   *             Root
+   *            /  |  \
+   *           a   b   c
+   *          10   20  70
+   * 
+ */ + public static Configuration + getConfigurationWithMultipleQueues(Configuration config) { + CapacitySchedulerConfiguration conf = + new CapacitySchedulerConfiguration(config); + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] { "a", "b", "c" }); + + final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + conf.setCapacity(A, 10); + conf.setMaximumCapacity(A, 100); + conf.setUserLimitFactor(A, 100); + + final String B = CapacitySchedulerConfiguration.ROOT + ".b"; + conf.setCapacity(B, 20); + conf.setMaximumCapacity(B, 100); + conf.setUserLimitFactor(B, 100); + + final String C = CapacitySchedulerConfiguration.ROOT + ".c"; + conf.setCapacity(C, 70); + conf.setMaximumCapacity(C, 100); + conf.setUserLimitFactor(C, 100); + + return conf; + } }