diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 3a87edb..fe3349c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -35,7 +35,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptableEntity; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; @@ -125,8 +126,8 @@ private long maxWaitTime; private CapacityScheduler scheduler; private long monitoringInterval; - private final Map preempted = - new HashMap(); + private final Map preempted = new HashMap<>(); + private ResourceCalculator rc; private float percentageClusterPreemptionAllowed; private double naturalTerminationFactor; @@ -135,6 +136,10 @@ new HashMap<>(); private RMNodeLabelsManager nlm; + // Preemptable Entities, synced from scheduler at every run + private Map preemptableEntities = null; + private Set killableContainers; + public ProportionalCapacityPreemptionPolicy() { clock = SystemClock.getInstance(); } @@ -184,6 +189,64 @@ public void editSchedule() { Resource clusterResources = Resources.clone(scheduler.getClusterResource()); containerBasedPreemptOrKill(root, clusterResources); } + + @SuppressWarnings("unchecked") + private void cleanupStaledKillableContainers(Resource cluster, + Set leafQueueNames) { + for (String q : leafQueueNames) { + for (TempQueuePerPartition tq : getQueuePartitions(q)) { + // When queue's used - killable <= guaranteed and, killable > 0, we need + // to check if any of killable containers needs to be reverted + if (Resources.lessThanOrEqual(rc, cluster, + Resources.subtract(tq.current, tq.killable), tq.idealAssigned) + && Resources.greaterThan(rc, cluster, tq.killable, Resources.none())) { + // How many killable resources need to be reverted + // need-to-revert = already-marked-killable - (current - ideal) + Resource toBeRevertedFromKillable = Resources.subtract(tq.killable, + Resources.subtract(tq.current, tq.idealAssigned)); + + Resource alreadyReverted = Resources.createResource(0); + + for (RMContainer c : preemptableEntities.get(q).getKillableContainers( + tq.partition).values()) { + if (Resources.greaterThanOrEqual(rc, cluster, alreadyReverted, + toBeRevertedFromKillable)) { + break; + } + + if (Resources.greaterThan(rc, cluster, + Resources.add(alreadyReverted, c.getAllocatedResource()), + toBeRevertedFromKillable)) { + continue; + } else { + // This container need to be marked to unkillable + Resources.addTo(alreadyReverted, c.getAllocatedResource()); + rmContext.getDispatcher().getEventHandler().handle( + new ContainerPreemptEvent(c.getApplicationAttemptId(), c, + SchedulerEventType.MARK_CONTAINER_FOR_NONKILLABLE)); + } + } + + } + } + } + } + + private void syncKillableContainersFromScheduler() { + // sync preemptable entities from scheduler + preemptableEntities = + scheduler.getPreemptionManager().getShallowCopyOfPreemptableEntities(); + + killableContainers = new HashSet<>(); + for (Map.Entry entry : preemptableEntities + .entrySet()) { + PreemptableEntity entity = entry.getValue(); + for (Map map : entity.getKillableContainers() + .values()) { + killableContainers.addAll(map.keySet()); + } + } + } /** * This method selects and tracks containers to be preempted. If a container @@ -201,6 +264,8 @@ private void containerBasedPreemptOrKill(CSQueue root, .getNodeLabelManager().getClusterNodeLabelNames()); allPartitions.add(RMNodeLabelsManager.NO_LABEL); + syncKillableContainersFromScheduler(); + // extract a summary of the queues from scheduler synchronized (scheduler) { queueToPartitions.clear(); @@ -228,13 +293,17 @@ private void containerBasedPreemptOrKill(CSQueue root, recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed); } + // remove containers from killable list when we want to preempt less resources + // from queue. + cleanupStaledKillableContainers(clusterResources, leafQueueNames); + // based on ideal allocation select containers to be preempted from each // queue and each application Map> toPreempt = getContainersToPreempt(leafQueueNames, clusterResources); if (LOG.isDebugEnabled()) { - logToCSV(new ArrayList(leafQueueNames)); + logToCSV(new ArrayList<>(leafQueueNames)); } // if we are in observeOnly mode return before any action is taken @@ -254,10 +323,10 @@ private void containerBasedPreemptOrKill(CSQueue root, // if we tried to preempt this for more than maxWaitTime if (preempted.get(container) != null && preempted.get(container) + maxWaitTime < clock.getTime()) { - // kill it + // mark container killable rmContext.getDispatcher().getEventHandler().handle( new ContainerPreemptEvent(appAttemptId, container, - SchedulerEventType.KILL_PREEMPTED_CONTAINER)); + SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE)); preempted.remove(container); } else { if (preempted.get(container) != null) { @@ -333,14 +402,14 @@ private void computeIdealResourceDistribution(ResourceCalculator rc, // qAlloc tracks currently active queues (will decrease progressively as // demand is met) - List qAlloc = new ArrayList(queues); + List qAlloc = new ArrayList<>(queues); // unassigned tracks how much resources are still to assign, initialized // with the total capacity for this set of queues Resource unassigned = Resources.clone(tot_guarant); // group queues based on whether they have non-zero guaranteed capacity - Set nonZeroGuarQueues = new HashSet(); - Set zeroGuarQueues = new HashSet(); + Set nonZeroGuarQueues = new HashSet<>(); + Set zeroGuarQueues = new HashSet<>(); for (TempQueuePerPartition q : qAlloc) { if (Resources @@ -415,8 +484,8 @@ private void computeFixpointAllocation(ResourceCalculator rc, // idealAssigned >= current + pending), remove it from consideration. // Sort queues from most under-guaranteed to most over-guaranteed. TQComparator tqComparator = new TQComparator(rc, tot_guarant); - PriorityQueue orderedByNeed = - new PriorityQueue(10, tqComparator); + PriorityQueue orderedByNeed = new PriorityQueue<>(10, + tqComparator); for (Iterator i = qAlloc.iterator(); i.hasNext();) { TempQueuePerPartition q = i.next(); if (Resources.greaterThan(rc, tot_guarant, q.current, q.guaranteed)) { @@ -474,7 +543,7 @@ private void computeFixpointAllocation(ResourceCalculator rc, // percentage of guaranteed. protected Collection getMostUnderservedQueues( PriorityQueue orderedByNeed, TQComparator tqComparator) { - ArrayList underserved = new ArrayList(); + ArrayList underserved = new ArrayList<>(); while (!orderedByNeed.isEmpty()) { TempQueuePerPartition q1 = orderedByNeed.remove(); underserved.add(q1); @@ -502,7 +571,7 @@ private void resetCapacity(ResourceCalculator rc, Resource clusterResource, if (ignoreGuar) { for (TempQueuePerPartition q : queues) { - q.normalizedGuarantee = (float) 1.0f / ((float) queues.size()); + q.normalizedGuarantee = 1.0f / queues.size(); } } else { for (TempQueuePerPartition q : queues) { @@ -515,8 +584,9 @@ private void resetCapacity(ResourceCalculator rc, Resource clusterResource, } } - private String getPartitionByNodeId(NodeId nodeId) { - return scheduler.getSchedulerNode(nodeId).getPartition(); + private String getPartitionByRMContainer(RMContainer rmContainer) { + return scheduler.getSchedulerNode(rmContainer.getAllocatedNode()) + .getPartition(); } /** @@ -534,7 +604,7 @@ private boolean tryPreemptContainerAndDeductResToObtain( return false; } - String nodePartition = getPartitionByNodeId(rmContainer.getAllocatedNode()); + String nodePartition = getPartitionByRMContainer(rmContainer); Resource toObtainByPartition = resourceToObtainByPartitions.get(nodePartition); @@ -575,7 +645,7 @@ private void addToPreemptMap( ApplicationAttemptId appAttemptId, RMContainer containerToPreempt) { Set set; if (null == (set = preemptMap.get(appAttemptId))) { - set = new HashSet(); + set = new HashSet<>(); preemptMap.put(appAttemptId, set); } set.add(containerToPreempt); @@ -587,7 +657,7 @@ private void addToPreemptMap( * over-capacity queue. It uses {@link #NATURAL_TERMINATION_FACTOR} to * account for containers that will naturally complete. * - * @param queues set of leaf queues to preempt from + * @param leafQueueNames set of leaf queues to preempt from * @param clusterResource total amount of cluster resources * @return a map of applciationID to set of containers to preempt */ @@ -595,8 +665,8 @@ private void addToPreemptMap( Set leafQueueNames, Resource clusterResource) { Map> preemptMap = - new HashMap>(); - List skippedAMContainerlist = new ArrayList(); + new HashMap<>(); + List skippedAMContainerlist = new ArrayList<>(); // Loop all leaf queues for (String queueName : leafQueueNames) { @@ -614,7 +684,7 @@ private void addToPreemptMap( LeafQueue leafQueue = null; Map resToObtainByPartition = - new HashMap(); + new HashMap<>(); for (TempQueuePerPartition qT : getQueuePartitions(queueName)) { leafQueue = qT.leafQueue; // we act only if we are violating balance by more than @@ -703,7 +773,6 @@ private void addToPreemptMap( * @param clusterResource * @param preemptMap * @param skippedAMContainerlist - * @param resToObtain * @param skippedAMSize * @param maxAMCapacityForThisQueue */ @@ -751,7 +820,7 @@ private void preemptFrom(FiCaSchedulerApp app, // first drop reserved containers towards rsrcPreempt List reservedContainers = - new ArrayList(app.getReservedContainers()); + new ArrayList<>(app.getReservedContainers()); for (RMContainer c : reservedContainers) { if (resToObtainByPartition.isEmpty()) { return; @@ -771,8 +840,7 @@ private void preemptFrom(FiCaSchedulerApp app, // if more resources are to be freed go through all live containers in // reverse priority and reverse allocation order and mark them for // preemption - List liveContainers = - new ArrayList(app.getLiveContainers()); + List liveContainers = new ArrayList<>(app.getLiveContainers()); sortContainers(liveContainers); @@ -788,6 +856,11 @@ private void preemptFrom(FiCaSchedulerApp app, continue; } + // Skip already marked to killable containers + if (killableContainers.contains(c.getContainerId())) { + continue; + } + // Try to preempt this container tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c, clusterResource, preemptMap); @@ -826,6 +899,10 @@ public String getPolicyName() { return "ProportionalCapacityPreemptionPolicy"; } + @VisibleForTesting + public Map getToPreemptContainers() { + return preempted; + } /** * This method walks a tree of CSQueue and clones the portion of the state @@ -851,6 +928,11 @@ private TempQueuePerPartition cloneQueues(CSQueue curQueue, partitionToLookAt); Resource guaranteed = Resources.multiply(partitionResource, absCap); Resource maxCapacity = Resources.multiply(partitionResource, absMaxCap); + Resource killable = Resources.none(); + if (null != preemptableEntities.get(queueName)) { + killable = preemptableEntities.get(queueName) + .getKillableResource(partitionToLookAt); + } // when partition is a non-exclusive partition, the actual maxCapacity // could more than specified maxCapacity @@ -875,7 +957,7 @@ private TempQueuePerPartition cloneQueues(CSQueue curQueue, l.getTotalPendingResourcesConsideringUserLimit( partitionResource, partitionToLookAt); ret = new TempQueuePerPartition(queueName, current, pending, guaranteed, - maxCapacity, preemptionDisabled, partitionToLookAt); + maxCapacity, preemptionDisabled, partitionToLookAt, killable); if (preemptionDisabled) { ret.untouchableExtra = extra; } else { @@ -886,7 +968,7 @@ private TempQueuePerPartition cloneQueues(CSQueue curQueue, Resource pending = Resource.newInstance(0, 0); ret = new TempQueuePerPartition(curQueue.getQueueName(), current, pending, - guaranteed, maxCapacity, false, partitionToLookAt); + guaranteed, maxCapacity, false, partitionToLookAt, killable); Resource childrensPreemptable = Resource.newInstance(0, 0); for (CSQueue c : curQueue.getChildQueues()) { TempQueuePerPartition subq = @@ -932,7 +1014,7 @@ private void addTempQueuePartition(TempQueuePerPartition queuePartition) { Map queuePartitions; if (null == (queuePartitions = queueToPartitions.get(queueName))) { - queuePartitions = new HashMap(); + queuePartitions = new HashMap<>(); queueToPartitions.put(queueName, queuePartitions); } queuePartitions.put(queuePartition.partition, queuePartition); @@ -971,8 +1053,10 @@ private TempQueuePerPartition getQueueByPartition(String queueName, final Resource guaranteed; final Resource maxCapacity; final String partition; + final Resource killable; Resource idealAssigned; Resource toBePreempted; + // For logging purpose Resource actuallyPreempted; Resource untouchableExtra; @@ -986,7 +1070,7 @@ private TempQueuePerPartition getQueueByPartition(String queueName, TempQueuePerPartition(String queueName, Resource current, Resource pending, Resource guaranteed, Resource maxCapacity, boolean preemptionDisabled, - String partition) { + String partition, Resource killableResource) { this.queueName = queueName; this.current = current; this.pending = pending; @@ -996,11 +1080,12 @@ private TempQueuePerPartition getQueueByPartition(String queueName, this.actuallyPreempted = Resource.newInstance(0, 0); this.toBePreempted = Resource.newInstance(0, 0); this.normalizedGuarantee = Float.NaN; - this.children = new ArrayList(); + this.children = new ArrayList<>(); this.untouchableExtra = Resource.newInstance(0, 0); this.preemptableExtra = Resource.newInstance(0, 0); this.preemptionDisabled = preemptionDisabled; this.partition = partition; + this.killable = killableResource; } public void setLeafQueue(LeafQueue l){ @@ -1018,12 +1103,6 @@ public void addChild(TempQueuePerPartition q) { Resources.addTo(pending, q.pending); } - public void addChildren(ArrayList queues) { - assert leafQueue == null; - children.addAll(queues); - } - - public ArrayList getChildren(){ return children; } @@ -1064,18 +1143,13 @@ public String toString() { return sb.toString(); } - public void printAll() { - LOG.info(this.toString()); - for (TempQueuePerPartition sub : this.getChildren()) { - sub.printAll(); - } - } - public void assignPreemption(float scalingFactor, ResourceCalculator rc, Resource clusterResource) { - if (Resources.greaterThan(rc, clusterResource, current, idealAssigned)) { - toBePreempted = Resources.multiply( - Resources.subtract(current, idealAssigned), scalingFactor); + if (Resources.greaterThan(rc, clusterResource, + Resources.subtract(current, killable), idealAssigned)) { + toBePreempted = Resources.multiply(Resources.subtract( + Resources.subtract(current, killable), idealAssigned), + scalingFactor); } else { toBePreempted = Resource.newInstance(0, 0); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java index 0a3638b..0c20fb5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java @@ -66,6 +66,8 @@ public RMAppAttemptMetrics(ApplicationAttemptId attemptId, } public void updatePreemptionInfo(Resource resource, RMContainer container) { + //DEBUG + LOG.info("############ 1111"); try { writeLock.lock(); resourcePreempted = Resources.addTo(resourcePreempted, resource); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java index 5d26931..6e32d6b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; /** * Represents the ResourceManager's view of an application container. See @@ -88,4 +89,12 @@ boolean hasIncreaseReservation(); void cancelIncreaseReservation(); + + String getQueueName(); + + void setLeafQueue(String leafQueue); + + boolean isAlive(); + + void setIsAlive(boolean alive); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index 16ab55d..ac3a332 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -182,6 +182,9 @@ // resource to rollback to should container resource increase token expires. private Resource lastConfirmedResource; + private volatile String leafQueue = null; + private volatile boolean isAlive = false; + public RMContainerImpl(Container container, ApplicationAttemptId appAttemptId, NodeId nodeId, String user, RMContext rmContext) { @@ -663,6 +666,7 @@ public void transition(RMContainerImpl container, RMContainerEvent event) { @Override public void transition(RMContainerImpl container, RMContainerEvent event) { + container.setIsAlive(false); RMContainerFinishedEvent finishedEvent = (RMContainerFinishedEvent) event; container.finishTime = System.currentTimeMillis(); @@ -814,4 +818,23 @@ public boolean hasIncreaseReservation() { public void cancelIncreaseReservation() { hasIncreaseReservation = false; } + + @Override + public String getQueueName() { + return leafQueue; + } + + @Override + public void setLeafQueue(String leafQueue) { + this.leafQueue = leafQueue; + } + + @Override + public void setIsAlive(boolean alive) { + this.isAlive = alive; + } + + @Override public boolean isAlive() { + return isAlive; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PreemptableResourceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PreemptableResourceScheduler.java index ee7e101..b73c538 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PreemptableResourceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PreemptableResourceScheduler.java @@ -45,6 +45,6 @@ * Ask the scheduler to forcibly interrupt the container given as input * @param container */ - void killPreemptedContainer(RMContainer container); + void markContainerForKillable(RMContainer container); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java index c545e9e..721eb36 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java @@ -38,6 +38,8 @@ // containers. private volatile Resource headroom; + private boolean allowPreempt = false; + public ResourceLimits(Resource limit) { this(limit, Resources.none()); } @@ -72,4 +74,11 @@ public void setAmountNeededUnreserve(Resource amountNeededUnreserve) { this.amountNeededUnreserve = amountNeededUnreserve; } + public boolean isAllowPreemption() { + return allowPreempt; + } + + public void setIsAllowPreemption(boolean allowPreempt) { + this.allowPreempt = allowPreempt; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 2542009..601e75b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -402,6 +402,7 @@ public synchronized RMContainer reserve(SchedulerNode node, rmContainer = new RMContainerImpl(container, getApplicationAttemptId(), node.getNodeID(), appSchedulingInfo.getUser(), rmContext); + rmContainer.setLeafQueue(this.getQueueName()); attemptResourceUsage.incReserved(node.getPartition(), container.getResource()); @@ -750,12 +751,14 @@ public synchronized void move(Queue newQueue) { QueueMetrics newMetrics = newQueue.getMetrics(); String user = getUser(); for (RMContainer liveContainer : liveContainers.values()) { + liveContainer.setLeafQueue(newQueue.getQueueName()); Resource resource = liveContainer.getContainer().getResource(); oldMetrics.releaseResources(user, 1, resource); newMetrics.allocateResources(user, 1, resource, false); } for (Map map : reservedContainers.values()) { for (RMContainer reservedContainer : map.values()) { + reservedContainer.setLeafQueue(newQueue.getQueueName()); Resource resource = reservedContainer.getReservedResource(); oldMetrics.unreserveResource(user, resource); newMetrics.reserveResource(user, resource); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 33ab2f1..6c4f300 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -64,9 +64,8 @@ private volatile ResourceUtilization nodeUtilization = ResourceUtilization.newInstance(0, 0, 0f); - - /** Set of containers that are allocated containers. */ - private final Map launchedContainers = + /* set of containers that are allocated containers */ + protected final Map launchedContainers = new HashMap<>(); private final RMNode rmNode; @@ -168,7 +167,7 @@ public synchronized void allocateContainer(RMContainer rmContainer) { * @param deltaResource Change in the resource allocation. * @param increase True if the change is an increase of allocation. */ - private synchronized void changeContainerResource(ContainerId containerId, + protected synchronized void changeContainerResource(ContainerId containerId, Resource deltaResource, boolean increase) { if (increase) { deductUnallocatedResource(deltaResource); @@ -242,7 +241,7 @@ public synchronized boolean isValidContainer(ContainerId containerId) { * Update the resources of the node when allocating a new container. * @param container Container to allocate. */ - private synchronized void updateResource(Container container) { + protected synchronized void updateResource(Container container) { addUnallocatedResource(container.getResource()); --numContainers; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 39ca29b..5a388b7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.Map; import java.util.Set; @@ -45,6 +46,7 @@ import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType; import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; @@ -474,12 +476,19 @@ synchronized boolean canAssignToThisQueue(Resource clusterResource, Resource nowTotalUsed = queueUsage.getUsed(nodePartition); - // Set headroom for currentResourceLimits - currentResourceLimits.setHeadroom(Resources.subtract(currentLimitResource, - nowTotalUsed)); + // Set headroom for currentResourceLimits: + // When queue is a parent queue: Headroom = limit - used + killable + // When queue is a leaf queue: Headroom = limit - used (leaf queue cannot preempt itself) + Resource usedConsideredKillable = nowTotalUsed; + if (null != getChildQueues() && !getChildQueues().isEmpty()) { + usedConsideredKillable = Resources.subtract(nowTotalUsed, + getTotalKillableResource(nodePartition)); + } + currentResourceLimits.setHeadroom( + Resources.subtract(currentLimitResource, usedConsideredKillable)); if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource, - nowTotalUsed, currentLimitResource)) { + usedConsideredKillable, currentLimitResource)) { // if reservation continous looking enabled, check to see if could we // potentially use this node instead of a reserved node if the application @@ -491,7 +500,7 @@ synchronized boolean canAssignToThisQueue(Resource clusterResource, resourceCouldBeUnreserved, Resources.none())) { // resource-without-reserved = used - reserved Resource newTotalWithoutReservedResource = - Resources.subtract(nowTotalUsed, resourceCouldBeUnreserved); + Resources.subtract(usedConsideredKillable, resourceCouldBeUnreserved); // when total-used-without-reserved-resource < currentLimit, we still // have chance to allocate on this node by unreserving some containers @@ -620,11 +629,10 @@ public Priority getDefaultApplicationPriority() { // considering all labels in cluster, only those labels which are // use some resource of this queue can be considered. Set nodeLabels = new HashSet(); - if (this.getAccessibleNodeLabels() != null - && this.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY)) { - nodeLabels.addAll(Sets.union(this.getQueueCapacities() - .getNodePartitionsSet(), this.getQueueResourceUsage() - .getNodePartitionsSet())); + if (this.getAccessibleNodeLabels() != null && this.getAccessibleNodeLabels() + .contains(RMNodeLabelsManager.ANY)) { + nodeLabels.addAll(Sets.union(this.getQueueCapacities().getNodePartitionsSet(), + this.getQueueResourceUsage().getNodePartitionsSet())); } else { nodeLabels.addAll(this.getAccessibleNodeLabels()); } @@ -636,4 +644,14 @@ public Priority getDefaultApplicationPriority() { } return nodeLabels; } + + public Resource getTotalKillableResource(String partition) { + return csContext.getPreemptionManager().getKillableResource(queueName, + partition); + } + + public Iterator getKillableContainers(String partition) { + return csContext.getPreemptionManager().getKillableContainers(queueName, + partition); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java index 68f6f12..6406efe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java @@ -26,6 +26,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.util.resource.Resources; +import java.util.List; + @Private @Unstable public class CSAssignment { @@ -42,6 +44,7 @@ private boolean fulfilledReservation; private final AssignmentInformation assignmentInformation; private boolean increaseAllocation; + private List containersToKill; public CSAssignment(Resource resource, NodeType type) { this(resource, type, null, null, false, false); @@ -147,4 +150,12 @@ public boolean isIncreasedAllocation() { public void setIncreasedAllocation(boolean flag) { increaseAllocation = flag; } + + public void setContainersToKill(List containersToKill) { + this.containersToKill = containersToKill; + } + + public List getContainersToKill() { + return containersToKill; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 6a1091d..75e52d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -109,6 +109,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; @@ -149,6 +151,14 @@ // timeout to join when we stop this service protected final long THREAD_JOIN_TIMEOUT_MS = 1000; + private PreemptionManager preemptionManager = new PreemptionManager(); + + // All live containers in the system, this will be used by preemption manager + // to check if a killable container is already completed. + private Map liveContainers = new ConcurrentHashMap<>(); + + private volatile boolean isLazyPreemptionEnabled = false; + static final Comparator nonPartitionedQueueComparator = new Comparator() { @Override @@ -301,12 +311,11 @@ private synchronized void initScheduler(Configuration configuration) throws initMaximumResourceCapability(this.conf.getMaximumAllocation()); this.calculator = this.conf.getResourceCalculator(); this.usePortForNodeName = this.conf.getUsePortForNodeName(); - this.applications = - new ConcurrentHashMap>(); + this.applications = new ConcurrentHashMap<>(); this.labelManager = rmContext.getNodeLabelManager(); authorizer = YarnAuthorizationProvider.getInstance(yarnConf); initializeQueues(this.conf); + this.isLazyPreemptionEnabled = conf.getLazyPreemptionEnabled(); scheduleAsynchronously = this.conf.getScheduleAynschronously(); asyncScheduleInterval = @@ -372,6 +381,9 @@ public void serviceStop() throws Exception { refreshMaximumAllocation(this.conf.getMaximumAllocation()); throw new IOException("Failed to re-init queues", t); } + + // update lazy preemption + this.isLazyPreemptionEnabled = this.conf.getLazyPreemptionEnabled(); } long getAsyncScheduleInterval() { @@ -506,6 +518,9 @@ private void initializeQueues(CapacitySchedulerConfiguration conf) LOG.info("Initialized root queue " + root); updatePlacementRules(); setQueueAcls(authorizer, queues); + + // Notify Preemption Manager + preemptionManager.refreshQueues(null, root); } @Lock(CapacityScheduler.class) @@ -533,6 +548,9 @@ private void reinitializeQueues(CapacitySchedulerConfiguration conf) labelManager.reinitializeQueueLabels(getQueueToLabels()); setQueueAcls(authorizer, queues); + + // Notify Preemption Manager + preemptionManager.refreshQueues(null, root); } @VisibleForTesting @@ -1253,8 +1271,10 @@ protected synchronized void allocateContainersToNode(FiCaSchedulerNode node) { // Try to schedule more if there are no reservations to fulfill if (node.getReservedContainer() == null) { - if (calculator.computeAvailableContainers(node.getUnallocatedResource(), - minimumAllocation) > 0) { + if (calculator.computeAvailableContainers(Resources + .add(node.getUnallocatedResource(), node.getTotalKillableResources()), + minimumAllocation) > 0) { + if (LOG.isDebugEnabled()) { LOG.debug("Trying to schedule on node: " + node.getNodeName() + ", available: " + node.getUnallocatedResource()); @@ -1436,11 +1456,20 @@ public void handle(SchedulerEvent event) { markContainerForPreemption(aid, containerToBePreempted); } break; - case KILL_PREEMPTED_CONTAINER: + case MARK_CONTAINER_FOR_KILLABLE: + { + ContainerPreemptEvent containerKillableEvent = (ContainerPreemptEvent)event; + RMContainer killableContainer = containerKillableEvent.getContainer(); + markContainerForKillable(killableContainer); + } + break; + case MARK_CONTAINER_FOR_NONKILLABLE: { - ContainerPreemptEvent killContainerEvent = (ContainerPreemptEvent)event; - RMContainer containerToBeKilled = killContainerEvent.getContainer(); - killPreemptedContainer(containerToBeKilled); + if (isLazyPreemptionEnabled) { + ContainerPreemptEvent cancelKillContainerEvent = + (ContainerPreemptEvent) event; + markContainerForNonKillable(cancelKillContainerEvent.getContainer()); + } } break; default: @@ -1549,15 +1578,16 @@ private void rollbackContainerResource( protected void completedContainerInternal( RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { - Container container = rmContainer.getContainer(); + ContainerId containerId = container.getId(); // Get the application for the finished container FiCaSchedulerApp application = getCurrentAttemptForContainer(container.getId()); ApplicationId appId = - container.getId().getApplicationAttemptId().getApplicationId(); + containerId.getApplicationAttemptId().getApplicationId(); if (application == null) { + liveContainers.remove(containerId); LOG.info("Container " + container + " of" + " finished application " + appId + " completed with event " + event); return; @@ -1570,15 +1600,6 @@ protected void completedContainerInternal( LeafQueue queue = (LeafQueue)application.getQueue(); queue.completedContainer(clusterResource, application, node, rmContainer, containerStatus, event, null, true); - - if (containerStatus.getExitStatus() == ContainerExitStatus.PREEMPTED) { - schedulerHealth.updatePreemption(Time.now(), container.getNodeId(), - container.getId(), queue.getQueuePath()); - schedulerHealth.updateSchedulerPreemptionCounts(1); - } else { - schedulerHealth.updateRelease(lastNodeUpdateTime, container.getNodeId(), - container.getId(), queue.getQueuePath()); - } } @Override @@ -1614,7 +1635,7 @@ public FiCaSchedulerApp getApplicationAttempt( ApplicationAttemptId applicationAttemptId) { return super.getApplicationAttempt(applicationAttemptId); } - + @Lock(Lock.NoLock.class) public FiCaSchedulerNode getNode(NodeId nodeId) { return nodes.get(nodeId); @@ -1660,15 +1681,66 @@ public void markContainerForPreemption(ApplicationAttemptId aid, } } - @Override - public void killPreemptedContainer(RMContainer cont) { + private void markContainerForKillableInternal(RMContainer killableContainer, + FiCaSchedulerNode node, FiCaSchedulerApp app) { + node.markContainerToKillable(killableContainer.getContainerId()); + + // notify PreemptionManager + // Get the application for the finished container + if (null != app) { + String leafQueueName = app.getCSLeafQueue().getQueueName(); + getPreemptionManager().addKillableContainer( + new KillableContainer(killableContainer, node.getPartition(), + leafQueueName)); + } + } + + public synchronized void markContainerForKillable( + RMContainer killableContainer) { if (LOG.isDebugEnabled()) { - LOG.debug(SchedulerEventType.KILL_PREEMPTED_CONTAINER + ": container" - + cont.toString()); + LOG.debug(SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE + ": container" + + killableContainer.toString()); + } + + if (!isLazyPreemptionEnabled) { + super.completedContainer(killableContainer, SchedulerUtils + .createPreemptedContainerStatus(killableContainer.getContainerId(), + SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL); + } else { + FiCaSchedulerNode node = (FiCaSchedulerNode) getSchedulerNode( + killableContainer.getAllocatedNode()); + + FiCaSchedulerApp application = getCurrentAttemptForContainer( + killableContainer.getContainerId()); + + markContainerForKillableInternal(killableContainer, node, application); + } + } + + private synchronized void markContainerForNonKillable( + RMContainer nonKillableContainer) { + if (LOG.isDebugEnabled()) { + LOG.debug( + SchedulerEventType.MARK_CONTAINER_FOR_NONKILLABLE + ": container" + + nonKillableContainer.toString()); + } + + FiCaSchedulerNode node = (FiCaSchedulerNode) getSchedulerNode( + nonKillableContainer.getAllocatedNode()); + + FiCaSchedulerApp application = getCurrentAttemptForContainer( + nonKillableContainer.getContainerId()); + + node.markContainerToNonKillable(nonKillableContainer.getContainerId()); + + // notify PreemptionManager + // Get the application for the finished container + if (null != application) { + String leafQueueName = application.getCSLeafQueue().getQueueName(); + getPreemptionManager().removeKillableContainer( + new KillableContainer(nonKillableContainer, node.getPartition(), + leafQueueName)); } - super.completedContainer(cont, SchedulerUtils - .createPreemptedContainerStatus(cont.getContainerId(), - SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL); } @Override @@ -1951,6 +2023,7 @@ private String handleMoveToPlanQueue(String targetQueueName) { return ret; } + @Override public SchedulerHealth getSchedulerHealth() { return this.schedulerHealth; } @@ -1960,6 +2033,11 @@ private void setLastNodeUpdateTime(long time) { } @Override + public long getLastNodeUpdateTime() { + return lastNodeUpdateTime; + } + + @Override public Priority checkAndGetApplicationPriority(Priority priorityFromContext, String user, String queueName, ApplicationId applicationId) throws YarnException { @@ -2060,4 +2138,9 @@ public void updateApplicationPriority(Priority newPriority, + rmApp.getQueue() + " for application: " + applicationId + " for the user: " + rmApp.getUser()); } + + @Override + public PreemptionManager getPreemptionManager() { + return preemptionManager; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 3756d9e..3729264 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -257,6 +257,12 @@ public static final String RESERVATION_ENFORCEMENT_WINDOW = "reservation-enforcement-window"; + @Private + public static final String LAZY_PREEMPTION_ENALBED = PREFIX + "lazy-preemption-enabled"; + + @Private + public static final boolean DEFAULT_LAZY_PREEMPTION_ENABLED = false; + public CapacitySchedulerConfiguration() { this(new Configuration()); } @@ -1007,7 +1013,11 @@ public void setOrderingPolicy(String queue, String policy) { @VisibleForTesting public void setOrderingPolicyParameter(String queue, String parameterKey, String parameterValue) { - set(getQueuePrefix(queue) + ORDERING_POLICY + "." - + parameterKey, parameterValue); + set(getQueuePrefix(queue) + ORDERING_POLICY + "." + parameterKey, + parameterValue); + } + + public boolean getLazyPreemptionEnabled() { + return getBoolean(LAZY_PREEMPTION_ENALBED, DEFAULT_LAZY_PREEMPTION_ENABLED); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java index 2a0dd0da..1203272 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java @@ -18,17 +18,20 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; -import java.util.Comparator; - import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import java.util.Comparator; + /** * Read-only interface to {@link CapacityScheduler} context. */ @@ -61,4 +64,12 @@ PartitionedQueueComparator getPartitionedQueueComparator(); FiCaSchedulerNode getNode(NodeId nodeId); + + FiCaSchedulerApp getApplicationAttempt(ApplicationAttemptId attemptId); + + PreemptionManager getPreemptionManager(); + + SchedulerHealth getSchedulerHealth(); + + long getLastNodeUpdateTime(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index c625fae..3dc2090 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -37,9 +37,11 @@ import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; @@ -63,7 +65,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyForPendingApps; @@ -823,6 +827,40 @@ private void handleExcessReservedContainer(Resource clusterResource, assignment.setExcessReservation(null); } } + + private void killToPreemptContainers(Resource clusterResource, + FiCaSchedulerNode node, + CSAssignment assignment) { + if (assignment.getContainersToKill() != null) { + StringBuilder sb = new StringBuilder("Killing containers: ["); + + for (RMContainer c : assignment.getContainersToKill()) { + FiCaSchedulerApp application = csContext.getApplicationAttempt( + c.getApplicationAttemptId()); + LeafQueue q = application.getCSLeafQueue(); + q.completedContainer(clusterResource, application, node, c, SchedulerUtils + .createPreemptedContainerStatus(c.getContainerId(), + SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL, + null, false); + sb.append("(container=" + c.getContainerId() + " resource=" + c + .getAllocatedResource() + ")"); + } + + sb.append("] for container=" + assignment.getAssignmentInformation() + .getFirstAllocatedOrReservedContainerId() + " resource=" + assignment + .getResource()); + LOG.info(sb.toString()); + + } + } + + private void setPreemptionAllowed(ResourceLimits limits, String nodePartition) { + // Set preemption-allowed: + // For leaf queue, only under-utilized queue is allowed to preempt resources from other queues + float usedCapacity = queueCapacities.getAbsoluteUsedCapacity(nodePartition); + float guaranteedCapacity = queueCapacities.getAbsoluteCapacity(nodePartition); + limits.setIsAllowPreemption(usedCapacity < guaranteedCapacity); + } @Override public synchronized CSAssignment assignContainers(Resource clusterResource, @@ -835,6 +873,8 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, + " #applications=" + orderingPolicy.getNumSchedulableEntities()); } + setPreemptionAllowed(currentResourceLimits, node.getPartition()); + // Check for reserved resources RMContainer reservedContainer = node.getReservedContainer(); if (reservedContainer != null) { @@ -846,6 +886,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, currentResourceLimits, schedulingMode, reservedContainer); handleExcessReservedContainer(clusterResource, assignment, node, application); + killToPreemptContainers(clusterResource, node, assignment); return assignment; } } @@ -907,6 +948,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, handleExcessReservedContainer(clusterResource, assignment, node, application); + killToPreemptContainers(clusterResource, node, assignment); if (Resources.greaterThan(resourceCalculator, clusterResource, assigned, Resources.none())) { @@ -1210,11 +1252,34 @@ public void unreserveIncreasedContainer(Resource clusterResource, } } + private void updateSchedulerHealthForCompletedContainer( + RMContainer rmContainer, ContainerStatus containerStatus) { + // Update SchedulerHealth for released / preempted container + SchedulerHealth schedulerHealth = csContext.getSchedulerHealth(); + if (null == schedulerHealth) { + // Only do update if we have schedulerHealth + return; + } + + if (containerStatus.getExitStatus() == ContainerExitStatus.PREEMPTED) { + schedulerHealth.updatePreemption(Time.now(), rmContainer.getAllocatedNode(), + rmContainer.getContainerId(), getQueuePath()); + schedulerHealth.updateSchedulerPreemptionCounts(1); + } else { + schedulerHealth.updateRelease(csContext.getLastNodeUpdateTime(), + rmContainer.getAllocatedNode(), rmContainer.getContainerId(), + getQueuePath()); + } + } + @Override public void completedContainer(Resource clusterResource, FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event, CSQueue childQueue, boolean sortQueues) { + // Update SchedulerHealth for released / preempted container + updateSchedulerHealthForCompletedContainer(rmContainer, containerStatus); + if (application != null) { // unreserve container increase request if it previously reserved. if (rmContainer.hasIncreaseReservation()) { @@ -1265,6 +1330,10 @@ public void completedContainer(Resource clusterResource, rmContainer, null, event, this, sortQueues); } } + + // Notify PreemptionManager + csContext.getPreemptionManager().removeKillableContainer( + new KillableContainer(rmContainer, node.getPartition(), queueName)); } synchronized void allocateResource(Resource clusterResource, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 7cf5565..241ed47 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -18,18 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; - import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -49,6 +37,7 @@ import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.nodelabels.RMNodeLabel; import org.apache.hadoop.yarn.security.AccessType; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -57,12 +46,25 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.util.resource.Resources; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + @Private @Evolving public class ParentQueue extends AbstractCSQueue { @@ -431,7 +433,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, resourceCalculator, clusterResource, assignedToChild.getResource(), Resources.none())) { // Track resource utilization for the parent-queue - super.allocateResource(clusterResource, assignedToChild.getResource(), + allocateResource(clusterResource, assignedToChild.getResource(), node.getPartition(), assignedToChild.isIncreasedAllocation()); // Track resource utilization in this pass of the scheduler @@ -494,19 +496,27 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, } private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) { - return (node.getReservedContainer() == null) && - Resources.greaterThanOrEqual(resourceCalculator, clusterResource, - node.getUnallocatedResource(), minimumAllocation); + // Two conditions need to meet when trying to allocate: + // 1) Node doesn't have reserved container + // 2) Node's available-resource + killable-resource should > 0 + return node.getReservedContainer() == null && Resources.greaterThanOrEqual( + resourceCalculator, clusterResource, Resources + .add(node.getUnallocatedResource(), node.getTotalKillableResources()), + minimumAllocation); } - + private ResourceLimits getResourceLimitsOfChild(CSQueue child, - Resource clusterResource, ResourceLimits parentLimits) { + Resource clusterResource, ResourceLimits parentLimits, + String nodePartition) { // Set resource-limit of a given child, child.limit = // min(my.limit - my.used + child.used, child.max) // Parent available resource = parent-limit - parent-used-resource Resource parentMaxAvailableResource = Resources.subtract(parentLimits.getLimit(), getUsedResources()); + // Deduct killable from used + Resources.addTo(parentMaxAvailableResource, + getTotalKillableResource(nodePartition)); // Child's limit = parent-available-resource + child-used Resource childLimit = @@ -568,7 +578,7 @@ private synchronized CSAssignment assignContainersToChildQueues( // Get ResourceLimits of child queue before assign containers ResourceLimits childLimits = - getResourceLimitsOfChild(childQueue, cluster, limits); + getResourceLimitsOfChild(childQueue, cluster, limits, node.getPartition()); assignment = childQueue.assignContainers(cluster, node, childLimits, schedulingMode); @@ -714,8 +724,8 @@ public synchronized void updateClusterResource(Resource clusterResource, // Update all children for (CSQueue childQueue : childQueues) { // Get ResourceLimits of child queue before assign containers - ResourceLimits childLimits = - getResourceLimitsOfChild(childQueue, clusterResource, resourceLimits); + ResourceLimits childLimits = getResourceLimitsOfChild(childQueue, + clusterResource, resourceLimits, RMNodeLabelsManager.NO_LABEL); childQueue.updateClusterResource(clusterResource, childLimits); } @@ -738,8 +748,8 @@ public void recoverContainer(Resource clusterResource, synchronized (this) { FiCaSchedulerNode node = scheduler.getNode(rmContainer.getContainer().getNodeId()); - super.allocateResource(clusterResource, rmContainer.getContainer() - .getResource(), node.getPartition(), false); + allocateResource(clusterResource, + rmContainer.getContainer().getResource(), node.getPartition(), false); } if (parent != null) { parent.recoverContainer(clusterResource, attempt, rmContainer); @@ -766,7 +776,7 @@ public void attachContainer(Resource clusterResource, if (application != null) { FiCaSchedulerNode node = scheduler.getNode(rmContainer.getContainer().getNodeId()); - super.allocateResource(clusterResource, rmContainer.getContainer() + allocateResource(clusterResource, rmContainer.getContainer() .getResource(), node.getPartition(), false); LOG.info("movedContainer" + " queueMoveIn=" + getQueueName() + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" @@ -802,4 +812,54 @@ public void detachContainer(Resource clusterResource, public synchronized int getNumApplications() { return numApplications; } + + synchronized void allocateResource(Resource clusterResource, + Resource resource, String nodePartition, boolean changeContainerResource) { + super.allocateResource(clusterResource, resource, nodePartition, + changeContainerResource); + + // check if we need to kill (killable) containers if maximum resource violated. + if (getQueueCapacities().getAbsoluteMaximumCapacity(nodePartition) + < getQueueCapacities().getAbsoluteUsedCapacity(nodePartition)) { + killContainersToEnforceMaxQueueCapacity(nodePartition, clusterResource); + } + } + + private void killContainersToEnforceMaxQueueCapacity(String partition, + Resource clusterResource) { + Iterator killableContainerIter = getKillableContainers( + partition); + if (!killableContainerIter.hasNext()) { + return; + } + + Resource partitionResource = labelManager.getResourceByLabel(partition, + null); + Resource maxResource = Resources.multiply(partitionResource, + getQueueCapacities().getAbsoluteMaximumCapacity(partition)); + + while (Resources.greaterThan(resourceCalculator, partitionResource, + queueUsage.getUsed(partition), maxResource)) { + RMContainer toKillContainer = killableContainerIter.next(); + FiCaSchedulerApp attempt = csContext.getApplicationAttempt( + toKillContainer.getContainerId().getApplicationAttemptId()); + FiCaSchedulerNode node = csContext.getNode( + toKillContainer.getAllocatedNode()); + if (null != attempt && null != node) { + LeafQueue lq = attempt.getCSLeafQueue(); + lq.completedContainer(clusterResource, attempt, node, toKillContainer, + SchedulerUtils.createPreemptedContainerStatus( + toKillContainer.getContainerId(), + SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL, + null, false); + LOG.info("Killed container=" + toKillContainer.getContainerId() + + " from queue=" + lq.getQueueName() + " to make queue=" + this + .getQueueName() + "'s max-capacity enforced"); + } + + if (!killableContainerIter.hasNext()) { + break; + } + } + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java index ee01bd1..afac235 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java @@ -108,6 +108,8 @@ protected CSAssignment getCSAssignmentFromAllocateResult( assignment.setFulfilledReservation(true); } } + + assignment.setContainersToKill(result.getToKillContainers()); } return assignment; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java index 1df9410..8f749f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java @@ -19,11 +19,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.util.resource.Resources; +import java.util.List; + public class ContainerAllocation { /** * Skip the locality (e.g. node-local, rack-local, any), and look at other @@ -56,6 +59,7 @@ NodeType containerNodeType = NodeType.NODE_LOCAL; NodeType requestNodeType = NodeType.NODE_LOCAL; Container updatedContainer; + private List toKillContainers; public ContainerAllocation(RMContainer containerToBeUnreserved, Resource resourceToBeAllocated, AllocationState state) { @@ -86,4 +90,12 @@ public NodeType getContainerNodeType() { public Container getUpdatedContainer() { return updatedContainer; } + + public void setToKillContainers(List toKillContainers) { + this.toKillContainers = toKillContainers; + } + + public List getToKillContainers() { + return toKillContainers; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index e168edf..a86d5a1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator; +import org.apache.commons.collections.ArrayStack; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.Container; @@ -42,6 +43,10 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + /** * Allocate normal (new) containers, considers locality/label, etc. Using * delayed scheduling mechanism to get better locality allocation. @@ -435,9 +440,6 @@ private ContainerAllocation assignContainer(Resource clusterResource, return ContainerAllocation.LOCALITY_SKIPPED; } - assert Resources.greaterThan( - rc, clusterResource, available, Resources.none()); - boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer( priority, capability); @@ -460,6 +462,29 @@ private ContainerAllocation assignContainer(Resource clusterResource, boolean reservationsContinueLooking = application.getCSLeafQueue().getReservationContinueLooking(); + // Check if we need to kill some containers to allocate this one + List toKillContainers = null; + if (availableContainers == 0 && currentResoureLimits.isAllowPreemption()) { + Resource availableConsidersKillable = Resources.clone(available); + for (RMContainer killableContainer : node + .getKillableContainers().values()) { + if (null == toKillContainers) { + toKillContainers = new ArrayList<>(); + } + toKillContainers.add(killableContainer); + Resources.addTo(availableConsidersKillable, + killableContainer.getAllocatedResource()); + if (Resources.fitsIn(rc, + clusterResource, + capability, + availableConsidersKillable)) { + // Stop if we find enough spaces + availableContainers = 1; + break; + } + } + } + if (availableContainers > 0) { // Allocate... // We will only do continuous reservation when this is not allocated from @@ -499,12 +524,26 @@ private ContainerAllocation assignContainer(Resource clusterResource, new ContainerAllocation(unreservedContainer, request.getCapability(), AllocationState.ALLOCATED); result.containerNodeType = type; + result.setToKillContainers(toKillContainers); return result; } else { + // Container should only be reserved when + // 1) Node available-resource > 0 OR + // 2) killable-resource > 0 + if (Resources.lessThanOrEqual(rc, clusterResource, + node.getUnallocatedResource(), Resources.none()) && + toKillContainers.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Cannot reserve container on the node=" + node.getNodeID() + + " for application=" + application.getApplicationAttemptId() + + " because node's available resource is 0" + + " and cannot find containers to kill on the node"); + } + } + // if we are allowed to allocate but this node doesn't have space, reserve // it or if this was an already a reserved container, reserve it again if (shouldAllocOrReserveNewContainer || rmContainer != null) { - if (reservationsContinueLooking && rmContainer == null) { // we could possibly ignoring queue capacity or user limits when // reservationsContinueLooking is set. Make sure we didn't need to @@ -522,6 +561,7 @@ private ContainerAllocation assignContainer(Resource clusterResource, new ContainerAllocation(null, request.getCapability(), AllocationState.RESERVED); result.containerNodeType = type; + result.setToKillContainers(null); return result; } // Skip the locality request @@ -613,8 +653,7 @@ private ContainerAllocation handleNewContainerAllocation( } ContainerAllocation doAllocation(ContainerAllocation allocationResult, - Resource clusterResource, FiCaSchedulerNode node, - SchedulingMode schedulingMode, Priority priority, + FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer) { // Create the container if necessary Container container = @@ -678,9 +717,7 @@ private ContainerAllocation allocate(Resource clusterResource, if (AllocationState.ALLOCATED == result.state || AllocationState.RESERVED == result.state) { - result = - doAllocation(result, clusterResource, node, schedulingMode, priority, - reservedContainer); + result = doAllocation(result, node, priority, reservedContainer); } return result; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/KillableContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/KillableContainer.java new file mode 100644 index 0000000..675b0b4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/KillableContainer.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption; + +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; + +public class KillableContainer { + RMContainer container; + String partition; + String leafQueueName; + + public KillableContainer(RMContainer container, String partition, String leafQueueName) { + this.container = container; + this.partition = partition; + this.leafQueueName = leafQueueName; + } + + public RMContainer getRMContainer() { + return this.container; + } + + public String getNodePartition() { + return this.partition; + } + + public String getLeafQueueName() { + return this.leafQueueName; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptableEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptableEntity.java new file mode 100644 index 0000000..88fc394 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptableEntity.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentSkipListMap; + +public class PreemptableEntity { + // Partition -> killable resources and containers + private Map totalKillableResources = new HashMap<>(); + private Map> killableContainers = + new HashMap<>(); + private PreemptableEntity parent; + + public PreemptableEntity(PreemptableEntity parent) { + this.parent = parent; + } + + public PreemptableEntity(Map totalKillableResources, + Map> killableContainers) { + this.totalKillableResources = totalKillableResources; + this.killableContainers = killableContainers; + } + + void addKillableContainer(KillableContainer container) { + String partition = container.getNodePartition(); + if (!totalKillableResources.containsKey(partition)) { + totalKillableResources.put(partition, Resources.createResource(0)); + killableContainers.put(partition, + new ConcurrentSkipListMap()); + } + + RMContainer c = container.getRMContainer(); + Resources.addTo(totalKillableResources.get(partition), + c.getAllocatedResource()); + killableContainers.get(partition).put(c.getContainerId(), c); + + if (null != parent) { + parent.addKillableContainer(container); + } + } + + void removeKillableContainer(KillableContainer container) { + String partition = container.getNodePartition(); + Map partitionKillableContainers = + killableContainers.get(partition); + if (partitionKillableContainers != null) { + RMContainer rmContainer = partitionKillableContainers.remove( + container.getRMContainer().getContainerId()); + if (null != rmContainer) { + Resources.subtractFrom(totalKillableResources.get(partition), + rmContainer.getAllocatedResource()); + } + } + + if (null != parent) { + parent.removeKillableContainer(container); + } + } + + public Resource getKillableResource(String partition) { + Resource res = totalKillableResources.get(partition); + return res == null ? Resources.none() : res; + } + + public Map getKillableContainers(String partition) { + Map map = killableContainers.get(partition); + return map == null ? Collections.EMPTY_MAP : map; + } + + public Map> getKillableContainers() { + return killableContainers; + } + + Map getTotalKillableResources() { + return totalKillableResources; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java new file mode 100644 index 0000000..e672de3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class PreemptionManager { + private ReentrantReadWriteLock.ReadLock readLock; + private ReentrantReadWriteLock.WriteLock writeLock; + private Map entities = new HashMap<>(); + + public PreemptionManager() { + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); + } + + public void refreshQueues(CSQueue parent, CSQueue current) { + try { + writeLock.lock(); + PreemptableEntity parentEntity = null; + if (parent != null) { + parentEntity = entities.get(parent.getQueueName()); + } + + if (!entities.containsKey(current.getQueueName())) { + entities.put(current.getQueueName(), + new PreemptableEntity(parentEntity)); + } + + if (current.getChildQueues() != null) { + for (CSQueue child : current.getChildQueues()) { + refreshQueues(current, child); + } + } + } + finally { + writeLock.unlock(); + } + } + + public void addKillableContainer(KillableContainer container) { + try { + writeLock.lock(); + PreemptableEntity entity = entities.get(container.getLeafQueueName()); + if (null != entity) { + entity.addKillableContainer(container); + } + } + finally { + writeLock.unlock(); + } + } + + public void removeKillableContainer(KillableContainer container) { + try { + writeLock.lock(); + PreemptableEntity entity = entities.get(container.getLeafQueueName()); + if (null != entity) { + entity.removeKillableContainer(container); + } + } + finally { + writeLock.unlock(); + } + } + + public void moveKillableContainer(KillableContainer oldContainer, + KillableContainer newContainer) { + // TODO, will be called when partition of the node changed OR + // container moved to different queue + } + + public void updateKillableContainerResource(KillableContainer container, + Resource oldResource, Resource newResource) { + // TODO, will be called when container's resource changed + } + + @VisibleForTesting + public Map getKillableContainersMap( + String queueName, String partition) { + try { + readLock.lock(); + PreemptableEntity entity = entities.get(queueName); + if (entity != null) { + Map containers = + entity.getKillableContainers().get(partition); + if (containers != null) { + return containers; + } + } + return Collections.emptyMap(); + } + finally { + readLock.unlock(); + } + } + + public Iterator getKillableContainers(String queueName, + String partition) { + return getKillableContainersMap(queueName, partition).values().iterator(); + } + + public Resource getKillableResource(String queueName, String partition) { + try { + readLock.lock(); + PreemptableEntity entity = entities.get(queueName); + if (entity != null) { + Resource res = entity.getTotalKillableResources().get(partition); + if (res == null || res.equals(Resources.none())) { + return Resources.none(); + } + return Resources.clone(res); + } + return Resources.none(); + } + finally { + readLock.unlock(); + } + } + + public Map getShallowCopyOfPreemptableEntities() { + try { + readLock.lock(); + Map map = new HashMap<>(); + for (Map.Entry entry : entities.entrySet()) { + String key = entry.getKey(); + PreemptableEntity entity = entry.getValue(); + map.put(key, new PreemptableEntity( + new HashMap<>(entity.getTotalKillableResources()), + new HashMap<>(entity.getKillableContainers()))); + } + return map; + } finally { + readLock.unlock(); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java index 5158255..aad3bc7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java @@ -120,9 +120,9 @@ public void addReservationDetails(ContainerId containerId, String queue) { } private ContainerId getFirstContainerIdFromOperation(Operation op) { - if (null != operationDetails.get(Operation.ALLOCATION)) { + if (null != operationDetails.get(op)) { List assignDetails = - operationDetails.get(Operation.ALLOCATION); + operationDetails.get(op); if (!assignDetails.isEmpty()) { return assignDetails.get(0).containerId; } @@ -131,7 +131,7 @@ private ContainerId getFirstContainerIdFromOperation(Operation op) { } public ContainerId getFirstAllocatedOrReservedContainerId() { - ContainerId containerId = null; + ContainerId containerId; containerId = getFirstContainerIdFromOperation(Operation.ALLOCATION); if (null != containerId) { return containerId; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 4d563cd..f896b3d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; @@ -94,6 +95,7 @@ * to hold the message if its app doesn't not get container from a node */ private String appSkipNodeDiagnostics; + private CapacitySchedulerContext capacitySchedulerContext; public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, @@ -138,28 +140,31 @@ public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, } containerAllocator = new ContainerAllocator(this, rc, rmContext); + + if (scheduler instanceof CapacityScheduler) { + capacitySchedulerContext = (CapacitySchedulerContext) scheduler; + } } - synchronized public boolean containerCompleted(RMContainer rmContainer, + public synchronized boolean containerCompleted(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event, String partition) { + ContainerId containerId = rmContainer.getContainerId(); // Remove from the list of containers - if (null == liveContainers.remove(rmContainer.getContainerId())) { + if (null == liveContainers.remove(containerId)) { return false; } + rmContainer.setIsAlive(false); // Remove from the list of newly allocated containers if found newlyAllocatedContainers.remove(rmContainer); - Container container = rmContainer.getContainer(); - ContainerId containerId = container.getId(); - // Inform the container rmContainer.handle( new RMContainerFinishedEvent(containerId, containerStatus, event)); - containersToPreempt.remove(rmContainer.getContainerId()); + containersToPreempt.remove(containerId); RMAuditLogger.logSuccess(getUser(), AuditConstants.RELEASE_CONTAINER, "SchedulerApp", @@ -176,7 +181,7 @@ synchronized public boolean containerCompleted(RMContainer rmContainer, return true; } - synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node, + public synchronized RMContainer allocate(NodeType type, FiCaSchedulerNode node, Priority priority, ResourceRequest request, Container container) { @@ -195,12 +200,16 @@ synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node, new RMContainerImpl(container, this.getApplicationAttemptId(), node.getNodeID(), appSchedulingInfo.getUser(), this.rmContext, request.getNodeLabelExpression()); + rmContainer.setLeafQueue(getQueueName()); updateAMContainerDiagnostics(AMState.ASSIGNED, null); // Add it to allContainers list. newlyAllocatedContainers.add(rmContainer); - liveContainers.put(container.getId(), rmContainer); + + ContainerId containerId = container.getId(); + liveContainers.put(containerId, rmContainer); + rmContainer.setIsAlive(true); // Update consumption and track allocations List resourceRequestList = appSchedulingInfo.allocate( @@ -213,17 +222,17 @@ synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node, // Inform the container rmContainer.handle( - new RMContainerEvent(container.getId(), RMContainerEventType.START)); + new RMContainerEvent(containerId, RMContainerEventType.START)); if (LOG.isDebugEnabled()) { LOG.debug("allocate: applicationAttemptId=" - + container.getId().getApplicationAttemptId() - + " container=" + container.getId() + " host=" + + containerId.getApplicationAttemptId() + + " container=" + containerId + " host=" + container.getNodeId().getHost() + " type=" + type); } RMAuditLogger.logSuccess(getUser(), AuditConstants.ALLOC_CONTAINER, "SchedulerApp", - getApplicationId(), container.getId()); + getApplicationId(), containerId); return rmContainer; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java index fe6db47..1d0e78a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java @@ -18,22 +18,29 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica; - -import java.util.Set; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; public class FiCaSchedulerNode extends SchedulerNode { private static final Log LOG = LogFactory.getLog(FiCaSchedulerNode.class); + private Map killableContainers = new HashMap<>(); + private Resource totalKillableResources = Resource.newInstance(0, 0); public FiCaSchedulerNode(RMNode node, boolean usePortForNodeName, Set nodeLabels) { @@ -92,7 +99,6 @@ public synchronized void reserveResource( @Override public synchronized void unreserveResource( SchedulerApplicationAttempt application) { - // adding NP checks as this can now be called for preemption if (getReservedContainer() != null && getReservedContainer().getContainer() != null @@ -115,4 +121,55 @@ public synchronized void unreserveResource( } setReservedContainer(null); } + + // According to decisions from preemption policy, mark the container to killable + public synchronized void markContainerToKillable(ContainerId containerId) { + RMContainer c = launchedContainers.get(containerId); + if (c != null && !killableContainers.containsKey(containerId)) { + killableContainers.put(containerId, c); + Resources.addTo(totalKillableResources, c.getAllocatedResource()); + } + } + + // According to decisions from preemption policy, mark the container to + // non-killable + public synchronized void markContainerToNonKillable(ContainerId containerId) { + RMContainer c = launchedContainers.get(containerId); + if (c != null && killableContainers.containsKey(containerId)) { + killableContainers.remove(containerId); + Resources.subtractFrom(totalKillableResources, c.getAllocatedResource()); + } + } + + @Override + protected synchronized void updateResource( + Container container) { + super.updateResource(container); + if (killableContainers.containsKey(container.getId())) { + Resources.subtractFrom(totalKillableResources, container.getResource()); + killableContainers.remove(container.getId()); + } + } + + @Override + protected synchronized void changeContainerResource(ContainerId containerId, + Resource deltaResource, boolean increase) { + super.changeContainerResource(containerId, deltaResource, increase); + + if (killableContainers.containsKey(containerId)) { + if (increase) { + Resources.addTo(totalKillableResources, deltaResource); + } else { + Resources.subtractFrom(totalKillableResources, deltaResource); + } + } + } + + public synchronized Resource getTotalKillableResources() { + return totalKillableResources; + } + + public synchronized Map getKillableContainers() { + return killableContainers; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java index 9cf09e9..35b7c14 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java @@ -38,10 +38,15 @@ // Source: ContainerAllocationExpirer CONTAINER_EXPIRED, - // Source: SchedulingEditPolicy + /* Source: SchedulingEditPolicy */ KILL_RESERVED_CONTAINER, - MARK_CONTAINER_FOR_PREEMPTION, // Mark a container for preemption - // in the near future - KILL_PREEMPTED_CONTAINER // Kill a container previously marked for - // preemption + + // Mark a container for preemption + MARK_CONTAINER_FOR_PREEMPTION, + + // Mark a for-preemption container killable + MARK_CONTAINER_FOR_KILLABLE, + + // Cancel a killable container + MARK_CONTAINER_FOR_NONKILLABLE } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index f1cefad..86ad012 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -379,6 +379,7 @@ else if (allowed.equals(NodeType.RACK_LOCAL) && RMContainer rmContainer = new RMContainerImpl(container, getApplicationAttemptId(), node.getNodeID(), appSchedulingInfo.getUser(), rmContext); + rmContainer.setLeafQueue(getQueueName()); // Add it to allContainers list. newlyAllocatedContainers.add(rmContainer); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java index d9306dd..c944752 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java @@ -59,7 +59,7 @@ public void testSchedulerEventDispatcherForPreemptionEvents() { rmDispatcher.getEventHandler().handle(event1); ContainerPreemptEvent event2 = new ContainerPreemptEvent(appAttemptId, container, - SchedulerEventType.KILL_PREEMPTED_CONTAINER); + SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE); rmDispatcher.getEventHandler().handle(event2); ContainerPreemptEvent event3 = new ContainerPreemptEvent(appAttemptId, container, @@ -70,7 +70,7 @@ public void testSchedulerEventDispatcherForPreemptionEvents() { verify(sched, times(3)).handle(any(SchedulerEvent.class)); verify(sched).killReservedContainer(container); verify(sched).markContainerForPreemption(appAttemptId, container); - verify(sched).killPreemptedContainer(container); + verify(sched).markContainerForKillable(container); } catch (InterruptedException e) { Assert.fail(); } finally { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 028afb1..3057615 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -2352,7 +2352,7 @@ public void testRMRestartAfterPreemption() throws Exception { FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications() .get(app0.getApplicationId()).getCurrentAppAttempt(); // kill app0-attempt - cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer( + cs.markContainerForKillable(schedulerAppAttempt.getRMContainer( app0.getCurrentAppAttempt().getMasterContainer().getId())); am0.waitForState(RMAppAttemptState.FAILED); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index 5035afe..16f3f60 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -63,7 +63,6 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.Records; -import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; import org.junit.Test; @@ -566,7 +565,7 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception { ContainerId amContainer = ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); // Preempt the first attempt; - scheduler.killPreemptedContainer(scheduler.getRMContainer(amContainer)); + scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer)); am1.waitForState(RMAppAttemptState.FAILED); Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry()); @@ -582,7 +581,7 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception { // Preempt the second attempt. ContainerId amContainer2 = ContainerId.newContainerId(am2.getApplicationAttemptId(), 1); - scheduler.killPreemptedContainer(scheduler.getRMContainer(amContainer2)); + scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer2)); am2.waitForState(RMAppAttemptState.FAILED); Assert.assertTrue(! attempt2.shouldCountTowardsMaxAttemptRetry()); @@ -677,7 +676,7 @@ public void testPreemptedAMRestartOnRMRestart() throws Exception { ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); // Forcibly preempt the am container; - scheduler.killPreemptedContainer(scheduler.getRMContainer(amContainer)); + scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer)); am1.waitForState(RMAppAttemptState.FAILED); Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 13f267d..e9129de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -23,7 +23,7 @@ import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.KILL_PREEMPTED_CONTAINER; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; @@ -167,6 +168,7 @@ public void setup() { when(mCS.getConfiguration()).thenReturn(schedConf); rmContext = mock(RMContext.class); when(mCS.getRMContext()).thenReturn(rmContext); + when(mCS.getPreemptionManager()).thenReturn(new PreemptionManager()); when(rmContext.getNodeLabelManager()).thenReturn(lm); mDisp = mock(EventHandler.class); Dispatcher disp = mock(Dispatcher.class); @@ -289,7 +291,7 @@ public void testExpireKill() { List events = evtCaptor.getAllValues(); for (ContainerPreemptEvent e : events.subList(20, 20)) { assertEquals(appC, e.getAppId()); - assertEquals(KILL_PREEMPTED_CONTAINER, e.getType()); + assertEquals(MARK_CONTAINER_FOR_KILLABLE, e.getType()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java index 512f37c..21ea495 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java @@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; @@ -123,6 +124,7 @@ public void setup() { mClock = mock(Clock.class); cs = mock(CapacityScheduler.class); when(cs.getResourceCalculator()).thenReturn(rc); + when(cs.getPreemptionManager()).thenReturn(new PreemptionManager()); nlm = mock(RMNodeLabelsManager.class); mDisp = mock(EventHandler.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index 0b32676..171196f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -264,6 +265,7 @@ public void testLimitsComputation() throws Exception { thenReturn(CapacityScheduler.nonPartitionedQueueComparator); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); when(csContext.getRMContext()).thenReturn(rmContext); + when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager()); // Say cluster has 100 nodes of 16G each Resource clusterResource = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java index 1569a12..d8161f8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java @@ -205,7 +205,7 @@ public void testApplicationPriorityAllocation() throws Exception { if (++counter > 2) { break; } - cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(c.getId())); + cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(c.getId())); } // check node report, 12 GB used and 4 GB available @@ -512,7 +512,7 @@ public void testApplicationPriorityAllocationWithChangeInPriority() if (++counter > 2) { break; } - cs.killPreemptedContainer(schedulerAppAttemptApp1.getRMContainer(c.getId())); + cs.markContainerForKillable(schedulerAppAttemptApp1.getRMContainer(c.getId())); iterator.remove(); } @@ -542,7 +542,7 @@ public void testApplicationPriorityAllocationWithChangeInPriority() if (++counter > 1) { break; } - cs.killPreemptedContainer(schedulerAppAttemptApp1.getRMContainer(c.getId())); + cs.markContainerForKillable(schedulerAppAttemptApp1.getRMContainer(c.getId())); iterator.remove(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index b6c005b..16ba607 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -1188,7 +1188,7 @@ public void testPreemptionInfo() throws Exception { // kill the 3 containers for (Container c : allocatedContainers) { - cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(c.getId())); + cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(c.getId())); } // check values @@ -1197,7 +1197,7 @@ public void testPreemptionInfo() throws Exception { Resource.newInstance(CONTAINER_MEMORY * 3, 3), false, 3); // kill app0-attempt0 AM container - cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(app0 + cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(app0 .getCurrentAppAttempt().getMasterContainer().getId())); // wait for app0 failed @@ -1220,7 +1220,7 @@ public void testPreemptionInfo() throws Exception { allocatedContainers = am1.allocateAndWaitForContainers(3, CONTAINER_MEMORY, nm1); for (Container c : allocatedContainers) { - cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(c.getId())); + cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(c.getId())); } // check values @@ -1269,7 +1269,7 @@ public void testRecoverRequestAfterPreemption() throws Exception { } // Call killContainer to preempt the container - cs.killPreemptedContainer(rmContainer); + cs.markContainerForKillable(rmContainer); Assert.assertEquals(3, requests.size()); for (ResourceRequest request : requests) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java new file mode 100644 index 0000000..ea39b4d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java @@ -0,0 +1,579 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import com.google.common.collect.Sets; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMActiveServices; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestCapacitySchedulerPreemption { + private static final Log LOG = LogFactory.getLog( + TestCapacitySchedulerPreemption.class); + + private final int GB = 1024; + + private Configuration conf; + + RMNodeLabelsManager mgr; + + Clock clock; + + @Before + public void setUp() throws Exception { + conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true); + conf.setClass(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, + ProportionalCapacityPreemptionPolicy.class, SchedulingEditPolicy.class); + conf = TestUtils.getConfigurationWithMultipleQueues(this.conf); + + // Set preemption related configurations + conf.setInt(ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL, + 0); + conf.setBoolean(CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED, + true); + conf.setFloat( + ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND, 1.0f); + conf.setFloat( + ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR, 1.0f); + mgr = new NullRMNodeLabelsManager(); + mgr.init(this.conf); + clock = mock(Clock.class); + when(clock.getTime()).thenReturn(0L); + } + + private SchedulingEditPolicy getSchedulingEditPolicy(MockRM rm) { + RMActiveServices activeServices = rm.getRMActiveService(); + SchedulingMonitor mon = null; + for (Service service : activeServices.getServices()) { + if (service instanceof SchedulingMonitor) { + mon = (SchedulingMonitor) service; + break; + } + } + + if (mon != null) { + return mon.getSchedulingEditPolicy(); + } + return null; + } + + @Test (timeout = 60000) + public void testSimplePreemption() throws Exception { + /** + * Test case: Submit two application (app1/app2) to different queues, queue + * structure: + * + *

+     *             Root
+     *            /  |  \
+     *           a   b   c
+     *          10   20  70
+     * 
+ * + * 1) Two nodes in the cluster, each of them has 4G. + * + * 2) app1 submit to queue-a first, it asked 7 * 1G containers, so there's no + * more resource available. + * + * 3) app2 submit to queue-c, ask for one 1G container (for AM) + * + * Now the cluster is fulfilled. + * + * 4) app2 asks for another 1G container, system will preempt one container + * from app1, and app2 will receive the preempted container + */ + MockRM rm1 = new MockRM(conf); + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + + MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB); + MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB); + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + am1.allocate("*", 1 * GB, 7, new ArrayList()); + + // Do allocation 3 times for node1/node2 + for (int i = 0; i < 3; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + + // App1 should have 7 containers now, and no available resource for cluster + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); + + // Submit app2 to queue-c and asks for a 1G container for AM + RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2); + + // NM1/NM2 has available resource = 0G + Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId()) + .getUnallocatedResource().getMemory()); + Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId()) + .getUnallocatedResource().getMemory()); + + // AM asks for a 1 * GB container + am2.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.newInstance(1), ResourceRequest.ANY, + Resources.createResource(1 * GB), 1)), null); + + // Get edit policy and do one update + SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1); + + // Call edit schedule twice, and check if one container from app1 marked + // to be "killable" + editPolicy.editSchedule(); + editPolicy.editSchedule(); + + PreemptionManager pm = cs.getPreemptionManager(); + Map killableContainers = + waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1); + Assert.assertEquals(1, killableContainers.size()); + Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey() + .getApplicationAttemptId(), am1.getApplicationAttemptId()); + + // Call CS.handle once to see if container preempted + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + + FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( + am2.getApplicationAttemptId()); + + // App1 has 6 containers, and app2 has 2 containers + Assert.assertEquals(6, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(2, schedulerApp2.getLiveContainers().size()); + + rm1.close(); + } + + @Test (timeout = 60000) + public void testPreemptionConsidersNodeLocalityDelay() + throws Exception { + /** + * Test case: same as testSimplePreemption steps 1-3. + * + * Step 4: app2 asks for 1G container with locality specified, so it needs + * to wait for missed-opportunity before get scheduled. + * Check if system waits missed-opportunity before finish killable container + */ + MockRM rm1 = new MockRM(conf); + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB); + MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB); + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + am1.allocate("*", 1 * GB, 6, new ArrayList()); + + // Do allocation 3 times for node1/node2 + for (int i = 0; i < 3; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + + // App1 should have 7 containers now, and no available resource for cluster + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); + + // Submit app2 to queue-c and asks for a 1G container for AM + RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2); + + // NM1/NM2 has available resource = 0G + Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId()) + .getUnallocatedResource().getMemory()); + Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId()) + .getUnallocatedResource().getMemory()); + + // AM asks for a 1 * GB container with unknown host and unknown rack + am2.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.newInstance(1), ResourceRequest.ANY, + Resources.createResource(1 * GB), 1), ResourceRequest + .newInstance(Priority.newInstance(1), "unknownhost", + Resources.createResource(1 * GB), 1), ResourceRequest + .newInstance(Priority.newInstance(1), "/default-rack", + Resources.createResource(1 * GB), 1)), null); + + // Get edit policy and do one update + SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1); + + // Call edit schedule twice, and check if one container from app1 marked + // to be "killable" + editPolicy.editSchedule(); + editPolicy.editSchedule(); + + PreemptionManager pm = cs.getPreemptionManager(); + Map killableContainers = + waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1); + Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey() + .getApplicationAttemptId(), am1.getApplicationAttemptId()); + + // Call CS.handle once to see if container preempted + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + + FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( + am2.getApplicationAttemptId()); + + // App1 has 7 containers, and app2 has 1 containers (no container preempted) + Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(1, schedulerApp2.getLiveContainers().size()); + + // Do allocation again, one container will be preempted + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + + // App1 has 6 containers, and app2 has 2 containers (new container allocated) + Assert.assertEquals(6, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(2, schedulerApp2.getLiveContainers().size()); + + rm1.close(); + } + + @Test (timeout = 60000) + public void testPreemptionConsidersHardNodeLocality() + throws Exception { + /** + * Test case: same as testSimplePreemption steps 1-3. + * + * Step 4: app2 asks for 1G container with hard locality specified, and + * asked host is not existed + * Confirm system doesn't preempt any container. + */ + MockRM rm1 = new MockRM(conf); + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB); + MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB); + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + am1.allocate("*", 1 * GB, 6, new ArrayList()); + + // Do allocation 3 times for node1/node2 + for (int i = 0; i < 3; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + } + for (int i = 0; i < 3; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + + // App1 should have 7 containers now, and no available resource for cluster + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); + + // Submit app2 to queue-c and asks for a 1G container for AM + RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2); + + // NM1/NM2 has available resource = 0G + Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId()) + .getUnallocatedResource().getMemory()); + Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId()) + .getUnallocatedResource().getMemory()); + + // AM asks for a 1 * GB container for h3 with hard locality, + // h3 doesn't exist in the cluster + am2.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.newInstance(1), ResourceRequest.ANY, + Resources.createResource(1 * GB), 1, true), ResourceRequest + .newInstance(Priority.newInstance(1), "h3", + Resources.createResource(1 * GB), 1, false), ResourceRequest + .newInstance(Priority.newInstance(1), "/default-rack", + Resources.createResource(1 * GB), 1, false)), null); + + // Get edit policy and do one update + SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1); + + // Call edit schedule twice, and check if one container from app1 marked + // to be "killable" + editPolicy.editSchedule(); + editPolicy.editSchedule(); + + PreemptionManager pm = cs.getPreemptionManager(); + Map killableContainers = + waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1); + Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey() + .getApplicationAttemptId(), am1.getApplicationAttemptId()); + + // Call CS.handle once to see if container preempted + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + + FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( + am2.getApplicationAttemptId()); + + // App1 has 7 containers, and app2 has 1 containers (no container preempted) + Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(1, schedulerApp2.getLiveContainers().size()); + + // Do allocation again, nothing will be preempted + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + + // App1 has 7 containers, and app2 has 1 containers (no container allocated) + Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(1, schedulerApp2.getLiveContainers().size()); + + rm1.close(); + } + + @Test (timeout = 60000) + public void testPreemptionPolicyShouldRespectAlreadyMarkedKillableContainers() + throws Exception { + /** + * Test case: + *
+     *             Root
+     *            /  |  \
+     *           a   b   c
+     *          10   20  70
+     * 
+ * Submit applications to two queues, one uses more than the other, so + * preemption will happen. + * + * Check: + * 1) Killable containers resources will be excluded from PCPP (no duplicated + * container added to killable list) + * 2) When more resources need to be preempted, new containers will be selected + * and killable containers will be considered + */ + MockRM rm1 = new MockRM(conf); + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + am1.allocate("*", 1 * GB, 6, new ArrayList()); + + // Do allocation 6 times for node1 + for (int i = 0; i < 6; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + } + + // App1 should have 7 containers now, and no available resource for cluster + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); + + // Submit app2 to queue-c and asks for a 1G container for AM + RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1); + + // NM1 has available resource = 0G + Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId()) + .getUnallocatedResource().getMemory()); + am2.allocate("*", 1 * GB, 1, new ArrayList()); + + // Get edit policy and do one update + ProportionalCapacityPreemptionPolicy editPolicy = + (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1); + + // Call edit schedule twice, and check if one container from app1 marked + // to be "killable" + editPolicy.editSchedule(); + editPolicy.editSchedule(); + + PreemptionManager pm = cs.getPreemptionManager(); + waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1); + + // Check killable containers and to-be-preempted containers in edit policy + Assert.assertEquals(0, editPolicy.getToPreemptContainers().size()); + + // Run edit schedule again, confirm status doesn't changed + editPolicy.editSchedule(); + Assert.assertEquals(0, editPolicy.getToPreemptContainers().size()); + + // Save current to kill containers + Set previousKillableContainers = new HashSet<>( + pm.getKillableContainersMap("a", RMNodeLabelsManager.NO_LABEL) + .keySet()); + + // Update request resource of c from 1 to 2, so we need to preempt + // one more container + am2.allocate("*", 1 * GB, 2, new ArrayList()); + + // Call editPolicy.editSchedule() once, we should have 1 container in to-preempt map + // and 1 container in killable map + editPolicy.editSchedule(); + Assert.assertEquals(1, editPolicy.getToPreemptContainers().size()); + + // Call editPolicy.editSchedule() once more, we should have 2 containers killable map + editPolicy.editSchedule(); + Assert.assertEquals(0, editPolicy.getToPreemptContainers().size()); + + // Check if previous killable containers included by new killable containers + Map killableContainers = + waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2); + Assert.assertTrue( + Sets.difference(previousKillableContainers, killableContainers.keySet()) + .isEmpty()); + } + + @Test (timeout = 60000) + public void testPreemptionPolicyCleanupKillableContainersWhenNoPreemptionNeeded() + throws Exception { + /** + * Test case: + *
+     *             Root
+     *            /  |  \
+     *           a   b   c
+     *          10   20  70
+     * 
+ * Submit applications to two queues, one uses more than the other, so + * preemption will happen. + * + * Check: + * 1) Containers will be marked to killable + * 2) Cancel resource request + * 3) Killable containers will be cancelled from policy and scheduler + */ + MockRM rm1 = new MockRM(conf); + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + am1.allocate("*", 1 * GB, 6, new ArrayList()); + + // Do allocation 6 times for node1 + for (int i = 0; i < 6; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + } + + // App1 should have 7 containers now, and no available resource for cluster + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); + + // Submit app2 to queue-c and asks for a 1G container for AM + RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1); + + // NM1 has available resource = 0G + Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId()) + .getUnallocatedResource().getMemory()); + am2.allocate("*", 3 * GB, 1, new ArrayList()); + + // Get edit policy and do one update + ProportionalCapacityPreemptionPolicy editPolicy = + (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1); + + // Call edit schedule twice, and check if 3 container from app1 marked + // to be "killable" + editPolicy.editSchedule(); + editPolicy.editSchedule(); + + PreemptionManager pm = cs.getPreemptionManager(); + waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 3); + + // Change reqeust from 3G to 2G, now we can preempt one less container. (3->2) + am2.allocate("*", 2 * GB, 1, new ArrayList()); + editPolicy.editSchedule(); + Assert.assertEquals(0, editPolicy.getToPreemptContainers().size()); + waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2); + + // Call editSchedule once more to make sure still nothing happens + editPolicy.editSchedule(); + Assert.assertEquals(0, editPolicy.getToPreemptContainers().size()); + waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2); + } + + private Map waitKillableContainersSize( + PreemptionManager pm, String queueName, String partition, + int expectedSize) throws InterruptedException { + Map killableContainers = + pm.getKillableContainersMap(queueName, partition); + + int wait = 0; + // Wait for at most 5 sec (it should be super fast actually) + while (expectedSize != killableContainers.size() && wait < 500) { + killableContainers = pm.getKillableContainersMap(queueName, partition); + Thread.sleep(10); + wait++; + } + + Assert.assertEquals(expectedSize, killableContainers.size()); + return killableContainers; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java index 5169337..1612201 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java @@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -99,6 +100,7 @@ public void setUp() throws Exception { when(csContext.getResourceCalculator()). thenReturn(resourceComparator); when(csContext.getRMContext()).thenReturn(rmContext); + when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager()); } private FiCaSchedulerApp getMockApplication(int appId, String user) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 69b0813..87a3d51 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -150,6 +151,7 @@ public void setUp() throws Exception { thenReturn(CapacityScheduler.nonPartitionedQueueComparator); when(csContext.getResourceCalculator()). thenReturn(resourceCalculator); + when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager()); when(csContext.getRMContext()).thenReturn(rmContext); RMContainerTokenSecretManager containerTokenSecretManager = new RMContainerTokenSecretManager(conf); @@ -3092,6 +3094,7 @@ private CapacitySchedulerContext mockCSContext( Resources.createResource(GB, 1)); when(csContext.getMaximumResourceCapability()).thenReturn( Resources.createResource(2 * GB, 2)); + when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager()); return csContext; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java index f73baa4..23dc860 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; @@ -92,6 +93,7 @@ public void setUp() throws Exception { thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32)); when(csContext.getNonPartitionedQueueComparator()). thenReturn(CapacityScheduler.nonPartitionedQueueComparator); + when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager()); when(csContext.getResourceCalculator()). thenReturn(resourceComparator); when(csContext.getRMContext()).thenReturn(rmContext); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java index 9047138..29c7dea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -126,6 +127,7 @@ private void setup(CapacitySchedulerConfiguration csConf, when(csContext.getNonPartitionedQueueComparator()).thenReturn( CapacityScheduler.nonPartitionedQueueComparator); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); + when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager()); when(csContext.getRMContext()).thenReturn(rmContext); RMContainerTokenSecretManager containerTokenSecretManager = new RMContainerTokenSecretManager( conf); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index 2694957..4441c6b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -356,4 +356,40 @@ public static FiCaSchedulerApp getFiCaSchedulerApp(MockRM rm, CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); return cs.getSchedulerApplications().get(appId).getCurrentAppAttempt(); } + + /** + * Get a queue structure: + *
+   *             Root
+   *            /  |  \
+   *           a   b   c
+   *          10   20  70
+   * 
+ */ + public static Configuration + getConfigurationWithMultipleQueues(Configuration config) { + CapacitySchedulerConfiguration conf = + new CapacitySchedulerConfiguration(config); + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] { "a", "b", "c" }); + + final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + conf.setCapacity(A, 10); + conf.setMaximumCapacity(A, 100); + conf.setUserLimitFactor(A, 100); + + final String B = CapacitySchedulerConfiguration.ROOT + ".b"; + conf.setCapacity(B, 20); + conf.setMaximumCapacity(B, 100); + conf.setUserLimitFactor(B, 100); + + final String C = CapacitySchedulerConfiguration.ROOT + ".c"; + conf.setCapacity(C, 70); + conf.setMaximumCapacity(C, 100); + conf.setUserLimitFactor(C, 100); + + return conf; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 8d7c22e..5c0aab5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -102,9 +103,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.Default; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; @@ -3955,7 +3958,92 @@ public void testDefaultRuleInitializesProperlyWhenPolicyNotConfigured() } } } - + + @Test(timeout = 5000) + public void testRecoverRequestAfterPreemption() throws Exception { + conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10); + + ControlledClock clock = new ControlledClock(); + scheduler.setClock(clock); + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + Priority priority = Priority.newInstance(20); + String host = "127.0.0.1"; + int GB = 1024; + + // Create Node and raised Node Added event + RMNode node = MockNodes.newNodeInfo(1, + Resources.createResource(16 * 1024, 4), 0, host); + NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); + scheduler.handle(nodeEvent); + + // Create 3 container requests and place it in ask + List ask = new ArrayList(); + ResourceRequest nodeLocalRequest = createResourceRequest(GB, 1, host, + priority.getPriority(), 1, true); + ResourceRequest rackLocalRequest = createResourceRequest(GB, 1, + node.getRackName(), priority.getPriority(), 1, true); + ResourceRequest offRackRequest = createResourceRequest(GB, 1, + ResourceRequest.ANY, priority.getPriority(), 1, true); + ask.add(nodeLocalRequest); + ask.add(rackLocalRequest); + ask.add(offRackRequest); + + // Create Request and update + ApplicationAttemptId appAttemptId = createSchedulingRequest("queueA", + "user1", ask); + scheduler.update(); + + // Sufficient node check-ins to fully schedule containers + NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node); + scheduler.handle(nodeUpdate); + + assertEquals(1, scheduler.getSchedulerApp(appAttemptId).getLiveContainers() + .size()); + SchedulerApplicationAttempt app = scheduler.getSchedulerApp(appAttemptId); + + // ResourceRequest will be empty once NodeUpdate is completed + Assert.assertNull(app.getResourceRequest(priority, host)); + + ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1); + RMContainer rmContainer = app.getRMContainer(containerId1); + + // Create a preempt event and register for preemption + scheduler.warnOrKillContainer(rmContainer); + + // Wait for few clock ticks + clock.tickSec(5); + + // preempt now + scheduler.warnOrKillContainer(rmContainer); + + // Trigger container rescheduled event + scheduler.handle(new ContainerPreemptEvent(appAttemptId, rmContainer, + SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE)); + + List requests = rmContainer.getResourceRequests(); + // Once recovered, resource request will be present again in app + Assert.assertEquals(3, requests.size()); + for (ResourceRequest request : requests) { + Assert.assertEquals(1, + app.getResourceRequest(priority, request.getResourceName()) + .getNumContainers()); + } + + // Send node heartbeat + scheduler.update(); + scheduler.handle(nodeUpdate); + + List containers = scheduler.allocate(appAttemptId, + Collections. emptyList(), + Collections. emptyList(), null, null, null, null).getContainers(); + + // Now with updated ResourceRequest, a container is allocated for AM. + Assert.assertTrue(containers.size() == 1); + } + @Test public void testBlacklistNodes() throws Exception { scheduler.init(conf); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java index 5bdcc08..2456594 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java @@ -1451,7 +1451,7 @@ public void testRecoverRequestAfterPreemption() throws Exception { // Trigger container rescheduled event scheduler.handle(new ContainerPreemptEvent(appAttemptId, rmContainer, - SchedulerEventType.KILL_PREEMPTED_CONTAINER)); + SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE)); List requests = rmContainer.getResourceRequests(); // Once recovered, resource request will be present again in app