diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index d2e81a50d94..eaf5404f9fc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -105,6 +105,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.SettableFuture; +import com.google.common.collect.Lists; @SuppressWarnings("unchecked") @Private @@ -728,12 +729,30 @@ private void completeOustandingUpdatesWhichAreReserved( } } + @VisibleForTesting + @Private + // clean up a completed containers + public void completedContainers(List rmContainers, + String diagnosticMessage) { + + String message = (diagnosticMessage == null) + ? SchedulerUtils.RELEASED_CONTAINER : diagnosticMessage; + + for (RMContainer container : rmContainers) { + completedContainer(container, + SchedulerUtils.createAbnormalContainerStatus( + container.getContainerId(), message), + RMContainerEventType.RELEASED); + } + } + // clean up a completed container protected abstract void completedContainerInternal(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event); protected void releaseContainers(List containers, SchedulerApplicationAttempt attempt) { + List containersToBeReleased = new ArrayList<>(); for (ContainerId containerId : containers) { RMContainer rmContainer = getRMContainer(containerId); if (rmContainer == null) { @@ -750,10 +769,10 @@ protected void releaseContainers(List containers, attempt.getApplicationId(), containerId, null); } } - completedContainer(rmContainer, - SchedulerUtils.createAbnormalContainerStatus(containerId, - SchedulerUtils.RELEASED_CONTAINER), RMContainerEventType.RELEASED); + containersToBeReleased.add(rmContainer); } + asyncContainersRelease(containersToBeReleased, + SchedulerUtils.RELEASED_CONTAINER); } @Override @@ -1348,13 +1367,27 @@ protected void rollbackContainerUpdate( } /** - * To be used to release a container via a Scheduler Event rather than - * in the same thread. - * @param container Container. + * To be used to release containers in batches via a Scheduler Event rather + * than in the same thread. + * + * @param containers List of Container. */ - public void asyncContainerRelease(RMContainer container) { - this.rmContext.getDispatcher().getEventHandler().handle( - new ReleaseContainerEvent(container)); + public void asyncContainersRelease(List containers, + String diagnosticMessage) { + + // to do: pull batch size from configuration/xml files + List> rmContainersLists = Lists.partition(containers, 10); + + int i = 1; + for (List rmContainersList : rmContainersLists) { + if (LOG.isDebugEnabled()) { + LOG.debug("Releasing RMContainers in batch. Reason: " + + diagnosticMessage + ". Batch No: " + i++); + } + this.rmContext.getDispatcher().getEventHandler() + .handle( + new ReleaseContainerEvent(rmContainersList, diagnosticMessage)); + } } /* diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 1225af1a2a4..e2546b74c4a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -936,6 +936,9 @@ protected synchronized void addToNewlyAllocatedContainers( tempContainerToKill.add(tempRMContainer); i.remove(); } + + List containersToBeReleased = new ArrayList<>(); + // Release all temporary containers Iterator tempIter = tempContainerToKill.iterator(); while (tempIter.hasNext()) { @@ -943,15 +946,17 @@ protected synchronized void addToNewlyAllocatedContainers( // Mark container for release (set RRs to null, so RM does not think // it is a recoverable container) ((RMContainerImpl) c).setContainerRequest(null); - - // Release this container async-ly so as to prevent - // 'LeafQueue::completedContainer()' from trying to acquire a lock - // on the app and queue which can contended for in the reverse order - // by the Scheduler thread. - ((AbstractYarnScheduler)rmContext.getScheduler()) - .asyncContainerRelease(c); + containersToBeReleased.add(c); tempIter.remove(); } + // Release these containers async-ly so as to prevent + // 'LeafQueue::completedContainer()' from trying to acquire a lock + // on the app and queue which can contended for in the reverse order + // by the Scheduler thread. + ((AbstractYarnScheduler) rmContext.getScheduler()) + .asyncContainersRelease(containersToBeReleased, + SchedulerUtils.RELEASED_CONTAINER); + return updatedContainers; } finally { writeLock.unlock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 50ab70d03ec..1951e3af733 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1079,6 +1079,8 @@ private void doneApplicationAttempt( return; } + List containersToBeReleased = new ArrayList<>(); + // Release all the allocated, acquired, running containers for (RMContainer rmContainer : attempt.getLiveContainers()) { if (keepContainers && rmContainer.getState().equals( @@ -1088,18 +1090,14 @@ private void doneApplicationAttempt( LOG.info("Skip killing " + rmContainer.getContainerId()); continue; } - super.completedContainer(rmContainer, SchedulerUtils - .createAbnormalContainerStatus(rmContainer.getContainerId(), - SchedulerUtils.COMPLETED_APPLICATION), - RMContainerEventType.KILL); + containersToBeReleased.add(rmContainer); } // Release all reserved containers - for (RMContainer rmContainer : attempt.getReservedContainers()) { - super.completedContainer(rmContainer, SchedulerUtils - .createAbnormalContainerStatus(rmContainer.getContainerId(), - "Application Complete"), RMContainerEventType.KILL); - } + containersToBeReleased.addAll(attempt.getReservedContainers()); + + asyncContainersRelease(containersToBeReleased, + SchedulerUtils.COMPLETED_APPLICATION); // Clean up pending requests, metrics etc. attempt.stop(rmAppAttemptFinalState); @@ -1770,16 +1768,14 @@ public void handle(SchedulerEvent event) { } } break; - case RELEASE_CONTAINER: - { - RMContainer container = ((ReleaseContainerEvent) event).getContainer(); - completedContainer(container, - SchedulerUtils.createAbnormalContainerStatus( - container.getContainerId(), - SchedulerUtils.RELEASED_CONTAINER), - RMContainerEventType.RELEASED); - } - break; + case RELEASE_CONTAINERS: { + ReleaseContainerEvent releaseContainerEvent = + (ReleaseContainerEvent) event; + List containers = releaseContainerEvent.getContainers(); + String diagnosticMessage = releaseContainerEvent.getDiagnosticMessage(); + completedContainers(containers, diagnosticMessage); + } + break; case KILL_RESERVED_CONTAINER: { ContainerPreemptEvent killReservedContainerEvent = @@ -1904,23 +1900,20 @@ private void removeNode(RMNode nodeInfo) { return; } + List containersToBeReleased = new ArrayList<>(); + // Remove running containers - List runningContainers = - node.getCopiedListOfRunningContainers(); - for (RMContainer container : runningContainers) { - super.completedContainer(container, SchedulerUtils - .createAbnormalContainerStatus(container.getContainerId(), - SchedulerUtils.LOST_CONTAINER), RMContainerEventType.KILL); - } + containersToBeReleased.addAll(node.getCopiedListOfRunningContainers()); // Remove reservations, if any RMContainer reservedContainer = node.getReservedContainer(); if (reservedContainer != null) { - super.completedContainer(reservedContainer, SchedulerUtils - .createAbnormalContainerStatus(reservedContainer.getContainerId(), - SchedulerUtils.LOST_CONTAINER), RMContainerEventType.KILL); + containersToBeReleased.add(reservedContainer); } + asyncContainersRelease(containersToBeReleased, + SchedulerUtils.LOST_CONTAINER); + nodeTracker.removeNode(nodeId); Resource clusterResource = getClusterResource(); getRootQueue().updateClusterResource(clusterResource, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ReleaseContainerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ReleaseContainerEvent.java index 4f3168471ef..0cdddc170f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ReleaseContainerEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ReleaseContainerEvent.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; +import java.util.List; + import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; /** @@ -25,22 +27,36 @@ */ public class ReleaseContainerEvent extends SchedulerEvent { - private final RMContainer container; + private final List containers; + private final String diagnosticMessage; /** * Create Event. - * @param rmContainer RMContainer. + * + * @param rmContainers List of RMContainer. */ - public ReleaseContainerEvent(RMContainer rmContainer) { - super(SchedulerEventType.RELEASE_CONTAINER); - this.container = rmContainer; + public ReleaseContainerEvent(List rmContainers, + String diagnosticMessage) { + super(SchedulerEventType.RELEASE_CONTAINERS); + this.containers = rmContainers; + this.diagnosticMessage = diagnosticMessage; } /** * Get RMContainer. - * @return RMContainer. + * + * @return List of RMContainer. + */ + public List getContainers() { + return containers; + } + + /** + * Get diagnostic message for containers to be released. + * + * @return diagnosticMessage */ - public RMContainer getContainer() { - return container; + public String getDiagnosticMessage() { + return diagnosticMessage; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java index b107cf4ee61..e84b500a79e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java @@ -38,9 +38,6 @@ // Source: ContainerAllocationExpirer CONTAINER_EXPIRED, - // Source: SchedulerAppAttempt::pullNewlyUpdatedContainer. - RELEASE_CONTAINER, - /* Source: SchedulingEditPolicy */ KILL_RESERVED_CONTAINER, @@ -54,5 +51,10 @@ MARK_CONTAINER_FOR_NONKILLABLE, //Queue Management Change - MANAGE_QUEUE + MANAGE_QUEUE, + + // Source: SchedulerAppAttempt::pullNewlyUpdatedContainer, + // Scheduler#doneApplicationAttempt, Scheduler#removeNode, + // Scheduler#releaseContainers + RELEASE_CONTAINERS } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index eb9f6af7101..e9173d915d5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -672,6 +672,8 @@ private void removeApplicationAttempt( return; } + List containersToBeReleased = new ArrayList<>(); + // Release all the running containers for (RMContainer rmContainer : attempt.getLiveContainers()) { if (keepContainers && rmContainer.getState().equals( @@ -681,18 +683,14 @@ private void removeApplicationAttempt( LOG.info("Skip killing " + rmContainer.getContainerId()); continue; } - super.completedContainer(rmContainer, SchedulerUtils - .createAbnormalContainerStatus(rmContainer.getContainerId(), - SchedulerUtils.COMPLETED_APPLICATION), - RMContainerEventType.KILL); + containersToBeReleased.add(rmContainer); } // Release all reserved containers - for (RMContainer rmContainer : attempt.getReservedContainers()) { - super.completedContainer(rmContainer, SchedulerUtils - .createAbnormalContainerStatus(rmContainer.getContainerId(), - "Application Complete"), RMContainerEventType.KILL); - } + containersToBeReleased.addAll(attempt.getReservedContainers()); + + asyncContainersRelease(containersToBeReleased, + SchedulerUtils.COMPLETED_APPLICATION); // Clean up pending requests, metrics etc. attempt.stop(rmAppAttemptFinalState); @@ -799,22 +797,18 @@ private void removeNode(RMNode rmNode) { return; } + List containersToBeReleased = new ArrayList<>(); + // Remove running containers - List runningContainers = - node.getCopiedListOfRunningContainers(); - for (RMContainer container : runningContainers) { - super.completedContainer(container, SchedulerUtils - .createAbnormalContainerStatus(container.getContainerId(), - SchedulerUtils.LOST_CONTAINER), RMContainerEventType.KILL); - } + containersToBeReleased.addAll(node.getCopiedListOfRunningContainers()); // Remove reservations, if any RMContainer reservedContainer = node.getReservedContainer(); if (reservedContainer != null) { - super.completedContainer(reservedContainer, SchedulerUtils - .createAbnormalContainerStatus(reservedContainer.getContainerId(), - SchedulerUtils.LOST_CONTAINER), RMContainerEventType.KILL); + containersToBeReleased.add(reservedContainer); } + asyncContainersRelease(containersToBeReleased, + SchedulerUtils.LOST_CONTAINER); nodeTracker.removeNode(nodeId); Resource clusterResource = getClusterResource(); @@ -1273,16 +1267,16 @@ public void handle(SchedulerEvent event) { appAttemptRemovedEvent.getFinalAttemptState(), appAttemptRemovedEvent.getKeepContainersAcrossAppAttempts()); break; - case RELEASE_CONTAINER: + case RELEASE_CONTAINERS: { if (!(event instanceof ReleaseContainerEvent)) { throw new RuntimeException("Unexpected event type: " + event); } - RMContainer container = ((ReleaseContainerEvent) event).getContainer(); - completedContainer(container, - SchedulerUtils.createAbnormalContainerStatus( - container.getContainerId(), - SchedulerUtils.RELEASED_CONTAINER), - RMContainerEventType.RELEASED); + ReleaseContainerEvent releaseContainerEvent = + (ReleaseContainerEvent) event; + List containers = releaseContainerEvent.getContainers(); + String diagnosticMessage = releaseContainerEvent.getDiagnosticMessage(); + completedContainers(containers, diagnosticMessage); + } break; case CONTAINER_EXPIRED: if (!(event instanceof ContainerExpiredSchedulerEvent)) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 8396db54ad8..706c87fe2de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -467,6 +467,8 @@ private synchronized void doneApplicationAttempt( " has completed!"); } + List containersToBeReleased = new ArrayList<>(); + // Kill all 'live' containers for (RMContainer container : attempt.getLiveContainers()) { if (keepContainers @@ -476,12 +478,12 @@ private synchronized void doneApplicationAttempt( LOG.info("Skip killing " + container.getContainerId()); continue; } - super.completedContainer(container, - SchedulerUtils.createAbnormalContainerStatus( - container.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION), - RMContainerEventType.KILL); + containersToBeReleased.add(container); } + asyncContainersRelease(containersToBeReleased, + SchedulerUtils.COMPLETED_APPLICATION); + // Clean up pending requests, metrics etc. attempt.stop(rmAppAttemptFinalState); } @@ -817,18 +819,17 @@ public void handle(SchedulerEvent event) { RMContainerEventType.EXPIRE); } break; - case RELEASE_CONTAINER: { + case RELEASE_CONTAINERS: { if (!(event instanceof ReleaseContainerEvent)) { throw new RuntimeException("Unexpected event type: " + event); } - RMContainer container = ((ReleaseContainerEvent) event).getContainer(); - completedContainer(container, - SchedulerUtils.createAbnormalContainerStatus( - container.getContainerId(), - SchedulerUtils.RELEASED_CONTAINER), - RMContainerEventType.RELEASED); + ReleaseContainerEvent releaseContainerEvent = + (ReleaseContainerEvent) event; + List containers = releaseContainerEvent.getContainers(); + String diagnosticMessage = releaseContainerEvent.getDiagnosticMessage(); + completedContainers(containers, diagnosticMessage); } - break; + break; default: LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!"); } @@ -883,13 +884,8 @@ private synchronized void removeNode(RMNode nodeInfo) { return; } // Kill running containers - for(RMContainer container : node.getCopiedListOfRunningContainers()) { - super.completedContainer(container, - SchedulerUtils.createAbnormalContainerStatus( - container.getContainerId(), - SchedulerUtils.LOST_CONTAINER), - RMContainerEventType.KILL); - } + asyncContainersRelease(node.getCopiedListOfRunningContainers(), + SchedulerUtils.LOST_CONTAINER); nodeTracker.removeNode(nodeInfo.getNodeID()); }