diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index d2e81a50d94..420a83907c8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.EnumSet; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -105,6 +106,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.SettableFuture; +import com.google.common.collect.Lists; @SuppressWarnings("unchecked") @Private @@ -728,12 +730,75 @@ private void completeOustandingUpdatesWhichAreReserved( } } + private void completeOustandingUpdatesWhichAreReserved( + Map containerToBeReleased, + RMContainerEventType event) { + + for (Map.Entry entry : containerToBeReleased + .entrySet()) { + completeOustandingUpdatesWhichAreReserved(entry.getKey(), + entry.getValue(), event); + recoverResourceRequestForContainer(entry.getKey()); + } + } + + @VisibleForTesting + @Private + // clean up a completed containers + public void completedContainers(List rmContainers, + String diagnosticMessage, RMContainerEventType event) { + + String message = (diagnosticMessage == null) + ? SchedulerUtils.RELEASED_CONTAINER : diagnosticMessage; + + Map containerToBeReleased = + new HashMap(); + for (RMContainer container : rmContainers) { + + if (container.getExecutionType() == ExecutionType.GUARANTEED) { + containerToBeReleased.put(container, SchedulerUtils.createAbnormalContainerStatus( + container.getContainerId(), message)); + } else { + ContainerId containerId = container.getContainerId(); + // Inform the container + container.handle( + new RMContainerFinishedEvent(containerId, + SchedulerUtils.createAbnormalContainerStatus(containerId, + SchedulerUtils.RELEASED_CONTAINER), + event)); + SchedulerApplicationAttempt schedulerAttempt = + getCurrentAttemptForContainer(containerId); + if (schedulerAttempt != null) { + schedulerAttempt.removeRMContainer(containerId); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Completed container: " + container.getContainerId() + + " in state: " + container.getState() + " event:" + event); + } + getSchedulerNode(container.getNodeId()) + .releaseContainer(container.getContainerId(), false); + } + recoverResourceRequestForContainer(container); + } + + if (containerToBeReleased.size() > 0) { + completedContainerInternal(containerToBeReleased, event); + completeOustandingUpdatesWhichAreReserved(containerToBeReleased, event); + } + } + // clean up a completed container protected abstract void completedContainerInternal(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event); + // clean up a completed container + protected abstract void completedContainerInternal( + Map rmContainer, + RMContainerEventType event); + protected void releaseContainers(List containers, SchedulerApplicationAttempt attempt) { + List containersToBeReleased = new ArrayList<>(); for (ContainerId containerId : containers) { RMContainer rmContainer = getRMContainer(containerId); if (rmContainer == null) { @@ -750,10 +815,10 @@ protected void releaseContainers(List containers, attempt.getApplicationId(), containerId, null); } } - completedContainer(rmContainer, - SchedulerUtils.createAbnormalContainerStatus(containerId, - SchedulerUtils.RELEASED_CONTAINER), RMContainerEventType.RELEASED); + containersToBeReleased.add(rmContainer); } + asyncContainersRelease(containersToBeReleased, + SchedulerUtils.RELEASED_CONTAINER); } @Override @@ -1348,13 +1413,27 @@ protected void rollbackContainerUpdate( } /** - * To be used to release a container via a Scheduler Event rather than - * in the same thread. - * @param container Container. + * To be used to release containers in batches via a Scheduler Event rather + * than in the same thread. + * + * @param containers List of Container. */ - public void asyncContainerRelease(RMContainer container) { - this.rmContext.getDispatcher().getEventHandler().handle( - new ReleaseContainerEvent(container)); + public void asyncContainersRelease(List containers, + String diagnosticMessage) { + + // to do: pull batch size from configuration/xml files + List> rmContainersLists = Lists.partition(containers, 10); + + int i = 1; + for (List rmContainersList : rmContainersLists) { + if (LOG.isDebugEnabled()) { + LOG.debug("Releasing RMContainers in batch. Reason: " + + diagnosticMessage + ". Batch No: " + i++); + } + this.rmContext.getDispatcher().getEventHandler() + .handle( + new ReleaseContainerEvent(rmContainersList, diagnosticMessage)); + } } /* diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 1225af1a2a4..e2546b74c4a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -936,6 +936,9 @@ protected synchronized void addToNewlyAllocatedContainers( tempContainerToKill.add(tempRMContainer); i.remove(); } + + List containersToBeReleased = new ArrayList<>(); + // Release all temporary containers Iterator tempIter = tempContainerToKill.iterator(); while (tempIter.hasNext()) { @@ -943,15 +946,17 @@ protected synchronized void addToNewlyAllocatedContainers( // Mark container for release (set RRs to null, so RM does not think // it is a recoverable container) ((RMContainerImpl) c).setContainerRequest(null); - - // Release this container async-ly so as to prevent - // 'LeafQueue::completedContainer()' from trying to acquire a lock - // on the app and queue which can contended for in the reverse order - // by the Scheduler thread. - ((AbstractYarnScheduler)rmContext.getScheduler()) - .asyncContainerRelease(c); + containersToBeReleased.add(c); tempIter.remove(); } + // Release these containers async-ly so as to prevent + // 'LeafQueue::completedContainer()' from trying to acquire a lock + // on the app and queue which can contended for in the reverse order + // by the Scheduler thread. + ((AbstractYarnScheduler) rmContext.getScheduler()) + .asyncContainersRelease(containersToBeReleased, + SchedulerUtils.RELEASED_CONTAINER); + return updatedContainers; } finally { writeLock.unlock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index 3963dc0a5b7..beae3b0d1c7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -218,8 +218,25 @@ public void completedContainer(Resource clusterResource, RMContainerEventType event, CSQueue childQueue, boolean sortQueues); + /** + * Batch of Containers assigned to the queue has completed. + * + * @param clusterResource the resource of the cluster + * @param application application to which the container was assigned + * @param node node on which the container completed + * @param containersToBeReleased completed containers + * @param childQueue CSQueue to reinsert in childQueues + * @param event event to be sent to the container + * @param sortQueues indicates whether it should re-sort the queues + */ + public void completedContainer(Resource clusterResource, + FiCaSchedulerApp application, + Map containersToBeReleased, + RMContainerEventType event, CSQueue childQueue, boolean sortQueues); + /** * Get the number of applications in the queue. + * * @return number of applications */ public int getNumApplications(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 50ab70d03ec..5675b0b07ac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1079,6 +1079,8 @@ private void doneApplicationAttempt( return; } + List containersToBeReleased = new ArrayList<>(); + // Release all the allocated, acquired, running containers for (RMContainer rmContainer : attempt.getLiveContainers()) { if (keepContainers && rmContainer.getState().equals( @@ -1088,18 +1090,14 @@ private void doneApplicationAttempt( LOG.info("Skip killing " + rmContainer.getContainerId()); continue; } - super.completedContainer(rmContainer, SchedulerUtils - .createAbnormalContainerStatus(rmContainer.getContainerId(), - SchedulerUtils.COMPLETED_APPLICATION), - RMContainerEventType.KILL); + containersToBeReleased.add(rmContainer); } // Release all reserved containers - for (RMContainer rmContainer : attempt.getReservedContainers()) { - super.completedContainer(rmContainer, SchedulerUtils - .createAbnormalContainerStatus(rmContainer.getContainerId(), - "Application Complete"), RMContainerEventType.KILL); - } + containersToBeReleased.addAll(attempt.getReservedContainers()); + + asyncContainersRelease(containersToBeReleased, + SchedulerUtils.COMPLETED_APPLICATION); // Clean up pending requests, metrics etc. attempt.stop(rmAppAttemptFinalState); @@ -1770,16 +1768,15 @@ public void handle(SchedulerEvent event) { } } break; - case RELEASE_CONTAINER: - { - RMContainer container = ((ReleaseContainerEvent) event).getContainer(); - completedContainer(container, - SchedulerUtils.createAbnormalContainerStatus( - container.getContainerId(), - SchedulerUtils.RELEASED_CONTAINER), + case RELEASE_CONTAINERS: { + ReleaseContainerEvent releaseContainerEvent = + (ReleaseContainerEvent) event; + List containers = releaseContainerEvent.getContainers(); + String diagnosticMessage = releaseContainerEvent.getDiagnosticMessage(); + completedContainers(containers, diagnosticMessage, RMContainerEventType.RELEASED); } - break; + break; case KILL_RESERVED_CONTAINER: { ContainerPreemptEvent killReservedContainerEvent = @@ -1904,23 +1901,20 @@ private void removeNode(RMNode nodeInfo) { return; } + List containersToBeReleased = new ArrayList<>(); + // Remove running containers - List runningContainers = - node.getCopiedListOfRunningContainers(); - for (RMContainer container : runningContainers) { - super.completedContainer(container, SchedulerUtils - .createAbnormalContainerStatus(container.getContainerId(), - SchedulerUtils.LOST_CONTAINER), RMContainerEventType.KILL); - } + containersToBeReleased.addAll(node.getCopiedListOfRunningContainers()); // Remove reservations, if any RMContainer reservedContainer = node.getReservedContainer(); if (reservedContainer != null) { - super.completedContainer(reservedContainer, SchedulerUtils - .createAbnormalContainerStatus(reservedContainer.getContainerId(), - SchedulerUtils.LOST_CONTAINER), RMContainerEventType.KILL); + containersToBeReleased.add(reservedContainer); } + asyncContainersRelease(containersToBeReleased, + SchedulerUtils.LOST_CONTAINER); + nodeTracker.removeNode(nodeId); Resource clusterResource = getClusterResource(); getRootQueue().updateClusterResource(clusterResource, @@ -1977,6 +1971,42 @@ protected void completedContainerInternal( } } + @Override + protected void completedContainerInternal( + Map containersToBeReleased, + RMContainerEventType event) { + + // Assuming all containers to be released belongs to same application, + // hence using first container to get application info + RMContainer firstRMContainer = + (RMContainer) containersToBeReleased.values().toArray()[0]; + Container container = firstRMContainer.getContainer(); + ContainerId containerId = container.getId(); + + // Get the application for the finished container + FiCaSchedulerApp application = + getCurrentAttemptForContainer(container.getId()); + ApplicationId appId = + containerId.getApplicationAttemptId().getApplicationId(); + if (application == null) { + LOG.info("Container " + container + " of" + " finished application " + + appId + " completed with event " + event); + return; + } + + // Inform the queue + LeafQueue queue = (LeafQueue) application.getQueue(); + queue.completedContainer(getClusterResource(), application, + containersToBeReleased, event, null, true); + + for (Map.Entry entry : containersToBeReleased + .entrySet()) { + if (ContainerExitStatus.PREEMPTED == entry.getValue().getExitStatus()) { + updateQueuePreemptionMetrics(queue, entry.getKey()); + } + } + } + private void updateQueuePreemptionMetrics( CSQueue queue, RMContainer rmc) { QueueMetrics qMetrics = queue.getMetrics(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 366bad0a4f2..a8818c50a9d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -1620,6 +1621,26 @@ private void updateSchedulerHealthForCompletedContainer( } } + private void updateSchedulerHealthForCompletedContainer( + Map containersToBeReleased, + RMContainerEventType event) { + for (Map.Entry entry : containersToBeReleased + .entrySet()) { + + NodeId nodeId = entry.getKey().getContainer().getNodeId(); + + // Get the node on which the container was allocated + FiCaSchedulerNode node = scheduler.getNode(nodeId); + if (null == node) { + LOG.info("Container " + entry.getKey().getContainer() + " of" + + " removed node " + nodeId + " completed with event " + event); + return; + } + updateSchedulerHealthForCompletedContainer(entry.getKey(), + entry.getValue()); + } + } + /** * Recalculate QueueUsage Ratio. * @@ -1662,36 +1683,13 @@ public void completedContainer(Resource clusterResource, // Careful! Locking order is important! try { writeLock.lock(); - Container container = rmContainer.getContainer(); - - // Inform the application & the node - // Note: It's safe to assume that all state changes to RMContainer - // happen under scheduler's lock... - // So, this is, in effect, a transaction across application & node - if (rmContainer.getState() == RMContainerState.RESERVED) { - removed = application.unreserve(rmContainer.getReservedSchedulerKey(), - node, rmContainer); - } else{ - removed = application.containerCompleted(rmContainer, containerStatus, - event, node.getPartition()); - - node.releaseContainer(rmContainer.getContainerId(), false); - } - - // Book-keeping - if (removed) { - - // Inform the ordering policy - orderingPolicy.containerReleased(application, rmContainer); - - releaseResource(clusterResource, application, container.getResource(), - node.getPartition(), rmContainer); - } + removed = completedContainer(clusterResource, application, node, + rmContainer, + containerStatus, event, childQueue); } finally { writeLock.unlock(); } - if (removed) { // Inform the parent queue _outside_ of the leaf-queue lock getParent().completedContainer(clusterResource, application, node, @@ -1704,6 +1702,89 @@ public void completedContainer(Resource clusterResource, new KillableContainer(rmContainer, node.getPartition(), queueName)); } + public boolean completedContainer(Resource clusterResource, + FiCaSchedulerApp application, FiCaSchedulerNode node, + RMContainer rmContainer, ContainerStatus containerStatus, + RMContainerEventType event, CSQueue childQueue) { + boolean removed = false; + Container container = rmContainer.getContainer(); + + // Inform the application & the node + // Note: It's safe to assume that all state changes to RMContainer + // happen under scheduler's lock... + // So, this is, in effect, a transaction across application & node + if (rmContainer.getState() == RMContainerState.RESERVED) { + removed = application.unreserve(rmContainer.getReservedSchedulerKey(), + node, rmContainer); + } else { + removed = application.containerCompleted(rmContainer, containerStatus, + event, node.getPartition()); + + node.releaseContainer(rmContainer.getContainerId(), false); + } + + // Book-keeping + if (removed) { + + // Inform the ordering policy + orderingPolicy.containerReleased(application, rmContainer); + + releaseResource(clusterResource, application, container.getResource(), + node.getPartition(), rmContainer); + } + return removed; + } + + @Override + public void completedContainer(Resource clusterResource, + FiCaSchedulerApp application, + Map containersToBeReleased, + RMContainerEventType event, CSQueue childQueue, boolean sortQueues) { + + // Update SchedulerHealth for released / preempted container + updateSchedulerHealthForCompletedContainer(containersToBeReleased, event); + + if (application != null) { + + boolean removed = false; + + // Careful! Locking order is important! + try { + writeLock.lock(); + for (Map.Entry entry : containersToBeReleased + .entrySet()) { + RMContainer rmContainer = entry.getKey(); + FiCaSchedulerNode node = + scheduler.getNode(rmContainer.getContainer().getNodeId()); + removed = completedContainer(clusterResource, application, node, + rmContainer, entry.getValue(), event, childQueue); + // Don't Inform the parent queue _outside_ of the leaf-queue lock + // for containers if above step is not successful + if (!removed) { + containersToBeReleased.remove(entry.getKey()); + } + } + } finally { + writeLock.unlock(); + } + + // Inform the parent queue _outside_ of the leaf-queue lock + getParent().completedContainer(clusterResource, application, + containersToBeReleased, + event, this, sortQueues); + } + + // Notify PreemptionManager + for (Map.Entry entry : containersToBeReleased + .entrySet()) { + FiCaSchedulerNode node = + scheduler.getNode(entry.getKey().getContainer().getNodeId()); + csContext.getPreemptionManager() + .removeKillableContainer(new KillableContainer(entry.getKey(), + node.getPartition(), queueName)); + } + } + void allocateResource(Resource clusterResource, SchedulerApplicationAttempt application, Resource resource, String nodePartition, RMContainer rmContainer) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index bb4823e1dd2..526f542ac49 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -837,18 +837,11 @@ private void printChildQueues() { private void internalReleaseResource(Resource clusterResource, FiCaSchedulerNode node, Resource releasedResource) { - try { - writeLock.lock(); - super.releaseResource(clusterResource, releasedResource, - node.getPartition()); + super.releaseResource(clusterResource, releasedResource, + node.getPartition()); - if (LOG.isDebugEnabled()) { - LOG.debug( - "completedContainer " + this + ", cluster=" + clusterResource); - } - - } finally { - writeLock.unlock(); + if (LOG.isDebugEnabled()) { + LOG.debug("completedContainer " + this + ", cluster=" + clusterResource); } } @@ -871,6 +864,22 @@ public void completedContainer(Resource clusterResource, } } + @Override + public void completedContainer(Resource clusterResource, + FiCaSchedulerApp application, + Map rmContainers, + RMContainerEventType event, CSQueue completedChildQueue, + boolean sortQueues) { + for (Map.Entry entry : rmContainers + .entrySet()) { + RMContainer rmContainer = entry.getKey(); + FiCaSchedulerNode node = + scheduler.getNode(rmContainer.getContainer().getNodeId()); + completedContainer(clusterResource, application, node, entry.getKey(), + null, event, this, sortQueues); + } + } + @Override public void updateClusterResource(Resource clusterResource, ResourceLimits resourceLimits) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ReleaseContainerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ReleaseContainerEvent.java index 4f3168471ef..0cdddc170f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ReleaseContainerEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ReleaseContainerEvent.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; +import java.util.List; + import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; /** @@ -25,22 +27,36 @@ */ public class ReleaseContainerEvent extends SchedulerEvent { - private final RMContainer container; + private final List containers; + private final String diagnosticMessage; /** * Create Event. - * @param rmContainer RMContainer. + * + * @param rmContainers List of RMContainer. */ - public ReleaseContainerEvent(RMContainer rmContainer) { - super(SchedulerEventType.RELEASE_CONTAINER); - this.container = rmContainer; + public ReleaseContainerEvent(List rmContainers, + String diagnosticMessage) { + super(SchedulerEventType.RELEASE_CONTAINERS); + this.containers = rmContainers; + this.diagnosticMessage = diagnosticMessage; } /** * Get RMContainer. - * @return RMContainer. + * + * @return List of RMContainer. + */ + public List getContainers() { + return containers; + } + + /** + * Get diagnostic message for containers to be released. + * + * @return diagnosticMessage */ - public RMContainer getContainer() { - return container; + public String getDiagnosticMessage() { + return diagnosticMessage; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java index b107cf4ee61..e84b500a79e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java @@ -38,9 +38,6 @@ // Source: ContainerAllocationExpirer CONTAINER_EXPIRED, - // Source: SchedulerAppAttempt::pullNewlyUpdatedContainer. - RELEASE_CONTAINER, - /* Source: SchedulingEditPolicy */ KILL_RESERVED_CONTAINER, @@ -54,5 +51,10 @@ MARK_CONTAINER_FOR_NONKILLABLE, //Queue Management Change - MANAGE_QUEUE + MANAGE_QUEUE, + + // Source: SchedulerAppAttempt::pullNewlyUpdatedContainer, + // Scheduler#doneApplicationAttempt, Scheduler#removeNode, + // Scheduler#releaseContainers + RELEASE_CONTAINERS } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index eb9f6af7101..d5fc2acf7d1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -24,6 +24,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; @@ -672,6 +673,8 @@ private void removeApplicationAttempt( return; } + List containersToBeReleased = new ArrayList<>(); + // Release all the running containers for (RMContainer rmContainer : attempt.getLiveContainers()) { if (keepContainers && rmContainer.getState().equals( @@ -681,18 +684,14 @@ private void removeApplicationAttempt( LOG.info("Skip killing " + rmContainer.getContainerId()); continue; } - super.completedContainer(rmContainer, SchedulerUtils - .createAbnormalContainerStatus(rmContainer.getContainerId(), - SchedulerUtils.COMPLETED_APPLICATION), - RMContainerEventType.KILL); + containersToBeReleased.add(rmContainer); } // Release all reserved containers - for (RMContainer rmContainer : attempt.getReservedContainers()) { - super.completedContainer(rmContainer, SchedulerUtils - .createAbnormalContainerStatus(rmContainer.getContainerId(), - "Application Complete"), RMContainerEventType.KILL); - } + containersToBeReleased.addAll(attempt.getReservedContainers()); + + asyncContainersRelease(containersToBeReleased, + SchedulerUtils.COMPLETED_APPLICATION); // Clean up pending requests, metrics etc. attempt.stop(rmAppAttemptFinalState); @@ -766,6 +765,18 @@ protected void completedContainerInternal( } } + + @Override + // clean up a completed containers + public void completedContainerInternal( + Map containersToBeReleased, + RMContainerEventType event) { + for (Map.Entry entry : containersToBeReleased + .entrySet()) { + completedContainerInternal(entry.getKey(), entry.getValue(), event); + } + } + private void addNode(List containerReports, RMNode node) { try { @@ -799,22 +810,18 @@ private void removeNode(RMNode rmNode) { return; } + List containersToBeReleased = new ArrayList<>(); + // Remove running containers - List runningContainers = - node.getCopiedListOfRunningContainers(); - for (RMContainer container : runningContainers) { - super.completedContainer(container, SchedulerUtils - .createAbnormalContainerStatus(container.getContainerId(), - SchedulerUtils.LOST_CONTAINER), RMContainerEventType.KILL); - } + containersToBeReleased.addAll(node.getCopiedListOfRunningContainers()); // Remove reservations, if any RMContainer reservedContainer = node.getReservedContainer(); if (reservedContainer != null) { - super.completedContainer(reservedContainer, SchedulerUtils - .createAbnormalContainerStatus(reservedContainer.getContainerId(), - SchedulerUtils.LOST_CONTAINER), RMContainerEventType.KILL); + containersToBeReleased.add(reservedContainer); } + asyncContainersRelease(containersToBeReleased, + SchedulerUtils.LOST_CONTAINER); nodeTracker.removeNode(nodeId); Resource clusterResource = getClusterResource(); @@ -1273,16 +1280,17 @@ public void handle(SchedulerEvent event) { appAttemptRemovedEvent.getFinalAttemptState(), appAttemptRemovedEvent.getKeepContainersAcrossAppAttempts()); break; - case RELEASE_CONTAINER: + case RELEASE_CONTAINERS: { if (!(event instanceof ReleaseContainerEvent)) { throw new RuntimeException("Unexpected event type: " + event); } - RMContainer container = ((ReleaseContainerEvent) event).getContainer(); - completedContainer(container, - SchedulerUtils.createAbnormalContainerStatus( - container.getContainerId(), - SchedulerUtils.RELEASED_CONTAINER), + ReleaseContainerEvent releaseContainerEvent = + (ReleaseContainerEvent) event; + List containers = releaseContainerEvent.getContainers(); + String diagnosticMessage = releaseContainerEvent.getDiagnosticMessage(); + completedContainers(containers, diagnosticMessage, RMContainerEventType.RELEASED); + } break; case CONTAINER_EXPIRED: if (!(event instanceof ContainerExpiredSchedulerEvent)) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 8396db54ad8..fea47959f97 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -467,6 +467,8 @@ private synchronized void doneApplicationAttempt( " has completed!"); } + List containersToBeReleased = new ArrayList<>(); + // Kill all 'live' containers for (RMContainer container : attempt.getLiveContainers()) { if (keepContainers @@ -476,12 +478,12 @@ private synchronized void doneApplicationAttempt( LOG.info("Skip killing " + container.getContainerId()); continue; } - super.completedContainer(container, - SchedulerUtils.createAbnormalContainerStatus( - container.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION), - RMContainerEventType.KILL); + containersToBeReleased.add(container); } + asyncContainersRelease(containersToBeReleased, + SchedulerUtils.COMPLETED_APPLICATION); + // Clean up pending requests, metrics etc. attempt.stop(rmAppAttemptFinalState); } @@ -817,18 +819,18 @@ public void handle(SchedulerEvent event) { RMContainerEventType.EXPIRE); } break; - case RELEASE_CONTAINER: { + case RELEASE_CONTAINERS: { if (!(event instanceof ReleaseContainerEvent)) { throw new RuntimeException("Unexpected event type: " + event); } - RMContainer container = ((ReleaseContainerEvent) event).getContainer(); - completedContainer(container, - SchedulerUtils.createAbnormalContainerStatus( - container.getContainerId(), - SchedulerUtils.RELEASED_CONTAINER), + ReleaseContainerEvent releaseContainerEvent = + (ReleaseContainerEvent) event; + List containers = releaseContainerEvent.getContainers(); + String diagnosticMessage = releaseContainerEvent.getDiagnosticMessage(); + completedContainers(containers, diagnosticMessage, RMContainerEventType.RELEASED); } - break; + break; default: LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!"); } @@ -875,6 +877,17 @@ protected synchronized void completedContainerInternal( } + @Override + // clean up a completed containers + public void completedContainerInternal( + Map containersToBeReleased, + RMContainerEventType event) { + for (Map.Entry entry : containersToBeReleased + .entrySet()) { + completedContainerInternal(entry.getKey(), entry.getValue(), event); + } + } + private Resource usedResource = recordFactory.newRecordInstance(Resource.class); private synchronized void removeNode(RMNode nodeInfo) { @@ -883,13 +896,8 @@ private synchronized void removeNode(RMNode nodeInfo) { return; } // Kill running containers - for(RMContainer container : node.getCopiedListOfRunningContainers()) { - super.completedContainer(container, - SchedulerUtils.createAbnormalContainerStatus( - container.getContainerId(), - SchedulerUtils.LOST_CONTAINER), - RMContainerEventType.KILL); - } + asyncContainersRelease(node.getCopiedListOfRunningContainers(), + SchedulerUtils.LOST_CONTAINER); nodeTracker.removeNode(nodeInfo.getNodeID()); }